Commit 4ebc2d05 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Tracing/Enh: add tracing decorator to trace read_blob and get_dataframe methods

parent 1a0814d2
Pipeline #49107 passed with stages
in 5 minutes and 22 seconds
......@@ -50,6 +50,7 @@ from .mime_types import MimeType, MimeTypes
# - using faster format, e.g. hd5
# - threshold about the busyness of the service (if not busy and not huge data -> direct write)
# - better proc fork and arg serialization
from ..helper.traces import with_trace
def export_to_parquet(
......@@ -228,6 +229,7 @@ async def create_and_write_blob(
raise RuntimeError(f'unexpected type {source} returned by bulk exporter function')
@with_trace('read_blob')
async def read_blob(blob: BlobBulk):
importer = BlobFileImporters.from_string(blob.content_type)
# TODO: run in executor?
......
......@@ -28,6 +28,7 @@ from .blob_storage import (
from .bulk_id import BulkId
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from ..helper.traces import with_trace
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
......@@ -48,12 +49,13 @@ async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
return bulkblob.id
@with_trace('get_dataframe')
async def get_dataframe(ctx: Context, bulk_id: str) -> pd.DataFrame:
""" fetch bulk from a blob storage, provide column major """
tenant = await resolve_tenant(ctx.partition_id)
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
bytes_data = await storage.download(tenant, bulk_id)
bytes_data = await storage.download(tenant, bulk_id)
# for now use fix parquet format saving one call
# meta_data = await storage.download_metadata(tenant.project_id, tenant.bucket_name, bulk_id)
# content_type = meta_data.metadata["content_type"]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment