Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (265)
Showing
with 634 additions and 226 deletions
...@@ -90,6 +90,16 @@ verify_existing_requirements: ...@@ -90,6 +90,16 @@ verify_existing_requirements:
expire_in: 2 days expire_in: 2 days
compile-and-unit-test:
artifacts:
when: always
paths:
- all-requirements.txt
- spec/generated/openapi.json
containerize: containerize:
extends: .skipForTriggeringMergeRequests extends: .skipForTriggeringMergeRequests
stage: containerize stage: containerize
......
...@@ -60,6 +60,7 @@ BSD-2-Clause ...@@ -60,6 +60,7 @@ BSD-2-Clause
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- colorama (from https://github.com/tartley/colorama)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py) - locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock) - mock (from https://github.com/testing-cabal/mock)
...@@ -78,10 +79,13 @@ BSD-3-Clause ...@@ -78,10 +79,13 @@ BSD-3-Clause
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- HeapDict (from http://stutzbachenterprises.com/) - HeapDict (from http://stutzbachenterprises.com/)
- Jinja2 (from http://jinja.pocoo.org/)
- MarkupSafe (from https://palletsprojects.com/p/markupsafe/)
- adlfs (from https://github.com/hayesgb/adlfs/) - adlfs (from https://github.com/hayesgb/adlfs/)
- asgiref (from http://github.com/django/asgiref/) - asgiref (from http://github.com/django/asgiref/)
- click (from http://github.com/mitsuhiko/click) - click (from http://github.com/mitsuhiko/click)
- cloudpickle (from https://github.com/cloudpipe/cloudpickle) - cloudpickle (from https://github.com/cloudpipe/cloudpickle)
- colorama (from https://github.com/tartley/colorama)
- cryptography (from https://github.com/pyca/cryptography) - cryptography (from https://github.com/pyca/cryptography)
- dask (from http://github.com/dask/dask/) - dask (from http://github.com/dask/dask/)
- decorator (from https://github.com/micheles/decorator) - decorator (from https://github.com/micheles/decorator)
...@@ -169,14 +173,6 @@ The following software have components provided under the terms of this license: ...@@ -169,14 +173,6 @@ The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/) - pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org) - rfc3986 (from https://rfc3986.readthedocs.org)
- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html)
========================================================================
GPL-3.0-or-later
========================================================================
The following software have components provided under the terms of this license:
- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html)
======================================================================== ========================================================================
ISC ISC
...@@ -234,7 +230,7 @@ The following software have components provided under the terms of this license: ...@@ -234,7 +230,7 @@ The following software have components provided under the terms of this license:
- aiohttp (from https://github.com/aio-libs/aiohttp/) - aiohttp (from https://github.com/aio-libs/aiohttp/)
- aioitertools (from https://github.com/jreese/aioitertools) - aioitertools (from https://github.com/jreese/aioitertools)
- aioredis (from https://github.com/aio-libs/aioredis) - aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from https://pypi.org/project/anyio/3.3.4/) - anyio (from https://pypi.org/project/anyio/3.4.0/)
- asgiref (from http://github.com/django/asgiref/) - asgiref (from http://github.com/django/asgiref/)
- attrs (from https://attrs.readthedocs.io/) - attrs (from https://attrs.readthedocs.io/)
- azure-common (from https://github.com/Azure/azure-sdk-for-python) - azure-common (from https://github.com/Azure/azure-sdk-for-python)
...@@ -285,7 +281,7 @@ The following software have components provided under the terms of this license: ...@@ -285,7 +281,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio) - sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/) - structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml) - toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.2.1/) - tomli (from https://pypi.org/project/tomli/1.2.2/)
- urllib3 (from https://urllib3.readthedocs.io/) - urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict) - xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp) - zipp (from https://github.com/jaraco/zipp)
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .bulk_id import BulkId from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient from .json_orient import JSONOrient
......
...@@ -13,27 +13,7 @@ ...@@ -13,27 +13,7 @@
# limitations under the License. # limitations under the License.
import uuid import uuid
from typing import Tuple, Optional
class BulkId: def new_bulk_id() -> str:
@staticmethod return str(uuid.uuid4())
def new_bulk_id() -> str:
return str(uuid.uuid4())
@classmethod
def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str:
if prefix:
return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}'
return uuid.UUID(bulk_id).urn
# Returns a tuple (<uuid> : str, <prefix> : str)
@classmethod
def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]:
if urn is None:
raise ValueError('attempted to decode empty urn')
parts = urn.split(":")
if len(parts) < 4:
return str(uuid.UUID(urn)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
from typing import Optional, NamedTuple
class BulkStorageVersion(NamedTuple):
""" This is the version of the bulk storage engine """
version: str
""" unique version identifier """
uri_prefix: Optional[str]
""" associated uri prefix """
BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None)
""" first bulk management implementation with direct management to blob storage with a single blob """
BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1")
""" version 1, using Dask to handle bulk manipulation and storage """
BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None)
""" represent an invalid/undefined storage version """
from typing import Optional, Tuple
import uuid
from .bulk_storage_version import (
BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid)
class BulkURI:
"""
Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how
the bulk is stored.
Usage:
- ctor from URI string value:
`bulk_uri = BulkURI.decode(uri_str)`
- ctor explicit given a bulk_id and a storage version:
`bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)`
- ctor explict using class method:
`bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)`
- encode to URI string value:
`uri_str: str = bulk_uri.encode()`
- check which storage engine version is:
`bulk_uri.storage_version == BulkStorageVersion_V0`
`bulk_uri.is_bulk_storage_V0()`
"""
def __init__(self, bulk_id: str, version: BulkStorageVersion):
"""
make an new one or invalid
Either pass uri alone or bulk_id, version
:param bulk_id: expected as a valid uuid
:param version: storage version
:throw: ValueError
"""
if not bulk_id or not version or version == BulkStorageVersion_Invalid:
bulk_id = ''
version = BulkStorageVersion_Invalid
else:
# ensure valid uuid
uuid.UUID(bulk_id)
self._bulk_id = bulk_id
self._storage_version = version
@classmethod
def invalid(cls):
""" make an invalid instance """
return cls('', BulkStorageVersion_Invalid)
@classmethod
def decode(cls, uri: str) -> 'BulkURI':
"""
construct a BulkURI from an encoded URI
:throw: ValueError
"""
if not uri:
return BulkURI.invalid()
bulk_id, prefix = cls._decode_uri(uri)
if not prefix:
version = BulkStorageVersion_V0
elif prefix == BulkStorageVersion_V1.uri_prefix:
version = BulkStorageVersion_V1
else:
raise ValueError('Unsupported prefix in bulk URI: ' + prefix)
return cls(bulk_id=bulk_id, version=version)
def is_bulk_storage_V0(self) -> bool:
""" convenient check that returns True is version == BulkStorageVersions.V0 """
return self._storage_version.version == BulkStorageVersion_V0.version
@classmethod
def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V0 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0)
@classmethod
def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V1 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1)
@property
def bulk_id(self) -> str:
return self._bulk_id
@property
def storage_version(self) -> BulkStorageVersion:
return self._storage_version
def encode(self) -> str:
"""
encode to uri as string
If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id`
If the prefix is empty or None, uri format = `urn:uuid:$bulk_id`
:Throw: ValueError
"""
if self._storage_version.uri_prefix:
return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}'
return uuid.UUID(self._bulk_id).urn
@classmethod
def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]:
"""
Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix].
If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix]
If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None]
:throw: ValueError if urn empty or invalid UUID
"""
if uri is None:
raise ValueError('attempted to decode empty urn')
parts = uri.split(":")
if len(parts) < 4:
return str(uuid.UUID(uri)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def is_valid(self) -> bool:
""" check invalid """
if self._bulk_id and self._storage_version.version:
return True
return False
...@@ -12,85 +12,35 @@ ...@@ -12,85 +12,35 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import hashlib
import json import json
import time import uuid
from contextlib import suppress from contextlib import suppress
from functools import wraps
from operator import attrgetter from operator import attrgetter
from typing import List from typing import List
import fsspec import fsspec
import pandas as pd import pandas as pd
from pyarrow.lib import ArrowException, ArrowInvalid
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
import pyarrow.parquet as pa
import dask.dataframe as dd import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin from pyarrow.lib import ArrowException
from dask.distributed import scheduler import pyarrow.parquet as pa
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowInvalid as e:
get_logger().exception(f"Pyarrow ArrowInvalid when running {target.__name__}")
raise BulkNotProcessable(f"Unable to process bulk - {str(e)}")
except ArrowException:
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
if self._register_fsspec_implementation:
self._register_fsspec_implementation()
def transition(self, key, start, finish, *args, **kwargs): from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
if finish == 'error':
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
def pandas_to_parquet(pdf, path, opt): from .errors import BulkNotFound, BulkNotProcessable, internal_bulk_exceptions
return pdf.to_parquet(path, index=True, engine='pyarrow', storage_options=opt) from .traces import wrap_trace_process, _create_func_key
from .utils import by_pairs, do_merge, worker_capture_timing_handlers
from .dask_worker_plugin import DaskWorkerPlugin
from ..dataframe_validators import assert_df_validate, validate_index, columns_not_in_reserved_names
from .session_file_meta import SessionFileMeta
from .. import DataframeSerializerSync
from . import storage_path_builder as pathBuilder
from ..bulk_id import new_bulk_id
class DaskBulkStorage: class DaskBulkStorage:
...@@ -116,7 +66,7 @@ class DaskBulkStorage: ...@@ -116,7 +66,7 @@ class DaskBulkStorage:
parameters.register_fsspec_implementation() parameters.register_fsspec_implementation()
await DaskBulkStorage.client.register_worker_plugin( await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin, DaskWorkerPlugin,
name="LoggerWorkerPlugin", name="LoggerWorkerPlugin",
logger=get_logger(), logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation) register_fsspec_implementation=parameters.register_fsspec_implementation)
...@@ -134,22 +84,6 @@ class DaskBulkStorage: ...@@ -134,22 +84,6 @@ class DaskBulkStorage:
def base_directory(self) -> str: def base_directory(self) -> str:
return self._parameters.base_directory return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
def _get_base_directory(self, protocol=True):
return f'{self.protocol}://{self.base_directory}' if protocol else self.base_directory
def _get_blob_path(self, record_id: str, bulk_id: str, with_protocol=True) -> str:
"""Return the bulk path from the bulk_id."""
encoded_id = self._encode_record_id(record_id)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/bulk/{bulk_id}/data'
def _build_path_from_session(self, session: Session, with_protocol=True) -> str:
"""Return the session path."""
encoded_id = self._encode_record_id(session.recordId)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/session/{session.id}/data'
def _load(self, path, **kwargs) -> dd.DataFrame: def _load(self, path, **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame """Read a Parquet file into a Dask DataFrame
path : string or list path : string or list
...@@ -171,27 +105,37 @@ class DaskBulkStorage: ...@@ -171,27 +105,37 @@ class DaskBulkStorage:
"""Return a dask Dataframe of a record at the specified version. """Return a dask Dataframe of a record at the specified version.
returns a Future<dd.DataFrame> returns a Future<dd.DataFrame>
""" """
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns) blob_path = pathBuilder.record_bulk_path(
self.base_directory, record_id, bulk_id, self.protocol)
return self._load(blob_path, columns=columns)
@with_trace('read_stat') @with_trace('read_stat')
def read_stat(self, record_id: str, bulk_id: str): def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk.""" """Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False) try:
dataset = pa.ParquetDataset(file_path, filesystem=self._fs) file_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
schema = dataset.read_pandas().schema dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)} schema = dataset.read_pandas().schema
return { schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
"num_rows": dataset.metadata.num_rows, return {
"schema": schema_dict "num_rows": dataset.metadata.num_rows,
} "schema": schema_dict
}
except OSError:
raise BulkNotFound(record_id, bulk_id)
def _submit_with_trace(self, target_func, *args, **kwargs): def _submit_with_trace(self, target_func, *args, **kwargs):
""" """
Submit given target_func to Distributed Dask workers and add tracing required stuff Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
""" """
dask_task_key = _create_func_key(target_func, *args, **kwargs)
kwargs['span_context'] = get_ctx().tracer.span_context kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func kwargs['target_func'] = target_func
return self.client.submit(wrap_trace_process, *args, **kwargs)
return self.client.submit(wrap_trace_process, *args, key=dask_task_key, **kwargs)
def _map_with_trace(self, target_func, *args, **kwargs): def _map_with_trace(self, target_func, *args, **kwargs):
""" """
...@@ -236,34 +180,22 @@ class DaskBulkStorage: ...@@ -236,34 +180,22 @@ class DaskBulkStorage:
returns a Future<None> returns a Future<None>
""" """
f_pdf = await self.client.scatter(pdf) f_pdf = await self.client.scatter(pdf)
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path, return await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, path,
self._parameters.storage_options) storage_options=self._parameters.storage_options)
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
if not df.index.is_unique:
raise BulkNotProcessable("Duplicated index found")
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
@internal_bulk_exceptions @internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers) @capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob') @with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None): async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage.""" """Write the data frame to the blob storage."""
bulk_id = bulk_id or BulkId.new_bulk_id() bulk_id = bulk_id or new_bulk_id()
if isinstance(ddf, pd.DataFrame): if isinstance(ddf, pd.DataFrame):
self._check_incoming_chunk(ddf) assert_df_validate(dataframe=ddf, validation_funcs=[validate_index, columns_not_in_reserved_names])
ddf = dd.from_pandas(ddf, npartitions=1) ddf = dd.from_pandas(ddf, npartitions=1, name=f"from_pandas-{uuid.uuid4()}")
ddf = await self.client.scatter(ddf) ddf = await self.client.scatter(ddf)
path = self._get_blob_path(record_id, bulk_id) path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
try: try:
await self._save_with_dask(path, ddf) await self._save_with_dask(path, ddf)
except OSError: except OSError:
...@@ -273,34 +205,27 @@ class DaskBulkStorage: ...@@ -273,34 +205,27 @@ class DaskBulkStorage:
@capture_timings('session_add_chunk') @capture_timings('session_add_chunk')
@with_trace('session_add_chunk') @with_trace('session_add_chunk')
async def session_add_chunk(self, session: Session, pdf: pd.DataFrame): async def session_add_chunk(self, session: Session, pdf: pd.DataFrame):
self._check_incoming_chunk(pdf) assert_df_validate(dataframe=pdf, validation_funcs=[validate_index, columns_not_in_reserved_names])
# sort column by names # sort column by names
pdf = pdf[sorted(pdf.columns)] pdf = pdf[sorted(pdf.columns)]
# generate a file name sorted by starting index filename = pathBuilder.build_chunk_filename(pdf)
# dask reads and sort files by 'natural_key' So the file name impact the final result session_path = pathBuilder.record_session_path(
first_idx, last_idx = pdf.index[0], pdf.index[-1] self.base_directory, session.id, session.recordId)
if isinstance(pdf.index, pd.DatetimeIndex):
first_idx, last_idx = pdf.index[0].value, pdf.index[-1].value
idx_range = f'{first_idx}_{last_idx}'
shape = hashlib.sha1('_'.join(map(str, pdf)).encode()).hexdigest()
t = round(time.time() * 1000)
filename = f'{idx_range}_{t}.{shape}'
session_path_wo_protocol = self._build_path_from_session(session, with_protocol=False)
self._fs.mkdirs(session_path_wo_protocol, exist_ok=True)
with self._fs.open(f'{session_path_wo_protocol}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf)}, outfile)
session_path = self._build_path_from_session(session) self._fs.mkdirs(session_path, exist_ok=True)
with self._fs.open(f'{session_path}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf.columns)}, outfile)
session_path = pathBuilder.add_protocol(session_path, self.protocol)
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf) await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
@capture_timings('get_session_parquet_files') @capture_timings('get_session_parquet_files')
@with_trace('get_session_parquet_files') @with_trace('get_session_parquet_files')
def get_session_parquet_files(self, session): def get_session_parquet_files(self, session):
session_path = self._build_path_from_session(session, with_protocol=False) session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
with suppress(FileNotFoundError): with suppress(FileNotFoundError):
session_files = [f for f in self._fs.ls(session_path) if f.endswith(".parquet")] session_files = [f for f in self._fs.ls(session_path) if f.endswith(".parquet")]
return session_files return session_files
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import WorkerPlugin
from app.helper.logger import get_logger
class DaskWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
if self._register_fsspec_implementation:
self._register_fsspec_implementation()
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
\ No newline at end of file
...@@ -13,24 +13,75 @@ ...@@ -13,24 +13,75 @@
# limitations under the License. # limitations under the License.
from fastapi import status, HTTPException from fastapi import status, HTTPException
from dask.distributed import scheduler
from pyarrow.lib import ArrowException, ArrowInvalid
from functools import wraps
from app.helper.logger import get_logger
class BulkError(Exception): class BulkError(Exception):
http_status: int http_status: int
def raise_as_http(self): def raise_as_http(self):
raise HTTPException(status_code=self.http_status, detail=str(self)) raise HTTPException(status_code=self.http_status, detail=str(self)) from self
class BulkNotFound(BulkError): class BulkNotFound(BulkError):
http_status = status.HTTP_404_NOT_FOUND http_status = status.HTTP_404_NOT_FOUND
def __init__(self, record_id, bulk_id): def __init__(self, record_id=None, bulk_id=None, message=None):
self.message = f'bulk {bulk_id} for record {record_id} not found' ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
if record_id:
ex_message += f'for record {record_id} '
ex_message += 'not found'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class BulkNotProcessable(BulkError): class BulkNotProcessable(BulkError):
http_status = status.HTTP_422_UNPROCESSABLE_ENTITY http_status = status.HTTP_422_UNPROCESSABLE_ENTITY
def __init__(self, bulk_id): def __init__(self, bulk_id=None, message=None):
self.message = f'bulk {bulk_id} not processable' ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
ex_message += 'not processable'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class FilterError(BulkError):
http_status = status.HTTP_400_BAD_REQUEST
def __init__(self, reason):
self.message = f'filter error: {reason}'
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowInvalid as e:
get_logger().exception(f"Pyarrow ArrowInvalid when running {target.__name__}")
raise BulkNotProcessable(f"Unable to process bulk - {str(e)}")
except ArrowException:
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from typing import List
from app.bulk_persistence.dask.utils import share_items
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
def _read_meta(self):
if not self._meta:
path, _ = os.path.splitext(self.path)
with self._fs.open(path + '.meta') as meta_file:
self._meta = json.load(meta_file)
return self._meta
@property
def columns(self) -> List[str]:
"""Return the column names"""
return self._read_meta()['columns']
def overlap(self, other: 'SessionFileMeta') -> bool:
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other: 'SessionFileMeta') -> bool:
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions that gathers method to build path for bulk storage
"""
import hashlib
from os.path import join
from time import time
from typing import Optional, Tuple
import pandas as pd
def hash_record_id(record_id: str) -> str:
"""encode the record_id to be a valid path name"""
return hashlib.sha1(record_id.encode()).hexdigest()
def build_base_path(base_directory: str, protocol: Optional[str] = None) -> str:
"""return the base directory, add the protocol if requested"""
return f'{protocol}://{base_directory}' if protocol else base_directory
def add_protocol(path: str, protocol: str) -> str:
"""add protocole to the path"""
prefix = protocol + '://'
if not path.startswith(prefix):
return prefix + path
return path
def remove_protocol(path: str) -> Tuple[str, str]:
"""remove protocol for path if any, return tuple[path, protocol].
If no protocol in path then protocol='' """
if '://' not in path:
return path, ''
sep_idx = path.index('://')
return path[sep_idx + 3:], path[:sep_idx]
def record_path(base_directory: str, record_id, protocol: Optional[str] = None) -> str:
"""Return the entity path.
(path where all data relateed to an entity are saved"""
encoded_id = hash_record_id(record_id)
base_path = build_base_path(base_directory, protocol)
return join(base_path, encoded_id)
def record_bulks_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where blob are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk')
def record_sessions_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where sessions are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session')
def record_bulk_path(
base_directory: str, record_id: str, bulk_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified bulk."""
entity_blob_path = record_bulks_root_path(base_directory, record_id, protocol)
return join(entity_blob_path, bulk_id, 'data')
def record_session_path(
base_directory: str, session_id: str, record_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified session."""
entity_session_path = record_sessions_root_path(base_directory, record_id, protocol)
return join(entity_session_path, session_id, 'data')
def build_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Return chunk file name sorted by starting index
Note 1: do not change the name without updating SessionFileMeta
Note 2: dask reads and sort files by 'natural_key' so the filenames impacts the final result
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
#shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape_str = '_'.join(f'{cn}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
from dask.utils import funcname
from dask.base import tokenize
from opencensus.trace.span import SpanKind from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler from opencensus.trace.samplers import AlwaysOnSampler
...@@ -23,6 +26,13 @@ def wrap_trace_process(*args, **kwargs): ...@@ -23,6 +26,13 @@ def wrap_trace_process(*args, **kwargs):
sampler=AlwaysOnSampler(), sampler=AlwaysOnSampler(),
exporter=_EXPORTER) exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span: with tracer.span(name=f"Dask Worker - {funcname(target_func)}") as span:
span.span_kind = SpanKind.CLIENT span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs) return target_func(*args, **kwargs)
def _create_func_key(func, *args, **kwargs):
"""
Inspired by Dask code, it returns a hashed key based on function name and given arguments
"""
return funcname(func) + "-" + tokenize(func, kwargs, *args)
...@@ -25,16 +25,17 @@ import dask.dataframe as dd ...@@ -25,16 +25,17 @@ import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO): def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)""" """log captured timing from the worker subprocess (no access to context)"""
def log_captured_timing(tag, wall, cpu): def log_captured_timing(tag, wall, cpu):
logger = get_logger() logger = get_logger()
if logger: if logger:
logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s") logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing return log_captured_timing
worker_capture_timing_handlers = [worker_make_log_captured_timing_handler(INFO)] worker_capture_timing_handlers = [worker_make_log_captured_timing_handler(INFO)]
##
def share_items(seq1, seq2): def share_items(seq1, seq2):
"""Returns True if seq1 contains common items with seq2.""" """Returns True if seq1 contains common items with seq2."""
...@@ -47,31 +48,6 @@ def by_pairs(iterable): ...@@ -47,31 +48,6 @@ def by_pairs(iterable):
return zip_longest(*[iter(iterable)] * 2, fillvalue=None) return zip_longest(*[iter(iterable)] * 2, fillvalue=None)
class SessionFileMeta:
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self.columns = self._get_columns(file_path) # TODO lazy load
self.path = file_path
def _get_columns(self, file_path):
path, _ = os.path.splitext(file_path)
with self._fs.open(path + '.meta') as meta_file:
return json.load(meta_file)['columns']
def overlap(self, other: 'SessionFileMeta'):
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other):
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
@capture_timings("set_index", handlers=worker_capture_timing_handlers) @capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf: dd.DataFrame): def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed.""" """Set index of the dask dataFrame only if needed."""
...@@ -94,4 +70,3 @@ def do_merge(df1: dd.DataFrame, df2: dd.DataFrame): ...@@ -94,4 +70,3 @@ def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf return ddf
...@@ -25,7 +25,7 @@ from .blob_storage import ( ...@@ -25,7 +25,7 @@ from .blob_storage import (
create_and_write_blob, create_and_write_blob,
read_blob, read_blob,
) )
from .bulk_id import BulkId from .bulk_id import new_bulk_id
from .mime_types import MimeTypes from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant from .tenant_provider import resolve_tenant
from ..helper.traces import with_trace from ..helper.traces import with_trace
...@@ -33,10 +33,10 @@ from ..helper.traces import with_trace ...@@ -33,10 +33,10 @@ from ..helper.traces import with_trace
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str: async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
"""Store bulk on a blob storage""" """Store bulk on a blob storage"""
new_bulk_id = BulkId.new_bulk_id() bulk_id = new_bulk_id()
tenant = await resolve_tenant(ctx.partition_id) tenant = await resolve_tenant(ctx.partition_id)
async with create_and_write_blob( async with create_and_write_blob(
df, file_exporter=BlobFileExporters.PARQUET, blob_id=new_bulk_id df, file_exporter=BlobFileExporters.PARQUET, blob_id=bulk_id
) as bulkblob: ) as bulkblob:
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase) storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
await storage.upload( await storage.upload(
......
...@@ -15,13 +15,11 @@ ...@@ -15,13 +15,11 @@
import asyncio import asyncio
from functools import partial from functools import partial
from io import BytesIO from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List, Dict from typing import Union, Optional, List, Dict
from pathlib import Path
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pydantic import BaseModel from pydantic import BaseModel
from pandas import DataFrame as DataframeClass
from .json_orient import JSONOrient from .json_orient import JSONOrient
from .mime_types import MimeTypes from .mime_types import MimeTypes
...@@ -35,9 +33,7 @@ class DataframeSerializerSync: ...@@ -35,9 +33,7 @@ class DataframeSerializerSync:
then provide unified the way to handle various topics float/double precision, compression etc... then provide unified the way to handle various topics float/double precision, compression etc...
""" """
# todo may be unified with the work from storage.blob_storage SupportedFormat = [MimeTypes.JSON, MimeTypes.PARQUET]
SupportedFormat = [MimeTypes.JSON] # , MimeTypes.MSGPACK]
""" these are supported format through wellbore ddms APIs """ """ these are supported format through wellbore ddms APIs """
@classmethod @classmethod
...@@ -60,7 +56,7 @@ class DataframeSerializerSync: ...@@ -60,7 +56,7 @@ class DataframeSerializerSync:
@classmethod @classmethod
def to_json(cls, def to_json(cls,
df: DataframeClass, df: pd.DataFrame,
orient: Union[str, JSONOrient] = JSONOrient.split, orient: Union[str, JSONOrient] = JSONOrient.split,
**kwargs) -> Optional[str]: **kwargs) -> Optional[str]:
""" """
...@@ -74,7 +70,17 @@ class DataframeSerializerSync: ...@@ -74,7 +70,17 @@ class DataframeSerializerSync:
return df.fillna("NaN").to_json(orient=orient.value, **kwargs) return df.fillna("NaN").to_json(orient=orient.value, **kwargs)
@classmethod @classmethod
def read_parquet(cls, data) -> DataframeClass: def to_parquet(cls, df: pd.DataFrame, path_or_buf=None, *, storage_options=None):
"""
:param df: dataframe to dump
:param path_or_buf: str or file-like object, default None, see Pandas.Dataframe.to_parquet
:param storage_options: storage_options, default None
:return: None or buffer
"""
return df.to_parquet(path_or_buf, index=True, engine='pyarrow', storage_options=storage_options)
@classmethod
def read_parquet(cls, data) -> pd.DataFrame:
""" """
:param data: bytes, path object or file-like object :param data: bytes, path object or file-like object
:return: dataframe :return: dataframe
...@@ -86,15 +92,46 @@ class DataframeSerializerSync: ...@@ -86,15 +92,46 @@ class DataframeSerializerSync:
return pd.read_parquet(data) return pd.read_parquet(data)
@classmethod @classmethod
def read_json(cls, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass: def read_json(cls, data, orient: Union[str, JSONOrient]) -> pd.DataFrame:
""" """
:param data: bytes str content (valid JSON str), path object or file-like object :param data: bytes str content (valid JSON str), path object or file-like object. It won't convert axes. In case
of orient='columns' since the indexes type is lost, it will still try to coerce index into 'int'
then 'float' then try convert to date time. 'Columns' will remain as string type.
For orient 'split' no convert at all.
:param orient: :param orient:
:return: dataframe :return: dataframe
""" """
orient = JSONOrient.get(orient) orient = JSONOrient.get(orient)
return pd.read_json(path_or_buf=data, orient=orient.value, convert_axes=convert_axes).replace("NaN", np.NaN) df = pd.read_json(
path_or_buf=data, orient=orient.value, convert_axes=False
).replace("NaN", np.NaN)
# this is a conner case, orient 'columns' implies to have all columns and index values to be passed as string
# in JSON content.
# In that case, their original types are lost. Since parameter 'convert_axes' is set to False, pandas won't
# try to infer the types of the columns and index.
# Regarding columns, it remains OK since WDMS enforces them to be string. Then, using orient 'columns' will cast
# them to string 'by design'.
# For the index values it's problematic. In main cases, those are integer values and it matters to have them
# back to the original type if possible.
#
# Here's the tradeoff to handle the case orient='columns':
# - no convert on columns, so remains as string type
# - try to coerce index to 'float64' or 'int64'
#
# This is similar to what is done in Pandas but only for index:
# see https://github.com/pandas-dev/pandas/blob/master/pandas/io/json/_json.py#L916
if orient == JSONOrient.columns:
for dtype in ['int64', 'float64']:
try:
# try to coerce index type as int then float
df.index = df.index.astype(dtype)
return df
except (TypeError, ValueError, OverflowError):
continue
return df
class DataframeSerializerAsync: class DataframeSerializerAsync:
...@@ -102,14 +139,13 @@ class DataframeSerializerAsync: ...@@ -102,14 +139,13 @@ class DataframeSerializerAsync:
self.executor = pool_executor self.executor = pool_executor
@with_trace("Parquet bulk serialization") @with_trace("Parquet bulk serialization")
async def to_parquet(self, df: DataframeClass, *args, **kwargs) -> DataframeClass: async def to_parquet(self, df: pd.DataFrame, *, storage_options=None) -> pd.DataFrame:
func = partial(DataframeSerializerSync.to_parquet, df, storage_options=storage_options)
func = partial(df.to_parquet, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func) return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("JSON bulk serialization") @with_trace("JSON bulk serialization")
async def to_json(self, async def to_json(self,
df: DataframeClass, df: pd.DataFrame,
orient: Union[str, JSONOrient] = JSONOrient.split, orient: Union[str, JSONOrient] = JSONOrient.split,
*args, **kwargs) -> Optional[str]: *args, **kwargs) -> Optional[str]:
...@@ -117,19 +153,19 @@ class DataframeSerializerAsync: ...@@ -117,19 +153,19 @@ class DataframeSerializerAsync:
return await asyncio.get_event_loop().run_in_executor(self.executor, func) return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("CSV bulk serialization") @with_trace("CSV bulk serialization")
async def to_csv(self, df: DataframeClass, *args, **kwargs) -> Optional[str]: async def to_csv(self, df: pd.DataFrame, *args, **kwargs) -> Optional[str]:
df = df.fillna("NaN") df = df.fillna("NaN")
func = partial(df.to_csv, *args, **kwargs) func = partial(df.to_csv, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func) return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("Parquet bulk deserialization") @with_trace("Parquet bulk deserialization")
async def read_parquet(self, data) -> DataframeClass: async def read_parquet(self, data) -> pd.DataFrame:
return await asyncio.get_event_loop().run_in_executor( return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.read_parquet, data self.executor, DataframeSerializerSync.read_parquet, data
) )
@with_trace("Parquet JSON deserialization") @with_trace("Parquet JSON deserialization")
async def read_json(self, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass: async def read_json(self, data, orient: Union[str, JSONOrient]) -> pd.DataFrame:
return await asyncio.get_event_loop().run_in_executor( return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.read_json, data, orient, convert_axes self.executor, DataframeSerializerSync.read_json, data, orient
) )
from typing import Tuple, Callable, Iterable, List
import re
import pandas as pd
from app.bulk_persistence.dask.errors import BulkNotProcessable
ValidationResult = Tuple[bool, str] # Tuple (is_dataframe_valid, failure_reason)
ValidationSuccess = (True, '')
DataFrameValidationFunc = Callable[[pd.DataFrame], ValidationResult]
def assert_df_validate(dataframe: pd.DataFrame,
validation_funcs: List[DataFrameValidationFunc]):
""" call one or more validation function and throw BulkNotProcessable in case of invalid, run all validation before
returning """
if not validation_funcs:
return
all_validity, all_reasons = zip(*[fn(dataframe) for fn in validation_funcs])
if not all(all_validity):
# raise exception with all invalid reasons
raise BulkNotProcessable(message=",".join([msg for ok, msg in zip(all_validity, all_reasons) if not ok]))
# the following functions are stateless and without side-effect so can be easily used in parallel/cross process context
def no_validation(_) -> ValidationResult:
"""
Always validate the given dataframe without error/warning
return True, ''
"""
return ValidationSuccess
def auto_cast_columns_to_string(df: pd.DataFrame) -> ValidationResult:
"""
If given dataframe contains columns name which is not a string, cast it
return always returns validation success
"""
df.columns = df.columns.astype(str)
return ValidationSuccess
def columns_type_must_be_string(df: pd.DataFrame) -> ValidationResult:
""" Ensure given dataframe contains columns name as string only as described by WellLog schemas """
if all((type(t) is str for t in df.columns)):
return ValidationSuccess
return False, 'All columns type should be string'
def validate_index(df: pd.DataFrame) -> ValidationResult:
""" Ensure index """
if len(df.index) == 0:
return False, "Empty data"
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
return False, "Index should be numeric or datetime"
if not df.index.is_unique:
return False, "Duplicated index found"
return ValidationSuccess
PandasReservedIndexColRegexp = re.compile(r'__index_level_\d+__')
def any_reserved_column_name(names: Iterable[str]) -> bool:
"""
There are reserved name for columns which are internally used by Pandas/Dask with PyArrow to save the index.
Save a df containing reserved name as regular columns lead to inability to read parquet file then.
At this stage, columns used as index are already marked as index and it's not considered as columns by Pandas.
return: True is any column uses a reserved name
"""
return any((PandasReservedIndexColRegexp.match(name) or name == '__null_dask_index__' for name in names))
def columns_not_in_reserved_names(df: pd.DataFrame) -> ValidationResult:
if any_reserved_column_name(df.columns):
return False, 'Invalid column name'
return ValidationSuccess
...@@ -23,6 +23,8 @@ class MimeType(NamedTuple): ...@@ -23,6 +23,8 @@ class MimeType(NamedTuple):
alternative_types: List[str] = [] alternative_types: List[str] = []
def match(self, str_value: str) -> bool: def match(self, str_value: str) -> bool:
if not str_value:
return False
normalized_value = str_value.lower() normalized_value = str_value.lower()
return any( return any(
( (
......
...@@ -75,6 +75,8 @@ async def client_middleware(request, call_next): ...@@ -75,6 +75,8 @@ async def client_middleware(request, call_next):
request.headers[conf.CORRELATION_ID_HEADER_NAME] = ctx.correlation_id request.headers[conf.CORRELATION_ID_HEADER_NAME] = ctx.correlation_id
if ctx.app_key: if ctx.app_key:
request.headers[conf.APP_KEY_HEADER_NAME] = ctx.app_key request.headers[conf.APP_KEY_HEADER_NAME] = ctx.app_key
if ctx.x_user_id:
request.headers[conf.X_USER_ID_HEADER_NAME] = ctx.x_user_id
response = None response = None
try: try:
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import hashlib
import uuid import uuid
from asyncio import gather, iscoroutinefunction from asyncio import gather, iscoroutinefunction
from typing import List from typing import List
...@@ -64,8 +65,8 @@ class StorageRecordServiceBlobStorage: ...@@ -64,8 +65,8 @@ class StorageRecordServiceBlobStorage:
@staticmethod @staticmethod
def _get_record_folder(id: str, data_partition: str): def _get_record_folder(record_id: str, data_partition: str):
encoded_id = hash(id) encoded_id = hashlib.md5(record_id.encode()).hexdigest()
folder = f'{data_partition or "global"}_r_{encoded_id}' folder = f'{data_partition or "global"}_r_{encoded_id}'
return folder return folder
......
...@@ -101,13 +101,6 @@ class ConfigurationContainer: ...@@ -101,13 +101,6 @@ class ConfigurationContainer:
default='undefined' default='undefined'
) )
# TODO: based on environment name, hardcoded values here are temporary until chunking feature release
alpha_feature_enabled: EnvVar = EnvVar(
key='ENVIRONMENT_NAME',
description='enable alpha features',
default='',
factory=lambda x: x.lower() in ['evd', 'dev', 'qa'])
cloud_provider: EnvVar = EnvVar( cloud_provider: EnvVar = EnvVar(
key='CLOUD_PROVIDER', key='CLOUD_PROVIDER',
description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm', description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm',
...@@ -394,3 +387,4 @@ CORRELATION_ID_HEADER_NAME = 'correlation-id' ...@@ -394,3 +387,4 @@ CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID' REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id' PARTITION_ID_HEADER_NAME = 'data-partition-id'
MODULES_PATH_PREFIX = 'app.modules' MODULES_PATH_PREFIX = 'app.modules'
X_USER_ID_HEADER_NAME = 'x-user-id'
\ No newline at end of file