Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Showing
with 887 additions and 1137 deletions
......@@ -101,13 +101,6 @@ class ConfigurationContainer:
default='undefined'
)
# TODO: based on environment name, hardcoded values here are temporary until chunking feature release
alpha_feature_enabled: EnvVar = EnvVar(
key='ENVIRONMENT_NAME',
description='enable alpha features',
default='',
factory=lambda x: x.lower() in ['evd', 'dev', 'qa'])
cloud_provider: EnvVar = EnvVar(
key='CLOUD_PROVIDER',
description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm',
......@@ -130,7 +123,7 @@ class ConfigurationContainer:
de_client_config_timeout: EnvVar = EnvVar(
key='DE_CLIENT_CFG_TIMEOUT',
description='set connect, read, write, and pool timeouts (in seconds) for all DE client.',
default='45', # gateway timeout is 30s, greater value ensure the async client won't be the bottleneck.
default='10',
factory=lambda x: int(x))
de_client_config_max_connection: EnvVar = EnvVar(
......@@ -193,6 +186,26 @@ class ConfigurationContainer:
description='Min amount of memory for one worker',
default="512Mi")
dask_data_ipc: EnvVar = EnvVar(
key='DASK_DATA_IPC',
description='Specify data IPC type between main process and dask workers',
default='dask_native',
allowed_values=['dask_native', 'local_file'],
factory=lambda x: x.lower()
)
max_columns_return: EnvVar = EnvVar(
key='MAX_COLUMNS_RETURN',
description='Max number of columns that can be returned per data request',
default="500",
factory=lambda x: int(x))
max_columns_per_chunk_write: EnvVar = EnvVar(
key='MAX_COLUMNS_PER_CHUNK_WRITE',
description='Max number of columns that can be write per chunk',
default="500",
factory=lambda x: int(x))
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
......@@ -303,6 +316,8 @@ def cloud_provider_additional_environment(config: ConfigurationContainer):
is_mandatory=False,
override=True)
config.az_bulk_container = 'wdms-osdu'
if provider == 'gcp':
config.add_from_env(attribute_name='default_data_tenant_project_id',
env_var_key='OS_WELLBORE_DDMS_DATA_PROJECT_ID',
......@@ -314,9 +329,24 @@ def cloud_provider_additional_environment(config: ConfigurationContainer):
config.add_from_env(attribute_name='default_data_tenant_credentials',
env_var_key='OS_WELLBORE_DDMS_DATA_PROJECT_CREDENTIALS',
description='path to the key file of the SA to access the data tenant',
is_mandatory=True,
is_mandatory=False,
override=True,
validator=validator_path_must_exist,
default=None)
config.add_from_env(attribute_name='service_host_storage',
env_var_key='SERVICE_HOST_STORAGE',
description='Back-end for storage service',
is_mandatory=False,
override=True,
default='http://storage/api/storage')
config.add_from_env(attribute_name='service_host_search',
env_var_key='SERVICE_HOST_SEARCH',
description='Back-end for search service',
is_mandatory=False,
override=True,
validator=validator_path_must_exist)
default='http://search/api/search')
if provider == 'ibm':
config.add_from_env(attribute_name='default_data_tenant_project_id',
......@@ -379,3 +409,4 @@ CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id'
MODULES_PATH_PREFIX = 'app.modules'
X_USER_ID_HEADER_NAME = 'x-user-id'
from .no_consistency import NoConsistencyChecks
from .welllog_consistency import (
check_welllog_consistency, WelllogDataConsistencyChecks, DuplicatedCurveIdException,
ReferenceCurveIdNotFoundException, ColumnDoesNotMatchCurveIdException
)
from .reference_check import ReferenceCurveException
from .trajectory_consistency import (
check_trajectory_consistency,
TrajectoryDataConsistencyChecks,
DuplicatedStationProperties,
)
import pandas as pd
from app.bulk_persistence import DataConsistencyChecks
class NoConsistencyChecks(DataConsistencyChecks):
@classmethod
async def check_bulk_consistency_on_commit_session(cls, record: "Record", new_bulk_id):
return
@classmethod
def check_bulk_consistency_on_post_bulk(cls, record: "Record", df: pd.DataFrame):
return
import pandas as pd
import math
from pydantic import BaseModel
from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks
class ReferenceCurveException(ConsistencyException):
"""raised when column doesn't match any CurveID"""
def check_reference_is_strictly_monotonic(ref: pd.Series):
# check unique values because is_monotonic_increasing & is_monotonic_decreasing are not strict
if ref.duplicated().any():
raise ReferenceCurveException("Repeated values in a reference curve aren't allowed.")
if not ref.is_monotonic_increasing and not ref.is_monotonic_decreasing:
# Nan values
if ref.isnull().values.any():
raise ReferenceCurveException("Nan values in a reference curve are not allowed.")
else:
raise ReferenceCurveException("Reference must be monotonically increasing or decreasing.")
def raise_if_attr_value_is_different(
record_data: BaseModel,
attr_name: str,
reference_value: float,
error_msg: str,
):
attr_value = getattr(record_data, attr_name, None)
if attr_value is not None and not math.isclose(attr_value, reference_value):
raise ReferenceCurveException(
error_msg.format(
attr_name=attr_name,
attr_value=attr_value,
reference_value=reference_value,
)
)
import pandas as pd
from dask.dataframe.core import DataFrame as DaskDataFrame
from typing import List
import math
from odes_storage.models import Record
from app.model.osdu_model import WellboreTrajectory110
from app.helper.traces import with_trace
from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage, BulkRecordNotFound
from app.bulk_persistence.dask.traces import submit_with_trace
from app.model.model_utils import from_record
from app.context import get_ctx
from .unique import get_unique_attr_values
from .reference_check import check_reference_is_strictly_monotonic, raise_if_attr_value_is_different
class DuplicatedStationProperties(RuntimeError):
"""raised if all trajectoryStationProperties names are not unique"""
class ColumnDoesNotMatchTrajectoryStationException(ConsistencyException):
"""raised when column doesn't match any AvailableTrajectoryStationProperties"""
def check_trajectory_consistency(traj: WellboreTrajectory110):
"""Check if trajectory is consistent.
Each station in data.AvailableTrajectoryStationProperties must have a unique name.
Args:
traj (WellboreTrajectory110): trajectory object to be verified
Returns:
Raises:
DuplicatedStationProperties: All AvailableTrajectoryStationProperties Name are not unique.
"""
if not traj.data or not traj.data.AvailableTrajectoryStationProperties:
return
# All name must be unique
station_name, duplicated_error = get_unique_attr_values(
traj.data.AvailableTrajectoryStationProperties,
"Name"
)
if duplicated_error:
raise DuplicatedStationProperties()
class TrajectoryDataConsistencyChecks(DataConsistencyChecks):
"""Check welllogTrajectory data consistency
bulk columns and TrajectoryStationProperty names must match.
MD column should be strictly monotonic increasing or strictly monotonic decreasing
MD Top & bottom values should match WelllogTrajectory
"""
reference_trajectory_station_property_type_id = ":reference-data--TrajectoryStationPropertyType:MD:"
@staticmethod
def get_reference_name(traj: WellboreTrajectory110):
if not traj.data.AvailableTrajectoryStationProperties:
return None
for station in traj.data.AvailableTrajectoryStationProperties:
if station and hasattr(station, "TrajectoryStationPropertyTypeID"):
if (
station.TrajectoryStationPropertyTypeID
and TrajectoryDataConsistencyChecks.reference_trajectory_station_property_type_id
in station.TrajectoryStationPropertyTypeID
):
if hasattr(station, "Name") and station.Name:
return station.Name
return None
@classmethod
@with_trace("bulk_consistency")
def check_bulk_consistency_on_post_bulk(cls, record: Record, df: pd.DataFrame):
"""Perform trajectory consistency checks of a bulk dataframe against welllogTrajectory record
Called when post a whole bulk (not chunking apis)
Args:
record (Record): WelllogTrajectory record to check
df (pandas.DataFrame): bulk data to check against the record
Raises: ConsistencyException
Returns: None
"""
if not record.data:
return
traj = from_record(WellboreTrajectory110, record)
cls._check_columns_consistency(traj, df.columns)
reference_name = TrajectoryDataConsistencyChecks.get_reference_name(traj)
if not reference_name:
return
if reference_name in df:
ref = df[reference_name]
check_reference_is_strictly_monotonic(ref)
cls._check_top_bottom_reference(traj, ref)
@classmethod
@with_trace("bulk_consistency")
async def check_bulk_consistency_on_commit_session(cls, record: Record, bulk_id: str):
traj = from_record(WellboreTrajectory110, record)
# check colums match TrajectoryStationProperties names
dask_blob_storage = await get_ctx().app_injector.get(DaskBulkStorage)
stats = await dask_blob_storage.read_stat(record.id, bulk_id)
schema = stats.get("schema")
cls._check_columns_consistency(traj, schema.keys())
reference_name = TrajectoryDataConsistencyChecks.get_reference_name(traj)
if not reference_name:
return
try:
ref_ddf = await dask_blob_storage.load_bulk(record.id, bulk_id, columns=[reference_name])
except BulkRecordNotFound:
return
# wrap what should be called in dask workers
def check_reference(traj: WellboreTrajectory110, ref_ddf_: DaskDataFrame):
ref = ref_ddf_[reference_name].compute()
check_reference_is_strictly_monotonic(ref)
cls._check_top_bottom_reference(traj, ref)
await submit_with_trace(dask_blob_storage.client, check_reference, traj, ref_ddf)
@staticmethod
def _check_columns_consistency(traj: WellboreTrajectory110, col_labels: List[str]):
error_msg = "do(es) not match any AvailableTrajectoryStationProperties name in the WellboreTrajectory record."
curve_ids, _ = get_unique_attr_values(traj.data.AvailableTrajectoryStationProperties, "Name")
col_names = DataConsistencyChecks._get_data_columns_name(col_labels)
not_matching_col_name = [col_name for col_name in col_names if col_name not in curve_ids]
if any(not_matching_col_name):
raise ColumnDoesNotMatchTrajectoryStationException(
f"Column(s) {', '.join(not_matching_col_name)} {error_msg}"
)
@staticmethod
def _check_top_bottom_reference(traj: WellboreTrajectory110, ref: pd.Series):
raise_if_attr_value_is_different(
record_data=traj.data,
attr_name="TopDepthMeasuredDepth",
reference_value=ref.iloc[0],
error_msg="First value ({reference_value}) of the measured depth is different from {attr_name} value ({attr_value}) of the WellboreTrajectory record."
)
raise_if_attr_value_is_different(
record_data=traj.data,
attr_name="BaseDepthMeasuredDepth",
reference_value=ref.iloc[-1],
error_msg="Last value ({reference_value}) of the measured depth is different from {attr_name} value ({attr_value}) of the WellboreTrajectory record."
)
from typing import Any, List, Optional, Set, Tuple
class DuplicatedIdError:
pass
def get_unique_attr_values(object_list: List[Any], attr_name: str) -> Tuple[Set[str], Optional[DuplicatedIdError]]:
"""Get the values of an attribute of all objects if all values are unique
Args:
object_list (List[Any]): objects whose specified attribute must be unique
attr_name: the name of the attribute to get for each objects
Returns:
The values of the attribute specified for each objects if they are all unique
otherwise an empty set and a DuplicatedIdError
"""
values = set()
# check all curve ids are unique
if object_list:
for obj in object_list:
value = getattr(obj, attr_name)
if value:
if value in values:
return values, DuplicatedIdError()
else:
values.add(value)
return values, None
import math
from typing import Iterable
import pandas as pd
from dask.dataframe.core import DataFrame as DaskDataFrame
from odes_storage.models import Record
from app.helper.traces import with_trace
from app.bulk_persistence.consistency_checks import ConsistencyException, DataConsistencyChecks
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage, BulkRecordNotFound
from app.bulk_persistence.dask.traces import submit_with_trace
from app.model.model_utils import from_record
from app.model.osdu_model import WellLog110
from app.context import get_ctx
from .reference_check import check_reference_is_strictly_monotonic, raise_if_attr_value_is_different
from .unique import get_unique_attr_values
class DuplicatedCurveIdException(ConsistencyException):
"""raised if all curveID values are not unique"""
class ReferenceCurveIdNotFoundException(ConsistencyException):
"""raised when there is no curve with a curveID value equal to the ReferenceCurveID value"""
class ColumnDoesNotMatchCurveIdException(ConsistencyException):
"""raised when column doesn't match any CurveID"""
@with_trace('welllog_consistency')
def check_welllog_consistency(wl: WellLog110):
"""Check wellLog metadata.
Curves ids in data.Curves must be unique
Welllog must have a curve whose curveID value is equal to the wellLog's ReferenceCurveID value if any
Args:
wl (WellLog110): wellLog object to be verified
Returns:
None
Raises:
DuplicatedCurveIdException: All CurveIDs are not unique.
ReferenceCurveIdNotFoundException: No curve whose curveID value are equal to ReferenceCurveID value.
"""
# There are no Curves or ReferenceCurveID
if not wl.data:
return
if not wl.data.Curves and not wl.data.ReferenceCurveID:
return
# Can't define a ReferenceCurveID when welllog doesn't have any Curve
if not wl.data.Curves and wl.data.ReferenceCurveID:
raise ReferenceCurveIdNotFoundException()
# All curve ids must be unique
curve_ids, duplicated_error = get_unique_attr_values(wl.data.Curves, "CurveID")
if duplicated_error:
raise DuplicatedCurveIdException()
# ReferenceCurveID should match a curve
if wl.data.ReferenceCurveID and wl.data.ReferenceCurveID not in curve_ids:
raise ReferenceCurveIdNotFoundException()
class WelllogDataConsistencyChecks(DataConsistencyChecks):
"""Check welllog data consistency
bulk columns and welllog curvesIDs must match.
welllog referenceCurveID must match a welllog curve
Reference should be strictly monotonic increasing or strictly monotonic decreasing
Top & bottom reference values should match welllog metadata ie:
SamplingStart is close to top reference value with 1e-9% tolerance
SamplingStop is close to bottom reference value with 1e-9% tolerance
"""
@classmethod
@with_trace('bulk_consistency')
def check_bulk_consistency_on_post_bulk(cls, record: Record, df: pd.DataFrame):
""" Perform welllog consistency checks of a bulk dataframe against a welllog record
used by bulk_persistence when post a whole bulk (not chunking apis)
Args:
record (Record): welllog record to check
df (pandas.DataFrame): bulk data to check against the record
Raises: ConsistencyException
Returns: None
"""
wl = from_record(WellLog110, record)
cls._check_columns_consistency(wl, df.columns)
if not (wl.data and wl.data.ReferenceCurveID):
return
if wl.data.ReferenceCurveID in df:
ref = df[wl.data.ReferenceCurveID]
check_reference_is_strictly_monotonic(ref)
cls._check_top_bottom_reference(wl, ref)
@classmethod
@with_trace('bulk_consistency')
async def check_bulk_consistency_on_commit_session(cls, record: Record, bulk_id: str):
""" Perform welllog consistency checks of a bulk against a welllog record
used by bulk_persistence when commit a session (chunking apis)
Args:
record (Record): welllog record to check
bulk_id (str): id of the bulk to check against the record
Raises: ConsistencyException
Returns: None
"""
wl = from_record(WellLog110, record)
# check col match record.curves
dask_blob_storage = await get_ctx().app_injector.get(DaskBulkStorage)
stats = await dask_blob_storage.read_stat(record.id, bulk_id)
schema = stats.get("schema")
cls._check_columns_consistency(wl, schema.keys())
# check reference
if not (wl.data and wl.data.ReferenceCurveID):
return
try:
ref_ddf = await dask_blob_storage.load_bulk(record.id, bulk_id, columns=[wl.data.ReferenceCurveID])
except BulkRecordNotFound:
return
# wrap what should be called in dask workers
def check_welllog_reference(wl: WellLog110, ref_ddf: DaskDataFrame):
ref = ref_ddf[wl.data.ReferenceCurveID].compute()
check_reference_is_strictly_monotonic(ref)
cls._check_top_bottom_reference(wl, ref)
await submit_with_trace(dask_blob_storage.client, check_welllog_reference, wl, ref_ddf)
@staticmethod
def _check_columns_consistency(wl: WellLog110, col_labels: Iterable[str]):
"""Checks bulk data column names match welllog record curves ids
Args:
wl(WellLog110): welllog record
col_labels: column's labels to check against the record
Returns: None
Raises:
ColumnDoesNotMatchCurveIdException: column and record's curves doesn't match
"""
if (not wl.data or not wl.data.Curves) and len(col_labels) > 0:
raise ColumnDoesNotMatchCurveIdException(f"Column(s) do(es) not match any CurveID of the WellLog record.")
curve_ids, _ = get_unique_attr_values(wl.data.Curves, "CurveID")
col_names = DataConsistencyChecks._get_data_columns_name(col_labels)
not_matching_col_name = [col_name for col_name in col_names if col_name not in curve_ids]
if any(not_matching_col_name):
raise ColumnDoesNotMatchCurveIdException(
f"Column(s) {', '.join(not_matching_col_name)} do(es) not match any CurveID of the WellLog record."
)
@staticmethod
def _check_top_bottom_reference(wl: WellLog110, ref: pd.Series):
raise_if_attr_value_is_different(
record_data=wl.data,
attr_name="SamplingStart",
reference_value=ref.iloc[0],
error_msg="Reference top value ({reference_value}) is different from {attr_name} value ({attr_value}) of the WellLog record."
)
raise_if_attr_value_is_different(
record_data=wl.data,
attr_name="SamplingStop",
reference_value=ref.iloc[-1],
error_msg="Reference bottom value ({reference_value}) is different from {attr_name} value ({attr_value}) of the WellLog record."
)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextvars
from typing import Optional
import json
from app.model.user import User
from app.injector.app_injector import AppInjector
class Context:
"""
Immutable object to provide contextual information a long request processing
"""
__slots__ = [
'_tracer',
'_correlation_id',
'_request_id',
'_dev_mode',
'_auth',
'_partition_id',
'_app_key',
'_api_key',
'_user',
'_app_injector',
'_attr_dict',
'_x_user_id'
]
def __init__(self,
tracer=None,
correlation_id: Optional[str] = None,
request_id: Optional[str] = None,
dev_mode: Optional[bool] = None,
auth=None,
partition_id: Optional[str] = None,
app_key: Optional[str] = None,
api_key: Optional[str] = None,
user: Optional[User] = None,
app_injector: Optional[AppInjector] = None,
x_user_id: Optional[str] = None,
**keys):
self._tracer = tracer
self._correlation_id = correlation_id
self._request_id = request_id
self._dev_mode = dev_mode
self._auth = auth
self._partition_id = partition_id
self._app_key = app_key
self._api_key = api_key
self._user = user
self._app_injector = app_injector
self._x_user_id = x_user_id
# pass
self._attr_dict = keys or {}
__ctx_var = contextvars.ContextVar('_wdms_internal_context_var')
"""
contextvar is natively supported in asyncio, we can take advantage of this of easily get the current context (by
the way it may hide the potential dependency)
"""
@classmethod
def current(cls) -> 'Context':
return cls.__ctx_var.get()
@classmethod
def clear_current(cls):
cls.__ctx_var.set(Context())
def set_current(self):
Context.__ctx_var.set(self)
@classmethod
def set_current_with_value(cls, tracer=None, correlation_id=None, request_id=None, auth=None,
partition_id=None, app_key=None, api_key=None, user=None, app_injector=None,
dev_mode=None, x_user_id=None,
**keys) -> 'Context':
"""
clone the current context with the given values, set the new ctx as current and returns it
:return:
"""
current = cls.current()
assert current is not None, 'no existing current context'
new_ctx = current.with_value(tracer=tracer,
correlation_id=correlation_id,
request_id=request_id,
auth=auth,
partition_id=partition_id,
app_key=app_key,
api_key=api_key,
user=user,
app_injector=app_injector,
dev_mode=dev_mode,
x_user_id=x_user_id,
**keys)
new_ctx.set_current()
return new_ctx
def get(self, key, default=None):
if key in self._attr_dict:
return self._attr_dict[key]
if hasattr(self, '_' + key):
return getattr(self, '_' + key)
return default
def __getitem__(self, key):
if key in self._attr_dict:
return self._attr_dict[key]
if hasattr(self, '_' + key):
return getattr(self, '_' + key)
raise KeyError(key + ' is unknown')
def __copy__(self):
return self.__class__(
tracer=self._tracer,
correlation_id=self._correlation_id,
request_id=self._request_id,
dev_mode=self._dev_mode,
auth=self._auth,
partition_id=self._partition_id,
app_key=self._app_key,
api_key=self._api_key,
user=self._user,
app_injector=self._app_injector,
x_user_id=self._x_user_id,
**self._attr_dict)
def with_correlation_id(self, correlation_id):
clone = self.__copy__()
clone._correlation_id = correlation_id
return clone
def with_request_id(self, request_id):
clone = self.__copy__()
clone._request_id = request_id
return clone
def with_auth(self, auth):
clone = self.__copy__()
clone._auth = auth
return clone
def with_partition_id(self, partition_id):
clone = self.__copy__()
clone._partition_id = partition_id
return clone
def with_x_user_id(self, x_user_id):
clone = self.__copy__()
clone._x_user_id = x_user_id
return clone
def with_user(self, user):
clone = self.__copy__()
clone._user = user
return clone
def with_app_key(self, app_key):
clone = self.__copy__()
clone._app_key = app_key
return clone
def with_api_key(self, api_key):
clone = self.__copy__()
clone._api_key = api_key
return clone
def with_injector(self, app_injector):
clone = self.__copy__()
clone._app_injector = app_injector
return clone
def with_value(self, tracer=None, correlation_id=None, request_id=None, auth=None,
partition_id=None, app_key=None, api_key=None, user=None, app_injector=None,
dev_mode=None, x_user_id=None, **keys) -> 'Context':
""" Clone context, adding all keys in future logs """
cloned = self.__class__(
tracer=tracer or self._tracer,
correlation_id=correlation_id or self._correlation_id,
request_id=request_id or self._request_id,
dev_mode=dev_mode or self._dev_mode,
auth=auth or self._auth,
partition_id=partition_id or self._partition_id,
app_key=app_key or self._app_key,
api_key=api_key or self._api_key,
user=user or self._user,
app_injector=app_injector or self._app_injector,
x_user_id=x_user_id or self._x_user_id,
**self._attr_dict)
if keys is not None:
cloned._attr_dict.update(keys)
return cloned
@property
def tracer(self):
return self._tracer
@property
def correlation_id(self) -> Optional[str]:
return self._correlation_id
@property
def request_id(self) -> Optional[str]:
return self._request_id
@property
def dev_mode(self) -> Optional[bool]:
return self._dev_mode
@property
def auth(self):
return self._auth
@property
def partition_id(self) -> Optional[str]:
return self._partition_id
@property
def api_key(self) -> Optional[str]:
return self._api_key
@property
def app_key(self) -> Optional[str]:
return self._app_key
@property
def user(self) -> Optional[User]:
return self._user
@property
def app_injector(self) -> Optional[AppInjector]:
return self._app_injector
@property
def x_user_id(self) -> Optional[str]:
return self._x_user_id
def __dict__(self):
return {
"tracer": self.tracer,
"correlation_id": self.correlation_id,
"request_id": self.request_id,
"dev_mode": self.dev_mode,
"partition_id": self.partition_id,
"app_key": self.app_key,
"api_key": self.api_key,
"x_user_id": self.x_user_id,
}
def __repr__(self):
return json.dumps(self.__dict__())
def get_ctx() -> Context:
return Context.current()
def get_or_create_ctx() -> Context:
"""
This method aims to be used in middleware, where the order of Context creation is not guaranteed
:return an empty Context with default values
"""
try:
return get_ctx()
except LookupError:
ctx = Context()
ctx.set_current()
return ctx
import urllib
import dateutil.parser
SEP = "benderseparator"
EMPTY = "benderempty"
BENDINGCONTEXT = "bending_context"
WDMS_FRAGMENT = "wdms"
DELFI_SOURCE = "delfi_source_entity"
class ConverterUtils:
@staticmethod
def decode_id(osdu_id: str) -> str:
"""
decode osdu style id to delfi id
"""
if osdu_id is None:
return None
return bytes.fromhex(osdu_id.split(":")[2]).decode()
@staticmethod
def fix_id(delfi_id: str, osdu_type: str) -> str:
if delfi_id is None:
return None
# Id will have the OSDU style but not pointing on an actual osdu item
# it will still be a delfi item - to be used with a get_as api
# TODO find a way to make delfi id match the osdu format
# TODO encode the delfi
id_as_list = delfi_id.split(sep=":")
encoded_str = delfi_id.encode().hex()
res_as_list = []
res_as_list.append(id_as_list[0])
res_as_list.append(osdu_type)
res_as_list.append(encoded_str)
res_as_list.append("")
return ":".join(res_as_list)
@staticmethod
def lookup(input_params: str, osdu_type: str) -> str:
# returns the id of the corresponding osdu type
# TODO implement this lookup with a cache
# Some of the lookup have fixed values such as WellboreTrajectoryType (Vertical, Directional, Horizontal),
# reference-data--VerticalMeasurementType,UnitOfMeasure
# Other have to be found in storage and stored in a cache
if input_params is None:
return None
input_params = input_params.split(SEP)
namespace = input_params[0]
delfi_value = input_params[1]
if delfi_value == EMPTY:
return None
# TODO lookup in catalog, put results in cache, insert if needed
ret = f"{namespace}:{osdu_type}:{urllib.parse.quote(delfi_value)}:"
return ret
@staticmethod
def find_in_meta(metas: [dict], search_attribute: str, search_value: str, returned_attribute: str) -> str:
if metas is None:
return EMPTY
for meta_item in metas:
if meta_item.get(search_attribute, EMPTY) == search_value:
return meta_item.get(returned_attribute, EMPTY)
return EMPTY
@staticmethod
def date_to_datetime(in_date: str) -> str:
return (
dateutil.parser.parse(in_date).strftime("%Y-%m-%dT%H:%M:%S.%f")
if in_date
else None
)
@staticmethod
def remove_none_from_dict(in_dict: dict) -> dict:
new_dict = {}
for k, v in in_dict.items():
if isinstance(v, dict):
v = ConverterUtils.remove_none_from_dict(v)
if v is not None:
new_dict[k] = v
return new_dict or None
@staticmethod
def _is_value_keepable(value):
"""
Utilitary function returning True is we keep the value
Value can be bool, string, numerical, list, dict, ...
Rejected values are EMPTY, None, [], {}, ()
None and [], {}, () are evaluated as false (but not equal to False)
value == False allow to keep boolean false values
:param v:
:return: True is we keep the value
"""
return value != EMPTY and (value or value == False)
@staticmethod
def remove_none(in_obj):
# This method can be optimized in python 3.8 with assignment expression PEP572 := https://stackoverflow.com/questions/4097518/intermediate-variable-in-a-list-comprehension-for-simultaneous-filtering-and-tra
#(x or x == False) keep x if x is not not None or if x a non empty container
if isinstance(in_obj, (list, tuple, set)):
return type(in_obj)(ConverterUtils.remove_none(x) for x in in_obj if ConverterUtils._is_value_keepable(x) and (
ConverterUtils.remove_none(x) or ConverterUtils.remove_none(x) == False))
elif isinstance(in_obj, dict):
return type(in_obj)(
(ConverterUtils.remove_none(k), ConverterUtils.remove_none(v)) for k, v in in_obj.items()
if k is not None and ConverterUtils._is_value_keepable(v)
and (ConverterUtils.remove_none(k) or ConverterUtils.remove_none(k) == False)
and (ConverterUtils.remove_none(v) or ConverterUtils.remove_none(v) == False)
)
else:
return in_obj
This diff is collapsed.
This diff is collapsed.
......@@ -32,7 +32,8 @@ from osdu_az.exceptions.data_access_error import (DataAccessError as OSDUPartiti
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from app.utils import get_ctx
from app.context import get_ctx
from app.helper.logger import get_logger
OSDU_DATA_ECOSYSTEM_SEARCH = "osdu-data-ecosystem-search"
OSDU_DATA_ECOSYSTEM_STORAGE = "osdu-data-ecosystem-storage"
......@@ -54,7 +55,7 @@ async def http_search_error_handler(request: Request, exc: OSDUSearchException)
"""
Catches and handles Exceptions raised by os-python-client
"""
get_ctx().logger.exception(f"http_search_error_handler - url: '{request.url}'")
get_logger().exception(f"http_search_error_handler - url: '{request.url}'")
if isinstance(exc, OSDUSearchUnexpectedResponse):
status = exc.status_code
errors = [load_content(exc.content)]
......@@ -75,7 +76,7 @@ async def http_storage_error_handler(request: Request, exc: OSDUStorageException
"""
Catches and handles Exceptions raised by os-python-client
"""
get_ctx().logger.exception(f"http_storage_error_handler - url: '{request.url}'")
get_logger().exception(f"http_storage_error_handler - url: '{request.url}'")
if isinstance(exc, OSDUStorageUnexpectedResponse) or isinstance(exc, OSDUStorageResponseValidationError):
status = exc.status_code
errors = [load_content(exc.content)]
......@@ -93,7 +94,7 @@ async def http_partition_error_handler(request: Request, exc: OSDUPartitionExcep
"""
Catches and handles Exceptions raised by os-python-client
"""
get_ctx().logger.exception(f"http_partition_error_handler - url: '{request.url}'")
get_logger().exception(f"http_partition_error_handler - url: '{request.url}'")
return JSONResponse({"origin": OSDU_DATA_ECOSYSTEM_PARTITION, "errors": [exc.message]},
status_code=exc.status_code)
......@@ -11,11 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic import ValidationError
from odes_search.exceptions import ApiException as OSDUSearchException
from odes_storage.exceptions import ApiException as OSDUStorageException
from osdu_az.exceptions.data_access_error import DataAccessError as OSDUPartitionException
from starlette import status
from .unhandled_error import unhandled_error_handler
from .validation_error import http422_error_handler
......@@ -24,11 +24,27 @@ from .client_error import (
http_storage_error_handler,
http_partition_error_handler
)
from fastapi import HTTPException
from fastapi.exception_handlers import http_exception_handler
__all__ = ['add_exception_handlers']
def create_custom_http_exception_handler(app, logger):
"""
overwrite the default fastapi HTTPException handler to log every 500 exception
https://fastapi.tiangolo.com/tutorial/handling-errors/
need to register this exception handler in a separate function here and call this function in start up event.
Because in add_exception_handlers function, we can't get an initialized logger
"""
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc: HTTPException):
if exc.status_code >= status.HTTP_500_INTERNAL_SERVER_ERROR:
logger.get_logger().exception(f"Internal server error - url: '{request.url}'")
return await http_exception_handler(request, exc)
def add_exception_handlers(app):
app.add_exception_handler(ValidationError, http422_error_handler)
app.add_exception_handler(OSDUSearchException, http_search_error_handler)
......
......@@ -15,13 +15,11 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from app.helper.logger import get_logger
async def unhandled_error_handler(request: Request, exc: Exception) -> JSONResponse:
"""
To handle wild exception not caught by other exception handlers
Logging wild exception is done by TracingMiddleware
"""
get_logger().exception(f"unhandled_error_handler - {request.url}")
return JSONResponse({"error": [str(exc)]}, status_code=HTTP_500_INTERNAL_SERVER_ERROR)
This diff is collapsed.
This diff is collapsed.
......@@ -18,6 +18,28 @@ def rename_cloud_role_func(service_name):
return callback_func
_maximum_azure_attribute_length = 2048
def azure_traces_processing(envelope):
"""
Return a function to process trace data, it reduces the size of 'request.url' field whether it is too big
to be sent.
It's used by AzureLogHandler and AzureExporter.
https://docs.microsoft.com/en-us/azure/azure-monitor/app/api-filtering-sampling#opencensus-python-telemetry-processors
"""
if hasattr(envelope.data, 'baseData'):
url = envelope.data.baseData.get('url')
if url and len(url) >= _maximum_azure_attribute_length:
suffix = '...'
truncated_url = url[:_maximum_azure_attribute_length-len(suffix)]
envelope.data.baseData['url'] = f'{truncated_url}{suffix}'
return True
def add_fields(**kwargs):
"""
Add key-value pairs to our homemade logger
......
......@@ -14,7 +14,9 @@
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu_aws.storage.storage_aws import AwsStorage
from osdu_aws.storage.dask_storage_parameters import get_dask_storage_parameters as aws_parameters
from app.context import Context
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from .app_injector import AppInjector, AppInjectorModule
from app.conf import Config
......@@ -34,4 +36,8 @@ class AwsInjector(AppInjectorModule):
@staticmethod
async def build_aws_dask_blob_storage() -> DaskBulkStorage:
raise NotImplementedError()
ctx: Context = Context.current()
tenant = await resolve_tenant(ctx.partition_id)
service_account_file=f'{Config.aws_region.value}$${Config.aws_env.value}'
params = await aws_parameters(tenant, service_account_file)
return await DaskBulkStorage.create(params)
This diff is collapsed.
This diff is collapsed.