Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (265)
Showing
with 634 additions and 226 deletions
......@@ -90,6 +90,16 @@ verify_existing_requirements:
expire_in: 2 days
compile-and-unit-test:
artifacts:
when: always
paths:
- all-requirements.txt
- spec/generated/openapi.json
containerize:
extends: .skipForTriggeringMergeRequests
stage: containerize
......
......@@ -60,6 +60,7 @@ BSD-2-Clause
========================================================================
The following software have components provided under the terms of this license:
- colorama (from https://github.com/tartley/colorama)
- grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
......@@ -78,10 +79,13 @@ BSD-3-Clause
The following software have components provided under the terms of this license:
- HeapDict (from http://stutzbachenterprises.com/)
- Jinja2 (from http://jinja.pocoo.org/)
- MarkupSafe (from https://palletsprojects.com/p/markupsafe/)
- adlfs (from https://github.com/hayesgb/adlfs/)
- asgiref (from http://github.com/django/asgiref/)
- click (from http://github.com/mitsuhiko/click)
- cloudpickle (from https://github.com/cloudpipe/cloudpickle)
- colorama (from https://github.com/tartley/colorama)
- cryptography (from https://github.com/pyca/cryptography)
- dask (from http://github.com/dask/dask/)
- decorator (from https://github.com/micheles/decorator)
......@@ -169,14 +173,6 @@ The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org)
- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html)
========================================================================
GPL-3.0-or-later
========================================================================
The following software have components provided under the terms of this license:
- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html)
========================================================================
ISC
......@@ -234,7 +230,7 @@ The following software have components provided under the terms of this license:
- aiohttp (from https://github.com/aio-libs/aiohttp/)
- aioitertools (from https://github.com/jreese/aioitertools)
- aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from https://pypi.org/project/anyio/3.3.4/)
- anyio (from https://pypi.org/project/anyio/3.4.0/)
- asgiref (from http://github.com/django/asgiref/)
- attrs (from https://attrs.readthedocs.io/)
- azure-common (from https://github.com/Azure/azure-sdk-for-python)
......@@ -285,7 +281,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.2.1/)
- tomli (from https://pypi.org/project/tomli/1.2.2/)
- urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp)
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .bulk_id import BulkId
from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient
......
......@@ -13,27 +13,7 @@
# limitations under the License.
import uuid
from typing import Tuple, Optional
class BulkId:
@staticmethod
def new_bulk_id() -> str:
return str(uuid.uuid4())
@classmethod
def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str:
if prefix:
return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}'
return uuid.UUID(bulk_id).urn
# Returns a tuple (<uuid> : str, <prefix> : str)
@classmethod
def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]:
if urn is None:
raise ValueError('attempted to decode empty urn')
parts = urn.split(":")
if len(parts) < 4:
return str(uuid.UUID(urn)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def new_bulk_id() -> str:
return str(uuid.uuid4())
from typing import Optional, NamedTuple
class BulkStorageVersion(NamedTuple):
""" This is the version of the bulk storage engine """
version: str
""" unique version identifier """
uri_prefix: Optional[str]
""" associated uri prefix """
BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None)
""" first bulk management implementation with direct management to blob storage with a single blob """
BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1")
""" version 1, using Dask to handle bulk manipulation and storage """
BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None)
""" represent an invalid/undefined storage version """
from typing import Optional, Tuple
import uuid
from .bulk_storage_version import (
BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid)
class BulkURI:
"""
Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how
the bulk is stored.
Usage:
- ctor from URI string value:
`bulk_uri = BulkURI.decode(uri_str)`
- ctor explicit given a bulk_id and a storage version:
`bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)`
- ctor explict using class method:
`bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)`
- encode to URI string value:
`uri_str: str = bulk_uri.encode()`
- check which storage engine version is:
`bulk_uri.storage_version == BulkStorageVersion_V0`
`bulk_uri.is_bulk_storage_V0()`
"""
def __init__(self, bulk_id: str, version: BulkStorageVersion):
"""
make an new one or invalid
Either pass uri alone or bulk_id, version
:param bulk_id: expected as a valid uuid
:param version: storage version
:throw: ValueError
"""
if not bulk_id or not version or version == BulkStorageVersion_Invalid:
bulk_id = ''
version = BulkStorageVersion_Invalid
else:
# ensure valid uuid
uuid.UUID(bulk_id)
self._bulk_id = bulk_id
self._storage_version = version
@classmethod
def invalid(cls):
""" make an invalid instance """
return cls('', BulkStorageVersion_Invalid)
@classmethod
def decode(cls, uri: str) -> 'BulkURI':
"""
construct a BulkURI from an encoded URI
:throw: ValueError
"""
if not uri:
return BulkURI.invalid()
bulk_id, prefix = cls._decode_uri(uri)
if not prefix:
version = BulkStorageVersion_V0
elif prefix == BulkStorageVersion_V1.uri_prefix:
version = BulkStorageVersion_V1
else:
raise ValueError('Unsupported prefix in bulk URI: ' + prefix)
return cls(bulk_id=bulk_id, version=version)
def is_bulk_storage_V0(self) -> bool:
""" convenient check that returns True is version == BulkStorageVersions.V0 """
return self._storage_version.version == BulkStorageVersion_V0.version
@classmethod
def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V0 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0)
@classmethod
def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V1 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1)
@property
def bulk_id(self) -> str:
return self._bulk_id
@property
def storage_version(self) -> BulkStorageVersion:
return self._storage_version
def encode(self) -> str:
"""
encode to uri as string
If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id`
If the prefix is empty or None, uri format = `urn:uuid:$bulk_id`
:Throw: ValueError
"""
if self._storage_version.uri_prefix:
return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}'
return uuid.UUID(self._bulk_id).urn
@classmethod
def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]:
"""
Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix].
If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix]
If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None]
:throw: ValueError if urn empty or invalid UUID
"""
if uri is None:
raise ValueError('attempted to decode empty urn')
parts = uri.split(":")
if len(parts) < 4:
return str(uuid.UUID(uri)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def is_valid(self) -> bool:
""" check invalid """
if self._bulk_id and self._storage_version.version:
return True
return False
......@@ -12,85 +12,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import time
import uuid
from contextlib import suppress
from functools import wraps
from operator import attrgetter
from typing import List
import fsspec
import pandas as pd
from pyarrow.lib import ArrowException, ArrowInvalid
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
import pyarrow.parquet as pa
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin
from dask.distributed import scheduler
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowInvalid as e:
get_logger().exception(f"Pyarrow ArrowInvalid when running {target.__name__}")
raise BulkNotProcessable(f"Unable to process bulk - {str(e)}")
except ArrowException:
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
if self._register_fsspec_implementation:
self._register_fsspec_implementation()
from pyarrow.lib import ArrowException
import pyarrow.parquet as pa
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
def pandas_to_parquet(pdf, path, opt):
return pdf.to_parquet(path, index=True, engine='pyarrow', storage_options=opt)
from .errors import BulkNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import wrap_trace_process, _create_func_key
from .utils import by_pairs, do_merge, worker_capture_timing_handlers
from .dask_worker_plugin import DaskWorkerPlugin
from ..dataframe_validators import assert_df_validate, validate_index, columns_not_in_reserved_names
from .session_file_meta import SessionFileMeta
from .. import DataframeSerializerSync
from . import storage_path_builder as pathBuilder
from ..bulk_id import new_bulk_id
class DaskBulkStorage:
......@@ -116,7 +66,7 @@ class DaskBulkStorage:
parameters.register_fsspec_implementation()
await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin,
DaskWorkerPlugin,
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
......@@ -134,22 +84,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
def _get_base_directory(self, protocol=True):
return f'{self.protocol}://{self.base_directory}' if protocol else self.base_directory
def _get_blob_path(self, record_id: str, bulk_id: str, with_protocol=True) -> str:
"""Return the bulk path from the bulk_id."""
encoded_id = self._encode_record_id(record_id)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/bulk/{bulk_id}/data'
def _build_path_from_session(self, session: Session, with_protocol=True) -> str:
"""Return the session path."""
encoded_id = self._encode_record_id(session.recordId)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/session/{session.id}/data'
def _load(self, path, **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
path : string or list
......@@ -171,27 +105,37 @@ class DaskBulkStorage:
"""Return a dask Dataframe of a record at the specified version.
returns a Future<dd.DataFrame>
"""
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns)
blob_path = pathBuilder.record_bulk_path(
self.base_directory, record_id, bulk_id, self.protocol)
return self._load(blob_path, columns=columns)
@with_trace('read_stat')
def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False)
dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema = dataset.read_pandas().schema
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
return {
"num_rows": dataset.metadata.num_rows,
"schema": schema_dict
}
try:
file_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema = dataset.read_pandas().schema
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
return {
"num_rows": dataset.metadata.num_rows,
"schema": schema_dict
}
except OSError:
raise BulkNotFound(record_id, bulk_id)
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
"""
dask_task_key = _create_func_key(target_func, *args, **kwargs)
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.submit(wrap_trace_process, *args, **kwargs)
return self.client.submit(wrap_trace_process, *args, key=dask_task_key, **kwargs)
def _map_with_trace(self, target_func, *args, **kwargs):
"""
......@@ -236,34 +180,22 @@ class DaskBulkStorage:
returns a Future<None>
"""
f_pdf = await self.client.scatter(pdf)
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path,
self._parameters.storage_options)
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
if not df.index.is_unique:
raise BulkNotProcessable("Duplicated index found")
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
return await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, path,
storage_options=self._parameters.storage_options)
@internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or BulkId.new_bulk_id()
bulk_id = bulk_id or new_bulk_id()
if isinstance(ddf, pd.DataFrame):
self._check_incoming_chunk(ddf)
ddf = dd.from_pandas(ddf, npartitions=1)
assert_df_validate(dataframe=ddf, validation_funcs=[validate_index, columns_not_in_reserved_names])
ddf = dd.from_pandas(ddf, npartitions=1, name=f"from_pandas-{uuid.uuid4()}")
ddf = await self.client.scatter(ddf)
path = self._get_blob_path(record_id, bulk_id)
path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
try:
await self._save_with_dask(path, ddf)
except OSError:
......@@ -273,34 +205,27 @@ class DaskBulkStorage:
@capture_timings('session_add_chunk')
@with_trace('session_add_chunk')
async def session_add_chunk(self, session: Session, pdf: pd.DataFrame):
self._check_incoming_chunk(pdf)
assert_df_validate(dataframe=pdf, validation_funcs=[validate_index, columns_not_in_reserved_names])
# sort column by names
pdf = pdf[sorted(pdf.columns)]
# generate a file name sorted by starting index
# dask reads and sort files by 'natural_key' So the file name impact the final result
first_idx, last_idx = pdf.index[0], pdf.index[-1]
if isinstance(pdf.index, pd.DatetimeIndex):
first_idx, last_idx = pdf.index[0].value, pdf.index[-1].value
idx_range = f'{first_idx}_{last_idx}'
shape = hashlib.sha1('_'.join(map(str, pdf)).encode()).hexdigest()
t = round(time.time() * 1000)
filename = f'{idx_range}_{t}.{shape}'
session_path_wo_protocol = self._build_path_from_session(session, with_protocol=False)
self._fs.mkdirs(session_path_wo_protocol, exist_ok=True)
with self._fs.open(f'{session_path_wo_protocol}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf)}, outfile)
filename = pathBuilder.build_chunk_filename(pdf)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
session_path = self._build_path_from_session(session)
self._fs.mkdirs(session_path, exist_ok=True)
with self._fs.open(f'{session_path}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf.columns)}, outfile)
session_path = pathBuilder.add_protocol(session_path, self.protocol)
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
@capture_timings('get_session_parquet_files')
@with_trace('get_session_parquet_files')
def get_session_parquet_files(self, session):
session_path = self._build_path_from_session(session, with_protocol=False)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
with suppress(FileNotFoundError):
session_files = [f for f in self._fs.ls(session_path) if f.endswith(".parquet")]
return session_files
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import WorkerPlugin
from app.helper.logger import get_logger
class DaskWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
if self._register_fsspec_implementation:
self._register_fsspec_implementation()
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
\ No newline at end of file
......@@ -13,24 +13,75 @@
# limitations under the License.
from fastapi import status, HTTPException
from dask.distributed import scheduler
from pyarrow.lib import ArrowException, ArrowInvalid
from functools import wraps
from app.helper.logger import get_logger
class BulkError(Exception):
http_status: int
def raise_as_http(self):
raise HTTPException(status_code=self.http_status, detail=str(self))
raise HTTPException(status_code=self.http_status, detail=str(self)) from self
class BulkNotFound(BulkError):
http_status = status.HTTP_404_NOT_FOUND
def __init__(self, record_id, bulk_id):
self.message = f'bulk {bulk_id} for record {record_id} not found'
def __init__(self, record_id=None, bulk_id=None, message=None):
ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
if record_id:
ex_message += f'for record {record_id} '
ex_message += 'not found'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class BulkNotProcessable(BulkError):
http_status = status.HTTP_422_UNPROCESSABLE_ENTITY
def __init__(self, bulk_id):
self.message = f'bulk {bulk_id} not processable'
def __init__(self, bulk_id=None, message=None):
ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
ex_message += 'not processable'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class FilterError(BulkError):
http_status = status.HTTP_400_BAD_REQUEST
def __init__(self, reason):
self.message = f'filter error: {reason}'
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowInvalid as e:
get_logger().exception(f"Pyarrow ArrowInvalid when running {target.__name__}")
raise BulkNotProcessable(f"Unable to process bulk - {str(e)}")
except ArrowException:
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from typing import List
from app.bulk_persistence.dask.utils import share_items
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
def _read_meta(self):
if not self._meta:
path, _ = os.path.splitext(self.path)
with self._fs.open(path + '.meta') as meta_file:
self._meta = json.load(meta_file)
return self._meta
@property
def columns(self) -> List[str]:
"""Return the column names"""
return self._read_meta()['columns']
def overlap(self, other: 'SessionFileMeta') -> bool:
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other: 'SessionFileMeta') -> bool:
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions that gathers method to build path for bulk storage
"""
import hashlib
from os.path import join
from time import time
from typing import Optional, Tuple
import pandas as pd
def hash_record_id(record_id: str) -> str:
"""encode the record_id to be a valid path name"""
return hashlib.sha1(record_id.encode()).hexdigest()
def build_base_path(base_directory: str, protocol: Optional[str] = None) -> str:
"""return the base directory, add the protocol if requested"""
return f'{protocol}://{base_directory}' if protocol else base_directory
def add_protocol(path: str, protocol: str) -> str:
"""add protocole to the path"""
prefix = protocol + '://'
if not path.startswith(prefix):
return prefix + path
return path
def remove_protocol(path: str) -> Tuple[str, str]:
"""remove protocol for path if any, return tuple[path, protocol].
If no protocol in path then protocol='' """
if '://' not in path:
return path, ''
sep_idx = path.index('://')
return path[sep_idx + 3:], path[:sep_idx]
def record_path(base_directory: str, record_id, protocol: Optional[str] = None) -> str:
"""Return the entity path.
(path where all data relateed to an entity are saved"""
encoded_id = hash_record_id(record_id)
base_path = build_base_path(base_directory, protocol)
return join(base_path, encoded_id)
def record_bulks_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where blob are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk')
def record_sessions_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where sessions are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session')
def record_bulk_path(
base_directory: str, record_id: str, bulk_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified bulk."""
entity_blob_path = record_bulks_root_path(base_directory, record_id, protocol)
return join(entity_blob_path, bulk_id, 'data')
def record_session_path(
base_directory: str, session_id: str, record_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified session."""
entity_session_path = record_sessions_root_path(base_directory, record_id, protocol)
return join(entity_session_path, session_id, 'data')
def build_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Return chunk file name sorted by starting index
Note 1: do not change the name without updating SessionFileMeta
Note 2: dask reads and sort files by 'natural_key' so the filenames impacts the final result
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
#shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape_str = '_'.join(f'{cn}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
from dask.utils import funcname
from dask.base import tokenize
from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
......@@ -23,6 +26,13 @@ def wrap_trace_process(*args, **kwargs):
sampler=AlwaysOnSampler(),
exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span:
with tracer.span(name=f"Dask Worker - {funcname(target_func)}") as span:
span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs)
def _create_func_key(func, *args, **kwargs):
"""
Inspired by Dask code, it returns a hashed key based on function name and given arguments
"""
return funcname(func) + "-" + tokenize(func, kwargs, *args)
......@@ -25,16 +25,17 @@ import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
def log_captured_timing(tag, wall, cpu):
logger = get_logger()
if logger:
logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing
worker_capture_timing_handlers = [worker_make_log_captured_timing_handler(INFO)]
##
def share_items(seq1, seq2):
"""Returns True if seq1 contains common items with seq2."""
......@@ -47,31 +48,6 @@ def by_pairs(iterable):
return zip_longest(*[iter(iterable)] * 2, fillvalue=None)
class SessionFileMeta:
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self.columns = self._get_columns(file_path) # TODO lazy load
self.path = file_path
def _get_columns(self, file_path):
path, _ = os.path.splitext(file_path)
with self._fs.open(path + '.meta') as meta_file:
return json.load(meta_file)['columns']
def overlap(self, other: 'SessionFileMeta'):
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other):
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
......@@ -94,4 +70,3 @@ def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf
......@@ -25,7 +25,7 @@ from .blob_storage import (
create_and_write_blob,
read_blob,
)
from .bulk_id import BulkId
from .bulk_id import new_bulk_id
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from ..helper.traces import with_trace
......@@ -33,10 +33,10 @@ from ..helper.traces import with_trace
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
"""Store bulk on a blob storage"""
new_bulk_id = BulkId.new_bulk_id()
bulk_id = new_bulk_id()
tenant = await resolve_tenant(ctx.partition_id)
async with create_and_write_blob(
df, file_exporter=BlobFileExporters.PARQUET, blob_id=new_bulk_id
df, file_exporter=BlobFileExporters.PARQUET, blob_id=bulk_id
) as bulkblob:
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
await storage.upload(
......
......@@ -15,13 +15,11 @@
import asyncio
from functools import partial
from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List, Dict
from typing import Union, Optional, List, Dict
from pathlib import Path
import numpy as np
import pandas as pd
from pydantic import BaseModel
from pandas import DataFrame as DataframeClass
from .json_orient import JSONOrient
from .mime_types import MimeTypes
......@@ -35,9 +33,7 @@ class DataframeSerializerSync:
then provide unified the way to handle various topics float/double precision, compression etc...
"""
# todo may be unified with the work from storage.blob_storage
SupportedFormat = [MimeTypes.JSON] # , MimeTypes.MSGPACK]
SupportedFormat = [MimeTypes.JSON, MimeTypes.PARQUET]
""" these are supported format through wellbore ddms APIs """
@classmethod
......@@ -60,7 +56,7 @@ class DataframeSerializerSync:
@classmethod
def to_json(cls,
df: DataframeClass,
df: pd.DataFrame,
orient: Union[str, JSONOrient] = JSONOrient.split,
**kwargs) -> Optional[str]:
"""
......@@ -74,7 +70,17 @@ class DataframeSerializerSync:
return df.fillna("NaN").to_json(orient=orient.value, **kwargs)
@classmethod
def read_parquet(cls, data) -> DataframeClass:
def to_parquet(cls, df: pd.DataFrame, path_or_buf=None, *, storage_options=None):
"""
:param df: dataframe to dump
:param path_or_buf: str or file-like object, default None, see Pandas.Dataframe.to_parquet
:param storage_options: storage_options, default None
:return: None or buffer
"""
return df.to_parquet(path_or_buf, index=True, engine='pyarrow', storage_options=storage_options)
@classmethod
def read_parquet(cls, data) -> pd.DataFrame:
"""
:param data: bytes, path object or file-like object
:return: dataframe
......@@ -86,15 +92,46 @@ class DataframeSerializerSync:
return pd.read_parquet(data)
@classmethod
def read_json(cls, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass:
def read_json(cls, data, orient: Union[str, JSONOrient]) -> pd.DataFrame:
"""
:param data: bytes str content (valid JSON str), path object or file-like object
:param data: bytes str content (valid JSON str), path object or file-like object. It won't convert axes. In case
of orient='columns' since the indexes type is lost, it will still try to coerce index into 'int'
then 'float' then try convert to date time. 'Columns' will remain as string type.
For orient 'split' no convert at all.
:param orient:
:return: dataframe
"""
orient = JSONOrient.get(orient)
return pd.read_json(path_or_buf=data, orient=orient.value, convert_axes=convert_axes).replace("NaN", np.NaN)
df = pd.read_json(
path_or_buf=data, orient=orient.value, convert_axes=False
).replace("NaN", np.NaN)
# this is a conner case, orient 'columns' implies to have all columns and index values to be passed as string
# in JSON content.
# In that case, their original types are lost. Since parameter 'convert_axes' is set to False, pandas won't
# try to infer the types of the columns and index.
# Regarding columns, it remains OK since WDMS enforces them to be string. Then, using orient 'columns' will cast
# them to string 'by design'.
# For the index values it's problematic. In main cases, those are integer values and it matters to have them
# back to the original type if possible.
#
# Here's the tradeoff to handle the case orient='columns':
# - no convert on columns, so remains as string type
# - try to coerce index to 'float64' or 'int64'
#
# This is similar to what is done in Pandas but only for index:
# see https://github.com/pandas-dev/pandas/blob/master/pandas/io/json/_json.py#L916
if orient == JSONOrient.columns:
for dtype in ['int64', 'float64']:
try:
# try to coerce index type as int then float
df.index = df.index.astype(dtype)
return df
except (TypeError, ValueError, OverflowError):
continue
return df
class DataframeSerializerAsync:
......@@ -102,14 +139,13 @@ class DataframeSerializerAsync:
self.executor = pool_executor
@with_trace("Parquet bulk serialization")
async def to_parquet(self, df: DataframeClass, *args, **kwargs) -> DataframeClass:
func = partial(df.to_parquet, *args, **kwargs)
async def to_parquet(self, df: pd.DataFrame, *, storage_options=None) -> pd.DataFrame:
func = partial(DataframeSerializerSync.to_parquet, df, storage_options=storage_options)
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("JSON bulk serialization")
async def to_json(self,
df: DataframeClass,
df: pd.DataFrame,
orient: Union[str, JSONOrient] = JSONOrient.split,
*args, **kwargs) -> Optional[str]:
......@@ -117,19 +153,19 @@ class DataframeSerializerAsync:
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("CSV bulk serialization")
async def to_csv(self, df: DataframeClass, *args, **kwargs) -> Optional[str]:
async def to_csv(self, df: pd.DataFrame, *args, **kwargs) -> Optional[str]:
df = df.fillna("NaN")
func = partial(df.to_csv, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("Parquet bulk deserialization")
async def read_parquet(self, data) -> DataframeClass:
async def read_parquet(self, data) -> pd.DataFrame:
return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.read_parquet, data
)
@with_trace("Parquet JSON deserialization")
async def read_json(self, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass:
async def read_json(self, data, orient: Union[str, JSONOrient]) -> pd.DataFrame:
return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.read_json, data, orient, convert_axes
self.executor, DataframeSerializerSync.read_json, data, orient
)
from typing import Tuple, Callable, Iterable, List
import re
import pandas as pd
from app.bulk_persistence.dask.errors import BulkNotProcessable
ValidationResult = Tuple[bool, str] # Tuple (is_dataframe_valid, failure_reason)
ValidationSuccess = (True, '')
DataFrameValidationFunc = Callable[[pd.DataFrame], ValidationResult]
def assert_df_validate(dataframe: pd.DataFrame,
validation_funcs: List[DataFrameValidationFunc]):
""" call one or more validation function and throw BulkNotProcessable in case of invalid, run all validation before
returning """
if not validation_funcs:
return
all_validity, all_reasons = zip(*[fn(dataframe) for fn in validation_funcs])
if not all(all_validity):
# raise exception with all invalid reasons
raise BulkNotProcessable(message=",".join([msg for ok, msg in zip(all_validity, all_reasons) if not ok]))
# the following functions are stateless and without side-effect so can be easily used in parallel/cross process context
def no_validation(_) -> ValidationResult:
"""
Always validate the given dataframe without error/warning
return True, ''
"""
return ValidationSuccess
def auto_cast_columns_to_string(df: pd.DataFrame) -> ValidationResult:
"""
If given dataframe contains columns name which is not a string, cast it
return always returns validation success
"""
df.columns = df.columns.astype(str)
return ValidationSuccess
def columns_type_must_be_string(df: pd.DataFrame) -> ValidationResult:
""" Ensure given dataframe contains columns name as string only as described by WellLog schemas """
if all((type(t) is str for t in df.columns)):
return ValidationSuccess
return False, 'All columns type should be string'
def validate_index(df: pd.DataFrame) -> ValidationResult:
""" Ensure index """
if len(df.index) == 0:
return False, "Empty data"
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
return False, "Index should be numeric or datetime"
if not df.index.is_unique:
return False, "Duplicated index found"
return ValidationSuccess
PandasReservedIndexColRegexp = re.compile(r'__index_level_\d+__')
def any_reserved_column_name(names: Iterable[str]) -> bool:
"""
There are reserved name for columns which are internally used by Pandas/Dask with PyArrow to save the index.
Save a df containing reserved name as regular columns lead to inability to read parquet file then.
At this stage, columns used as index are already marked as index and it's not considered as columns by Pandas.
return: True is any column uses a reserved name
"""
return any((PandasReservedIndexColRegexp.match(name) or name == '__null_dask_index__' for name in names))
def columns_not_in_reserved_names(df: pd.DataFrame) -> ValidationResult:
if any_reserved_column_name(df.columns):
return False, 'Invalid column name'
return ValidationSuccess
......@@ -23,6 +23,8 @@ class MimeType(NamedTuple):
alternative_types: List[str] = []
def match(self, str_value: str) -> bool:
if not str_value:
return False
normalized_value = str_value.lower()
return any(
(
......
......@@ -75,6 +75,8 @@ async def client_middleware(request, call_next):
request.headers[conf.CORRELATION_ID_HEADER_NAME] = ctx.correlation_id
if ctx.app_key:
request.headers[conf.APP_KEY_HEADER_NAME] = ctx.app_key
if ctx.x_user_id:
request.headers[conf.X_USER_ID_HEADER_NAME] = ctx.x_user_id
response = None
try:
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import uuid
from asyncio import gather, iscoroutinefunction
from typing import List
......@@ -64,8 +65,8 @@ class StorageRecordServiceBlobStorage:
@staticmethod
def _get_record_folder(id: str, data_partition: str):
encoded_id = hash(id)
def _get_record_folder(record_id: str, data_partition: str):
encoded_id = hashlib.md5(record_id.encode()).hexdigest()
folder = f'{data_partition or "global"}_r_{encoded_id}'
return folder
......
......@@ -101,13 +101,6 @@ class ConfigurationContainer:
default='undefined'
)
# TODO: based on environment name, hardcoded values here are temporary until chunking feature release
alpha_feature_enabled: EnvVar = EnvVar(
key='ENVIRONMENT_NAME',
description='enable alpha features',
default='',
factory=lambda x: x.lower() in ['evd', 'dev', 'qa'])
cloud_provider: EnvVar = EnvVar(
key='CLOUD_PROVIDER',
description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm',
......@@ -394,3 +387,4 @@ CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id'
MODULES_PATH_PREFIX = 'app.modules'
X_USER_ID_HEADER_NAME = 'x-user-id'
\ No newline at end of file