Commit 92877de6 authored by Yunhua Koglin's avatar Yunhua Koglin
Browse files

merge from master to dev

parents 2eae1daf 0933cca7
......@@ -14,7 +14,7 @@
# limitations under the License.
variables:
PIP_REQUIREMENTS: "frozenrequirements_dev.txt requirements.txt requirements_dev.txt"
PIP_REQUIREMENTS: "requirements.txt requirements_dev.txt"
AZURE_SERVICE: wellbore-ddms
AZURE_DOCKER_SUBDIR: build/Dockerfile
......@@ -162,7 +162,6 @@ osdu-gcp-test-python:
stage: integration
image: gcr.io/google.com/cloudsdktool/cloud-sdk
needs: ['osdu-gcp-deploy']
allow_failure: true
only:
variables:
- $OSDU_GCP == 'true' && $OSDU_GCP_INT_TEST_TYPE == 'python'
......@@ -172,8 +171,10 @@ osdu-gcp-test-python:
- source env/bin/activate
- pip install --upgrade pip
- pip install wheel pytest pytest-cov
- pip install -r requirements.txt
- pip install -r requirements_dev.txt
- >
for REQ in $PIP_REQUIREMENTS ; do
pip install -r $REQ
done
- cd tests/integration
- echo $OSDU_GCP_INTEGRATION_TESTER | base64 -d > file.json
- gcloud auth activate-service-account --key-file file.json
......@@ -191,9 +192,6 @@ osdu-gcp-test-python:
# Allow failure on deployments
osdu-gcp-deploy:
allow_failure: true
ibm-deploy:
allow_failure: true
......@@ -201,6 +199,3 @@ ibm-deploy:
ibm-test:
allow_failure: true
aws-test-python:
allow_failure: true
......@@ -36,6 +36,7 @@ The following software have components provided under the terms of this license:
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
......@@ -100,6 +101,7 @@ The following software have components provided under the terms of this license:
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- pip-tools (from http://pypi.python.org/pypi/pip-tools/1.8.1rc3)
- ply (from http://www.dabeaz.com/ply/)
- protobuf (from https://developers.google.com/protocol-buffers/)
- psutil (from https://github.com/giampaolo/psutil)
......@@ -251,6 +253,7 @@ The following software have components provided under the terms of this license:
- natsort (from https://github.com/SethMMorton/natsort)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pluggy (from https://github.com/pytest-dev/pluggy)
- py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/)
......@@ -269,6 +272,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.1.0/)
- urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp)
......
......@@ -42,6 +42,7 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org/) for data manipulation
- [pyarrow](https://pypi.org/project/pyarrow/) for load and save data into parquet format
- [opencensus](https://opencensus.io/guides/grpc/python/) for tracing and logging on cloud provider
- [dask](https://docs.dask.org/en/latest/) to manage huge amount of bulk data
### Library Dependencies
......@@ -60,6 +61,16 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
## Project Startup
### Dask Configuration - Locally
By default, It will use all memory available and use CPU resources through workers. The number of workers is determined by the quantity of core the current local machine has.
### Dask Configuration - In a cluster
In a container context, such as Kubernetes we recommend to set container memory limit at 3Gi of RAM and 4-8 CPUs.
At the minimum 1.2Gi and 1 cpu but performance will be reduced, but enough to handle WellLogs of 10 curves with 1M values each.
Note: container memory is not entirely dedicated to Dask workers, fastapi service with its process also require some.
### Run the service locally
1. Create virtual environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
......@@ -88,6 +99,12 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
pip install -r requirements.txt
```
Or, for a developer setup, this will install tools to help you work with the code.
```bash
pip install -r requirements.txt -r requirements_dev.txt
```
6. Run the service
```bash
......@@ -298,8 +315,9 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
```bash
LOCAL_PORT=<local_port>
docker run -d -p $LOCAL_PORT:8080 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
IMAGE_TAG=<image_name>
docker run -d -p $LOCAL_PORT:8080 -e CLOUD_PROVIDER=local -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH="/tmp" -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH="/tmp" -e OS_WELLBORE_DDMS_DEV_MODE=True -e USE_PARTITION_SERVICE=disabled $IMAGE_TAG
```
2. Access app on `http://127.0.0.1:<LOCAL_PORT>/api/os-wellbore-ddms/docs`
......@@ -312,11 +330,12 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
docker logs CONTAINER_ID
```
### Run Unit Tests Locally
```bash
# Install test dependencies
pip install -r requirements_dev.txt
pip install -r requirements.txt -r requirements_dev.txt
python -m pytest --junit-xml=unit_tests_report.xml --cov=app --cov-report=html --cov-report=xml ./tests/unit
```
......@@ -345,7 +364,36 @@ pytest ./functional --environment="./generated/postman_environment.json" --filte
For more information see the [integration tests README](tests/integration/README.md)
### Port Forward from Kubernetes
### Manage package dependencies
Anytime, you may want to ensure your virtual environment is in sync with your requirements specification.
For this you can use:
```bash
pip-sync
```
If you want to work with other requirements file, you can specify them
```bash
pip-sync requirements.txt requirements_dev.txt
```
If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use:
```bash
pip-compile
```
If you want to update the version of only one dependency, for instance fastapi:
```bash
pip-compile --upgrade-package fastapi
```
For more information: https://github.com/jazzband/pip-tools/
### Debugging:
#### Port Forward from Kubernetes
1. List the pods: `kubectl get pods`
2. Port forward: `kubectl port-forward pods/POD_NAME LOCAL_PORT:8080`
......
......@@ -25,7 +25,7 @@ from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
......@@ -37,16 +37,27 @@ from pyarrow.lib import ArrowException
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin
from dask.distributed import scheduler
def handle_pyarrow_exceptions(target):
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowException:
get_logger().exception(f"{target} raised exception")
raise BulkNotProcessable("Unable to process bulk")
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
......@@ -95,7 +106,7 @@ class DaskBulkStorage:
dask_client = dask_client or await DaskClient.create()
if DaskBulkStorage.client is not dask_client: # executed only once per dask client
DaskBulkStorage.client = dask_client
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -118,7 +129,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
......@@ -139,10 +149,17 @@ class DaskBulkStorage:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
"""
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
chunksize='25M',
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -189,7 +206,6 @@ class DaskBulkStorage:
engine='pyarrow',
storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
......@@ -199,8 +215,8 @@ class DaskBulkStorage:
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path,
self._parameters.storage_options)
def _check_incoming_chunk(self, df):
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
......@@ -211,8 +227,9 @@ class DaskBulkStorage:
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
@handle_pyarrow_exceptions
@internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or BulkId.new_bulk_id()
......@@ -290,17 +307,14 @@ class DaskBulkStorage:
@capture_timings('session_commit')
@with_trace('session_commit')
@handle_pyarrow_exceptions
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
if len(dfs) > 1: # set_index is not needed if no merge operations are done
dfs = self._map_with_trace(set_index, dfs)
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
while len(dfs) > 1:
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
......
......@@ -20,6 +20,8 @@ from logging import INFO
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -71,25 +73,25 @@ class SessionFileMeta:
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf): # TODO
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
if '_idx' not in ddf:
ddf['_idx'] = ddf.index # we need to create a temporary variable to set it as index
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx', sorted=True)
if not ddf.known_divisions:
return ddf.set_index(ddf.index, sorted=True)
return ddf
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1, df2):
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
return ddf
......@@ -188,6 +188,11 @@ class ConfigurationContainer:
description="""Comma separated list of module names to load.""",
default="log_recognition.routers.log_recognition") # Add modules to the list once they are refactored, so that they are included
min_worker_memory: EnvVar = EnvVar(
key='MIN_WORKER_MEMORY',
description='Min amount of memory for one worker',
default="512Mi")
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
......
......@@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from fastapi import APIRouter, Depends
from odes_search.models import (
QueryRequest,
CursorQueryResponse,
)
CursorQueryResponse)
from app.clients.search_service_client import get_search_service
from ..common_parameters import REQUIRED_ROLES_READ
from app.utils import Context
......@@ -29,14 +27,16 @@ from .search import (
query_type_returned_fields,
basic_query_request,
basic_query_request_with_cursor)
router = APIRouter()
#osdu kind
# osdu kind
OSDU_WELLBORE_KIND = '*:wks:master-data--Wellbore:*'
OSDU_WELLLOG_KIND = '*:wks:work-product-component--WellLog:*'
OSDU_WELLBOREMARKERSET_KIND = '*:wks:work-product-component--WellboreMarkerSet:*'
WELLBORE_RELATIONSHIP = "WellboreID"
def create_relationships_id_str(data_type: str, id: str):
return f'data.{data_type}:\"{id}\"'
......@@ -73,7 +73,7 @@ async def query_request_with_specific_attribute(query_type: str, attribute: str,
return query_result
relationships_ids = [create_relationships_id_str(data_type, r["id"]) for r in response.results]
id_list = ' OR '.join(relationships_ids) # [a, b, c] => 'a OR b OR c'
id_list = ' OR '.join(relationships_ids) # [a, b, c] => 'a OR b OR c'
query = added_query(id_list, query)
......@@ -94,13 +94,14 @@ async def query_request_with_specific_attribute(query_type: str, attribute: str,
async def query_wellbores(body: SearchQuery = None, ctx: Context = Depends(get_ctx)):
return await basic_query_request_with_cursor(query_type, OSDU_WELLBORE_KIND, ctx, body.query)
@router.post('/query/wellbores/{wellboreId}/welllogs', summary='Query with cursor, search WellLogs by wellbore ID',
description=f"""Get all WellLogs object using its relationship Wellbore ID. <p>All WellLogs linked to this
specific ID will be returned</p>
<p>The WellLogs kind is {OSDU_WELLLOG_KIND} returns all records directly based on existing schemas</p>{REQUIRED_ROLES_READ}""",
response_model=CursorQueryResponse)
async def query_welllogs_bywellbore(wellboreId: str, body: SearchQuery = None,
ctx: Context = Depends(get_ctx)):
ctx: Context = Depends(get_ctx)):
query = added_relationships_query(wellboreId, WELLBORE_RELATIONSHIP, body.query)
return await basic_query_request(query_type, OSDU_WELLLOG_KIND, ctx, query)
......@@ -112,13 +113,15 @@ async def query_welllogs_bywellbore(wellboreId: str, body: SearchQuery = None,
<p>The WellLogs kind is {OSDU_WELLLOG_KIND} returns all records directly based on existing schemas</p>{REQUIRED_ROLES_READ}""",
response_model=CursorQueryResponse)
async def query_welllogs_bywellboreattribute(wellboreAttribute: str, body: SearchQuery = None,
ctx: Context = Depends(get_ctx)):
return await query_request_with_specific_attribute(query_type, wellboreAttribute, OSDU_WELLBORE_KIND, OSDU_WELLLOG_KIND,
ctx: Context = Depends(get_ctx)):
return await query_request_with_specific_attribute(query_type, wellboreAttribute, OSDU_WELLBORE_KIND,
OSDU_WELLLOG_KIND,
WELLBORE_RELATIONSHIP, ctx,
body.query)
@router.post('/query/wellbores/{wellboreId}/wellboremarkersets', summary='Query with cursor, search wellbore markersets by wellbore ID',
@router.post('/query/wellbores/{wellboreId}/wellboremarkersets',
summary='Query with cursor, search wellbore markersets by wellbore ID',
description=f"""Get all Wellbore Markersets objects using its relationship Wellbore ID. <p>All Markers linked to this
specific ID will be returned</p>
<p>The Wellbore Markerset kind is {OSDU_WELLBOREMARKERSET_KIND} returns all records directly based on existing schemas</p>{REQUIRED_ROLES_READ}""",
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fastapi import APIRouter, Depends
from odes_search.models import (
QueryRequest,
CursorQueryResponse,
CursorQueryRequest,
BaseModel,
Field,
Optional)
from app.clients.search_service_client import get_search_service
from app.utils import Context
from .search_v3 import (
added_query,
create_relationships_id_str,
query_type_returned_fields,
OSDU_WELLBORE_KIND,
get_ctx,
query_type)
from ..common_parameters import REQUIRED_ROLES_READ
router = APIRouter()
def update_query_with_names_based_search(names: str = None, user_query: str = None) -> str:
generated_query = f"data.FacilityName:{names}"
return added_query(generated_query, user_query)
def escape_forbidden_characters_for_search(input_str: str) -> str:
# Reserved character are listed here https://community.opengroup.org/osdu/documentation/-/blob/master/platform/tutorials/core-services/SearchService.md
# ? and * are allowed for wildcard search
reserved_char_list = ['+', '-', '=', '>', '<', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~',
':', '\\', '/']
def escape_char(input_char: str, reserved_char_list: [str]) -> str:
return input_char if input_char not in reserved_char_list else f"\\{input_char}"
result_str = ''.join([escape_char(char, reserved_char_list) for char in input_str])
return result_str
def added_relationships_query(id: str, data_type: str, query: str = None):
relationships_id = create_relationships_id_str(data_type, id)
return added_query(relationships_id, query)
class SearchQueryRequest(BaseModel):
# Used by as input, w/o kind, etc...
limit: "Optional[int]" = Field(None, alias="limit")
query: "Optional[str]" = Field(None, alias="query")
cursor: "Optional[str]" = Field(None, alias="cursor")
offset: "Optional[int]" = Field(None, alias="offset")
SearchQueryRequest.update_forward_refs()
DEFAULT_SEARCHQUERYREQUEST = SearchQueryRequest(limit=None, query=None, cursor=None, offset=None)
class SimpleCursorQueryRequest(BaseModel):
# Used by as input, w/o kind, etc...
limit: "Optional[int]" = Field(None, alias="limit")
query: "Optional[str]" = Field(None, alias="query")
cursor: "Optional[str]" = Field(None, alias="cursor")
SimpleCursorQueryRequest.update_forward_refs()
DEFAULT_CURSORQUERYREQUEST = SimpleCursorQueryRequest(limit=None, query=None, cursor=None)
class SimpleOffsetQueryRequest(BaseModel):
limit: "Optional[int]" = Field(None, alias="limit")
query: "Optional[str]" = Field(None, alias="query")
offset: "Optional[int]" = Field(None, alias="offset")
SimpleOffsetQueryRequest.update_forward_refs()
DEFAULT_QUERYREQUEST = SimpleOffsetQueryRequest(limit=None, query=None, offset=None)
async def query_request_with_cursor(query_type: str, kind: str, ctx: Context, query: SimpleCursorQueryRequest = None):
returned_fields = query_type_returned_fields(query_type)
query_request = CursorQueryRequest(kind=kind,
limit=query.limit or 1000,
query=query.query,
returnedFields=[returned_fields],
cursor=query.cursor)
client = await get_search_service(ctx)
return await client.query_with_cursor(
data_partition_id=ctx.partition_id,
cursor_query_request=query_request)
async def query_request_with_offset(query_type: str, kind: str, ctx: Context, query: SimpleOffsetQueryRequest = None):
returned_fields = query_type_returned_fields(query_type)
query_request = QueryRequest(kind=kind,
limit=query.limit or 1000,
query=query.query,
returnedFields=[returned_fields],
offset=query.offset)
client = await get_search_service(ctx)
return await client.query(
data_partition_id=ctx.partition_id,
query_request=query_request)
async def query_request(query_type: str, kind: str, ctx: Context, query: SearchQueryRequest = None):
# use offset if not not none else use cursor
query_as_dict = query.dict(exclude_none=True, exclude_unset=True)
if query.offset is not None:
cursor_query = SimpleOffsetQueryRequest(**query_as_dict)
return await query_request_with_offset(query_type, kind, ctx, cursor_query)
cursor_query = SimpleCursorQueryRequest(**query_as_dict)
return await query_request_with_cursor(query_type, kind, ctx, cursor_query)
@router.post('/query/wellbores/byname', summary='Query with cursor or offset, get wellbores',
description=f"""Get Wellbores object by name. <p>The wellbore kind is {OSDU_WELLBORE_KIND}
returns all records directly based on existing schemas. The query is done on data.FacilityName field</p>{REQUIRED_ROLES_READ}""",
response_model=CursorQueryResponse)
async def query_wellbores_by_name(names: str, body: SearchQueryRequest = DEFAULT_QUERYREQUEST,
ctx: Context = Depends(get_ctx)):
names = escape_forbidden_characters_for_search(names)
body.query = update_query_with_names_based_search(names=names, user_query=body.query)
return await query_request(query_type, OSDU_WELLBORE_KIND, ctx, body)
......@@ -23,13 +23,19 @@ import tempfile
import json
from asyncio import iscoroutinefunction
import dask
from dask.utils import parse_bytes, format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import system
from distributed.deploy.utils import nprocesses_nthreads
from app.model.user import User
from app.injector.app_injector import AppInjector
from app.conf import Config
from time import perf_counter, process_time
from logging import INFO
import dask
from dask.distributed import Client as DaskDistributedClient
POOL_EXECUTOR_MAX_WORKER = 4
@lru_cache()
......@@ -37,14 +43,22 @@ def get_http_client_session(key: str = 'GLOBAL'):
return ClientSession(json_serialize=json.dumps)
POOL_EXECUTOR_MAX_WORKER = 4
class DaskException(Exception):
pass
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
""" Dask client """
# Ensure access to critical section is done for only one coroutine
lock_client