Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (7)
......@@ -15,13 +15,13 @@
This module groups function related to bulk catalog.
A catalog contains metadata of the chunks
"""
import asyncio
import functools
import json
from contextlib import suppress
from dataclasses import dataclass
from typing import Dict, Iterable, List, NamedTuple, Optional, Set
from dask.distributed import get_client
from .traces import submit_with_trace, trace_attributes_root_span
from app.helper.traces import with_trace
from app.utils import capture_timings
......@@ -179,32 +179,26 @@ CATALOG_FILE_NAME = 'bulk_catalog.json'
@capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('save_bulk_catalog')
def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
async def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
"""save a bulk catalog to a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with filesystem.open(meta_path, 'w') as outfile:
data = json.dumps(catalog.as_dict(), indent=0)
_func = functools.partial(json.dumps, catalog.as_dict(), indent=0)
data = await asyncio.get_running_loop().run_in_executor(None, _func)
outfile.write(data)
# json.dump(catalog.as_dict(), outfile) # don't know why json.dump is slower (local windows)
@capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk_catalog')
def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
async def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
"""load a bulk catalog from a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with suppress(FileNotFoundError):
with filesystem.open(meta_path) as json_file:
data = json.load(json_file)
data = await asyncio.get_running_loop().run_in_executor(None, json.load, json_file)
return BulkCatalog.from_dict(data)
return None
async def async_load_bulk_catalog(filesystem, folder_path: str) -> BulkCatalog:
return await submit_with_trace(get_client(), load_bulk_catalog, filesystem, folder_path)
async def async_save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
return await submit_with_trace(get_client(), save_bulk_catalog, filesystem, folder_path, catalog)
......@@ -33,7 +33,7 @@ from app.conf import Config
from .client import DaskClient
from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace, trace_attributes_root_span
from .traces import map_with_trace, submit_with_trace, trace_attributes_root_span, trace_attributes_current_span
from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, join_dataframes, worker_capture_timing_handlers,
get_num_rows, set_index, index_union)
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
......@@ -42,13 +42,14 @@ from . import storage_path_builder as pathBuilder
from . import session_file_meta as session_meta
from ..bulk_id import new_bulk_id
from .bulk_catalog import (BulkCatalog, ChunkGroup,
async_load_bulk_catalog,
async_save_bulk_catalog)
load_bulk_catalog,
save_bulk_catalog)
from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
from . import dask_worker_write_bulk as bulk_writer
from ..consistency_checks import DataConsistencyChecks
def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
"""call dask.dataframe.read_parquet with default parameters
Dask read_parquet parameters:
......@@ -180,6 +181,11 @@ class DaskBulkStorage:
if index_df:
dfs.append(index_df)
trace_attributes_current_span({
'parquet-files-to-load-count': len(files_to_load),
'df-to-merge-count': len(dfs)
})
if not dfs:
raise RuntimeError("cannot find requested columns")
......@@ -250,7 +256,7 @@ class DaskBulkStorage:
@with_trace('get_bulk_catalog')
async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog:
bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
catalog = await async_load_bulk_catalog(self._fs, bulk_path)
catalog = await load_bulk_catalog(self._fs, bulk_path)
if catalog:
return catalog
......@@ -467,8 +473,7 @@ class DaskBulkStorage:
self._fill_catalog_columns_info(catalog, chunk_metas, bulk_id)
)
fcatalog = await self.client.scatter(catalog)
await async_save_bulk_catalog(self._fs, commit_path, fcatalog)
await save_bulk_catalog(self._fs, commit_path, catalog)
trace_attributes_root_span({
'catalog-row-count': catalog.nb_rows,
'catalog-col-count': catalog.all_columns_count
......