Commit 09d08fa8 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Feature/remove feature flag data chunking

parent c5c5c634
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from app.conf import Config
from app.injector.app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence.dask.blob_storage import (DaskBlobStorageBase,
DaskBlobStorageLocal)
from app.bulk_persistence.dask.azure import DaskBlobStorageAzure
from app.bulk_persistence.dask.google import DaskBlobStorageGoogle
class DaskStorageInjector(AppInjectorModule):
""" WORK IN PROGRESS: register factory for dask storage"""
def configure(self, app_injector: AppInjector):
blob_storage_localfs: str = Config.get('USE_LOCALFS_BLOB_STORAGE_WITH_PATH')
if blob_storage_localfs:
async def _dask_blob_storage_builder():
return DaskBlobStorageLocal(base_directory=blob_storage_localfs)
app_injector.register(DaskBlobStorageBase, _dask_blob_storage_builder)
elif Config.cloud_provider.value == 'az':
async def build_dask_az_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageAzure()
app_injector.register(DaskBlobStorageBase, build_dask_az_blob_storage)
elif Config.cloud_provider.value == 'gcp':
async def build_dask_gcp_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageGoogle()
app_injector.register(DaskBlobStorageBase, build_dask_gcp_blob_storage)
else:
raise NotImplementedError(f"dask storage not available for provider {Config.cloud_provider.value}")
......@@ -14,12 +14,16 @@
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu_aws.storage.storage_aws import AwsStorage
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from .app_injector import AppInjector, AppInjectorModule
from app.conf import Config
class AwsInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, AwsInjector.build_aws_storage)
app_injector.register(DaskBlobStorageBase, AwsInjector.build_aws_dask_blob_storage)
@staticmethod
async def build_aws_storage() -> BlobStorageBase:
......@@ -27,3 +31,7 @@ class AwsInjector(AppInjectorModule):
session=None,
service_account_file=f'{Config.aws_region.value}$${Config.aws_env.value}'
)
@staticmethod
async def build_aws_dask_blob_storage() -> DaskBlobStorageBase:
raise NotImplementedError()
......@@ -16,11 +16,21 @@ from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from .app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
# Below import should be pull out to dedicated Azure package osdu.core.api.storage
from app.bulk_persistence.dask.azure import DaskBlobStorageAzure
class AzureInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, AzureInjector.build_az_blob_storage)
app_injector.register(DaskBlobStorageBase, AzureInjector.build_dask_az_blob_storage)
@staticmethod
async def build_az_blob_storage() -> BlobStorageBase:
return AzureAioBlobStorage()
@staticmethod
async def build_dask_az_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageAzure()
......@@ -13,16 +13,20 @@
# limitations under the License.
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from app.utils import get_http_client_session
from osdu_gcp.storage.blob_storage_gcp import GCloudAioStorage
from app.utils import get_http_client_session
from .app_injector import AppInjector, AppInjectorModule
from app.utils import Context
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from app.bulk_persistence.dask.google import DaskBlobStorageGoogle
class GCPInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, GCPInjector.build_gcp_blob_storage)
app_injector.register(DaskBlobStorageBase, GCPInjector.build_dask_gcp_blob_storage)
@staticmethod
async def build_gcp_blob_storage(*args, **kwargs) -> BlobStorageBase:
......@@ -33,3 +37,8 @@ class GCPInjector(AppInjectorModule):
session=get_http_client_session(),
service_account_file=tenant.credentials
)
@staticmethod
async def build_dask_gcp_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageGoogle()
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from app.utils import get_http_client_session
from osdu_ibm.storage.blob_storage_ibm import IBMObjectStorage
from .app_injector import AppInjector, AppInjectorModule
from app.utils import get_http_client_session
from app.utils import Context
from .app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
class IBMInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, IBMInjector.build_ibm_blob_storage)
app_injector.register(DaskBlobStorageBase, IBMInjector.build_ibm_dask_blob_storage)
@staticmethod
async def build_ibm_blob_storage(*args, **kwargs) -> BlobStorageBase:
......@@ -19,3 +22,7 @@ class IBMInjector(AppInjectorModule):
session=get_http_client_session(),
service_account_file=tenant.credentials
)
@staticmethod
async def build_ibm_dask_blob_storage() -> DaskBlobStorageBase:
raise NotImplementedError()
......@@ -13,20 +13,26 @@
# limitations under the License.
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from app.conf import *
from app.helper.logger import get_logger
from .app_injector import AppInjector, AppInjectorModule, WithLifeTime
from app.injector.az_injector import AzureInjector
from app.injector.aws_injector import AwsInjector
from app.injector.gcp_injector import GCPInjector
from app.injector.ibm_injector import IBMInjector
from app.clients import StorageRecordServiceClient
from app.clients.storage_service_blob_storage import StorageRecordServiceBlobStorage
from app.clients.search_service_client import SearchServiceClient
from app.clients import make_search_client, make_storage_record_client
from app.persistence.sessions_storage import SessionsStorage
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from app.helper.logger import get_logger
from app.injector.ibm_injector import IBMInjector
from app.bulk_persistence.dask.blob_storage import (DaskBlobStorageBase,
DaskBlobStorageLocal)
class MainInjector(AppInjectorModule):
......@@ -94,6 +100,12 @@ class MainInjector(AppInjectorModule):
logger.warning(f'overriding blob storage to use local fs on path ' + blob_storage_localfs)
app_injector.register(BlobStorageBase, _blob_storage_builder)
async def _dask_blob_storage_builder():
return DaskBlobStorageLocal(base_directory=blob_storage_localfs)
app_injector.register(DaskBlobStorageBase, _dask_blob_storage_builder)
logger.warning(f'overriding DASK blob storage to use local fs on path ' + blob_storage_localfs)
@staticmethod
def make_storage_service_on_localfs_blob_storage_builder(path: str):
......
......@@ -45,7 +45,6 @@ from app.routers.search import search, fast_search
from app.clients import StorageRecordServiceClient, SearchServiceClient
from app.utils import get_http_client_session, OpenApiHandler, get_wdms_temp_dir
base_app = FastAPI()
# The sub application which contains all the routers
......@@ -170,23 +169,20 @@ wdms_app.include_router(log_recognition.router, prefix='/log-recognition', tags=
Depends(require_opendes_authorized_user, use_cache=False)])
dependencies = [Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)]
tags = ['ALPHA feature: bulk data chunking']
wdms_app.include_router(sessions.router, prefix='/alpha/ddms/v3/welllogs', tags=tags, dependencies=dependencies)
wdms_app.include_router(welllog_ddms_v3.router_bulk, prefix='/alpha/ddms/v3', tags=tags, dependencies=dependencies)
# ------------- add alpha feature
def enable_alpha_feature():
""" must be called to enable and activate alpha feature"""
from app.in_dev.injector.dask_storage_injector import DaskStorageInjector
logger.get_logger().warning("Enabling alpha feature: chunking")
# register dask storage factory
DaskStorageInjector().configure(app_injector)
wdms_app.include_router(sessions.router, prefix='/ddms/v3/welllogs', tags=['ALPHA chunking'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)])
wdms_app.include_router(welllog_ddms_v3.router_bulk, prefix='/ddms/v3', tags=['ALPHA chunking'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)])
# include alpha routers down below #
pass
# order is last executed first
......
This diff is collapsed.
......@@ -51,13 +51,12 @@ def read_parquet(parquet_bytes):
return pd.read_parquet(f)
WELLLOG_URL_PREFIX = 'ddms/v3/welllogs'
WELLLOG_URL_PREFIX = 'alpha/ddms/v3/welllogs'
# todo get data json
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_send_one_chunk_without_session(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -77,7 +76,6 @@ def test_send_one_chunk_without_session(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_send_one_chunk_with_session_commit(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -111,7 +109,6 @@ def test_send_one_chunk_with_session_commit(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_send_multiple_chunks_with_session_commit(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -154,7 +151,6 @@ def test_send_multiple_chunks_with_session_commit(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_get_data_with_ofset_filter(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -204,7 +200,6 @@ def test_get_data_with_ofset_filter(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_get_data_with_column_filter(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -233,7 +228,6 @@ def test_get_data_with_column_filter(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_get_data_with_limit_filter(with_wdms_env, with_welllog):
record_id = with_welllog
......@@ -280,7 +274,6 @@ def test_get_data_with_limit_filter(with_wdms_env, with_welllog):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.alpha_feature
def test_get_data_with_limit_and_offset_filter(with_wdms_env, with_welllog):
record_id = with_welllog
......
......@@ -24,7 +24,7 @@ from ..request_builders.wdms.session import (
build_list_session)
SESSION_URL_PREFIX = 'ddms/v3/welllogs'
SESSION_URL_PREFIX = 'alpha/ddms/v3/welllogs'
@pytest.fixture
......@@ -50,7 +50,6 @@ def create_session(env, record_id, meta=None):
@pytest.mark.tag('session', 'smoke', 'chunking')
@pytest.mark.alpha_feature
def test_create_get_session(with_wdms_env, with_welllog):
record_id = with_welllog
session_obj = create_session(with_wdms_env, record_id, meta={'custom': 'from_e2e'})
......@@ -70,7 +69,6 @@ def test_create_get_session(with_wdms_env, with_welllog):
@pytest.mark.tag('session', 'smoke', 'chunking')
@pytest.mark.alpha_feature
def test_list_session(with_wdms_env, with_welllog):
r_id = with_welllog
......
This diff is collapsed.
......@@ -38,8 +38,6 @@ def client(ctx_fixture):
wdms_app.dependency_overrides = {}
# TODO reactivate once feature chunking is released
@pytest.mark.skip(reason="TEMPORARY due to toggle on/off alpha feature")
def test_api_spec(client):
# get the openapi spec
response = client.get("/openapi.json")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment