Commit 55e32e97 authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'master' into gitlab-forbid-failure-gcp

parents 0d008e4c dfe809c2
Pipeline #57422 failed with stages
in 12 minutes and 15 seconds
......@@ -14,7 +14,7 @@
# limitations under the License.
variables:
PIP_REQUIREMENTS: "frozenrequirements_dev.txt requirements.txt requirements_dev.txt"
PIP_REQUIREMENTS: "requirements.txt requirements_dev.txt"
AZURE_SERVICE: wellbore-ddms
AZURE_DOCKER_SUBDIR: build/Dockerfile
......@@ -171,8 +171,10 @@ osdu-gcp-test-python:
- source env/bin/activate
- pip install --upgrade pip
- pip install wheel pytest pytest-cov
- pip install -r requirements.txt
- pip install -r requirements_dev.txt
- >
for REQ in $PIP_REQUIREMENTS ; do
pip install -r $REQ
done
- cd tests/integration
- echo $OSDU_GCP_INTEGRATION_TESTER | base64 -d > file.json
- gcloud auth activate-service-account --key-file file.json
......
......@@ -36,6 +36,7 @@ The following software have components provided under the terms of this license:
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
......@@ -100,6 +101,7 @@ The following software have components provided under the terms of this license:
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- pip-tools (from http://pypi.python.org/pypi/pip-tools/1.8.1rc3)
- ply (from http://www.dabeaz.com/ply/)
- protobuf (from https://developers.google.com/protocol-buffers/)
- psutil (from https://github.com/giampaolo/psutil)
......@@ -219,7 +221,7 @@ The following software have components provided under the terms of this license:
- adal (from https://github.com/AzureAD/azure-activedirectory-library-for-python)
- aiohttp (from https://github.com/aio-libs/aiohttp/)
- aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from )
- anyio (from https://pypi.org/project/anyio/3.3.0/)
- asgiref (from http://github.com/django/asgiref/)
- atomicwrites (from https://github.com/untitaker/python-atomicwrites)
- attrs (from https://attrs.readthedocs.io/)
......@@ -251,6 +253,7 @@ The following software have components provided under the terms of this license:
- natsort (from https://github.com/SethMMorton/natsort)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pluggy (from https://github.com/pytest-dev/pluggy)
- py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/)
......@@ -269,6 +272,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.1.0/)
- urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp)
......
......@@ -42,6 +42,7 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org/) for data manipulation
- [pyarrow](https://pypi.org/project/pyarrow/) for load and save data into parquet format
- [opencensus](https://opencensus.io/guides/grpc/python/) for tracing and logging on cloud provider
- [dask](https://docs.dask.org/en/latest/) to manage huge amount of bulk data
### Library Dependencies
......@@ -60,6 +61,16 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
## Project Startup
### Dask Configuration - Locally
By default, It will use all memory available and use CPU resources through workers. The number of workers is determined by the quantity of core the current local machine has.
### Dask Configuration - In a cluster
In a container context, such as Kubernetes we recommend to set container memory limit at 3Gi of RAM and 4-8 CPUs.
At the minimum 1.2Gi and 1 cpu but performance will be reduced, but enough to handle WellLogs of 10 curves with 1M values each.
Note: container memory is not entirely dedicated to Dask workers, fastapi service with its process also require some.
### Run the service locally
1. Create virtual environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
......@@ -88,6 +99,12 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
pip install -r requirements.txt
```
Or, for a developer setup, this will install tools to help you work with the code.
```bash
pip install -r requirements.txt -r requirements_dev.txt
```
6. Run the service
```bash
......@@ -298,8 +315,9 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
```bash
LOCAL_PORT=<local_port>
docker run -d -p $LOCAL_PORT:8080 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
IMAGE_TAG=<image_name>
docker run -d -p $LOCAL_PORT:8080 -e CLOUD_PROVIDER=local -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH="/tmp" -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH="/tmp" -e OS_WELLBORE_DDMS_DEV_MODE=True -e USE_PARTITION_SERVICE=disabled $IMAGE_TAG
```
2. Access app on `http://127.0.0.1:<LOCAL_PORT>/api/os-wellbore-ddms/docs`
......@@ -312,11 +330,12 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_WHEEL_DI
docker logs CONTAINER_ID
```
### Run Unit Tests Locally
```bash
# Install test dependencies
pip install -r requirements_dev.txt
pip install -r requirements.txt -r requirements_dev.txt
python -m pytest --junit-xml=unit_tests_report.xml --cov=app --cov-report=html --cov-report=xml ./tests/unit
```
......@@ -345,7 +364,36 @@ pytest ./functional --environment="./generated/postman_environment.json" --filte
For more information see the [integration tests README](tests/integration/README.md)
### Port Forward from Kubernetes
### Manage package dependencies
Anytime, you may want to ensure your virtual environment is in sync with your requirements specification.
For this you can use:
```bash
pip-sync
```
If you want to work with other requirements file, you can specify them
```bash
pip-sync requirements.txt requirements_dev.txt
```
If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use:
```bash
pip-compile
```
If you want to update the version of only one dependency, for instance fastapi:
```bash
pip-compile --upgrade-package fastapi
```
For more information: https://github.com/jazzband/pip-tools/
### Debugging:
#### Port Forward from Kubernetes
1. List the pods: `kubectl get pods`
2. Port forward: `kubectl port-forward pods/POD_NAME LOCAL_PORT:8080`
......
......@@ -12,45 +12,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import hashlib
import json
import time
from contextlib import suppress
from functools import wraps
from operator import attrgetter
import fsspec
import pandas as pd
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings, get_wdms_temp_dir, get_ctx
from app.utils import DaskClient, capture_timings, get_ctx
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from pyarrow.lib import ArrowException
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from dask.distributed import WorkerPlugin
from dask.distributed import scheduler
def handle_pyarrow_exceptions(target):
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowException:
get_logger().exception(f"{target} raised exception")
raise BulkNotProcessable("Unable to process bulk")
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker raised exception when running '{target.__name__}'")
raise BulkNotProcessable("Unable to process bulk- Dask")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
......@@ -63,8 +70,8 @@ class DefaultWorkerPlugin(WorkerPlugin):
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
get_logger().debug("WorkerPlugin initialised")
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
......@@ -77,13 +84,14 @@ class DefaultWorkerPlugin(WorkerPlugin):
get_logger().exception(f"Task '{key}' has failed with exception")
def pandas_to_parquet(pdf, path, opt):
return pdf.to_parquet(path, index=True, engine='pyarrow', storage_options=opt)
class DaskBulkStorage:
client = None
client: DaskDistributedClient = None
""" Dask client """
lock_client = asyncio.Lock()
""" used to ensure """
def __init__(self):
""" use `create` to create instance """
self._parameters = None
......@@ -95,20 +103,20 @@ class DaskBulkStorage:
instance._parameters = parameters
# Initialise the dask client.
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True)
dask_client = dask_client or await DaskClient.create()
if DaskBulkStorage.client is not dask_client: # executed only once per dask client
DaskBulkStorage.client = dask_client
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin,
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin,
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
return instance
......@@ -121,13 +129,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
@staticmethod
async def close(): # TODO check for the needs, currently not usage
async with DaskBulkStorage.lock_client:
if DaskBulkStorage.client:
await DaskBulkStorage.client.close() # or shutdown
DaskBulkStorage.client = None
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
......@@ -148,11 +149,17 @@ class DaskBulkStorage:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
"""
get_logger().debug(f"loading bulk : {path}")
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
chunksize='25M',
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -199,16 +206,17 @@ class DaskBulkStorage:
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _save_with_pandas(self, path, pdf: dd.DataFrame):
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
returns a Future<None>
"""
return self._submit_with_trace(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
f_pdf = await self.client.scatter(pdf)
return await self._submit_with_trace(pandas_to_parquet, f_pdf, path,
self._parameters.storage_options)
def _check_incoming_chunk(self, df):
@staticmethod
def _check_incoming_chunk(df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
......@@ -219,17 +227,17 @@ class DaskBulkStorage:
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
@handle_pyarrow_exceptions
@internal_bulk_exceptions
@capture_timings('save_blob', handlers=worker_capture_timing_handlers)
@with_trace('save_blob')
async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
# TODO: The new bulk_id should contain information about the way we store the bulk
# In the future, if we change the way we store chunk it could be useful to deduce it from the bulk_uri
bulk_id = bulk_id or BulkId.new_bulk_id()
if isinstance(ddf, pd.DataFrame):
self._check_incoming_chunk(ddf)
ddf = dd.from_pandas(ddf, npartitions=1)
ddf = await self.client.scatter(ddf)
path = self._get_blob_path(record_id, bulk_id)
try:
......@@ -261,14 +269,9 @@ class DaskBulkStorage:
with self._fs.open(f'{session_path_wo_protocol}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf)}, outfile)
# could be done asynchronously in the workers but it as a cost
# we may want to be async if the dataFrame is big
session_path = self._build_path_from_session(session)
# await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
# TODO: Warning this is a sync CPU bound operation
pdf.to_parquet(f'{session_path}/{filename}.parquet', index=True,
storage_options=self._parameters.storage_options, engine='pyarrow')
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
@capture_timings('get_session_parquet_files')
@with_trace('get_session_parquet_files')
......@@ -304,16 +307,15 @@ class DaskBulkStorage:
@capture_timings('session_commit')
@with_trace('session_commit')
@handle_pyarrow_exceptions
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
dfs = self._map_with_trace(set_index, dfs)
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
while len(dfs) > 1:
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
......
......@@ -20,6 +20,8 @@ from logging import INFO
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -71,24 +73,25 @@ class SessionFileMeta:
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf): # TODO
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
ddf['_idx'] = ddf.index # we need to create a temporary variable to set it as index
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx', sorted=True)
if not ddf.known_divisions:
return ddf.set_index(ddf.index, sorted=True)
return ddf
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1, df2):
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
return ddf
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import asyncio
from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List, Dict
......
......@@ -14,10 +14,12 @@
import uuid
from asyncio import gather, iscoroutinefunction
from typing import List
from app.model import model_utils
from fastapi import FastAPI, HTTPException, status
from odes_storage.models import *
from fastapi import HTTPException, status
from odes_storage.models import (CreateUpdateRecordsResponse, Record,
RecordVersions)
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.exceptions import ResourceNotFoundException
from osdu.core.api.storage.tenant import Tenant
......@@ -116,11 +118,11 @@ class StorageRecordServiceBlobStorage:
skipped_record_ids=[])
async def get_record_version(self,
id: str,
version: int,
data_partition_id: str = None,
appkey: str = None,
token: str = None) -> Record:
id: str,
version: int,
data_partition_id: str = None,
appkey: str = None,
token: str = None) -> Record:
await self._check_auth(appkey, token)
try:
object_name = await self._build_record_path(id, data_partition_id, version=version)
......
......@@ -188,6 +188,11 @@ class ConfigurationContainer:
description="""Comma separated list of module names to load.""",
default="log_recognition.routers.log_recognition") # Add modules to the list once they are refactored, so that they are included
min_worker_memory: EnvVar = EnvVar(
key='MIN_WORKER_MEMORY',
description='Min amount of memory for one worker',
default="512Mi")
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
......
......@@ -33,9 +33,9 @@ class LogBulkHelper:
return record.data
@classmethod
def _set_bulk_id_in_wks(cls, record: Record, bulk_id) -> None:
def _set_bulk_id_in_wks(cls, record: Record, bulk_id, prefix: str) -> None:
""" for now it used externalIds, to _get_bulk_id_from_wksbe updated once schema is fixed with log.data.bulkId """
bulk_urn = BulkId.bulk_urn_encode(bulk_id)
bulk_urn = BulkId.bulk_urn_encode(bulk_id, prefix=prefix)
cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_urn
@classmethod
......@@ -49,7 +49,7 @@ class LogBulkHelper:
@classmethod
def update_bulk_id(
cls, record: Record, bulk_id, custom_bulk_id_path: Optional[str] = None
cls, record: Record, bulk_id, custom_bulk_id_path: Optional[str] = None, prefix: Optional[str] = None
):
"""
Update bulk id within a log record. Note that the custom path cannot be applied when using a strict structured model
......@@ -59,7 +59,7 @@ class LogBulkHelper:
:param custom_bulk_id_path: !! incompatible with log model
"""
if custom_bulk_id_path is None: # what about empty string ?
cls._set_bulk_id_in_wks(record, bulk_id)
cls._set_bulk_id_in_wks(record, bulk_id, prefix)
else:
record_dict = {"data": record.data}
......@@ -69,7 +69,7 @@ class LogBulkHelper:
json_exp.find(record_dict)[0].value[
field_name
] = BulkId.bulk_urn_encode(bulk_id)
] = BulkId.bulk_urn_encode(bulk_id, prefix=prefix)
# if only support existing field, it can be done with a simple update call
# parse_jsonpath(custom_bulk_id_path).update(record, bulk_ref)
record.data = record_dict["data"]
......
......@@ -34,7 +34,7 @@ class GetDataParams:
example=100),
curves: Optional[str] = Query(
default=None,
description='Filters curves. List of curves to be returned.',
description='Filters curves. List of curves to be returned. The curves are returned in the same order as it is given.',
example='MD,GR'),
describe: Optional[bool] = Query(
default=False,
......
......@@ -22,7 +22,6 @@ from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
from app.bulk_persistence.mime_types import MimeTypes
from app.model.model_chunking import GetDataParams
from app.model.log_bulk import LogBulkHelper
from app.utils import Context, OpenApiHandler, get_ctx
from app.persistence.sessions_storage import (Session, SessionException, SessionState, SessionUpdateMode)
from app.routers.common_parameters import (
......@@ -33,11 +32,15 @@ from app.routers.common_parameters import (
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
WithSessionStorages, get_session_dependencies)
from app.routers.record_utils import fetch_record
from app.routers.bulk.utils import (
with_dask_blob_storage, get_check_input_df_func, get_df_from_request,get_bulk_uri_osdu,
set_bulk_field_and_send_record, BULK_URN_PREFIX_VERSION, DataFrameRender)
from app.routers.bulk.utils import (with_dask_blob_storage, get_check_input_df_func, get_df_from_request,
set_bulk_field_and_send_record, DataFrameRender)
from app.routers.bulk.bulk_uri_dependencies import (get_bulk_id_access, BulkIdAccess,
BULK_URN_PREFIX_VERSION)
from app.helper.traces import with_trace
from osdu.core.api.storage.exceptions import ResourceNotFoundException
router = APIRouter() # router dedicated to bulk APIs
OPERATION_IDS = {"record_data": "write_record_data",
......@@ -66,6 +69,7 @@ async def post_data(record_id: str,
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
check_input_df_func=Depends(get_check_input_df_func),
bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access),
):
@with_trace("save_blob")
async def save_blob():
......@@ -77,7 +81,8 @@ async def post_data(record_id: str,
fetch_record(ctx, record_id),
save_blob()
)
return await set_bulk_field_and_send_record(ctx=ctx, bulk_id=bulk_id, record=record)
return await set_bulk_field_and_send_record(ctx=ctx, bulk_id=bulk_id, record=record, bulk_uri_access=bulk_uri_access)
@OpenApiHandler.set(operation_id=OPERATION_IDS["chunk_data"], request_body=REQUEST_DATA_BODY_SCHEMA)
......@@ -136,14 +141,10 @@ async def get_data_version(
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access)
):
record = await fetch_record(ctx, record_id, version)
bulk_urn = get_bulk_uri_osdu(record)
if bulk_urn is not None: