Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Showing
with 726 additions and 726 deletions
......@@ -22,7 +22,8 @@ from app.clients.storage_service_client import get_storage_record_service
from app.routers.bulk.bulk_uri_dependencies import BulkIdAccess
from app.routers.record_utils import fetch_record
from app.context import Context, get_ctx
from app.context import Context
from app.helper.logger import get_logger
async def _get_bulk_uri_from_version(ctx: Context, bulk_uri_access: BulkIdAccess, record_id: str, index: int,
......@@ -78,7 +79,7 @@ async def delete_record(
delete_result = asyncio.ensure_future(task)
def task_done(future_result):
if future_result.exception():
get_ctx().logger.exception(
get_logger().exception(
f"Exception on bulk versions deletion: {future_result.exception().detail}")
delete_result.add_done_callback(task_done)
......@@ -24,19 +24,11 @@ from logging import INFO
from aiohttp import ClientSession
import dask
from dask.utils import parse_bytes, format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import system, LocalCluster
from distributed.deploy.utils import nprocesses_nthreads
from .helper.logger import get_logger
from .bulk_persistence import get_temp_dir
from .context import Context
from app.model.user import User
from app.injector.app_injector import AppInjector
from app.conf import Config
POOL_EXECUTOR_MAX_WORKER = 4
HOUR = 3600 # in seconds
@lru_cache()
......@@ -44,124 +36,6 @@ def get_http_client_session(key: str = 'GLOBAL'):
return ClientSession(json_serialize=json.dumps)
class DaskException(Exception):
pass
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
# Minimal amount of memory required for a Dask worker to not get bad performances
min_worker_memory_recommended = parse_bytes(Config.min_worker_memory.value)
# Amount of memory Reserved for fastApi server + ProcessPoolExecutors
memory_leeway = parse_bytes('600Mi')
@staticmethod
async def create() -> DaskDistributedClient:
if not DaskClient.lock_client:
DaskClient.lock_client = asyncio.Lock()
if not DaskClient.client:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
get_logger().info(f"Dask using temporary directory: {get_temp_dir()}")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
# Ensure memory used by workers is freed regularly despite memory leak
dask.config.set({'distributed.worker.lifetime.duration': HOUR * 24})
dask.config.set({'distributed.worker.lifetime.stagger': HOUR * 1})
dask.config.set({'distributed.worker.lifetime.restart': True})
logger.info(f"Dask cluster configuration - "
f"worker lifetime: {dask.config.get('distributed.worker.lifetime.duration')}s. "
f"stagger: {dask.config.get('distributed.worker.lifetime.stagger')}s.")
cluster = await LocalCluster(
asynchronous=True,
processes=True,
threads_per_worker=threads_per_worker,
n_workers=n_workers,
memory_limit=worker_memory_limit,
dashboard_address=None
)
# A worker could be killed when executing a task if lifetime duration elapsed,
# "cluster.adapt(min=N, max=N)" ensure the respawn of workers if it happens
cluster.adapt(minimum=n_workers, maximum=n_workers)
DaskClient.client = await DaskDistributedClient(cluster, asynchronous=True)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
def _get_system_memory():
return system.MEMORY_LIMIT
@staticmethod
def _available_memory_for_workers():
""" Return amount of RAM available for Dask's workers after withdrawing RAM required by server itself """
return max(0, (DaskClient._get_system_memory() - DaskClient.memory_leeway))
@staticmethod
def _recommended_workers_and_threads():
""" Return the recommended numbers of worker and threads according the cpus available provided by Dask """
return nprocesses_nthreads()
@staticmethod
def _get_dask_configuration(logger):
"""
Return recommended Dask workers configuration
"""
n_workers, threads_per_worker = DaskClient._recommended_workers_and_threads()
available_memory_bytes = DaskClient._available_memory_for_workers()
worker_memory_limit = int(available_memory_bytes / n_workers)
logger.info(f"Dask client - system.MEMORY_LIMIT: {format_bytes(DaskClient._get_system_memory())} "
f"- available_memory_bytes: {format_bytes(available_memory_bytes)} "
f"- min_worker_memory_recommended: {format_bytes(DaskClient.min_worker_memory_recommended)} "
f"- computed worker_memory_limit: {format_bytes(worker_memory_limit)} for {n_workers} workers")
if DaskClient.min_worker_memory_recommended > worker_memory_limit:
n_workers = available_memory_bytes // DaskClient.min_worker_memory_recommended
if not n_workers >= 1:
min_memory = DaskClient.min_worker_memory_recommended + DaskClient.memory_leeway
message = f'Not enough memory available to start Dask worker. ' \
f'Please, consider upgrading container memory to {format_bytes(min_memory)}'
logger.error(f"Dask client - {message} - "
f'n_workers: {n_workers} threads_per_worker: {threads_per_worker}, '
f'available_memory_bytes: {available_memory_bytes} ')
raise DaskException(message)
worker_memory_limit = available_memory_bytes / n_workers
logger.warning(f"Dask client - available RAM is too low. Reducing number of workers "
f"to {n_workers} running with {format_bytes(worker_memory_limit)} of RAM")
return n_workers, threads_per_worker, worker_memory_limit
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
# closing the cluster (started independently from the client)
cluster = await DaskClient.client.cluster
await cluster.close()
await DaskClient.client.close() # or shutdown
DaskClient.client = None
async def async_with_cache(cache, key: str, fn_coroutine, *args, **kwargs):
try:
return cache[key]
......@@ -182,7 +56,7 @@ def load_schema_example(file_name: str):
def make_log_captured_timing_handler(level=INFO):
def log_captured_timing(tag, wall, cpu):
Context.current().logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
get_logger().log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing
......
......@@ -57,8 +57,8 @@ from app.pool_executor import run_in_pool_executor
from app.utils import (
get_http_client_session,
OpenApiHandler,
DaskClient,
POOL_EXECUTOR_MAX_WORKER)
from app.bulk_persistence import DaskClient
from app.routers.bulk.utils import (
update_operation_ids,
set_v3_input_dataframe_check,
......@@ -130,14 +130,14 @@ def make_entity_type_dependency(entity_type: Entity, version: str):
async def startup_event():
service_name = Config.service_name.value
logger.init_logger(service_name=service_name)
logger.init_logger(service_name=service_name, config=Config)
#check python version >=3.8
assert sys.version_info.major == 3 and sys.version_info.minor >= 8, 'Python version required >=3.8'
check_environment(Config)
MainInjector().configure(app_injector)
wdms_app.trace_exporter = traces.create_exporter(service_name=service_name)
wdms_app.trace_exporter = traces.create_exporter(service_name=service_name, config=Config)
# seems that the lock is not in the same event loop as requests
# so we need to wait instead of just fire a task
......@@ -315,7 +315,7 @@ update_operation_ids(wdms_app)
wdms_app.add_middleware(TracingMiddleware)
# must be added last to be executed first, it's responsible to clean and create WDMS Context
wdms_app.add_middleware(CreateBasicContextMiddleware, injector=app_injector)
wdms_app.add_middleware(CreateBasicContextMiddleware, config=Config, injector=app_injector)
# adding exception handling
......
......@@ -16,17 +16,22 @@ import pytest
import os
import uuid
from unittest import mock
from app.context import Context
import app.conf as conf
from app.helper.traces import create_exporter
from app.conf import ConfigurationContainer, Config, check_environment, validator_path_must_exist, \
from app.conf import ConfigurationContainer, check_environment, validator_path_must_exist, \
cloud_provider_additional_environment
@pytest.fixture
def testing_config():
return Config
config = ConfigurationContainer.with_load_all()
# patching Config in app.conf module, so it is found by other modules
with mock.patch('app.conf.Config', config):
# returning the config for explicit use in tests.
yield config
# mock.patch will restore original Config on exiting context, after fixture use.
@pytest.fixture()
......@@ -38,14 +43,16 @@ def gcp_config_fixture():
environment_dict['SERVICE_HOST_STORAGE'] = 'https://test-endpoint/api/storage'
environment_dict['SERVICE_HOST_SEARCH'] = 'https://test-endpoint/api/search'
conf.Config = ConfigurationContainer.with_load_all(
config = ConfigurationContainer.with_load_all(
environment_dict=environment_dict,
contextual_loader=cloud_provider_additional_environment)
yield conf.Config, provider_name
# patching Config in app.conf module, so it is found by other modules
with mock.patch('app.conf.Config', config):
# returning the config for explicit use in tests.
yield config
# restore initial config
ConfigurationContainer.with_load_all(environment_dict=os.environ, contextual_loader=None)
# mock.patch will restore original Config on exiting context, after fixture use.
@pytest.fixture()
......@@ -59,20 +66,22 @@ def azure_config_fixture():
environment_dict['SERVICE_HOST_SEARCH'] = 'https://test-endpoint/api/search'
environment_dict['USE_PARTITION_SERVICE'] = 'disabled'
conf.Config = ConfigurationContainer.with_load_all(
config = ConfigurationContainer.with_load_all(
environment_dict=environment_dict,
contextual_loader=cloud_provider_additional_environment)
yield conf.Config, provider_name
# patching Config in app.conf module, so it is found by other modules
with mock.patch('app.conf.Config', config):
# returning the config for explicit use in tests.
yield config
# restore initial config
ConfigurationContainer.with_load_all(environment_dict=os.environ, contextual_loader=None)
# mock.patch will restore original Config on exiting context, after fixture use.
def test_gcp_configuration_checker(gcp_config_fixture):
gcp_config, provider_name = gcp_config_fixture
gcp_config = gcp_config_fixture
assert gcp_config.cloud_provider.value == provider_name
assert gcp_config.cloud_provider.value == "gcp"
variables_dict = gcp_config.as_printable_dict()
assert "default_data_tenant_project_id" in variables_dict.keys()
......@@ -82,13 +91,15 @@ def test_gcp_configuration_checker(gcp_config_fixture):
def test_azure_configuration_checker(azure_config_fixture):
azure_config, provider_name = azure_config_fixture
azure_config = azure_config_fixture
assert azure_config.cloud_provider.value == 'az'
variables_dict = azure_config.as_printable_dict().keys()
check_environment(azure_config)
assert azure_config.az_bulk_container == 'wdms-osdu'
# below attribute are gcp only
assert "default_data_tenant_project_id" not in variables_dict
assert "default_data_tenant_credentials" not in variables_dict
......@@ -101,7 +112,7 @@ def test_azure_trace_exporter_created(azure_config_fixture):
mock_exporter.configure_mock(**{'exporter_name': exporter_name})
with mock.patch('app.helper.traces._create_azure_exporter', mock.Mock(return_value=mock_exporter)):
exporter = create_exporter('test-service')
exporter = create_exporter(service_name='test-service', config=azure_config_fixture)
assert len(exporter.exporters) == 1
# ensure called method is azure exporter
azure_exporter = exporter.exporters[0]
......@@ -115,7 +126,7 @@ def test_gcp_trace_exporter_created(gcp_config_fixture):
mock_exporter.configure_mock(**{'exporter_name': exporter_name})
with mock.patch('app.helper.traces._create_gcp_exporter', mock.Mock(return_value=mock_exporter)):
exporter = create_exporter('test-service')
exporter = create_exporter(service_name='test-service', config=gcp_config_fixture)
assert len(exporter.exporters) == 1
# ensure called method is gcp exporter
gcp_exporter = exporter.exporters[0]
......
from unittest import mock
import pytest
from distributed.deploy.utils import nprocesses_nthreads
from app.bulk_persistence.dask.localcluster import (
min_worker_memory_recommended,
memory_leeway,
get_dask_configuration,
DaskException,
)
def test_get_dask_configuration_not_enough_memory(
local_dev_config, nope_logger_fixture
):
def mock_system_memory():
return 42
with mock.patch(
"app.bulk_persistence.dask.localcluster.system_memory", mock_system_memory
):
with pytest.raises(DaskException) as exc:
get_dask_configuration(config=local_dev_config, logger=nope_logger_fixture)
assert exc.value.args[0].startswith("Not enough memory")
@pytest.mark.parametrize("memory_space_for_worker", [1, 2, 3, 4, 5, 6, 7, 8])
def test_get_dask_configuration_just_enough_memory(
memory_space_for_worker, local_dev_config, nope_logger_fixture
):
def mock_system_memory():
return (
min_worker_memory_recommended(local_dev_config) * memory_space_for_worker
+ memory_leeway
)
with mock.patch(
"app.bulk_persistence.dask.localcluster.system_memory", mock_system_memory
):
n_workers, threads_per_worker, worker_memory_limit = get_dask_configuration(
config=local_dev_config, logger=nope_logger_fixture
)
# we should have as many worker as memory space allow,
# but no more than available processes
assert n_workers == min(memory_space_for_worker, nprocesses_nthreads()[0])
# the memory limit per worker should be the total (minus leeway) divided by number of workers
assert worker_memory_limit == (mock_system_memory() - memory_leeway) / n_workers
......@@ -10,14 +10,6 @@ from dask.distributed import Client
from app.bulk_persistence.dask.dask_data_ipc import DaskNoneDataIPC, DaskLocalFileDataIPC, DaskNativeDataIPC
@pytest.fixture
async def dask_client(init_fixtures, event_loop):
# use a mono process, mono thread async client with a single worker
client = await Client(processes=False, asynchronous=True, direct_to_workers=True, n_workers=1, threads_per_worker=1)
yield client
await client.close()
async def data_async_gen(data=b"123456789", chunk_size=3):
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
......@@ -67,22 +59,26 @@ async def test_dask_native_ipc_handle_async_generator_and_bytes(in_data):
@pytest.mark.asyncio
async def test_dask_native_ipc_basic_usage(dask_client):
client = dask_client
ipc_obj = DaskNativeDataIPC(dask_client=client)
# worker function, simply read and return the data
def worker_func(ipc_data_ref, ipc_data_get_func):
with ipc_data_get_func(ipc_data_ref) as file_like_data:
return file_like_data.read()
with dask_client(autoclose_asynccontext=True) as dask_client_asynccontext:
async with dask_client_asynccontext() as client_starter:
client = await client_starter()
# set in IPC
async with ipc_obj.set(b"123456789") as (data_ref, getter):
ipc_obj = DaskNativeDataIPC(dask_client=client)
# worker function, simply read and return the data
def worker_func(ipc_data_ref, ipc_data_get_func):
with ipc_data_get_func(ipc_data_ref) as file_like_data:
return file_like_data.read()
# set in IPC
async with ipc_obj.set(b"123456789") as (data_ref, getter):
# WHEN submit task to dask client
result = await client.submit(worker_func, data_ref, getter)
# WHEN submit task to dask client
result = await client.submit(worker_func, data_ref, getter)
# THEN worker as fetch and read the expected data
assert result == b"123456789"
# THEN worker as fetch and read the expected data
assert result == b"123456789"
@pytest.mark.asyncio
......
......@@ -22,7 +22,7 @@ import app.conf as conf
import pytest
from app.conf import ConfigurationContainer
from app.context import Context
from app.utils import DaskClient
from fastapi import Header
from hypothesis import settings, Verbosity, HealthCheck
......@@ -40,6 +40,10 @@ from .fixtures import (
mock_storage_client_holding_data
)
from .fixtures_pkg import (
dask_client
)
@pytest.fixture(autouse=False)
def top_fixture(monkeypatch):
......@@ -89,12 +93,13 @@ def pytest_unconfigure(config):
del os.environ['SERVICE_HOST_PARTITION']
# all tests with pytest-asyncio will share the same loop
# Ref: https://github.com/pytest-dev/pytest-asyncio#event_loop
@pytest.fixture(scope="session")
def event_loop(): # all tests will share the same loop
def event_loop():
loop = asyncio.get_event_loop()
yield loop
# teardown
loop.run_until_complete(DaskClient.close())
loop.close()
......
......@@ -31,15 +31,13 @@ def context_base():
def test_context_repr(context_base):
expected = '{"tracer": null, "logger": "logger", "correlation_id": "correlation_id", "request_id": "request_id", "dev_mode": true, "partition_id": "partition_id", "app_key": "app_key", "api_key": "api_key", "x_user_id": null}'
expected = '{"tracer": null, "correlation_id": "correlation_id", "request_id": "request_id", "dev_mode": true, "partition_id": "partition_id", "app_key": "app_key", "api_key": "api_key", "x_user_id": null}'
assert str(context_base) == expected
assert repr(context_base) == expected
def test_context_basic(context_base):
assert context_base.logger == 'logger'
assert context_base['logger'] == 'logger'
assert context_base.correlation_id == 'correlation_id'
assert context_base.request_id == 'request_id'
......@@ -64,7 +62,6 @@ def test_context_basic(context_base):
def test_context_clone(context_base):
new_context = context_base.with_value(correlation_id='new_correlation_id', custom1='new_c1', custom3='added_c3')
assert new_context.logger == context_base.logger
assert new_context.correlation_id == 'new_correlation_id'
assert new_context.request_id == context_base.request_id
assert new_context.dev_mode == context_base.dev_mode
......
import asyncio
import contextlib
import copy
import types
from typing import List
......@@ -12,6 +13,7 @@ from unittest.mock import AsyncMock, create_autospec
from fastapi.testclient import TestClient
from app.conf import ConfigurationContainer, cloud_provider_additional_environment
from app.auth.auth import require_opendes_authorized_user
from app.middleware.basic_context_middleware import require_data_partition_id
from app.clients import SearchServiceClient, StorageRecordServiceClient, make_storage_record_client
......@@ -22,23 +24,23 @@ from app.wdms_app import base_app, wdms_app, app_injector
@pytest.fixture(scope="module")
def local_dev_config():
# local import
from app.conf import Config
# set config to a local dev config (assumption for running unit tests)
Config.dev_mode.value = True
Config.cloud_provider.value = "local"
Config.service_host_search.value = "https://test-endpoint/api/search"
Config.service_host_storage.value = "https://test-endpoint/api/storage"
Config.modules.value = "log_recognition.routers.log_recognition"
# This one is necessary as long as we have can_run() in modules dependending on it
Config.environment_name.value = "evd"
config = ConfigurationContainer.with_load_all(environment_dict={
# set config to a local dev config (assumption for running unit tests)
"OS_WELLBORE_DDMS_DEV_MODE": "True",
"CLOUD_PROVIDER": "local",
"SERVICE_HOST_STORAGE": "https://test-endpoint/api/storage",
"SERVICE_HOST_SEARCH": "https://test-endpoint/api/search",
"MODULES": "log_recognition.routers.log_recognition",
# This one is necessary as long as we have can_run() in modules dependending on it
"ENVIRONMENT_NAME": "evd"
}, contextual_loader=cloud_provider_additional_environment)
# patching Config in app.conf module, so it is found by other modules
with mock.patch('app.conf') as app_conf:
app_conf.Config = Config
with mock.patch('app.conf.Config', config):
# returning the config for explicit use in tests.
yield config
yield Config
# mock.patch will restore original Config on exiting context, after fixture use.
@pytest.fixture
......@@ -107,18 +109,27 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
@pytest.fixture(scope="module")
def app_initialized_with_testclient(local_dev_config, request):
def app_initialized_with_testclient(local_dev_config, dask_client):
"""
Fixture providing wdms_app started, along with a test client
"""
global base_app, wdms_app
# this app, initialized, and as part of a hierarchy of apps
with TestClient(
base_app
): # TOFIX: currently necessary because base_app and wdms_app are interdependent
with TestClient(wdms_app) as client:
yield wdms_app, client
# retrieve the dask_client starter, but let the app close it.
# CAREFUL about the fixture scope
with dask_client(autoclose_asynccontext=False) as dask_client_starter:
# Mocking dask_client for app to use it
with mock.patch('app.bulk_persistence.dask.client.DaskClient.create', dask_client_starter):
# this app, initialized, and as part of a hierarchy of apps
with TestClient(
base_app
): # TOFIX: currently necessary because base_app and wdms_app are interdependent
with TestClient(wdms_app) as client:
yield wdms_app, client
# slb_app shutdown event should call DaskClient.close()
@pytest.fixture
......
from .dask_client import dask_client
import contextlib
from unittest import mock
import pytest
from app.bulk_persistence import DaskClient
from app.bulk_persistence.dask.localcluster import memory_leeway, min_worker_memory_recommended
@pytest.fixture(scope="module")
def dask_client(event_loop, local_dev_config):
# a context manager to handle the mocks to configure the singleton
@contextlib.contextmanager
def configure(*,
system_memory_mock=min_worker_memory_recommended(local_dev_config) + memory_leeway,
worker_threads_mock=(2, 1),
autoclose_asynccontext=True
):
with mock.patch("app.bulk_persistence.dask.localcluster.system_memory",
mock.Mock(return_value=system_memory_mock)):
with mock.patch("app.bulk_persistence.dask.localcluster.recommended_workers_and_threads",
mock.Mock(return_value=worker_threads_mock)):
if autoclose_asynccontext:
# an async context manager to handle the daskclient close() coroutine function
@contextlib.asynccontextmanager
async def start():
try:
# CAREFUL: this is for the test to await for it (required to be usable in app fixture).
yield DaskClient.create #TODO : call with parameters here to specify test env for dask ?
# because of the async close, we need a coroutine,
# and therefore an async context manager
# to ensure dask is properly closed in the test using it
finally:
# we also need a try finally in case the create() itself is triggering an exception.
# As the context manager will not take care of this,
# we still should call close() to cleanup what should be cleaned.
await DaskClient.close()
yield start
else:
# return a coroutine, it is the responsibility of the test to await on it
# within its own eventloop
yield DaskClient.create
# the caller must also call Daskclient.close()
return configure
import pytest
from app.bulk_persistence import DaskException
from app.bulk_persistence.dask.localcluster import min_worker_memory_recommended, memory_leeway
@pytest.mark.asyncio
async def test_dask_workers_not_enough_ram_available(dask_client, nope_logger_fixture):
with dask_client(system_memory_mock=42,
worker_threads_mock=(10, 10),
autoclose_asynccontext=True
) as dask_client_asynccontext:
with pytest.raises(DaskException) as exc:
# creating the client should throw an exception
async with dask_client_asynccontext() as client_starter:
await client_starter()
assert exc.value.args[0].startswith("Not enough memory")
@pytest.mark.parametrize("expected_workers", [
1, 3, 5
])
@pytest.mark.asyncio
async def test_dask_workers_enough_ram_available(local_dev_config, expected_workers, dask_client, nope_logger_fixture):
system_memory = min_worker_memory_recommended(local_dev_config) * expected_workers + memory_leeway
with dask_client(system_memory_mock=system_memory,
worker_threads_mock=(10, 10),
autoclose_asynccontext=True
) as dask_client_asynccontext:
# the workers should use expected memory amount
async with dask_client_asynccontext() as client_starter:
client = await client_starter()
expected_worker_memory = (system_memory - memory_leeway) / expected_workers
assert expected_workers == len(client.cluster.scheduler.workers)
workers_has_expected_memory = [w.memory_limit == int(expected_worker_memory)
for _, w in client.cluster.scheduler.workers.items()]
assert all(workers_has_expected_memory)
......@@ -12,15 +12,15 @@ def test_ensure_basic_context_middleware_is_first():
@pytest.mark.asyncio
async def test_should_start_and_leave_cleared_context():
middleware = CreateBasicContextMiddleware(None, app=None)
async def test_should_start_and_leave_cleared_context(local_dev_config):
middleware = CreateBasicContextMiddleware(config=local_dev_config, injector=None, app=None)
request_mock = Mock()
type(request_mock).headers = PropertyMock(return_value={})
type(request_mock).scope = PropertyMock(return_value={})
type(request_mock).url = PropertyMock(return_value=URL())
properties_to_check = [
'tracer', 'logger', 'correlation_id', 'request_id', 'auth', 'partition_id',
'tracer', 'correlation_id', 'request_id', 'dev_mode', 'auth', 'partition_id',
'app_key', 'api_key', 'user', 'app_injector', 'x_user_id']
# GIVEN set current with values and request with no headers
......@@ -54,15 +54,15 @@ async def test_should_start_and_leave_cleared_context():
@pytest.mark.asyncio
async def test_should_leave_cleared_context_in_case_of_exception():
middleware = CreateBasicContextMiddleware(None, app=None)
async def test_should_leave_cleared_context_in_case_of_exception(local_dev_config):
middleware = CreateBasicContextMiddleware(config=local_dev_config, injector=None, app=None)
request_mock = Mock()
type(request_mock).headers = PropertyMock(return_value={})
type(request_mock).scope = PropertyMock(return_value={})
type(request_mock).url = PropertyMock(return_value=URL())
properties_to_check = [
'tracer', 'logger', 'correlation_id', 'request_id', 'auth', 'partition_id',
'tracer', 'correlation_id', 'request_id', 'dev_mode', 'auth', 'partition_id',
'app_key', 'api_key', 'user', 'app_injector', 'x_user_id']
# GIVEN set current with values and request with no headers
......
......@@ -11,19 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import types
from typing import Optional
from unittest.mock import AsyncMock
from opencensus.trace import base_exporter
from fastapi.testclient import TestClient
import pytest
from starlette.routing import Router, Route, Mount
from app.wdms_app import DDMS_V2_PATH
from app.clients import SearchServiceClient, StorageRecordServiceClient
from app.wdms_app import wdms_app, base_app, DDMS_V2_PATH, app_injector
from ..test_utils import gen_all_routes_request
# Initialize traces exporter in app with a custom one to allow validating our traces
......@@ -111,21 +105,6 @@ def test_about_call_traces_request_header(app_configurable_with_testclient, head
assert spandata.attributes[header_name] == "some value"
def gen_all_routes_request(rtr: Router, prefix: Optional[str] = None):
if prefix is None:
prefix = ""
for route in rtr.routes:
if isinstance(route, Mount):
# if this is a Mount, we need to recurse on the route
yield from gen_all_routes_request(route.app, route.path)
elif isinstance(route, Route):
for method in route.methods:
yield method, prefix + route.path
else:
RuntimeError(f"{route} routes retrieval not implemented")
def test_call_trace_url(app_configurable_with_testclient, mock_storage_client_holding_data, well_v2_record_list, well_v3_record_list):
# empty storage client mock required because we use get_record result in route.
storage_client_mock = mock_storage_client_holding_data(data=[])
......@@ -157,7 +136,7 @@ def test_call_trace_url(app_configurable_with_testclient, mock_storage_client_ho
call_count = 0
# startup event has been called (client has been called in a contest), so all routers should be mounted
# startup event has been called (client has been called in a context), so all routers should be mounted
for method, path in gen_all_routes_request(client_after_startup.app):
# skip routes created on app instantiation
......
......@@ -2,10 +2,12 @@ import pytest
from app.routers.bulk.bulk_routes import router
from app.routers.ddms_v2 import log_ddms_v2
from app.routers.ddms_v3 import wellbore_trajectory_ddms_v3, welllog_ddms_v3
from app.wdms_app import ALPHA_APIS_PREFIX, DDMS_V2_PATH, DDMS_V3_PATH
from app.wdms_app import wdms_app, ALPHA_APIS_PREFIX, DDMS_V2_PATH, DDMS_V3_PATH
from fastapi.testclient import TestClient
from tests.unit.routers.chunking_test import dasked_test_app
from ..test_utils import gen_all_routes_request
base_paths = [
DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
......@@ -31,12 +33,6 @@ def dependencies_check_app(dasked_test_app):
dasked_test_app.dependency_overrides = {}
def _get_all_wdms_app_routes():
""" Retrieve all routes of wdms app """
from app.wdms_app import wdms_app
return [(route.path, method) for route in wdms_app.routes for method in route.methods]
def _is_trajectories_v3_route(route_url: str):
""" Return true if given route_url is OSDU Trajectory v3 api """
return route_url.startswith(DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH)
......@@ -47,7 +43,7 @@ def _is_welllogs_v3_route(route_url: str):
return route_url.startswith(DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH)
@pytest.mark.parametrize("route_url,method", _get_all_wdms_app_routes())
@pytest.mark.parametrize("route_url,method", list(gen_all_routes_request(wdms_app)))
def test_ensure_bulk_apis_dependencies_injection(dependencies_check_app, route_url, method):
client = dependencies_check_app
......
......@@ -64,6 +64,7 @@ def with_patched_get_record(well_record):
@pytest.mark.asyncio
async def test_delete_recursive_only_delete_entity_provided(ctx_fixture,
nope_logger_fixture,
authority,
data_partition,
entity_source,
......@@ -111,6 +112,7 @@ async def test_delete_recursive_only_delete_entity_provided(ctx_fixture,
@pytest.mark.asyncio
async def test_delete_failure_on_parent_dont_delete_children(ctx_fixture,
nope_logger_fixture,
authority,
data_partition,
entity_source,
......@@ -151,14 +153,12 @@ async def test_delete_failure_on_parent_dont_delete_children(ctx_fixture,
@pytest.mark.asyncio
async def test_delete_should_keep_delete_heterogeneous_failure(
ctx_fixture,
nope_logger_fixture,
authority,
data_partition,
entity_source,
well_record,
with_patched_get_record):
moc_logger = mock.MagicMock()
ctx = Context.set_current_with_value(logger=moc_logger)
# in case of exception on delete call, should still call delete on all of them
sub_ids = [f'id:{i}' for i in range(10)]
......@@ -184,7 +184,7 @@ async def test_delete_should_keep_delete_heterogeneous_failure(
side_effect=delete_success_only_well) as moc_storage_delete_record:
with pytest.raises(fastApiHTTPException) as exp_info: # expect to raise
await StorageHelper.delete_recursively(
ctx,
ctx_fixture,
well_record.id, 'well',
[Entity.LOGSET],
data_partition,
......@@ -200,21 +200,19 @@ async def test_delete_should_keep_delete_heterogeneous_failure(
assert set(expect_delete_ids) == actual_deleted_id
# check error are logged
assert moc_logger.error.call_count == len(sub_ids)
assert nope_logger_fixture.error.call_count == len(sub_ids)
@pytest.mark.asyncio
async def test_delete_should_keep_delete_homogenous_failure(
ctx_fixture,
nope_logger_fixture,
authority,
data_partition,
entity_source,
well_record,
with_patched_get_record):
moc_logger = mock.MagicMock()
ctx = Context.set_current_with_value(logger=moc_logger)
# in case of exception on delete call, should still call delete on all of them
sub_ids = [f'id:{i}' for i in range(10)]
......@@ -238,7 +236,7 @@ async def test_delete_should_keep_delete_homogenous_failure(
side_effect=delete_success_only_well) as moc_storage_delete_record:
with pytest.raises(fastApiHTTPException) as exp_info: # expect to raise
await StorageHelper.delete_recursively(
ctx,
ctx_fixture,
well_record.id, 'well',
[Entity.LOGSET],
data_partition,
......@@ -254,7 +252,7 @@ async def test_delete_should_keep_delete_homogenous_failure(
assert set(expect_delete_ids) == actual_deleted_id
# check error are logged
assert moc_logger.error.call_count == len(sub_ids)
assert nope_logger_fixture.error.call_count == len(sub_ids)
@pytest.mark.asyncio
......@@ -265,6 +263,7 @@ async def test_delete_should_keep_delete_homogenous_failure(
content=b'',
headers={})])
async def test_delete_404_of_sub_delete_is_valid(ctx_fixture,
nope_logger_fixture,
data_partition,
authority,
entity_source,
......@@ -306,6 +305,7 @@ async def test_delete_404_of_sub_delete_is_valid(ctx_fixture,
fastApiHTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR),
RuntimeError()])
async def test_delete_failure_get_record(ctx_fixture,
nope_logger_fixture,
data_partition,
entity_source,
well_record,
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import pytest
from unittest import mock
......@@ -21,6 +22,7 @@ from contextlib import contextmanager
from opencensus.trace.span_context import SpanContext
from odes_storage.models import Record, StorageAcl, Legal
from starlette.routing import Mount, Router, Route
from app.model.model_utils import record_to_dict
from app.context import get_or_create_ctx
......@@ -145,3 +147,18 @@ def format_routes(app, prefix, tags):
def side_effect_raise(*args, **kwargs):
raise ValueError("side effect")
def gen_all_routes_request(rtr: Router, prefix: Optional[str] = None):
if prefix is None:
prefix = ""
for route in rtr.routes:
if isinstance(route, Mount):
# if this is a Mount, we need to recurse on the route
yield from gen_all_routes_request(route.app, route.path)
elif isinstance(route, Route):
for method in route.methods:
yield method, prefix + route.path
else:
RuntimeError(f"{route} routes retrieval not implemented")