From 71efd29607b3628719cc903cbd0d5a9627a6fdf0 Mon Sep 17 00:00:00 2001 From: Luc Yriarte Date: Wed, 13 Apr 2022 14:50:23 +0200 Subject: [PATCH] Revert "Merge branch 'bulk-persistence-module-exports' into 'master'" This reverts commit c90403733469a5b93e4ed6dac2ae080a7f1cfad9, reversing changes made to 3cff952c7cf3af4699b95b67eb656e1d20775290. --- app/bulk_persistence/__init__.py | 6 +---- app/bulk_persistence/blob_storage.py | 2 +- app/bulk_persistence/capture_timings.py | 2 +- app/bulk_persistence/dask/__init__.py | 1 + app/bulk_persistence/dask/bulk_catalog.py | 2 +- .../dask/session_file_meta.py | 4 ++-- app/bulk_persistence/dask/utils.py | 2 +- app/bulk_persistence/dataframe_persistence.py | 2 +- app/bulk_persistence/dataframe_serializer.py | 2 +- app/consistency/reference_check.py | 2 +- app/consistency/trajectory_consistency.py | 7 +++--- app/consistency/welllog_consistency.py | 6 +++-- app/model/log_bulk.py | 2 +- app/routers/bulk/bulk_routes.py | 22 ++++++++++--------- app/routers/bulk/bulk_uri_dependencies.py | 2 +- app/routers/bulk/utils.py | 15 ++++++++----- app/routers/common_parameters.py | 2 +- app/routers/ddms_v2/log_ddms_v2.py | 2 +- app/routers/ddms_v2/persistence.py | 4 +++- app/routers/delete/delete_bulk_data.py | 3 ++- tests/unit/model/log_bulk_test.py | 2 +- tests/unit/routers/filter_test.py | 3 ++- 22 files changed, 53 insertions(+), 42 deletions(-) diff --git a/app/bulk_persistence/__init__.py b/app/bulk_persistence/__init__.py index d75320c6..3be6223f 100644 --- a/app/bulk_persistence/__init__.py +++ b/app/bulk_persistence/__init__.py @@ -17,13 +17,10 @@ from .bulk_filter import BulkReadFilters, BulkReadFilterOperator from .model_chunking import GetDataParams, DataframeBasicDescribe, DataframeDescribe from .dask.dask_bulk_storage import DaskBulkStorage from .dask.dask_bulk_storage_local import make_local_dask_bulk_storage -from .dask.storage_path_builder import hash_record_id -from .dask.traces import trace_dataframe_attributes, submit_with_trace, trace_attributes_root_span from .dataframe_persistence import create_and_store_dataframe, get_dataframe, download_bulk from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync -from .dataframe_validators import auto_cast_columns_to_string, columns_type_must_be_string, DataFrameValidationFunc, no_validation from .json_orient import JSONOrient -from .mime_types import MimeTypes, MimeType +from .mime_types import MimeTypes from .tenant_provider import resolve_tenant from .exceptions import UnknownChannelsException, InvalidBulkException, NoBulkException, NoDataException, RecordNotFoundException from .consistency_checks import ConsistencyException, DataConsistencyChecks @@ -33,7 +30,6 @@ from .capture_timings import capture_timings from .sessions_storage import Session, SessionsStorage, \ SessionNotFound, SessionInvalidState, SessionUpdatedEtagUnmatched, SessionException, \ SessionState, SessionUpdateMode, SessionInternal, CommitSessionResponse -from .dask.errors import BulkError, BulkRecordNotFound, BulkCurvesNotFound, TooManyColumnsRequested, FilterError, internal_bulk_exceptions # TMP: this should probably not be exposed outside of the bulk_persistence package from .temp_dir import get_temp_dir diff --git a/app/bulk_persistence/blob_storage.py b/app/bulk_persistence/blob_storage.py index 95d69411..0bfd476a 100644 --- a/app/bulk_persistence/blob_storage.py +++ b/app/bulk_persistence/blob_storage.py @@ -52,7 +52,7 @@ from .mime_types import MimeType, MimeTypes # - using faster format, e.g. hd5 # - threshold about the busyness of the service (if not busy and not huge data -> direct write) # - better proc fork and arg serialization -from app.helper.traces import with_trace +from ..helper.traces import with_trace def export_to_parquet( diff --git a/app/bulk_persistence/capture_timings.py b/app/bulk_persistence/capture_timings.py index fafe58ff..10e3f1e7 100644 --- a/app/bulk_persistence/capture_timings.py +++ b/app/bulk_persistence/capture_timings.py @@ -4,7 +4,7 @@ from functools import wraps import asyncio from time import perf_counter, process_time -from app.helper.logger import get_logger +from ..helper.logger import get_logger def make_log_captured_timing_handler(level=INFO): diff --git a/app/bulk_persistence/dask/__init__.py b/app/bulk_persistence/dask/__init__.py index e69de29b..ab4e198a 100644 --- a/app/bulk_persistence/dask/__init__.py +++ b/app/bulk_persistence/dask/__init__.py @@ -0,0 +1 @@ +from . import dask_config diff --git a/app/bulk_persistence/dask/bulk_catalog.py b/app/bulk_persistence/dask/bulk_catalog.py index 22e2b2d1..3f091266 100644 --- a/app/bulk_persistence/dask/bulk_catalog.py +++ b/app/bulk_persistence/dask/bulk_catalog.py @@ -22,7 +22,7 @@ from contextlib import suppress from dataclasses import dataclass from typing import Dict, Iterable, List, NamedTuple, Optional, Set -from app.helper.traces import with_trace +from ...helper.traces import with_trace from ..capture_timings import capture_timings from .storage_path_builder import join, remove_protocol from .utils import worker_capture_timing_handlers diff --git a/app/bulk_persistence/dask/session_file_meta.py b/app/bulk_persistence/dask/session_file_meta.py index ae8f2faa..448bd1d4 100644 --- a/app/bulk_persistence/dask/session_file_meta.py +++ b/app/bulk_persistence/dask/session_file_meta.py @@ -23,8 +23,8 @@ from distributed.worker import get_client import pandas as pd from .utils import share_items -from app.helper.logger import get_logger -from app.helper.traces import with_trace +from ...helper.logger import get_logger +from ...helper.traces import with_trace from ..sessions_storage import Session from ..capture_timings import capture_timings diff --git a/app/bulk_persistence/dask/utils.py b/app/bulk_persistence/dask/utils.py index 06750ef6..350f1d73 100644 --- a/app/bulk_persistence/dask/utils.py +++ b/app/bulk_persistence/dask/utils.py @@ -20,7 +20,7 @@ import dask.dataframe as dd import pandas as pd import pyarrow.parquet as pa -from app.helper.logger import get_logger +from ...helper.logger import get_logger from ..capture_timings import capture_timings diff --git a/app/bulk_persistence/dataframe_persistence.py b/app/bulk_persistence/dataframe_persistence.py index 89d5585a..6f677a59 100644 --- a/app/bulk_persistence/dataframe_persistence.py +++ b/app/bulk_persistence/dataframe_persistence.py @@ -30,7 +30,7 @@ from .bulk_id import new_bulk_id from .dask.errors import internal_bulk_exceptions from .mime_types import MimeTypes, MimeType from .tenant_provider import resolve_tenant -from app.helper.traces import with_trace +from ..helper.traces import with_trace async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str: diff --git a/app/bulk_persistence/dataframe_serializer.py b/app/bulk_persistence/dataframe_serializer.py index 84cdee1c..a2b059ea 100644 --- a/app/bulk_persistence/dataframe_serializer.py +++ b/app/bulk_persistence/dataframe_serializer.py @@ -24,7 +24,7 @@ from pydantic import BaseModel from .json_orient import JSONOrient from .mime_types import MimeTypes, MimeType from app.pool_executor import get_pool_executor -from app.helper.traces import with_trace +from ..helper.traces import with_trace class DataframeSerializerSync: diff --git a/app/consistency/reference_check.py b/app/consistency/reference_check.py index 40eb75f0..a473d4ee 100644 --- a/app/consistency/reference_check.py +++ b/app/consistency/reference_check.py @@ -1,7 +1,7 @@ import pandas as pd import math from pydantic import BaseModel -from app.bulk_persistence import ConsistencyException, DataConsistencyChecks +from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks class ReferenceCurveException(ConsistencyException): diff --git a/app/consistency/trajectory_consistency.py b/app/consistency/trajectory_consistency.py index 205599fe..982ee048 100644 --- a/app/consistency/trajectory_consistency.py +++ b/app/consistency/trajectory_consistency.py @@ -7,9 +7,10 @@ from odes_storage.models import Record from app.model.osdu_model import WellboreTrajectory110 from app.helper.traces import with_trace -from app.bulk_persistence import ConsistencyException, DataConsistencyChecks, \ - BulkRecordNotFound, \ - DaskBulkStorage, submit_with_trace +from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks +from app.bulk_persistence.dask.dask_bulk_storage import BulkRecordNotFound +from app.bulk_persistence import DaskBulkStorage +from app.bulk_persistence.dask.traces import submit_with_trace from app.model.model_utils import from_record from app.context import get_ctx diff --git a/app/consistency/welllog_consistency.py b/app/consistency/welllog_consistency.py index 5409a847..650d7c7a 100644 --- a/app/consistency/welllog_consistency.py +++ b/app/consistency/welllog_consistency.py @@ -7,8 +7,10 @@ from dask.dataframe.core import DataFrame as DaskDataFrame from odes_storage.models import Record from app.helper.traces import with_trace -from app.bulk_persistence import BulkRecordNotFound, \ - DaskBulkStorage, ConsistencyException, DataConsistencyChecks, submit_with_trace +from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks +from app.bulk_persistence.dask.dask_bulk_storage import BulkRecordNotFound +from app.bulk_persistence import DaskBulkStorage +from app.bulk_persistence.dask.traces import submit_with_trace from app.model.model_utils import from_record from app.model.osdu_model import WellLog110 from app.context import get_ctx diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index acf15efa..1ab79edc 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -18,7 +18,7 @@ from jsonpath_ng import parse as parse_jsonpath from jsonpath_ng.jsonpath import Parent as JsonParent from odes_storage.models import Record -from app.bulk_persistence import BulkURI +from app.bulk_persistence.bulk_uri import BulkURI class LogBulkHelper: diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index 08f23c18..6f1c3d1e 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -57,16 +57,18 @@ from app.routers.sessions import ( ) # imports from bulk persistence -from app.bulk_persistence import (auto_cast_columns_to_string, - DataFrameValidationFunc, no_validation, - JSONOrient, - get_dataframe, download_bulk, - DaskBulkStorage, - MimeTypes, MimeType, - trace_dataframe_attributes, trace_attributes_root_span, - BulkError, BulkRecordNotFound, FilterError, TooManyColumnsRequested, - DataConsistencyChecks - ) +from app.bulk_persistence.dataframe_validators import (auto_cast_columns_to_string, + DataFrameValidationFunc, + no_validation) +from app.bulk_persistence import JSONOrient, get_dataframe, download_bulk +from app.bulk_persistence import DaskBulkStorage +from app.bulk_persistence.dask.errors import BulkError, BulkRecordNotFound, FilterError, TooManyColumnsRequested +from app.bulk_persistence.mime_types import MimeTypes, MimeType +from app.bulk_persistence.dask.traces import trace_dataframe_attributes, trace_attributes_root_span + + +from app.bulk_persistence import DataConsistencyChecks + router = APIRouter(route_class=TracingRoute) # router dedicated to bulk APIs diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 7840d07a..cc592398 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -3,7 +3,7 @@ from abc import ABC from typing import Optional from fastapi import Request -from app.bulk_persistence import BulkURI +from app.bulk_persistence.bulk_uri import BulkURI from app.model.log_bulk import LogBulkHelper diff --git a/app/routers/bulk/utils.py b/app/routers/bulk/utils.py index 2d501245..3351b638 100644 --- a/app/routers/bulk/utils.py +++ b/app/routers/bulk/utils.py @@ -11,11 +11,16 @@ import dask.dataframe as dd import pandas as pd from pyarrow.lib import ArrowInvalid -from app.bulk_persistence import DaskBulkStorage, DataframeSerializerAsync, \ - MimeTypes, MimeType, JSONOrient, trace_dataframe_attributes, capture_timings, \ - auto_cast_columns_to_string, columns_type_must_be_string, \ - no_validation, DataFrameValidationFunc, \ - FilterError, internal_bulk_exceptions, BulkCurvesNotFound +from app.bulk_persistence.dask.errors import FilterError, internal_bulk_exceptions, BulkCurvesNotFound +from app.bulk_persistence.dask.traces import trace_dataframe_attributes +from app.bulk_persistence import DaskBulkStorage +from app.bulk_persistence.dataframe_validators import auto_cast_columns_to_string, columns_type_must_be_string, \ + no_validation, DataFrameValidationFunc +from app.bulk_persistence import DataframeSerializerAsync +from app.bulk_persistence.mime_types import MimeTypes, MimeType +from app.bulk_persistence import JSONOrient +from app.bulk_persistence import capture_timings + from app.clients.storage_service_client import get_storage_record_service from app.context import get_ctx, Context from app.utils import OpenApiHandler diff --git a/app/routers/common_parameters.py b/app/routers/common_parameters.py index 4ab2d78f..bfdf16b3 100644 --- a/app/routers/common_parameters.py +++ b/app/routers/common_parameters.py @@ -1,7 +1,7 @@ from fastapi import Query, Request, HTTPException from pandas import DataFrame -from app.bulk_persistence import MimeType, MimeTypes +from app.bulk_persistence.mime_types import MimeType, MimeTypes from app.bulk_persistence import JSONOrient diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index d05107bb..bb30a694 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -38,7 +38,7 @@ from odes_storage.models import ( from pydantic import BaseModel, Field from app.bulk_persistence import DataframeSerializerAsync, DataframeSerializerSync, JSONOrient, MimeTypes, get_dataframe -from app.bulk_persistence import BulkURI +from app.bulk_persistence.bulk_uri import BulkURI from app.clients.storage_service_client import get_storage_record_service from app.model.log_bulk import LogBulkHelper from app.model.model_curated import log diff --git a/app/routers/ddms_v2/persistence.py b/app/routers/ddms_v2/persistence.py index 4d6ea54d..ce5eb4af 100644 --- a/app/routers/ddms_v2/persistence.py +++ b/app/routers/ddms_v2/persistence.py @@ -16,10 +16,12 @@ import pandas as pd from odes_storage.models import Record -from app.bulk_persistence import create_and_store_dataframe, get_dataframe, trace_dataframe_attributes +from app.bulk_persistence import create_and_store_dataframe +from app.bulk_persistence import get_dataframe from app.context import Context from app.model.log_bulk import LogBulkHelper +from app.bulk_persistence.dask.traces import trace_dataframe_attributes from app.helper.traces import with_trace from app.helper.logger import get_logger diff --git a/app/routers/delete/delete_bulk_data.py b/app/routers/delete/delete_bulk_data.py index 53ef5836..e1544396 100644 --- a/app/routers/delete/delete_bulk_data.py +++ b/app/routers/delete/delete_bulk_data.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from app.bulk_persistence import resolve_tenant, hash_record_id +from app.bulk_persistence import resolve_tenant from osdu.core.api.storage.blob_storage_base import BlobStorageBase import asyncio +from app.bulk_persistence.dask.storage_path_builder import hash_record_id from app.clients import StorageRecordServiceClient from app.clients.storage_service_client import get_storage_record_service from app.routers.bulk.bulk_uri_dependencies import BulkIdAccess diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index 31fd7ca4..b5483639 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from app.model.log_bulk import LogBulkHelper -from app.bulk_persistence import BulkURI +from app.bulk_persistence.bulk_uri import BulkURI from tests.unit.test_utils import basic_record import uuid import pytest diff --git a/tests/unit/routers/filter_test.py b/tests/unit/routers/filter_test.py index 2cebb338..b7a76ba3 100644 --- a/tests/unit/routers/filter_test.py +++ b/tests/unit/routers/filter_test.py @@ -1,7 +1,8 @@ import pytest +from app.bulk_persistence.dask.errors import FilterError from app.bulk_persistence import GetDataParams, \ - BulkReadFilterOperator, BulkReadFilters, FilterError + BulkReadFilterOperator, BulkReadFilters @pytest.mark.parametrize("filters, expected", [ -- GitLab