Commit 603f3a28 authored by Yannick's avatar Yannick
Browse files

trace exporter moved inside dask pluging setup

global variable Config usage removed
parent 0bcab85f
Pipeline #114640 failed with stage
in 46 seconds
......@@ -14,21 +14,36 @@
from dask.distributed import WorkerPlugin
from app.helper.logger import get_logger, init_logger
from app.conf import Config as AppConfig
from app.conf import ConfigurationContainer
from .traces import init_trace_exporter
class DaskWorkerPlugin(WorkerPlugin):
def __init__(self, service_name, logger=None, register_fsspec_implementation=None) -> None:
def __init__(self, service_name,
logger=None, register_fsspec_implementation=None, app_config: ConfigurationContainer=None) -> None:
Dask worker plugin used to setup Dask worker. It will be used to setup logging, tracing and the ffspec params
(especially storage account and credentials)
@param service_name:
@param logger: to use inside worker, if not a default one is created at worker setup based on config
@param register_fsspec_implementation: see DaskStorageParameters
@param app_config: specific application config to use, if none will use environment variables
self.worker = None
self._register_fsspec_implementation = register_fsspec_implementation
self._service_name = service_name
self._app_config = app_config
logger.debug("WorkerPlugin initialised")
def setup(self, worker):
init_logger(service_name=self._service_name, config=AppConfig)
# Note: this setup is run inside the dask worker process
app_config = self._app_config or ConfigurationContainer.with_cloud_provider_env()
init_logger(service_name=self._service_name, config=app_config)
self.worker = worker
if self._register_fsspec_implementation:
from typing import Callable, Union
from enum import Enum
from dask.distributed import Client
import pandas as pd
from dask.utils import funcname
from dask.base import tokenize
from dask.distributed import Client
from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
from app.conf import Config
from app.helper import traces
from app.context import get_ctx
from opencensus.trace import execution_context
......@@ -19,17 +18,24 @@ from . import dask_worker_write_bulk as bulk_writer
def wrap_trace_process(*args, **kwargs):
def init_trace_exporter(app_config):
# TODO should we disabled if cloud_provider == local ?
global _EXPORTER
if _EXPORTER is not None:
print("Warning: bulk race exporter already created")
tracing_headers = kwargs.pop('tracing_headers')
_EXPORTER = traces.create_exporter(service_name=app_config.service_name, config=app_config)
def _wrap_trace_process(*args, **kwargs):
target_func = kwargs.pop('target_func')
if _EXPORTER is None:
return target_func(*args, **kwargs)
tracing_headers = kwargs.pop('tracing_headers')
if not tracing_headers or not target_func:
raise AttributeError("Keyword arguments should contain 'target_func' and 'tracing_headers'")
if _EXPORTER is None:
_EXPORTER = traces.create_exporter(service_name=Config.service_name, config=Config)
span_context = traces.get_trace_propagator().from_headers(tracing_headers)
tracer = open_tracer.Tracer(span_context=span_context,
......@@ -59,7 +65,7 @@ def submit_with_trace(dask_client: Client, target_func: Callable, *args, **kwarg
dask_task_key = _create_func_key(target_func, *args, **kwargs)
return dask_client.submit(wrap_trace_process, *args, key=dask_task_key, **kwargs)
return dask_client.submit(_wrap_trace_process, *args, key=dask_task_key, **kwargs)
def map_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
......@@ -74,7 +80,7 @@ def map_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
dask_task_key = _create_func_key(target_func, *args, **kwargs)
return, *args, key=dask_task_key, **kwargs)
return, *args, key=dask_task_key, **kwargs)
class TracingMode(Enum):
......@@ -234,6 +234,10 @@ class ConfigurationContainer:
return inst
def with_cloud_provider_env(cls, environment_dict=os.environ):
return cls.with_load_all(environment_dict, cloud_provider_additional_environment)
def reload(self, environment_dict=None):
if environment_dict is not None:
self._environment_dict = environment_dict
......@@ -370,6 +374,8 @@ def cloud_provider_additional_environment(config: ConfigurationContainer):
# Global config instance
Config = ConfigurationContainer.with_load_all(contextual_loader=cloud_provider_additional_environment)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment