Skip to content
Snippets Groups Projects
Commit 78a619c2 authored by Alexandre Vincent's avatar Alexandre Vincent
Browse files

Merge branch 'wdms_temp_dir_in_bulk_persistence' into 'master'

move get_wdms_temp_dir in bulk_persistence

See merge request !453
parents 70b7a4dc 15adb0c6
No related branches found
No related tags found
1 merge request!453move get_wdms_temp_dir in bulk_persistence
Pipeline #99278 passed with warnings
......@@ -18,7 +18,7 @@ The following software have components provided under the terms of this license:
- google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python, https://github.com/googleapis/python-cloud-core)
- google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-trace (from https://github.com/googleapis/googleapis)
- googleapis-common-protos (from https://github.com/googleapis/googleapis)
......
......@@ -19,4 +19,7 @@ from .json_orient import JSONOrient
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from .exceptions import UnknownChannelsException, InvalidBulkException, NoBulkException, NoDataException, RecordNotFoundException
from .consistency_checks import ConsistencyException, DataConsistencyChecks
\ No newline at end of file
from .consistency_checks import ConsistencyException, DataConsistencyChecks
# TMP: this should probably not be exposed outside of the bulk_persistence package
from .temp_dir import get_temp_dir
......@@ -37,7 +37,7 @@ import pyarrow.feather as feather
import pyarrow.parquet as pq
from app.pool_executor import get_pool_executor
from app.utils import get_wdms_temp_dir
from .temp_dir import get_temp_dir
from .dataframe_serializer import DataframeSerializerAsync
from .blob_bulk import BlobBulk
......@@ -201,7 +201,7 @@ async def create_and_write_blob(
# Build the output filename which will be used as bulk id
blob_id = blob_id or str(uuid.uuid4())
out_filename = blob_id + file_exporter.mime_type.extension
out_path = path.join(out_dir or get_wdms_temp_dir(), out_filename)
out_path = path.join(out_dir or get_temp_dir(), out_filename)
# Dump/Export the dataframe into a file format
export_to_file_function = custom_export_to_file_fn or file_exporter.writer_fn
......
from . import dask_config
import dask
from ..temp_dir import get_temp_dir
dask.config.set({'temporary_directory': get_temp_dir()})
......@@ -20,7 +20,7 @@ from contextlib import asynccontextmanager, contextmanager, suppress
from typing import Union, AsyncGenerator, AsyncContextManager
from dask.utils import format_bytes
from app.utils import get_wdms_temp_dir
from ..temp_dir import get_temp_dir
from app.helper.logger import get_logger
......@@ -116,7 +116,7 @@ class DaskLocalFileDataIPC:
log_usage_error_threshold = 5 * GiB
def __init__(self, base_folder=None, io_chunk_size=50*MiB):
self._base_folder = base_folder or get_wdms_temp_dir()
self._base_folder = base_folder or get_temp_dir()
self._io_chunk_size = io_chunk_size
class _AsyncSetterContextManager:
......
import tempfile
from os import path, makedirs
def _setup_temp_dir() -> str:
tmpdir = tempfile.gettempdir()
if not tmpdir.endswith('wdmsosdu'):
tmpdir = path.join(tmpdir, 'wdmsosdu')
makedirs(tmpdir, exist_ok=True)
tempfile.tempdir = tmpdir
return tmpdir
_TEMP_DIR = _setup_temp_dir()
def get_temp_dir():
return _TEMP_DIR
......@@ -29,6 +29,7 @@ from dask.distributed import Client as DaskDistributedClient
from distributed import system, LocalCluster
from distributed.deploy.utils import nprocesses_nthreads
from .bulk_persistence import get_temp_dir
from .context import Context
from app.model.user import User
from app.injector.app_injector import AppInjector
......@@ -71,6 +72,7 @@ class DaskClient:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
get_logger().info(f"Dask using temporary directory: {get_temp_dir()}")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
......@@ -160,22 +162,6 @@ class DaskClient:
DaskClient.client = None
def _setup_temp_dir() -> str:
tmpdir = tempfile.gettempdir()
if not tmpdir.endswith('wdmsosdu'):
tmpdir = path.join(tmpdir, 'wdmsosdu')
makedirs(tmpdir, exist_ok=True)
tempfile.tempdir = tmpdir
return tmpdir
WDMS_TEMP_DIR = _setup_temp_dir()
def get_wdms_temp_dir():
return WDMS_TEMP_DIR
async def async_with_cache(cache, key: str, fn_coroutine, *args, **kwargs):
try:
return cache[key]
......@@ -328,4 +314,3 @@ class __OpenApiHandler:
OpenApiHandler = __OpenApiHandler()
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
......@@ -57,7 +57,6 @@ from app.pool_executor import run_in_pool_executor
from app.utils import (
get_http_client_session,
OpenApiHandler,
get_wdms_temp_dir,
DaskClient,
POOL_EXECUTOR_MAX_WORKER)
from app.routers.bulk.utils import (
......@@ -137,7 +136,6 @@ async def startup_event():
assert sys.version_info.major == 3 and sys.version_info.minor >= 8, 'Python version required >=3.8'
check_environment(Config)
print('using temporary directory:', get_wdms_temp_dir())
MainInjector().configure(app_injector)
wdms_app.trace_exporter = traces.create_exporter(service_name=service_name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment