Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (10)
Showing
with 427 additions and 226 deletions
......@@ -276,7 +276,7 @@ The following software have components provided under the terms of this license:
- pydantic (from https://github.com/samuelcolvin/pydantic)
- pyparsing (from http://pyparsing.wikispaces.com/)
- pyrsistent (from http://github.com/tobgu/pyrsistent/)
- pytest (from https://docs.pytest.org/en/latest/)
- pytest (from http://pytest.org, https://docs.pytest.org/en/latest/)
- pytest-cov (from https://github.com/pytest-dev/pytest-cov)
- pytest-httpx (from https://colin-b.github.io/pytest_httpx/)
- pytest-mock (from https://github.com/pytest-dev/pytest-mock/)
......
......@@ -20,6 +20,8 @@ from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from .exceptions import UnknownChannelsException, InvalidBulkException, NoBulkException, NoDataException, RecordNotFoundException
from .consistency_checks import ConsistencyException, DataConsistencyChecks
from .dask.client import DaskClient
from .dask.localcluster import DaskException
# TMP: this should probably not be exposed outside of the bulk_persistence package
from .temp_dir import get_temp_dir
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import dask
from dask.utils import format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import LocalCluster
from app.conf import Config
from .localcluster import get_dask_configuration
HOUR = 3600 # in seconds
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
@staticmethod
async def create() -> DaskDistributedClient:
if not DaskClient.lock_client:
DaskClient.lock_client = asyncio.Lock()
if not DaskClient.client:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
n_workers, threads_per_worker, worker_memory_limit = get_dask_configuration(config=Config, logger=logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
# Ensure memory used by workers is freed regularly despite memory leak
dask.config.set({'distributed.worker.lifetime.duration': HOUR * 24})
dask.config.set({'distributed.worker.lifetime.stagger': HOUR * 1})
dask.config.set({'distributed.worker.lifetime.restart': True})
logger.info(f"Dask cluster configuration - "
f"worker lifetime: {dask.config.get('distributed.worker.lifetime.duration')}s. "
f"stagger: {dask.config.get('distributed.worker.lifetime.stagger')}s.")
cluster = await LocalCluster(
asynchronous=True,
processes=True,
threads_per_worker=threads_per_worker,
n_workers=n_workers,
memory_limit=worker_memory_limit,
dashboard_address=None
)
# A worker could be killed when executing a task if lifetime duration elapsed,
# "cluster.adapt(min=N, max=N)" ensure the respawn of workers if it happens
cluster.adapt(minimum=n_workers, maximum=n_workers)
DaskClient.client = await DaskDistributedClient(cluster, asynchronous=True)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
# closing the cluster (started independently from the client)
cluster = await DaskClient.client.cluster
await cluster.close()
await DaskClient.client.close() # or shutdown
DaskClient.client = None
......@@ -27,10 +27,10 @@ from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings
from app.utils import capture_timings
from app.conf import Config
from .client import DaskClient
from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace, trace_attributes_root_span
......
from logging import Logger
from dask.utils import format_bytes, parse_bytes
from distributed import system
from distributed.deploy.utils import nprocesses_nthreads
from app.conf import Config
class DaskException(Exception):
pass
# Amount of memory Reserved for fastApi server + ProcessPoolExecutors
memory_leeway = parse_bytes("600Mi")
def min_worker_memory_recommended(config: Config):
"""Minimal amount of memory required for a Dask worker to not get bad performances"""
return parse_bytes(config.min_worker_memory.value)
def system_memory():
"""returns the detected memory limit for this system (done by distributed)"""
return system.MEMORY_LIMIT
def available_memory_for_workers():
"""Return amount of RAM available for Dask's workers after withdrawing RAM required by server itself"""
return max(0, (system_memory() - memory_leeway))
def recommended_workers_and_threads():
""" Return the recommended numbers of worker and threads according the cpus available provided by Dask """
return nprocesses_nthreads()
def get_dask_configuration(*, config: Config, logger: Logger):
"""
Return recommended Dask workers configuration
"""
n_workers, threads_per_worker = recommended_workers_and_threads()
available_memory_bytes = available_memory_for_workers()
worker_memory_limit = int(available_memory_bytes / n_workers)
logger.info(
f"Dask client - system.MEMORY_LIMIT: {format_bytes(system_memory())} "
f"- available_memory_bytes: {format_bytes(available_memory_bytes)} "
f"- min_worker_memory_recommended: {format_bytes(min_worker_memory_recommended(config))} "
f"- computed worker_memory_limit: {format_bytes(worker_memory_limit)} for {n_workers} workers"
)
if min_worker_memory_recommended(config) > worker_memory_limit:
n_workers = available_memory_bytes // min_worker_memory_recommended(config)
if not n_workers >= 1:
min_memory = min_worker_memory_recommended(config) + memory_leeway
message = (
f"Not enough memory available to start Dask worker. "
f"Please, consider upgrading container memory to {format_bytes(min_memory)}"
)
logger.error(
f"Dask client - {message} - "
f"n_workers: {n_workers} threads_per_worker: {threads_per_worker}, "
f"available_memory_bytes: {available_memory_bytes} "
)
raise DaskException(message)
worker_memory_limit = available_memory_bytes / n_workers
logger.warning(
f"Dask client - available RAM is too low. Reducing number of workers "
f"to {n_workers} running with {format_bytes(worker_memory_limit)} of RAM"
)
return n_workers, threads_per_worker, worker_memory_limit
......@@ -17,7 +17,6 @@ import odes_search
import odes_storage
from odes_search.api_client import AsyncSearchApi
from odes_storage.api_client import AsyncRecordsApi
from app.conf import Config
from dataclasses import dataclass
from typing import Optional
......@@ -40,26 +39,26 @@ class Limits:
keepalive_expiry: Optional[float] = 5.0
def make_search_client(host) -> SearchServiceClient:
def make_search_client(*, host, timeout, max_connections=None, max_keepalive_connections=None) -> SearchServiceClient:
search_client = odes_search.ApiClient(
host=host,
timeout=Config.de_client_config_timeout.value,
timeout=timeout,
limits=Limits(
max_connections=Config.de_client_config_max_connection.value or None,
max_keepalive_connections=Config.de_client_config_max_keepalive.value or None)
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections)
)
search_client.add_middleware(middleware=client_middleware)
search_client.add_middleware(middleware=backoff_middleware)
return odes_search.AsyncApis(search_client).search_api
def make_storage_record_client(host) -> StorageRecordServiceClient:
def make_storage_record_client(*, host, timeout, max_connections=None, max_keepalive_connections=None) -> StorageRecordServiceClient:
storage_client = odes_storage.ApiClient(
host=host,
timeout=Config.de_client_config_timeout.value,
timeout=timeout,
limits=Limits(
max_connections=Config.de_client_config_max_connection.value or None,
max_keepalive_connections=Config.de_client_config_max_keepalive.value or None)
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections)
)
storage_client.add_middleware(middleware=client_middleware)
storage_client.add_middleware(middleware=backoff_middleware)
......
......@@ -15,7 +15,7 @@
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from app.conf import *
from app.conf import Config
from app.helper.logger import get_logger
from .app_injector import AppInjector, AppInjectorModule, WithLifeTime
......@@ -131,8 +131,24 @@ class MainInjector(AppInjectorModule):
@staticmethod
async def build_storage_service_client(host=None, *args, **kwargs) -> StorageRecordServiceClient:
return make_storage_record_client(host or Config.service_host_storage.value)
if host is None:
host = Config.service_host_storage.value
return make_storage_record_client(
host=host,
timeout=Config.de_client_config_timeout.value,
max_connections=Config.de_client_config_max_connection.value,
max_keepalive_connections=Config.de_client_config_max_keepalive.value
)
@staticmethod
async def build_search_service_client(host=None, *args, **kwargs) -> SearchServiceClient:
return make_search_client(host or Config.service_host_search.value)
if host is None:
host = Config.service_host_search.value
return make_search_client(
host=host,
timeout=Config.de_client_config_timeout.value,
max_connections=Config.de_client_config_max_connection.value,
max_keepalive_connections=Config.de_client_config_max_keepalive.value
)
......@@ -24,19 +24,11 @@ from logging import INFO
from aiohttp import ClientSession
import dask
from dask.utils import parse_bytes, format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import system, LocalCluster
from distributed.deploy.utils import nprocesses_nthreads
from .bulk_persistence import get_temp_dir
from .context import Context
from app.model.user import User
from app.injector.app_injector import AppInjector
from app.conf import Config
POOL_EXECUTOR_MAX_WORKER = 4
HOUR = 3600 # in seconds
@lru_cache()
......@@ -44,124 +36,6 @@ def get_http_client_session(key: str = 'GLOBAL'):
return ClientSession(json_serialize=json.dumps)
class DaskException(Exception):
pass
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
# Minimal amount of memory required for a Dask worker to not get bad performances
min_worker_memory_recommended = parse_bytes(Config.min_worker_memory.value)
# Amount of memory Reserved for fastApi server + ProcessPoolExecutors
memory_leeway = parse_bytes('600Mi')
@staticmethod
async def create() -> DaskDistributedClient:
if not DaskClient.lock_client:
DaskClient.lock_client = asyncio.Lock()
if not DaskClient.client:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
get_logger().info(f"Dask using temporary directory: {get_temp_dir()}")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
# Ensure memory used by workers is freed regularly despite memory leak
dask.config.set({'distributed.worker.lifetime.duration': HOUR * 24})
dask.config.set({'distributed.worker.lifetime.stagger': HOUR * 1})
dask.config.set({'distributed.worker.lifetime.restart': True})
logger.info(f"Dask cluster configuration - "
f"worker lifetime: {dask.config.get('distributed.worker.lifetime.duration')}s. "
f"stagger: {dask.config.get('distributed.worker.lifetime.stagger')}s.")
cluster = await LocalCluster(
asynchronous=True,
processes=True,
threads_per_worker=threads_per_worker,
n_workers=n_workers,
memory_limit=worker_memory_limit,
dashboard_address=None
)
# A worker could be killed when executing a task if lifetime duration elapsed,
# "cluster.adapt(min=N, max=N)" ensure the respawn of workers if it happens
cluster.adapt(minimum=n_workers, maximum=n_workers)
DaskClient.client = await DaskDistributedClient(cluster, asynchronous=True)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
def _get_system_memory():
return system.MEMORY_LIMIT
@staticmethod
def _available_memory_for_workers():
""" Return amount of RAM available for Dask's workers after withdrawing RAM required by server itself """
return max(0, (DaskClient._get_system_memory() - DaskClient.memory_leeway))
@staticmethod
def _recommended_workers_and_threads():
""" Return the recommended numbers of worker and threads according the cpus available provided by Dask """
return nprocesses_nthreads()
@staticmethod
def _get_dask_configuration(logger):
"""
Return recommended Dask workers configuration
"""
n_workers, threads_per_worker = DaskClient._recommended_workers_and_threads()
available_memory_bytes = DaskClient._available_memory_for_workers()
worker_memory_limit = int(available_memory_bytes / n_workers)
logger.info(f"Dask client - system.MEMORY_LIMIT: {format_bytes(DaskClient._get_system_memory())} "
f"- available_memory_bytes: {format_bytes(available_memory_bytes)} "
f"- min_worker_memory_recommended: {format_bytes(DaskClient.min_worker_memory_recommended)} "
f"- computed worker_memory_limit: {format_bytes(worker_memory_limit)} for {n_workers} workers")
if DaskClient.min_worker_memory_recommended > worker_memory_limit:
n_workers = available_memory_bytes // DaskClient.min_worker_memory_recommended
if not n_workers >= 1:
min_memory = DaskClient.min_worker_memory_recommended + DaskClient.memory_leeway
message = f'Not enough memory available to start Dask worker. ' \
f'Please, consider upgrading container memory to {format_bytes(min_memory)}'
logger.error(f"Dask client - {message} - "
f'n_workers: {n_workers} threads_per_worker: {threads_per_worker}, '
f'available_memory_bytes: {available_memory_bytes} ')
raise DaskException(message)
worker_memory_limit = available_memory_bytes / n_workers
logger.warning(f"Dask client - available RAM is too low. Reducing number of workers "
f"to {n_workers} running with {format_bytes(worker_memory_limit)} of RAM")
return n_workers, threads_per_worker, worker_memory_limit
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
# closing the cluster (started independently from the client)
cluster = await DaskClient.client.cluster
await cluster.close()
await DaskClient.client.close() # or shutdown
DaskClient.client = None
async def async_with_cache(cache, key: str, fn_coroutine, *args, **kwargs):
try:
return cache[key]
......
......@@ -57,8 +57,8 @@ from app.pool_executor import run_in_pool_executor
from app.utils import (
get_http_client_session,
OpenApiHandler,
DaskClient,
POOL_EXECUTOR_MAX_WORKER)
from app.bulk_persistence import DaskClient
from app.routers.bulk.utils import (
update_operation_ids,
set_v3_input_dataframe_check,
......
from unittest import mock
import pytest
from distributed.deploy.utils import nprocesses_nthreads
from app.bulk_persistence.dask.localcluster import (
min_worker_memory_recommended,
memory_leeway,
get_dask_configuration,
DaskException,
)
def test_get_dask_configuration_not_enough_memory(
local_dev_config, nope_logger_fixture
):
def mock_system_memory():
return 42
with mock.patch(
"app.bulk_persistence.dask.localcluster.system_memory", mock_system_memory
):
with pytest.raises(DaskException) as exc:
get_dask_configuration(config=local_dev_config, logger=nope_logger_fixture)
assert exc.value.args[0].startswith("Not enough memory")
@pytest.mark.parametrize("memory_space_for_worker", [1, 2, 3, 4, 5, 6, 7, 8])
def test_get_dask_configuration_just_enough_memory(
memory_space_for_worker, local_dev_config, nope_logger_fixture
):
def mock_system_memory():
return (
min_worker_memory_recommended(local_dev_config) * memory_space_for_worker
+ memory_leeway
)
with mock.patch(
"app.bulk_persistence.dask.localcluster.system_memory", mock_system_memory
):
n_workers, threads_per_worker, worker_memory_limit = get_dask_configuration(
config=local_dev_config, logger=nope_logger_fixture
)
# we should have as many worker as memory space allow,
# but no more than available processes
assert n_workers == min(memory_space_for_worker, nprocesses_nthreads()[0])
# the memory limit per worker should be the total (minus leeway) divided by number of workers
assert worker_memory_limit == (mock_system_memory() - memory_leeway) / n_workers
......@@ -10,14 +10,6 @@ from dask.distributed import Client
from app.bulk_persistence.dask.dask_data_ipc import DaskNoneDataIPC, DaskLocalFileDataIPC, DaskNativeDataIPC
@pytest.fixture
async def dask_client(init_fixtures, event_loop):
# use a mono process, mono thread async client with a single worker
client = await Client(processes=False, asynchronous=True, direct_to_workers=True, n_workers=1, threads_per_worker=1)
yield client
await client.close()
async def data_async_gen(data=b"123456789", chunk_size=3):
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
......@@ -67,22 +59,26 @@ async def test_dask_native_ipc_handle_async_generator_and_bytes(in_data):
@pytest.mark.asyncio
async def test_dask_native_ipc_basic_usage(dask_client):
client = dask_client
ipc_obj = DaskNativeDataIPC(dask_client=client)
# worker function, simply read and return the data
def worker_func(ipc_data_ref, ipc_data_get_func):
with ipc_data_get_func(ipc_data_ref) as file_like_data:
return file_like_data.read()
with dask_client(autoclose_asynccontext=True) as dask_client_asynccontext:
async with dask_client_asynccontext() as client_starter:
client = await client_starter()
# set in IPC
async with ipc_obj.set(b"123456789") as (data_ref, getter):
ipc_obj = DaskNativeDataIPC(dask_client=client)
# worker function, simply read and return the data
def worker_func(ipc_data_ref, ipc_data_get_func):
with ipc_data_get_func(ipc_data_ref) as file_like_data:
return file_like_data.read()
# set in IPC
async with ipc_obj.set(b"123456789") as (data_ref, getter):
# WHEN submit task to dask client
result = await client.submit(worker_func, data_ref, getter)
# WHEN submit task to dask client
result = await client.submit(worker_func, data_ref, getter)
# THEN worker as fetch and read the expected data
assert result == b"123456789"
# THEN worker as fetch and read the expected data
assert result == b"123456789"
@pytest.mark.asyncio
......
......@@ -36,13 +36,13 @@ from app.conf import Config
from tests.unit.test_utils import ctx_fixture
@pytest.mark.asyncio
async def test_make_storage_client(httpx_mock: HTTPXMock, ctx_fixture):
host = 'http://my_host:81234'
async with make_storage_record_client(host) as client:
async def test_make_storage_client(local_dev_config, httpx_mock: HTTPXMock, ctx_fixture):
async with make_storage_record_client(host=local_dev_config.service_host_storage.value,
timeout=local_dev_config.de_client_config_timeout.value) as client:
assert isinstance(client, StorageRecordServiceClient)
# ensure host
assert client.api_client.host == host
assert client.api_client.host == local_dev_config.service_host_storage.value
# using literal here to make config change visible
assert client.api_client._async_client.timeout == httpx.Timeout(timeout=10)
......@@ -54,13 +54,13 @@ async def test_make_storage_client(httpx_mock: HTTPXMock, ctx_fixture):
@pytest.mark.asyncio
async def test_make_search_client(httpx_mock: HTTPXMock, ctx_fixture):
host = 'http://my_host:81234'
async with make_search_client(host) as client:
async def test_make_search_client(local_dev_config, httpx_mock: HTTPXMock, ctx_fixture):
async with make_search_client(host=local_dev_config.service_host_search.value,
timeout=local_dev_config.de_client_config_timeout.value) as client:
assert isinstance(client, SearchServiceClient)
# ensure host
assert client.api_client.host == host
assert client.api_client.host == local_dev_config.service_host_search.value
assert client.api_client._async_client.timeout == httpx.Timeout(timeout=10)
get_or_create_ctx()
......
......@@ -20,8 +20,7 @@ from app.context import Context, get_or_create_ctx
from tests.unit.test_utils import ctx_fixture
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_storage(ctx_fixture: Context, httpx_mock: HTTPXMock):
storage_url = "http://example.com" # well formed url required
async def test_fwd_correlation_id_to_outgoing_request_to_storage(local_dev_config, ctx_fixture: Context, httpx_mock: HTTPXMock):
expected_correlation_id = 'some-correlation-id'
ctx = ctx_fixture.with_correlation_id(expected_correlation_id).with_auth("foobar")
......@@ -30,7 +29,8 @@ async def test_fwd_correlation_id_to_outgoing_request_to_storage(ctx_fixture: Co
# safety: make sure no methods on tracer have been called yet
assert ctx.tracer.method_calls == []
async with make_storage_record_client(storage_url) as storage_client:
async with make_storage_record_client(host=local_dev_config.service_host_search.value,
timeout=local_dev_config.de_client_config_timeout.value) as storage_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
# force to use endpoint which does not return a response to skip model validation
response = await storage_client.delete_record(id="123", data_partition_id="test")
......@@ -43,8 +43,7 @@ async def test_fwd_correlation_id_to_outgoing_request_to_storage(ctx_fixture: Co
)
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_search(ctx_fixture: Context, httpx_mock: HTTPXMock):
storage_url = "http://example.com" # well formed url required
async def test_fwd_correlation_id_to_outgoing_request_to_search(local_dev_config, ctx_fixture: Context, httpx_mock: HTTPXMock):
expected_correlation_id = 'some-correlation-id'
ctx = ctx_fixture.with_correlation_id(expected_correlation_id).with_auth("foobar")
......@@ -53,7 +52,8 @@ async def test_fwd_correlation_id_to_outgoing_request_to_search(ctx_fixture: Con
# safety: make sure no methods on tracer have been called yet
assert ctx.tracer.method_calls == []
async with make_search_client(storage_url) as search_client:
async with make_search_client(host=local_dev_config.service_host_search.value,
timeout=local_dev_config.de_client_config_timeout.value) as search_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
# force to use endpoint which does not return a response to skip model validation
response = await search_client.delete_index(kind="kind", data_partition_id="test")
......
......@@ -22,7 +22,7 @@ import app.conf as conf
import pytest
from app.conf import ConfigurationContainer
from app.context import Context
from app.utils import DaskClient
from fastapi import Header
from hypothesis import settings, Verbosity, HealthCheck
......@@ -40,6 +40,10 @@ from .fixtures import (
mock_storage_client_holding_data
)
from .fixtures_pkg import (
dask_client
)
@pytest.fixture(autouse=False)
def top_fixture(monkeypatch):
......@@ -89,12 +93,13 @@ def pytest_unconfigure(config):
del os.environ['SERVICE_HOST_PARTITION']
# all tests with pytest-asyncio will share the same loop
# Ref: https://github.com/pytest-dev/pytest-asyncio#event_loop
@pytest.fixture(scope="session")
def event_loop(): # all tests will share the same loop
def event_loop():
loop = asyncio.get_event_loop()
yield loop
# teardown
loop.run_until_complete(DaskClient.close())
loop.close()
......
import asyncio
import contextlib
import copy
import types
from typing import List
......@@ -50,13 +51,14 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
For usage examples, see fixtures_test.py in this directory
We depend on :
- local_dev_config to ha ve a valid configuration, but also avoid doing unexpected network requests
- local_dev_config to have a valid configuration, but also avoid doing unexpected network requests
- nope_logger_fixture because configuring this will mount middlewares, and they need a logger
"""
def setup_data_for_mock(data):
template_client = make_storage_record_client(
local_dev_config.service_host_storage
host=local_dev_config.service_host_storage.value,
timeout=local_dev_config.de_client_config_timeout.value
)
# Note: we want to be able to modify the mock to handle get_record and get_record_version specifically
......@@ -106,18 +108,27 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
@pytest.fixture(scope="module")
def app_initialized_with_testclient(local_dev_config, request):
def app_initialized_with_testclient(local_dev_config, dask_client):
"""
Fixture providing wdms_app started, along with a test client
"""
global base_app, wdms_app
# this app, initialized, and as part of a hierarchy of apps
with TestClient(
base_app
): # TOFIX: currently necessary because base_app and wdms_app are interdependent
with TestClient(wdms_app) as client:
yield wdms_app, client
# retrieve the dask_client starter, but let the app close it.
# CAREFUL about the fixture scope
with dask_client(autoclose_asynccontext=False) as dask_client_starter:
# Mocking dask_client for app to use it
with mock.patch('app.bulk_persistence.dask.client.DaskClient.create', dask_client_starter):
# this app, initialized, and as part of a hierarchy of apps
with TestClient(
base_app
): # TOFIX: currently necessary because base_app and wdms_app are interdependent
with TestClient(wdms_app) as client:
yield wdms_app, client
# slb_app shutdown event should call DaskClient.close()
@pytest.fixture
......
from .dask_client import dask_client
import contextlib
from unittest import mock
import pytest
from app.bulk_persistence import DaskClient
from app.bulk_persistence.dask.localcluster import memory_leeway, min_worker_memory_recommended
@pytest.fixture(scope="module")
def dask_client(event_loop, local_dev_config):
# a context manager to handle the mocks to configure the singleton
@contextlib.contextmanager
def configure(*,
system_memory_mock=min_worker_memory_recommended(local_dev_config) + memory_leeway,
worker_threads_mock=(2, 1),
autoclose_asynccontext=True
):
with mock.patch("app.bulk_persistence.dask.localcluster.system_memory",
mock.Mock(return_value=system_memory_mock)):
with mock.patch("app.bulk_persistence.dask.localcluster.recommended_workers_and_threads",
mock.Mock(return_value=worker_threads_mock)):
if autoclose_asynccontext:
# an async context manager to handle the daskclient close() coroutine function
@contextlib.asynccontextmanager
async def start():
try:
# CAREFUL: this is for the test to await for it (required to be usable in app fixture).
yield DaskClient.create #TODO : call with parameters here to specify test env for dask ?
# because of the async close, we need a coroutine,
# and therefore an async context manager
# to ensure dask is properly closed in the test using it
finally:
# we also need a try finally in case the create() itself is triggering an exception.
# As the context manager will not take care of this,
# we still should call close() to cleanup what should be cleaned.
await DaskClient.close()
yield start
else:
# return a coroutine, it is the responsibility of the test to await on it
# within its own eventloop
yield DaskClient.create
# the caller must also call Daskclient.close()
return configure
import pytest
from app.bulk_persistence import DaskException
from app.bulk_persistence.dask.localcluster import min_worker_memory_recommended, memory_leeway
@pytest.mark.asyncio
async def test_dask_workers_not_enough_ram_available(dask_client, nope_logger_fixture):
with dask_client(system_memory_mock=42,
worker_threads_mock=(10, 10),
autoclose_asynccontext=True
) as dask_client_asynccontext:
with pytest.raises(DaskException) as exc:
# creating the client should throw an exception
async with dask_client_asynccontext() as client_starter:
await client_starter()
assert exc.value.args[0].startswith("Not enough memory")
@pytest.mark.parametrize("expected_workers", [
1, 3, 5
])
@pytest.mark.asyncio
async def test_dask_workers_enough_ram_available(local_dev_config, expected_workers, dask_client, nope_logger_fixture):
system_memory = min_worker_memory_recommended(local_dev_config) * expected_workers + memory_leeway
with dask_client(system_memory_mock=system_memory,
worker_threads_mock=(10, 10),
autoclose_asynccontext=True
) as dask_client_asynccontext:
# the workers should use expected memory amount
async with dask_client_asynccontext() as client_starter:
client = await client_starter()
expected_worker_memory = (system_memory - memory_leeway) / expected_workers
assert expected_workers == len(client.cluster.scheduler.workers)
workers_has_expected_memory = [w.memory_limit == int(expected_worker_memory)
for _, w in client.cluster.scheduler.workers.items()]
assert all(workers_has_expected_memory)
......@@ -13,11 +13,11 @@ from app.injector.main_injector import MainInjector
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_storage(httpx_mock: HTTPXMock, ctx_fixture):
storage_url = "http://example.com" # well formed url required
async def test_fwd_correlation_id_to_outgoing_request_to_storage(local_dev_config, httpx_mock: HTTPXMock, ctx_fixture):
expected_correlation_id = 'some-correlation-id'
async with make_storage_record_client(storage_url) as storage_client:
async with make_storage_record_client(host=local_dev_config.service_host_storage.value,
timeout=local_dev_config.de_client_config_timeout.value) as storage_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
ctx_fixture.set_current_with_value(correlation_id=expected_correlation_id)
......@@ -28,11 +28,11 @@ async def test_fwd_correlation_id_to_outgoing_request_to_storage(httpx_mock: HTT
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_search(httpx_mock: HTTPXMock, ctx_fixture):
storage_url = "http://example.com" # well formed url required
async def test_fwd_correlation_id_to_outgoing_request_to_search(local_dev_config, httpx_mock: HTTPXMock, ctx_fixture):
expected_correlation_id = 'some-correlation-id'
async with make_search_client(storage_url) as search_client:
async with make_search_client(host=local_dev_config.service_host_search.value,
timeout=local_dev_config.de_client_config_timeout.value) as search_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
ctx_fixture.set_current_with_value(correlation_id=expected_correlation_id)
......@@ -42,36 +42,14 @@ async def test_fwd_correlation_id_to_outgoing_request_to_search(httpx_mock: HTTP
assert response is not None
@pytest.fixture()
async def wdms_app_mocked(nope_logger_fixture):
def test_outgoing_tracing_headers_with_incoming_headers(local_dev_config, app_configurable_with_testclient, httpx_mock):
from fastapi.testclient import TestClient
from app.wdms_app import wdms_app, app_injector
from app.clients import StorageRecordServiceClient
# we do not want dev mode, so that we are able to actually send http requests
conf.Config.dev_mode.value = False
conf.Config.service_host_search.value = "http://localhost:8888"
conf.Config.service_host_storage.value = "http://localhost:9999"
wdms_app.dependency_overrides[require_opendes_authorized_user] = lambda: True
wdms_app.dependency_overrides[require_data_partition_id] = lambda: True
wdms_app.trace_exporter = traces.CombinedExporter(service_name='tested-ddms')
client = TestClient(wdms_app)
MainInjector().configure(app_injector)
yield client
wdms_app.dependency_overrides = {}
# explicit close client in teardown
storage_client = await app_injector.get(StorageRecordServiceClient)
if storage_client is not None:
await storage_client.api_client.close()
def test_outgoing_tracing_headers_with_incoming_headers(wdms_app_mocked, httpx_mock):
app, client = app_configurable_with_testclient(
storage_client_mock=make_storage_record_client(host=local_dev_config.service_host_storage.value,
timeout=local_dev_config.de_client_config_timeout.value),
fake_opendes_authorized_user=True,
fake_data_partition_id=True
)
version = '00'
trace_id = '80f22fa582f64d2584e76b4aac231f12'
......@@ -96,11 +74,18 @@ def test_outgoing_tracing_headers_with_incoming_headers(wdms_app_mocked, httpx_m
httpx_mock.add_callback(custom_response)
response = wdms_app_mocked.delete(f'/ddms/v2/logs/123456', headers=input_headers)
response = client.delete(f'/ddms/v2/logs/123456', headers=input_headers)
assert response.status_code == 204
def test_outgoing_tracing_headers_without_headers(wdms_app_mocked, httpx_mock):
def test_outgoing_tracing_headers_without_headers(local_dev_config, app_configurable_with_testclient, httpx_mock):
app, client = app_configurable_with_testclient(
storage_client_mock=make_storage_record_client(host=local_dev_config.service_host_storage.value,
timeout=local_dev_config.de_client_config_timeout.value),
fake_opendes_authorized_user=True,
fake_data_partition_id=True
)
def custom_response(request: httpx.Request, *args, **kwargs):
assert request.headers['traceparent'], "check if tracing header is present"
......@@ -116,5 +101,5 @@ def test_outgoing_tracing_headers_without_headers(wdms_app_mocked, httpx_mock):
httpx_mock.add_callback(custom_response)
response = wdms_app_mocked.delete('/ddms/v2/logs/123456')
response = client.delete('/ddms/v2/logs/123456')
assert response.status_code == 204