Skip to content
Snippets Groups Projects
Commit 30cf3d5a authored by Alexandre Vincent's avatar Alexandre Vincent
Browse files

rename wdms_temp_dir to temp_dir

parent 6035addc
No related branches found
No related tags found
1 merge request!453move get_wdms_temp_dir in bulk_persistence
Pipeline #99117 failed
......@@ -21,4 +21,4 @@ from .tenant_provider import resolve_tenant
from .exceptions import UnknownChannelsException, InvalidBulkException, NoBulkException, NoDataException, RecordNotFoundException
from .consistency_checks import ConsistencyException, DataConsistencyChecks
from .wdms_temp_dir import get_wdms_temp_dir
\ No newline at end of file
from .temp_dir import get_temp_dir
......@@ -37,7 +37,7 @@ import pyarrow.feather as feather
import pyarrow.parquet as pq
from app.pool_executor import get_pool_executor
from .wdms_temp_dir import get_wdms_temp_dir
from .temp_dir import get_temp_dir
from .dataframe_serializer import DataframeSerializerAsync
from .blob_bulk import BlobBulk
......@@ -201,7 +201,7 @@ async def create_and_write_blob(
# Build the output filename which will be used as bulk id
blob_id = blob_id or str(uuid.uuid4())
out_filename = blob_id + file_exporter.mime_type.extension
out_path = path.join(out_dir or get_wdms_temp_dir(), out_filename)
out_path = path.join(out_dir or get_temp_dir(), out_filename)
# Dump/Export the dataframe into a file format
export_to_file_function = custom_export_to_file_fn or file_exporter.writer_fn
......
import dask
from ..wdms_temp_dir import get_wdms_temp_dir
from ..temp_dir import get_temp_dir
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
dask.config.set({'temporary_directory': get_temp_dir()})
......@@ -20,7 +20,7 @@ from contextlib import asynccontextmanager, contextmanager, suppress
from typing import Union, AsyncGenerator, AsyncContextManager
from dask.utils import format_bytes
from ..wdms_temp_dir import get_wdms_temp_dir
from ..temp_dir import get_temp_dir
from app.helper.logger import get_logger
......@@ -116,7 +116,7 @@ class DaskLocalFileDataIPC:
log_usage_error_threshold = 5 * GiB
def __init__(self, base_folder=None, io_chunk_size=50*MiB):
self._base_folder = base_folder or get_wdms_temp_dir()
self._base_folder = base_folder or get_temp_dir()
self._io_chunk_size = io_chunk_size
class _AsyncSetterContextManager:
......
import tempfile
from os import path, makedirs
def _setup_temp_dir() -> str:
tmpdir = tempfile.gettempdir()
if not tmpdir.endswith('wdmsosdu'):
......@@ -10,8 +11,8 @@ def _setup_temp_dir() -> str:
return tmpdir
WDMS_TEMP_DIR = _setup_temp_dir()
_TEMP_DIR = _setup_temp_dir()
def get_wdms_temp_dir():
return WDMS_TEMP_DIR
def get_temp_dir():
return _TEMP_DIR
......@@ -29,7 +29,7 @@ from dask.distributed import Client as DaskDistributedClient
from distributed import system, LocalCluster
from distributed.deploy.utils import nprocesses_nthreads
from .bulk_persistence import get_wdms_temp_dir
from .bulk_persistence import get_temp_dir
from .context import Context
from app.model.user import User
from app.injector.app_injector import AppInjector
......@@ -72,7 +72,7 @@ class DaskClient:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
get_logger().info(f"Dask using temporary directory: {get_wdms_temp_dir()}")
get_logger().info(f"Dask using temporary directory: {get_temp_dir()}")
n_workers, threads_per_worker, worker_memory_limit = DaskClient._get_dask_configuration(logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment