Commit 62b9c32d authored by Alexandre Vincent's avatar Alexandre Vincent
Browse files

Merge branch 'master' into 'avincent/pip-tools-dependency-management'

# Conflicts:
#   README.md
parents c393dbb9 dd5c8192
......@@ -42,6 +42,7 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org/) for data manipulation
- [pyarrow](https://pypi.org/project/pyarrow/) for load and save data into parquet format
- [opencensus](https://opencensus.io/guides/grpc/python/) for tracing and logging on cloud provider
- [dask](https://docs.dask.org/en/latest/) to manage huge amount of bulk data
### Library Dependencies
......@@ -60,6 +61,16 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
## Project Startup
### Dask Configuration - Locally
By default, It will use all memory available and use CPU resources through workers. The number of workers is determined by the quantity of core the current local machine has.
### Dask Configuration - In a cluster
In a container context, such as Kubernetes we recommend to set container memory limit at 3Gi of RAM and 4-8 CPUs.
At the minimum 1.2Gi and 1 cpu but performance will be reduced, but enough to handle WellLogs of 10 curves with 1M values each.
Note: container memory is not entirely dedicated to Dask workers, fastapi service with its process also require some.
### Run the service locally
1. Create virtual environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
......@@ -304,8 +315,9 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
```bash
LOCAL_PORT=<local_port>
docker run -d -p $LOCAL_PORT:8080 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
IMAGE_TAG=<image_name>
docker run -d -p $LOCAL_PORT:8080 -e CLOUD_PROVIDER=local -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH="/tmp" -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH="/tmp" -e OS_WELLBORE_DDMS_DEV_MODE=True -e USE_PARTITION_SERVICE=disabled $IMAGE_TAG
```
2. Access app on `http://127.0.0.1:<LOCAL_PORT>/api/os-wellbore-ddms/docs`
......@@ -318,6 +330,7 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
docker logs CONTAINER_ID
```
### Run Unit Tests Locally
```bash
......@@ -373,8 +386,8 @@ pip-compile
For more information: https://github.com/jazzband/pip-tools/
### Port Forward from Kubernetes
### Debugging:
#### Port Forward from Kubernetes
1. List the pods: `kubectl get pods`
2. Port forward: `kubectl port-forward pods/POD_NAME LOCAL_PORT:8080`
......
......@@ -37,16 +37,27 @@ from pyarrow.lib import ArrowException
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin
from dask.distributed import scheduler
def handle_pyarrow_exceptions(target):
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowException:
get_logger().exception(f"{target} raised exception")
raise BulkNotProcessable("Unable to process bulk")
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
......@@ -118,7 +129,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
......@@ -189,7 +199,6 @@ class DaskBulkStorage:
engine='pyarrow',
storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
......@@ -199,8 +208,8 @@ class DaskBulkStorage:
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path,
self._parameters.storage_options)
def _check_incoming_chunk(self, df):
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
......@@ -211,8 +220,9 @@ class DaskBulkStorage:
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
@handle_pyarrow_exceptions
@internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or BulkId.new_bulk_id()
......@@ -290,7 +300,7 @@ class DaskBulkStorage:
@capture_timings('session_commit')
@with_trace('session_commit')
@handle_pyarrow_exceptions
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
......
......@@ -188,6 +188,11 @@ class ConfigurationContainer:
description="""Comma separated list of module names to load.""",
default="log_recognition.routers.log_recognition") # Add modules to the list once they are refactored, so that they are included
min_worker_memory: EnvVar = EnvVar(
key='MIN_WORKER_MEMORY',
description='Min amount of memory for one worker',
default="512Mi")
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
......
......@@ -23,13 +23,19 @@ import tempfile
import json
from asyncio import iscoroutinefunction
import dask
from dask.utils import parse_bytes, format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import system
from distributed.deploy.utils import nprocesses_nthreads
from app.model.user import User
from app.injector.app_injector import AppInjector
from app.conf import Config
from time import perf_counter, process_time
from logging import INFO
import dask
from dask.distributed import Client as DaskDistributedClient
POOL_EXECUTOR_MAX_WORKER = 4
@lru_cache()
......@@ -37,14 +43,22 @@ def get_http_client_session(key: str = 'GLOBAL'):
return ClientSession(json_serialize=json.dumps)
POOL_EXECUTOR_MAX_WORKER = 4
class DaskException(Exception):
pass
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
""" Dask client """
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
""" used to ensure """
# Minimal amount of memory required for a Dask worker to not get bad performances
min_worker_memory_recommended = parse_bytes(Config.min_worker_memory.value)
# Amount of memory Reserved for fastApi server + ProcessPoolExecutors
memory_leeway = parse_bytes('600Mi')
@staticmethod
async def create() -> DaskDistributedClient:
......@@ -55,23 +69,78 @@ class DaskClient:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
get_logger().info(f"Dask client initialization started...")
DaskClient.client = await DaskDistributedClient(asynchronous=True,
logger = get_logger()
logger.info(f"Dask client initialization started...")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
DaskClient.client = await DaskDistributedClient(memory_limit=worker_memory_limit,
n_workers=n_workers,
threads_per_worker=threads_per_worker,
asynchronous=True,
processes=True,
dashboard_address=None,
diagnostics_port=None,
)
dashboard_address=None)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
def _get_system_memory():
return system.MEMORY_LIMIT
@staticmethod
def _available_memory_for_workers():
""" Return amount of RAM available for Dask's workers after withdrawing RAM required by server itself """
return max(0, (DaskClient._get_system_memory() - DaskClient.memory_leeway))
@staticmethod
def _recommended_workers_and_threads():
""" Return the recommended numbers of worker and threads according the cpus available provided by Dask """
return nprocesses_nthreads()
@staticmethod
def _get_dask_configuration(logger):
"""
Return recommended Dask workers configuration
"""
n_workers, threads_per_worker = DaskClient._recommended_workers_and_threads()
available_memory_bytes = DaskClient._available_memory_for_workers()
worker_memory_limit = int(available_memory_bytes / n_workers)
logger.info(f"Dask client - system.MEMORY_LIMIT: {format_bytes(DaskClient._get_system_memory())} "
f"- available_memory_bytes: {format_bytes(available_memory_bytes)} "
f"- min_worker_memory_recommended: {format_bytes(DaskClient.min_worker_memory_recommended)} "
f"- computed worker_memory_limit: {format_bytes(worker_memory_limit)} for {n_workers} workers")
if DaskClient.min_worker_memory_recommended > worker_memory_limit:
n_workers = available_memory_bytes // DaskClient.min_worker_memory_recommended
if not n_workers >= 1:
min_memory = DaskClient.min_worker_memory_recommended + DaskClient.memory_leeway
message = f'Not enough memory available to start Dask worker. ' \
f'Please, consider upgrading container memory to {format_bytes(min_memory)}'
logger.error(f"Dask client - {message} - "
f'n_workers: {n_workers} threads_per_worker: {threads_per_worker}, '
f'available_memory_bytes: {available_memory_bytes} ')
raise DaskException(message)
worker_memory_limit = available_memory_bytes / n_workers
logger.warning(f"Dask client - available RAM is too low. Reducing number of workers "
f"to {n_workers} running with {format_bytes(worker_memory_limit)} of RAM")
return n_workers, threads_per_worker, worker_memory_limit
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
await DaskClient.client.close() # or shutdown
DaskClient.client = None
def get_pool_executor():
if get_pool_executor._pool is None:
get_pool_executor._pool = concurrent.futures.ProcessPoolExecutor(POOL_EXECUTOR_MAX_WORKER)
......@@ -356,7 +425,6 @@ class Context:
return json.dumps(self.__dict__())
def get_ctx() -> Context:
return Context.current()
......@@ -382,6 +450,7 @@ def load_schema_example(file_name: str):
def make_log_captured_timing_handler(level=INFO):
def log_captured_timing(tag, wall, cpu):
Context.current().logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing
......@@ -390,6 +459,7 @@ default_capture_timing_handlers = [make_log_captured_timing_handler(INFO)]
def capture_timings(tag, handlers=default_capture_timing_handlers):
""" basic timing decorator, get both wall and cpu """
def decorate(target):
if iscoroutinefunction(target):
......@@ -434,7 +504,7 @@ class OpenApiResponse(NamedTuple):
example: Optional[dict] = None
#NOSONAR
# NOSONAR
class __OpenApiHandler:
def __init__(self):
self._handlers = {}
......
......@@ -84,6 +84,8 @@ spec:
- mountPath: /azure-keyvault
name: azure-keyvault
readOnly: true
resources:
{{ toYaml .Values.resources | nindent 10 }}
livenessProbe:
httpGet:
path: {{ include "os-wellbore-ddms.prefix" . }}/healthz
......
......@@ -26,6 +26,12 @@ deployment:
replicaCount: 2
resources:
limits:
memory: 3Gi
requests:
memory: 3Gi
annotations:
buildNumber: []
buildOrigin: AzureDevops build/#{Build.DefinitionName}#
......
......@@ -11,16 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from app.utils import DaskClient
import asyncio
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pytest
from tests.unit.test_utils import ctx_fixture, nope_logger_fixture
import mock
from app.utils import DaskException
from app.utils import DaskClient
from dask.utils import parse_bytes
from app.helper import logger
from app.bulk_persistence.dask.dask_bulk_storage import (BulkNotFound,
BulkNotProcessable,
DaskBulkStorage,
......@@ -28,8 +33,6 @@ from app.bulk_persistence.dask.dask_bulk_storage import (BulkNotFound,
from app.persistence.sessions_storage import (Session, SessionState,
SessionUpdateMode)
from tests.unit.test_utils import ctx_fixture, nope_logger_fixture
def generate_df(columns, index):
def gen_values(col_name, size):
......@@ -340,3 +343,33 @@ async def test_duplicate_index(test_session, dask_storage: DaskBulkStorage):
# with session
with pytest.raises(BulkNotProcessable):
await dask_storage.session_add_chunk(test_session, df_ref)
@pytest.mark.parametrize("system_memory, worker_created", [
(42, 0),
((DaskClient.min_worker_memory_recommended + DaskClient.memory_leeway), 1),
((DaskClient.min_worker_memory_recommended * 3 + DaskClient.memory_leeway), 3),
((DaskClient.min_worker_memory_recommended * 3 + DaskClient.memory_leeway) + 1000, 3)
])
@pytest.mark.asyncio
async def test_dask_workers_according_ram_available(system_memory, worker_created):
# clear existing Dask distributed client
await DaskClient.close()
logger._LOGGER = mock.MagicMock()
with mock.patch('app.utils.DaskClient._get_system_memory', mock.Mock(return_value=system_memory)):
with mock.patch('app.utils.DaskClient._recommended_workers_and_threads', mock.Mock(return_value=(10, 10))):
if DaskClient._available_memory_for_workers() < DaskClient.min_worker_memory_recommended:
with pytest.raises(DaskException):
await DaskClient.create()
else:
client = await DaskClient.create()
expected_worker_memory = (system_memory - DaskClient.memory_leeway) / worker_created
assert worker_created == len(client.cluster.scheduler.workers)
workers_has_expected_memory = [w.memory_limit == int(expected_worker_memory)
for _, w in client.cluster.scheduler.workers.items()]
assert all(workers_has_expected_memory)
await DaskClient.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment