Newer
Older
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ['Config',
'ConfigurationContainer',
'check_environment',
'cloud_provider_additional_environment',
'validator_path_must_exist']
logger = logging.getLogger('configuration')
@dataclass
class EnvVar:
key: str
description: str = ''
secret: bool = False
default: Optional[str] = None
value: Optional[Any] = None
allowed_values: Optional[List[Any]] = None # if value not in the given list, it's reassigned to None
is_mandatory: bool = False
factory: Optional[Callable[[str], Any]] = None # transform input value into the target
validator: Optional[Callable[[Any], int]] = None # value is always check if None
def load(self, environment_dict):
value = environment_dict.get(self.key, self.default)
if self.factory is not None and value is not None:
value = self.factory(value)
if self.allowed_values is None or value in self.allowed_values:
self.value = value
def __call__(self):
return self.value
def __str__(self):
return f'{self.key} = {self.printable_value}'
def __bool__(self):
if self.value is None:
return False
return True if self.validator is None else self.validator(self.value)
@property
def printable_value(self) -> str:
return 'UNDEFINED'
if self.secret:
return '*****'
return str(self.value)
def validator_path_must_exist(path: str):
return os.path.exists(path)
@dataclass(repr=False, eq=False)
class ConfigurationContainer:
"""
Gather any static environment variables and other variable. It's possible to add other at runtime or override them.
Add method also add it as attribute of the current instance.
Environment variable are declared as type EnvVar, then to access the value must do:
config.env_var_attribute.value
Environment variable can be get also by the key of this environment variable. For instance if declared like that:
path_env_var: EnvVar = EnvVar(key='PATH')
then the value can be access:
path_value = config.path_env_var.value
path_value = config['path_env_var']
path_value = config['PATH']
use env_var.printable_value instead of env_var.value when the goal is to log/display it.
"""
service_name: EnvVar = EnvVar(
key='SERVICE_NAME',
description='Display name of the service when exporting entries for logging and tracing',
default='os-wellbore-ddms---local'
)
environment_name: EnvVar = EnvVar(
key='ENVIRONMENT_NAME',
description='Environment name',
default='undefined'
)
cloud_provider: EnvVar = EnvVar(
key='CLOUD_PROVIDER',
description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm',
allowed_values=['aws', 'gcp', 'az', 'local', 'ibm'],
service_host_search: EnvVar = EnvVar(
key='SERVICE_HOST_SEARCH',
description='Back-end for search service',
service_host_storage: EnvVar = EnvVar(
key='SERVICE_HOST_STORAGE',
description='Back-end for storage service',
is_mandatory=True)
de_client_config_timeout: EnvVar = EnvVar(
key='DE_CLIENT_CFG_TIMEOUT',
description='set connect, read, write, and pool timeouts (in seconds) for all DE client.',
factory=lambda x: int(x))
de_client_config_max_connection: EnvVar = EnvVar(
key='DE_CLIENT_CFG_MAX_CONNECTION',
description='maximum number of allowable connections, 0 to always allow.',
factory=lambda x: int(x))
de_client_config_max_keepalive: EnvVar = EnvVar(
key='DE_CLIENT_CFG_MAX_KEEPALIVE',
description='number of allowable keep-alive connections, 0 to always allow.',
de_client_backoff_max_tries: EnvVar = EnvVar(
key='DE_CLIENT_BACKOFF_MAX_RETRIES',
description="""The maximum number of attempts to make before giving
up. Once exhausted, the exception will be allowed to escape.
The default value of None means their is no limit to the
number of tries.""",
default='4',
factory=lambda x: int(x))
de_client_backoff_max_wait: EnvVar = EnvVar(
key='DE_CLIENT_BACKOFF_MAX_WAIT',
description="""The maximum wait in second between retry. """,
default='5',
factory=lambda x: int(x))
build_details: EnvVar = EnvVar(
key='OS_WELLBORE_DDMS_BUILD_DETAILS',
description='contains optional extra information of the build, format is the multiple "key=value" separated'
'by ;',
default='')
dev_mode: EnvVar = EnvVar(
key='OS_WELLBORE_DDMS_DEV_MODE',
description='dev mode',
default='false',
factory=lambda x: x.lower() == 'true' or x == '1')
openapi_prefix: EnvVar = EnvVar(
key='OPENAPI_PREFIX',
description='specify the base path for the openapi doc, in case deployed beind a proxy',
default='/api/os-wellbore-ddms')
custom_catalog_timeout: EnvVar = EnvVar(
key='CUSTOM_CATALOG_TIMEOUT',
description='Timeout to invalidate custom catalog in seconds',
default='300',
factory=lambda x: int(x))
modules: EnvVar = EnvVar(
key='MODULES',
description="""Comma separated list of module names to load.""",
default="log_recognition.routers.log_recognition") # Add modules to the list once they are refactored, so that they are included
Cyril Monmouton
committed
min_worker_memory: EnvVar = EnvVar(
key='MIN_WORKER_MEMORY',
description='Min amount of memory for one worker',
default="512Mi")
dask_data_ipc: EnvVar = EnvVar(
key='DASK_DATA_IPC',
description='Specify data IPC type between main process and dask workers',
default='dask_native',
allowed_values=['dask_native', 'local_file'],
factory=lambda x: x.lower()
)
max_columns_return: EnvVar = EnvVar(
key='MAX_COLUMNS_RETURN',
description='Max number of columns that can be returned per data request',
default="500",
factory=lambda x: int(x))
max_columns_per_chunk_write: EnvVar = EnvVar(
key='MAX_COLUMNS_PER_CHUNK_WRITE',
description='Max number of columns that can be write per chunk',
default="500",
factory=lambda x: int(x))
_environment_dict: Dict = os.environ
_contextual_loader: Callable = None
def add(self, name: str, value: Any, *, override: bool = False):
""" add a custom """
if not override and name in self.__dict__:
raise KeyError(name + ' already exists')
self.__setattr__(name, value)
def add_from_env(self,
env_var_key: str,
attribute_name: Optional[str] = None,
description: str = '',
secret: bool = False,
default: Optional[str] = None,
factory: Optional[Callable[[str], Any]] = None,
*, override: bool = False) -> Optional:
env_var = EnvVar(key=env_var_key,
description=description,
secret=secret,
default=default,
factory=factory,
allowed_values=allowed_values,
is_mandatory=is_mandatory,
validator=validator)
env_var.load(self._environment_dict)
self.add(attribute_name or env_var_key, env_var, override=override)
return env_var.value
@classmethod
def with_load_all(cls, environment_dict=os.environ, contextual_loader=None):
inst = cls(_environment_dict=environment_dict,
_contextual_loader=contextual_loader)
inst.reload()
return inst
def reload(self, environment_dict=None):
if environment_dict is not None:
self._environment_dict = environment_dict
for var in self.env_vars():
var.load(self._environment_dict)
if self._contextual_loader is not None:
self._contextual_loader(self)
def __getitem__(self, name):
""" look for any declared attribute and env var key """
attribute = self.get_env_or_attribute(name)
if attribute is None: # fallback into environment dict
return self._environment_dict[name]
return attribute.value if isinstance(attribute, EnvVar) else attribute
def get(self, name, default=None):
return self.__getattribute__(name)
return next((v for v in self.env_vars() if v.key == name), None)
def __contains__(self, name) -> bool:
if name in self.__dict__:
return True
return any([v.key == name for v in self.env_vars()])
def __repr__(self):
return ', '.join([f'{k}={v}' for k, v in self.as_printable_dict().items()])
def as_printable_dict(self) -> Dict[str, str]:
return {
name:
att.printable_value if isinstance(att, EnvVar)
else att for name, att in self.__dict__.items()
if not name.startswith('_')}
def env_vars(self):
""" generator of all env vars only """
for name, attribute in self.__dict__.items():
if isinstance(attribute, EnvVar):
yield attribute
def cloud_provider_additional_environment(config: ConfigurationContainer):
provider = config.cloud_provider.value
if provider == 'az':
config.add_from_env(attribute_name='az_ai_instrumentation_key',
env_var_key='AZ_AI_INSTRUMENTATION_KEY',
description='azure app insights instrumentation key',
secret=True,
is_mandatory=True,
override=True)
config.add_from_env(attribute_name='az_logger_level',
env_var_key='AZ_LOGGER_LEVEL',
description='azure logger level',
default='INFO',
secret=False,
is_mandatory=False,
override=True)
config.az_bulk_container = 'wdms-osdu'
if provider == 'gcp':
config.add_from_env(attribute_name='default_data_tenant_project_id',
env_var_key='OS_WELLBORE_DDMS_DATA_PROJECT_ID',
description='GCP data tenant ID',
default='logstore-dev',
is_mandatory=True,
override=True)
config.add_from_env(attribute_name='default_data_tenant_credentials',
env_var_key='OS_WELLBORE_DDMS_DATA_PROJECT_CREDENTIALS',
description='path to the key file of the SA to access the data tenant',
validator=validator_path_must_exist,
default=None)
config.add_from_env(attribute_name='service_host_storage',
env_var_key='SERVICE_HOST_STORAGE',
description='Back-end for storage service',
is_mandatory=False,
override=True,
default='http://storage/api/storage')
config.add_from_env(attribute_name='service_host_search',
env_var_key='SERVICE_HOST_SEARCH',
description='Back-end for search service',
is_mandatory=False,
override=True,
default='http://search/api/search')
if provider == 'ibm':
config.add_from_env(attribute_name='default_data_tenant_project_id',
env_var_key='OS_WELLBORE_DDMS_DATA_PROJECT_ID',
description='IBM data tenant ID',
default='logstore-ibm',
is_mandatory=True,
override=True)
if provider == 'aws':
config.add_from_env(attribute_name='aws_region',
env_var_key='AWS_REGION',
description='AWS data tenant ID',
default='us-east-1',
is_mandatory=True,
override=True)
config.add_from_env(attribute_name='aws_env',
env_var_key='ENVIRONMENT',
description='AWS ResourcePrefix',
default='osdu-',
is_mandatory=True,
override=True)
Config = ConfigurationContainer.with_load_all(contextual_loader=cloud_provider_additional_environment)
def check_environment(configuration):
"""
The goal is to fail fast and provide meaningfully report in case of error to ease any fix/debug
We may generalize and isolate this in each module (some implementation may need specific setup,
e.g. some Azure impl may require an dedicated env var to some valid file).
For now keep every rules here and review it later.
"""
logger.info('Environment configuration:')
for k, v in configuration.as_printable_dict().items():
logger.info(f' - {k} = {v}')
mandatory_variables = [v for v in configuration.env_vars()
if v.is_mandatory and not v]
errors = [f'env var {v.key} ({v.description}) is undefined or invalid, current value={os.environ.get(v.key)}'
for v in mandatory_variables]
logger_level = logger.warning if configuration.dev_mode.value else logger.error
for err in errors:
logger_level(err)
# handle errors, in no dev mode exit immediately
if any(errors):
if configuration.dev_mode.value:
logger.error('!!! The current environment is not correctly setup to run the service, see logs !!!')
else: # just abort
raise RuntimeError('Incorrect environment: ' + ', '.join(errors))
AUTHORIZATION_HEADER_NAME = 'Authorization'
APP_KEY_HEADER_NAME = 'appKey'
CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id'
MODULES_PATH_PREFIX = 'app.modules'
X_USER_ID_HEADER_NAME = 'x-user-id'