Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (7)
...@@ -15,13 +15,13 @@ ...@@ -15,13 +15,13 @@
This module groups function related to bulk catalog. This module groups function related to bulk catalog.
A catalog contains metadata of the chunks A catalog contains metadata of the chunks
""" """
import asyncio
import functools
import json import json
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Iterable, List, NamedTuple, Optional, Set from typing import Dict, Iterable, List, NamedTuple, Optional, Set
from dask.distributed import get_client
from .traces import submit_with_trace, trace_attributes_root_span from .traces import submit_with_trace, trace_attributes_root_span
from app.helper.traces import with_trace from app.helper.traces import with_trace
from app.utils import capture_timings from app.utils import capture_timings
...@@ -179,32 +179,26 @@ CATALOG_FILE_NAME = 'bulk_catalog.json' ...@@ -179,32 +179,26 @@ CATALOG_FILE_NAME = 'bulk_catalog.json'
@capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers) @capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('save_bulk_catalog') @with_trace('save_bulk_catalog')
def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None: async def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
"""save a bulk catalog to a json file in the given folder path""" """save a bulk catalog to a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path) folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME) meta_path = join(folder_path, CATALOG_FILE_NAME)
with filesystem.open(meta_path, 'w') as outfile: with filesystem.open(meta_path, 'w') as outfile:
data = json.dumps(catalog.as_dict(), indent=0) _func = functools.partial(json.dumps, catalog.as_dict(), indent=0)
data = await asyncio.get_running_loop().run_in_executor(None, _func)
outfile.write(data) outfile.write(data)
# json.dump(catalog.as_dict(), outfile) # don't know why json.dump is slower (local windows)
@capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers) @capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk_catalog') @with_trace('load_bulk_catalog')
def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]: async def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
"""load a bulk catalog from a json file in the given folder path""" """load a bulk catalog from a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path) folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME) meta_path = join(folder_path, CATALOG_FILE_NAME)
with suppress(FileNotFoundError): with suppress(FileNotFoundError):
with filesystem.open(meta_path) as json_file: with filesystem.open(meta_path) as json_file:
data = json.load(json_file) data = await asyncio.get_running_loop().run_in_executor(None, json.load, json_file)
return BulkCatalog.from_dict(data) return BulkCatalog.from_dict(data)
return None return None
async def async_load_bulk_catalog(filesystem, folder_path: str) -> BulkCatalog:
return await submit_with_trace(get_client(), load_bulk_catalog, filesystem, folder_path)
async def async_save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
return await submit_with_trace(get_client(), save_bulk_catalog, filesystem, folder_path, catalog)
...@@ -33,7 +33,7 @@ from app.conf import Config ...@@ -33,7 +33,7 @@ from app.conf import Config
from .client import DaskClient from .client import DaskClient
from .dask_worker_plugin import DaskWorkerPlugin from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace, trace_attributes_root_span from .traces import map_with_trace, submit_with_trace, trace_attributes_root_span, trace_attributes_current_span
from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, join_dataframes, worker_capture_timing_handlers, from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, join_dataframes, worker_capture_timing_handlers,
get_num_rows, set_index, index_union) get_num_rows, set_index, index_union)
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
...@@ -42,13 +42,14 @@ from . import storage_path_builder as pathBuilder ...@@ -42,13 +42,14 @@ from . import storage_path_builder as pathBuilder
from . import session_file_meta as session_meta from . import session_file_meta as session_meta
from ..bulk_id import new_bulk_id from ..bulk_id import new_bulk_id
from .bulk_catalog import (BulkCatalog, ChunkGroup, from .bulk_catalog import (BulkCatalog, ChunkGroup,
async_load_bulk_catalog, load_bulk_catalog,
async_save_bulk_catalog) save_bulk_catalog)
from ..mime_types import MimeType from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
from . import dask_worker_write_bulk as bulk_writer from . import dask_worker_write_bulk as bulk_writer
from ..consistency_checks import DataConsistencyChecks from ..consistency_checks import DataConsistencyChecks
def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame: def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
"""call dask.dataframe.read_parquet with default parameters """call dask.dataframe.read_parquet with default parameters
Dask read_parquet parameters: Dask read_parquet parameters:
...@@ -180,6 +181,11 @@ class DaskBulkStorage: ...@@ -180,6 +181,11 @@ class DaskBulkStorage:
if index_df: if index_df:
dfs.append(index_df) dfs.append(index_df)
trace_attributes_current_span({
'parquet-files-to-load-count': len(files_to_load),
'df-to-merge-count': len(dfs)
})
if not dfs: if not dfs:
raise RuntimeError("cannot find requested columns") raise RuntimeError("cannot find requested columns")
...@@ -250,7 +256,7 @@ class DaskBulkStorage: ...@@ -250,7 +256,7 @@ class DaskBulkStorage:
@with_trace('get_bulk_catalog') @with_trace('get_bulk_catalog')
async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog: async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog:
bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id) bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
catalog = await async_load_bulk_catalog(self._fs, bulk_path) catalog = await load_bulk_catalog(self._fs, bulk_path)
if catalog: if catalog:
return catalog return catalog
...@@ -467,8 +473,7 @@ class DaskBulkStorage: ...@@ -467,8 +473,7 @@ class DaskBulkStorage:
self._fill_catalog_columns_info(catalog, chunk_metas, bulk_id) self._fill_catalog_columns_info(catalog, chunk_metas, bulk_id)
) )
fcatalog = await self.client.scatter(catalog) await save_bulk_catalog(self._fs, commit_path, catalog)
await async_save_bulk_catalog(self._fs, commit_path, fcatalog)
trace_attributes_root_span({ trace_attributes_root_span({
'catalog-row-count': catalog.nb_rows, 'catalog-row-count': catalog.nb_rows,
'catalog-col-count': catalog.all_columns_count 'catalog-col-count': catalog.all_columns_count
......