Commit 940aa1d0 authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'master' into container-reqs

parents f6502548 232ea2d7
......@@ -28,7 +28,7 @@ variables:
AWS_INT_TEST_TYPE: python
OSDU_GCP_PROJECT_NAME: nice-etching-277309
OSDU_GCP_CLUSTER: primary
OSDU_GCP_CLUSTER: asm-primary
OSDU_GCP_ZONE: us-central1-c
OSDU_GCP_SERVICE: wellbore
OSDU_GCP_HELM_DEPLOYMENT_DIR: devops/gcp/osdu-helm
......@@ -180,7 +180,7 @@ osdu-gcp-test-python:
- gcloud config set project $OSDU_GCP_PROJECT_NAME
- >
python gen_postman_env.py
--token $(gcloud auth print-identity-token)
--token $(gcloud auth print-access-token)
--base_url ${OSDU_GCP_URL}${OSDU_GCP_SERVICE_PATH}
--cloud_provider $OSDU_GCP_VENDOR
--data_partition $OSDU_GCP_TENANT
......
......@@ -20,7 +20,7 @@ The following software have components provided under the terms of this license:
- google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-trace (from https://github.com/googleapis/googleapis)
- googleapis-common-protos (from https://github.com/googleapis/googleapis)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- importlib-metadata (from http://importlib-metadata.readthedocs.io/)
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
- msgpack (from http://msgpack.org/)
......@@ -38,7 +38,7 @@ The following software have components provided under the terms of this license:
- pandas (from http://pandas.pydata.org)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from )
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-multipart (from http://github.com/andrew-d/python-multipart)
- requests (from http://python-requests.org)
......@@ -58,7 +58,7 @@ BSD-2-Clause
The following software have components provided under the terms of this license:
- colorama (from https://github.com/tartley/colorama)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
......@@ -86,7 +86,7 @@ The following software have components provided under the terms of this license:
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- hiredis (from https://github.com/redis/hiredis-py)
- httpcore (from https://github.com/encode/httpcore)
- httpx (from https://github.com/encode/httpx)
......@@ -145,14 +145,14 @@ GPL-2.0-only
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
========================================================================
GPL-2.0-or-later
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
========================================================================
GPL-3.0-only
......@@ -160,7 +160,7 @@ GPL-3.0-only
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org)
......@@ -169,7 +169,7 @@ ISC
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib)
========================================================================
......@@ -177,7 +177,7 @@ Info-ZIP
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
========================================================================
JSON
......@@ -237,7 +237,7 @@ The following software have components provided under the terms of this license:
- cffi (from http://cffi.readthedocs.org)
- coverage (from https://coverage.readthedocs.io)
- fastapi (from https://github.com/tiangolo/fastapi)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- h11 (from https://github.com/python-hyper/h11)
- iniconfig (from http://github.com/RonnyPfannschmidt/iniconfig)
- jmespath (from https://github.com/jmespath/jmespath.py)
......@@ -256,7 +256,7 @@ The following software have components provided under the terms of this license:
- pyrsistent (from http://github.com/tobgu/pyrsistent/)
- pytest (from http://pytest.org)
- pytest-cov (from https://github.com/pytest-dev/pytest-cov)
- pytest-httpx (from )
- pytest-httpx (from https://colin-b.github.io/pytest_httpx/)
- pytest-mock (from https://github.com/pytest-dev/pytest-mock/)
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
- python-ulid (from https://github.com/mdomke/python-ulid)
......@@ -296,7 +296,7 @@ OpenSSL
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
========================================================================
Python-2.0
......@@ -331,7 +331,7 @@ Unlicense
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
========================================================================
WTFPL
......@@ -359,7 +359,7 @@ Zlib
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org)
========================================================================
......@@ -368,7 +368,7 @@ public-domain
The following software have components provided under the terms of this license:
- botocore (from https://github.com/boto/botocore)
- grpcio (from http://www.grpc.io)
- grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/)
......
......@@ -18,26 +18,27 @@ import json
import time
from contextlib import suppress
from functools import wraps
from logging import getLogger
from operator import attrgetter
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
import fsspec
import pandas as pd
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings, get_wdms_temp_dir
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from app.utils import capture_timings, get_wdms_temp_dir, get_ctx
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
......@@ -55,9 +56,12 @@ def handle_pyarrow_exceptions(target):
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
get_logger().debug("WorkerPlugin initialised")
super().__init__()
......@@ -69,8 +73,8 @@ class DefaultWorkerPlugin(WorkerPlugin):
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
exc = self.worker.exceptions[key]
getLogger().exception("Task '%s' has failed with exception: %s" % (key, str(exc)))
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
class DaskBulkStorage:
......@@ -84,7 +88,7 @@ class DaskBulkStorage:
""" use `create` to create instance """
self._parameters = None
self._fs = None
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
......@@ -94,7 +98,7 @@ class DaskBulkStorage:
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True)
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -103,8 +107,8 @@ class DaskBulkStorage:
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
get_logger().debug(f"dask client initialized : {DaskBulkStorage.client}")
get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
return instance
......@@ -146,9 +150,10 @@ class DaskBulkStorage:
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
"""
get_logger().debug(f"loading bulk : {path}")
return self.client.submit(dd.read_parquet, path, engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
......@@ -156,7 +161,24 @@ class DaskBulkStorage:
"""
return self._load(self._get_blob_path(record_id, bulk_id))
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.submit(wrap_trace_process, *args, **kwargs)
def _map_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.map(wrap_trace_process, *args, **kwargs)
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
......@@ -172,17 +194,19 @@ class DaskBulkStorage:
we should be able to change or support other format easily ?
schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values
"""
return self.client.submit(dd.to_parquet, ddf, path, schema={}, engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(dd.to_parquet, ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
returns a Future<None>
"""
return self.client.submit(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _check_incoming_chunk(self, df):
# TODO should we test if is_monotonic?, unique ?
......@@ -289,13 +313,12 @@ class DaskBulkStorage:
if not dfs:
raise BulkNotProcessable("No data to commit")
dfs = self.client.map(set_index, dfs)
dfs = self._map_with_trace(set_index, dfs)
while len(dfs) > 1:
dfs = [self.client.submit(do_merge, a, b) for a, b in by_pairs(dfs)]
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
return await self.save_blob(dfs[0], record_id=session.recordId)
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
......
from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.conf import Config
_EXPORTER = None
def wrap_trace_process(*args, **kwargs):
global _EXPORTER
target_func = kwargs.pop('target_func')
span_context = kwargs.pop('span_context')
if not span_context or not target_func:
raise AttributeError("Keyword arguments should contain 'target_func' and 'span_context'")
if _EXPORTER is None:
_EXPORTER = create_exporter(service_name=Config.service_name.value)
tracer = open_tracer.Tracer(span_context=span_context,
sampler=AlwaysOnSampler(),
exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span:
span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs)
......@@ -15,7 +15,7 @@
import json
import asyncio
from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List
from typing import Union, AnyStr, IO, Optional, List, Dict
from pathlib import Path
import numpy as np
......@@ -48,23 +48,12 @@ class DataframeSerializerSync:
columns: List[Union[str, int, float]] = None
index: List[Union[str, int, float]] = None
class IndexFormat(BaseModel):
# TODO
pass
class ColumnFormat(BaseModel):
# TODO
pass
class RecordsFormat(BaseModel):
# TODO
pass
__root__: Dict[str, Dict[Union[str, int, float], Union[str, int, float]]]
schema_dict = {
JSONOrient.split: SplitFormat.schema(),
JSONOrient.index: IndexFormat.schema(),
JSONOrient.columns: ColumnFormat.schema(),
JSONOrient.records: RecordsFormat.schema()
JSONOrient.columns: ColumnFormat.schema()
}
return schema_dict[JSONOrient.get(orient)]
......
......@@ -17,12 +17,8 @@ from typing import Union
class JSONOrient(str, Enum):
# not allow 'table' because very verbose then comes with significant overhead
# not allow 'values' because cannot carry index nor column
split = "split"
index = "index"
columns = "columns"
records = "records"
@classmethod
def get(cls, orient: Union[str, "JSONOrient"]) -> "JSONOrient":
......
......@@ -183,10 +183,10 @@ class ConfigurationContainer:
default='300',
factory=lambda x: int(x))
extension_modules: EnvVar = EnvVar(
key='EXTENSION_MODULES',
description="""Comma separated list of extension module names to load.""",
default=None)
modules: EnvVar = EnvVar(
key='MODULES',
description="""Comma separated list of module names to load.""",
default="") # Add modules to the list once they are refactored, so that they are included
_environment_dict: Dict = os.environ
......@@ -369,6 +369,7 @@ def check_environment(configuration):
AUTHORIZATION_HEADER_NAME = 'Authorization'
APP_KEY_HEADER_NAME = 'appKey'
APP_ID_HEADER_NAME = 'x-app-id'
CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id'
# Wellbore Domain Services Extensions
> :warning: **This is an alpha feature**
> Implementation changes are expected, and will not be announced.
[[_TOC_]]
## Use cases
### Include new routers
1. Add a new directory `<extension package name>/routers` under `app/extensions`
2. Add a new python module `<router module name>.py`
```
/wellbore-domain-services
- app
|- extensions
|- <extension package name>
|- __init__.py
|- routers
|- <router module name>.py
|- __init__.py
```
3. In `<router module name>.py` include:
```python
from fastapi import APIRouter
router = APIRouter()
router.prefix = '<extension router prefix>'
router.tags = ['<extension router tags>']
def can_run() -> (bool, str):
# Include the extension router specific checks if applicable.
# WDMS main app will skip loading the extension module routers if it returns False.
return True
```
4. Include new service endpoints to `router` in `<router module name>.py`.
- Use `@router.[get|post|delete|put|patch|...]` fastapi decorator to include new methods to the router
- Check WDMS routers implementation under `app/routers`
- The same pattern is encouraged to be used in the extension routers.
```python
from fastapi import APIRouter, Depends, status
from app.utils import Context, get_ctx
from app.model.model_curated import *
from app.clients.storage_service_client import get_storage_record_service
from app.model.model_utils import from_record
router = APIRouter() # From previous step
# E.g.: Including a GET method API
@router.get('/markers/{marker_id}',
response_model=marker,
summary="Get the marker using wks:marker:1.0.4 schema",
description="""Get the Marker object using its **id**.""",
operation_id="get_marker",
responses={status.HTTP_404_NOT_FOUND: {"description": "marker not found"}},
response_model_exclude_unset=True)
async def get_marker(
marker_id: str,
ctx: Context = Depends(get_ctx)
) -> marker:
storage_client = await get_storage_record_service(ctx)
marker_record = await storage_client.get_record(id=marker_id, data_partition_id=ctx.partition_id)
return from_record(marker, marker_record)
```
5. Include the full router module name in `EXTENSION_MODULES` environment variable
E.g.: `app.extensions.<extension package name>.routers.<router module name>`
6. Update the deployment scripts accordingly.
7. Run WDMS service, and check in the logs for the messages
```
Loading `app.extensions.<extension package>.routers.<router module>` extension
Done. `app.extensions.<extension package>.routers.<router module>` loaded
```
8. Verify the new API endpoints were added under the provided tag,
and with the given prefix. `https://{base_url}/api/os-wellbore-ddms/docs/{Tag}`
9. Include test
- Unit tests are to be placed under `tests/unit/extensions/<extension package name>`
- Integration tests are to be placed under `tests/integration/functional/extensions/<extension package name>`
#### Troubleshooting
##### A. Wrong router module configuration
Check step 3 above
```shell
Loading `app.extensions.{}.routers.{}` extension
Failed to load `app.extensions.{}.routers.{}` extension.
Module not configured properly. module 'app.extensions.{}.routers.{}' has no attribute 'router'
```
##### B. Wrong module name
Review steps 2 and 4 above
```shell
Loading `app.extensions.{}.routers.{}` extension
Failed to load `app.extensions.{}.routers.{}` extension.
Module not found.
No module named 'app.extensions.{}.routers.{}'
```
##### C. Trailing comma in `EXTENSION_MODULES` list
Review step 4 above, make sure there is no trailing comma in the `EXTENSION_MODULES` list.
```shell
Loading `` extension
Failed to load `` extension. Empty module name
```
##### D. Empty router prefix or tags
Check step 3 above
```shell
Loading `app.extensions.{}.routers.{}` extension
Failed to load `app.extensions.{}.routers.{}` extension.
Module not configured properly. Router prefix cannot be empty.
```
\ No newline at end of file
......@@ -6,6 +6,7 @@ from app.utils import Context
from .app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from osdu_ibm.storage.dask_storage_parameters import get_dask_storage_parameters as ibm_parameters
class IBMInjector(AppInjectorModule):
......@@ -25,4 +26,8 @@ class IBMInjector(AppInjectorModule):
@staticmethod
async def build_ibm_dask_blob_storage() -> DaskBulkStorage:
raise NotImplementedError()
# raise NotImplementedError()
ctx: Context = Context.current()
tenant = await resolve_tenant(ctx.partition_id)
params = await ibm_parameters(tenant)
return await DaskBulkStorage.create(params)
......@@ -87,6 +87,13 @@ class TracingMiddleware(BaseHTTPMiddleware):
tracer.add_attribute_to_current_span(
attribute_key=conf.CORRELATION_ID_HEADER_NAME,
attribute_value=correlation_id)
ctx_partition_id = get_or_create_ctx().partition_id
partition_id = ctx_partition_id if ctx_partition_id is not None \
else request.headers.get(conf.PARTITION_ID_HEADER_NAME)
tracer.add_attribute_to_current_span(
attribute_key=conf.PARTITION_ID_HEADER_NAME,
attribute_value=partition_id)
request_content_type = request.headers.get("Content-type")
tracer.add_attribute_to_current_span(attribute_key="request.header Content-type",
......@@ -96,6 +103,10 @@ class TracingMiddleware(BaseHTTPMiddleware):
tracer.add_attribute_to_current_span(attribute_key="request.header Content-length",
attribute_value=request_content_length)
app_id = request.headers.get(conf.APP_ID_HEADER_NAME)
tracer.add_attribute_to_current_span(attribute_key=conf.APP_ID_HEADER_NAME,
attribute_value=app_id)
@staticmethod
def _after_request(request: Request, response: Response, tracer):
......
......@@ -40,6 +40,37 @@ class LogServiceDateInterval(DDMSBaseModel):
EndDate: Optional[datetime] = None
class AvailableTrajectoryStationProperty(DDMSBaseModel):
"""
A set of properties describing a trajectory station property which is available for this instance of a WellboreTrajectory.
"""
TrajectoryStationPropertyTypeID: Optional[
constr(
regex=r'^[\w\-\.]+:reference-data\-\-TrajectoryStationPropertyType:[\w\-\.\:\%]+:[0-9]*$'
)
] = Field(
None,
description='The reference to a trajectory station property type - of if interpreted as channels, the curve or channel name type, identifying e.g. MD, Inclination, Azimuth. This is a relationship to a reference-data--TrajectoryStationPropertyType record id.',
example='partition-id:reference-data--TrajectoryStationPropertyType:AzimuthTN:',
title='Trajectory Station Property Type ID',
)
StationPropertyUnitID: Optional[
constr(regex=r'^[\w\-\.]+:reference-data\-\-UnitOfMeasure:[\w\-\.\:\%]+:[0-9]*$')
] = Field(
None,
description='Unit of Measure for the station properties of type TrajectoryStationPropertyType.',
example='partition-id:reference-data--UnitOfMeasure:dega:',
title='Station Property Unit ID',
)
Name: Optional[str] = Field(
None,
description='The name of the curve (e.g. column in a CSV document) as originally found. If absent The name of the TrajectoryCurveType is intended to be used.',
example='AzimuthTN',
title='Name',
)
class Owner(DDMSBaseModel):
__root__: constr(
regex