Commit 9fc53c3c authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'master' into basic-apis-v2

parents ebf745db fd1cb4bb
Pipeline #57211 passed with stages
in 17 minutes and 21 seconds
......@@ -14,7 +14,7 @@
# limitations under the License.
variables:
PIP_REQUIREMENTS: "frozenrequirements_dev.txt requirements.txt requirements_dev.txt"
PIP_REQUIREMENTS: "requirements.txt requirements_dev.txt"
AZURE_SERVICE: wellbore-ddms
AZURE_DOCKER_SUBDIR: build/Dockerfile
......@@ -172,8 +172,7 @@ osdu-gcp-test-python:
- source env/bin/activate
- pip install --upgrade pip
- pip install wheel pytest pytest-cov
- pip install -r requirements.txt
- pip install -r requirements_dev.txt
- pip install -r $PIP_REQUIREMENTS
- cd tests/integration
- echo $OSDU_GCP_INTEGRATION_TESTER | base64 -d > file.json
- gcloud auth activate-service-account --key-file file.json
......
......@@ -42,6 +42,7 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org/) for data manipulation
- [pyarrow](https://pypi.org/project/pyarrow/) for load and save data into parquet format
- [opencensus](https://opencensus.io/guides/grpc/python/) for tracing and logging on cloud provider
- [dask](https://docs.dask.org/en/latest/) to manage huge amount of bulk data
### Library Dependencies
......@@ -60,6 +61,16 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
## Project Startup
### Dask Configuration - Locally
By default, It will use all memory available and use CPU resources through workers. The number of workers is determined by the quantity of core the current local machine has.
### Dask Configuration - In a cluster
In a container context, such as Kubernetes we recommend to set container memory limit at 3Gi of RAM and 4-8 CPUs.
At the minimum 1.2Gi and 1 cpu but performance will be reduced, but enough to handle WellLogs of 10 curves with 1M values each.
Note: container memory is not entirely dedicated to Dask workers, fastapi service with its process also require some.
### Run the service locally
1. Create virtual environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
......@@ -88,6 +99,12 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
pip install -r requirements.txt
```
Or, for a developer setup, this will install tools to help you work with the code.
```bash
pip install -r requirements.txt -r requirements_dev.txt
```
6. Run the service
```bash
......@@ -298,8 +315,9 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
```bash
LOCAL_PORT=<local_port>
docker run -d -p $LOCAL_PORT:8080 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
IMAGE_TAG=<image_name>
docker run -d -p $LOCAL_PORT:8080 -e CLOUD_PROVIDER=local -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH="/tmp" -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH="/tmp" -e OS_WELLBORE_DDMS_DEV_MODE=True -e USE_PARTITION_SERVICE=disabled $IMAGE_TAG
```
2. Access app on `http://127.0.0.1:<LOCAL_PORT>/api/os-wellbore-ddms/docs`
......@@ -312,11 +330,12 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
docker logs CONTAINER_ID
```
### Run Unit Tests Locally
```bash
# Install test dependencies
pip install -r requirements_dev.txt
pip install -r requirements.txt -r requirements_dev.txt
python -m pytest --junit-xml=unit_tests_report.xml --cov=app --cov-report=html --cov-report=xml ./tests/unit
```
......@@ -345,7 +364,36 @@ pytest ./functional --environment="./generated/postman_environment.json" --filte
For more information see the [integration tests README](tests/integration/README.md)
### Port Forward from Kubernetes
### Manage package dependencies
Anytime, you may want to ensure your virtual environment is in sync with your requirements specification.
For this you can use:
```bash
pip-sync
```
If you want to work with other requirements file, you can specify them
```bash
pip-sync requirements.txt requirements_dev.txt
```
If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use:
```bash
pip-compile
```
If you want to update the version of only one dependency, for instance fastapi:
```bash
pip-compile --upgrade-package fastapi
```
For more information: https://github.com/jazzband/pip-tools/
### Debugging:
#### Port Forward from Kubernetes
1. List the pods: `kubectl get pods`
2. Port forward: `kubectl port-forward pods/POD_NAME LOCAL_PORT:8080`
......
......@@ -25,7 +25,7 @@ from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
......@@ -37,16 +37,27 @@ from pyarrow.lib import ArrowException
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin
from dask.distributed import scheduler
def handle_pyarrow_exceptions(target):
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowException:
get_logger().exception(f"{target} raised exception")
raise BulkNotProcessable("Unable to process bulk")
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
......@@ -95,7 +106,7 @@ class DaskBulkStorage:
dask_client = dask_client or await DaskClient.create()
if DaskBulkStorage.client is not dask_client: # executed only once per dask client
DaskBulkStorage.client = dask_client
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -118,7 +129,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
......@@ -139,10 +149,17 @@ class DaskBulkStorage:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
"""
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
chunksize='25M',
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -189,7 +206,6 @@ class DaskBulkStorage:
engine='pyarrow',
storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
......@@ -199,8 +215,8 @@ class DaskBulkStorage:
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path,
self._parameters.storage_options)
def _check_incoming_chunk(self, df):
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
......@@ -211,8 +227,9 @@ class DaskBulkStorage:
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
@handle_pyarrow_exceptions
@internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or BulkId.new_bulk_id()
......@@ -290,17 +307,14 @@ class DaskBulkStorage:
@capture_timings('session_commit')
@with_trace('session_commit')
@handle_pyarrow_exceptions
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
if len(dfs) > 1: # set_index is not needed if no merge operations are done
dfs = self._map_with_trace(set_index, dfs)
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
while len(dfs) > 1:
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
......
......@@ -20,6 +20,8 @@ from logging import INFO
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -71,25 +73,25 @@ class SessionFileMeta:
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf): # TODO
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
if '_idx' not in ddf:
ddf['_idx'] = ddf.index # we need to create a temporary variable to set it as index
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx', sorted=True)
if not ddf.known_divisions:
return ddf.set_index(ddf.index, sorted=True)
return ddf
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1, df2):
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
return ddf
......@@ -188,6 +188,11 @@ class ConfigurationContainer:
description="""Comma separated list of module names to load.""",
default="log_recognition.routers.log_recognition") # Add modules to the list once they are refactored, so that they are included
min_worker_memory: EnvVar = EnvVar(
key='MIN_WORKER_MEMORY',
description='Min amount of memory for one worker',
default="512Mi")
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
......
......@@ -23,13 +23,19 @@ import tempfile
import json
from asyncio import iscoroutinefunction
import dask
from dask.utils import parse_bytes, format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import system
from distributed.deploy.utils import nprocesses_nthreads
from app.model.user import User
from app.injector.app_injector import AppInjector
from app.conf import Config
from time import perf_counter, process_time
from logging import INFO
import dask
from dask.distributed import Client as DaskDistributedClient
POOL_EXECUTOR_MAX_WORKER = 4
@lru_cache()
......@@ -37,14 +43,22 @@ def get_http_client_session(key: str = 'GLOBAL'):
return ClientSession(json_serialize=json.dumps)
POOL_EXECUTOR_MAX_WORKER = 4
class DaskException(Exception):
pass
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
""" Dask client """
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
""" used to ensure """
# Minimal amount of memory required for a Dask worker to not get bad performances
min_worker_memory_recommended = parse_bytes(Config.min_worker_memory.value)
# Amount of memory Reserved for fastApi server + ProcessPoolExecutors
memory_leeway = parse_bytes('600Mi')
@staticmethod
async def create() -> DaskDistributedClient:
......@@ -55,23 +69,78 @@ class DaskClient:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
get_logger().info(f"Dask client initialization started...")
DaskClient.client = await DaskDistributedClient(asynchronous=True,
logger = get_logger()
logger.info(f"Dask client initialization started...")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
DaskClient.client = await DaskDistributedClient(memory_limit=worker_memory_limit,
n_workers=n_workers,
threads_per_worker=threads_per_worker,
asynchronous=True,
processes=True,
dashboard_address=None,
diagnostics_port=None,
)
dashboard_address=None)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
def _get_system_memory():
return system.MEMORY_LIMIT
@staticmethod
def _available_memory_for_workers():
""" Return amount of RAM available for Dask's workers after withdrawing RAM required by server itself """
return max(0, (DaskClient._get_system_memory() - DaskClient.memory_leeway))
@staticmethod
def _recommended_workers_and_threads():
""" Return the recommended numbers of worker and threads according the cpus available provided by Dask """
return nprocesses_nthreads()
@staticmethod
def _get_dask_configuration(logger):
"""
Return recommended Dask workers configuration
"""
n_workers, threads_per_worker = DaskClient._recommended_workers_and_threads()
available_memory_bytes = DaskClient._available_memory_for_workers()
worker_memory_limit = int(available_memory_bytes / n_workers)
logger.info(f"Dask client - system.MEMORY_LIMIT: {format_bytes(DaskClient._get_system_memory())} "
f"- available_memory_bytes: {format_bytes(available_memory_bytes)} "
f"- min_worker_memory_recommended: {format_bytes(DaskClient.min_worker_memory_recommended)} "
f"- computed worker_memory_limit: {format_bytes(worker_memory_limit)} for {n_workers} workers")
if DaskClient.min_worker_memory_recommended > worker_memory_limit:
n_workers = available_memory_bytes // DaskClient.min_worker_memory_recommended
if not n_workers >= 1:
min_memory = DaskClient.min_worker_memory_recommended + DaskClient.memory_leeway
message = f'Not enough memory available to start Dask worker. ' \
f'Please, consider upgrading container memory to {format_bytes(min_memory)}'
logger.error(f"Dask client - {message} - "
f'n_workers: {n_workers} threads_per_worker: {threads_per_worker}, '
f'available_memory_bytes: {available_memory_bytes} ')
raise DaskException(message)
worker_memory_limit = available_memory_bytes / n_workers
logger.warning(f"Dask client - available RAM is too low. Reducing number of workers "
f"to {n_workers} running with {format_bytes(worker_memory_limit)} of RAM")
return n_workers, threads_per_worker, worker_memory_limit
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
await DaskClient.client.close() # or shutdown
DaskClient.client = None
def get_pool_executor():
if get_pool_executor._pool is None:
get_pool_executor._pool = concurrent.futures.ProcessPoolExecutor(POOL_EXECUTOR_MAX_WORKER)
......@@ -356,7 +425,6 @@ class Context:
return json.dumps(self.__dict__())
def get_ctx() -> Context:
return Context.current()
......@@ -382,6 +450,7 @@ def load_schema_example(file_name: str):
def make_log_captured_timing_handler(level=INFO):
def log_captured_timing(tag, wall, cpu):
Context.current().logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing
......@@ -390,6 +459,7 @@ default_capture_timing_handlers = [make_log_captured_timing_handler(INFO)]
def capture_timings(tag, handlers=default_capture_timing_handlers):
""" basic timing decorator, get both wall and cpu """
def decorate(target):
if iscoroutinefunction(target):
......@@ -434,7 +504,7 @@ class OpenApiResponse(NamedTuple):
example: Optional[dict] = None
#NOSONAR
# NOSONAR
class __OpenApiHandler:
def __init__(self):
self._handlers = {}
......
......@@ -14,9 +14,8 @@
FROM python:3.7-slim-buster
COPY requirements.txt frozenrequirements.txt ./
COPY requirements.txt ./
RUN pip install -r frozenrequirements.txt
RUN pip install -r requirements.txt
COPY ./app /app
......
......@@ -84,6 +84,8 @@ spec:
- mountPath: /azure-keyvault
name: azure-keyvault
readOnly: true
resources:
{{ toYaml .Values.resources | nindent 10 }}
livenessProbe:
httpGet:
path: {{ include "os-wellbore-ddms.prefix" . }}/healthz
......
......@@ -26,6 +26,12 @@ deployment:
replicaCount: 2
resources:
limits:
memory: 3Gi
requests:
memory: 3Gi
annotations:
buildNumber: []
buildOrigin: AzureDevops build/#{Build.DefinitionName}#
......
......@@ -140,7 +140,7 @@ steps:
appUrl=${{ parameters.baseUrl }}${{ parameters.basePath }}
echo "Testing App on ${appUrl}"
python tests/integration/gen_postman_env.py --token $(token) --base_url ${appUrl} --cloud_provider ${{ parameters.cloudProvider }} --acl_domain ${{ parameters.aclDomain }} --legal_tag ${{ parameters.legalTag }} --data_partition ${{ parameters.dataPartition }}
pip install virtualenv
......
adal==1.2.7
adlfs==0.7.7
aiohttp==3.7.4.post0
aioredis==1.3.1
anyio==3.2.1
asgiref==3.4.1
async-timeout==3.0.1
attrs==21.2.0
azure-common==1.1.27
azure-core==1.16.0
azure-datalake-store==0.0.52
azure-identity==1.6.0
azure-keyvault==4.1.0
azure-keyvault-certificates==4.3.0
azure-keyvault-keys==4.4.0
azure-keyvault-secrets==4.3.0
azure-storage-blob==12.8.1
backoff==1.11.1
boto3==1.18.1
botocore==1.21.1
cachetools==4.2.2
certifi==2021.5.30
cffi==1.14.6
chardet==4.0.0
charset-normalizer==2.0.3
click==8.0.1
cloudpickle==1.6.0
colorama==0.4.4
cryptography==3.4.7
dask==2021.6.2
decorator==5.0.9
distributed==2021.6.2
fastapi==0.66.0
fsspec==2021.7.0
gcsfs==2021.7.0
google-api-core==1.31.0
google-auth==1.33.0
google-auth-oauthlib==0.4.4
google-cloud-core==1.7.1
google-cloud-monitoring==0.36.0
google-cloud-trace==0.24.0
googleapis-common-protos==1.53.0
grpcio==1.38.1
h11==0.12.0
HeapDict==1.0.1
hiredis==2.0.0
httpcore==0.13.6
httpx==0.18.2
idna==3.2
isodate==0.6.0
jmespath==0.10.0
JSONBender==0.9.3
jsonpath-ng==1.5.3
locket==0.2.1
msal==1.12.0
msal-extensions==0.3.0
msgpack==1.0.2
msrest==0.6.21
multidict==5.1.0
numpy==1.21.1
oauthlib==3.1.1
opencensus==0.7.13
opencensus-context==0.1.2
opencensus-ext-azure==1.0.8
opencensus-ext-logging==0.1.0
opencensus-ext-ocagent==0.7.1
opencensus-ext-stackdriver==0.7.4
opencensus-proto==0.1.0
packaging==21.0
pandas==1.2.4
partd==1.2.0
ply==3.11
portalocker==1.7.1
protobuf==3.17.3
psutil==5.8.0
pyarrow==4.0.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydantic==1.8.2
PyJWT==2.1.0
pyparsing==2.4.7
python-dateutil==2.8.2
python-multipart==0.0.5
python-rapidjson==1.4
python-ulid==1.0.3
pytz==2021.1
PyYAML==5.4.1
requests==2.26.0
requests-oauthlib==1.3.0
rfc3986==1.5.0
rsa==4.7.2
s3transfer==0.5.0
six==1.16.0
sniffio==1.2.0
sortedcontainers==2.4.0
starlette==0.14.2
structlog==21.1.0
tblib==1.7.0
toolz==0.11.1