Draft: trace exporter moved inside dask pluging setup
2 unresolved threads
2 unresolved threads
global variable Config usage removed
Merge request reports
Activity
assigned to @Vernet
14 14 15 15 from dask.distributed import WorkerPlugin 16 16 from app.helper.logger import get_logger, init_logger 17 from app.conf import Config as AppConfig 17 from app.conf import ConfigurationContainer 18 19 from .traces import init_trace_exporter 18 20 19 21 20 22 class DaskWorkerPlugin(WorkerPlugin): 21 23 22 def __init__(self, service_name, logger=None, register_fsspec_implementation=None) -> None: 24 def __init__(self, service_name, @Vernet Very good idea to explicitly create the traces exporter and logger at Worker initialization!
30 @param logger: to use inside worker, if not a default one is created at worker setup based on config 31 @param register_fsspec_implementation: see DaskStorageParameters 32 @param app_config: specific application config to use, if none will use environment variables 33 """ 23 34 self.worker = None 24 35 self._register_fsspec_implementation = register_fsspec_implementation 25 36 self._service_name = service_name 37 self._app_config = app_config 26 38 27 39 super().__init__() 28 40 logger.debug("WorkerPlugin initialised") 29 41 30 42 def setup(self, worker): 31 init_logger(service_name=self._service_name, config=AppConfig) 43 # Note: this setup is run inside the dask worker process 44 app_config = self._app_config or ConfigurationContainer.with_cloud_provider_env()
Please register or sign in to reply