Commit 30fde309 authored by Sutton's avatar Sutton
Browse files

Mergin

parents 1b461899 a467c9e7
......@@ -13,95 +13,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, os
import importlib
import yaml # MIT license
import requests
from airflow.models import Variable
from osdu_api.libs.configuration.config_manager import ConfigManager
from osdu_api.libs.context.context import Context
from osdu_api.libs.exceptions.exceptions import UnknownRequestMethodError, ConfigurationError
from osdu_api.libs.refresh_token.refresh_token import TokenRefresher, authorize
from osdu_api.model.http_method import HttpMethod
'''
Base client that is meant to be extended by service specific clients
'''
class BaseClient:
"""
Base client that is meant to be extended by service specific clients.
"""
'''
Base client gets initialized with configuration values and a bearer token
based on provider-specific logic
'''
def __init__(self):
self._read_variables()
self.bearer_token = self._get_bearer_token()
'''
The path to the logic to get a valid bearer token is dynamically injected based on
what provider and entitlements module name is provided in the configuration yaml
'''
def _get_bearer_token(self):
entitlements_client = importlib.import_module(f"osdu_api.provider.{self.provider}.{self.entitlements_module_name}")
return entitlements_client.get_bearer_token()
def __init__(self, token_refresher: TokenRefresher, context: Context):
self._config_manager = ConfigManager()
self.token_refresher = token_refresher
self.data_partition_id = context.data_partition_id
'''
Parses a yaml filed named osdu_api.yaml. All config values listed below are meant to
be required except URLs to specific services which may or may not be used depending
on the specific script
'''
def _parse_config(self):
config_file_location = os.path.join(sys.path[0], 'osdu_api.yaml')
with open(config_file_location, 'r') as config_file:
config = yaml.load(config_file)
self.data_partition_id = self._parse_config_value(config, 'data_partition_id', True)
self.storage_url = self._parse_config_value(config, 'storage_url', False)
self.search_url = self._parse_config_value(config, 'search_url', False)
self.provider = self._parse_config_value(config, 'provider', True)
self.entitlements_module_name = self._parse_config_value(config, 'entitlements_module_name', True)
def get_config_value(self, value: str) -> str:
return self._config_manager.get_config_value(value)
'''
Read Airflow variables
'''
def _read_variables(self):
self.storage_url = Variable.get('storage_url')
self.search_url = Variable.get('search_url')
self.provider = Variable.get('provider')
self.entitlements_module_name = Variable.get('entitlements_module_name')
'''
Used during parsing of the yaml config file. Will raise an exception if a required config
value is missing
'''
def _parse_config_value(self, config, config_name, is_required):
config_value = ''
try:
config_value = config[config_name]
except TypeError:
if(is_required):
raise Exception('Config value %s missing and is required' % config_name)
else:
print('Config value %s missing' % config_name)
return config_value
@authorize()
def _send_request(self, headers: dict, url: str, params: dict, data: str, method: HttpMethod) -> requests.Response:
if method is HttpMethod.GET:
response = requests.get(url=url, params=params, headers=headers)
elif method is HttpMethod.POST:
response = requests.post(url=url, params=params, data=data, headers=headers)
elif method is HttpMethod.PUT:
response = requests.put(url=url, params=params, data=data, headers=headers)
else:
raise UnknownRequestMethodError
return response
'''
Makes a request using python's built in requests library. Takes additional headers if
necessary
'''
def make_request(self, method: HttpMethod, url: str, data = '', add_headers = {}, params = {}):
def make_request(
self,
method: HttpMethod,
url: str,
data: str = '',
add_headers: dict = None,
params: dict = None
) -> requests.Response:
"""
Makes a request using python's built in requests library. Takes additional headers if
necessary
"""
params = params or {}
add_headers = add_headers or {}
headers = {
'content-type': 'application/json',
'data-partition-id': self.data_partition_id,
'Authorization': self.bearer_token
}
if (len(add_headers) > 0):
for key, value in add_headers:
headers[key] = value
response = object
if (method == HttpMethod.GET):
response = requests.get(url=url, params=params, headers=headers)
elif (method == HttpMethod.POST):
response = requests.post(url=url, params=params, data=data, headers=headers)
elif (method == HttpMethod.PUT):
response = requests.put(url=url, params=params, data=data, headers=headers)
for key, value in add_headers.items():
headers[key] = value
response = self._send_request(headers, url, params, data, method)
return response
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import os
import yaml
from osdu_api.libs.exceptions.exceptions import ConfigurationError
"""
This module is used for initializing configurations, such as OSDU API endpoints, vendor info etc.
Requires the environmental variable 'OSDU_API_CONFIG' to be specified as a path to yaml-config file.
If there is 'airflow_vars' in this yaml-file, then we get configs from Airflow Variables, otherwise we get all the configs
from this yaml-file.
"""
class ConfigSource(enum.Enum):
AIRFLOW = enum.auto()
YAML = enum.auto()
class ConfigManager(object):
"""
This configuration manager is used for getting different configurations for OSDU clients. Depending on the
configuration yaml-file, it can return configurations specified either in Airflow or in the yaml-file.
"""
def __init__(self):
self._config_source = None
self.configs = None
self._parse_yaml_config()
def get_config_value(self, value: str) -> str:
"""
:param value: configuration value we attempt to get. Example: 'storage_api_url'.
:return: configuration value.
"""
if self._config_source is ConfigSource.YAML:
return self._read_yaml_variable(value)
elif self._config_source is ConfigSource.AIRFLOW:
return self._read_airflow_variable(value)
def _parse_yaml_config(self):
"""
Open "OSDU_API_CONFIG" yaml-file.
If the variable 'airflow_vars' is specified in this file, then we assume, that configs are specified in Airflow variables.
Else if the variable 'yaml_vars' is specified, then we assume that all configs are stored in this file.
"""
config_file_location = os.environ.get("OSDU_API_CONFIG")
if not config_file_location:
raise ConfigurationError(
"Configuration file is not specified in the environment variable 'OSDU_API_CONFIG'")
with open(config_file_location, 'r') as config_file:
self.configs = yaml.load(config_file)
if self.configs.get("airflow_vars"):
import airflow
self.airflow_variables = airflow.models.Variable
self._config_source = ConfigSource.AIRFLOW
elif self.configs.get("yaml_vars"):
self._config_source = ConfigSource.YAML
else:
raise ConfigurationError(
"There is no option for getting osdu_api configs. Either 'airflow_vars' or 'yaml_vars' must be set."
)
def _read_yaml_variable(self, value: str) -> str:
"""
:param value: configuration value we attempt to get from yaml-file. Example: 'storage_api_url'.
:return: configuration value
"""
try:
return self.configs[value]
except:
raise ConfigurationError(f"Can't get value '{value}' from configs.")
def _read_airflow_variable(self, value: str) -> str:
"""
:param value: configuration value we attempt to get from Airflow variables. Example: 'storage_api_url'.
:return: configuration value
"""
try:
return self.airflow_variables.get(value)
except:
raise ConfigurationError(f"Can't get value '{value}' from Airflow configs.")
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
@dataclasses.dataclass
class Context(object):
"""
Store data-partition-id and AppKey passed via Payload field of dagrun.conf.
"""
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
ctx_payload = ctx.pop('Payload')
ctx_obj = cls(
app_key=ctx_payload['AppKey'],
data_partition_id=ctx_payload['data-partition-id']
)
return ctx_obj
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class TokenRefresherNotPresentError(Exception):
"""
Raise when token refresher is not present in "refresh_token' decorator.
"""
pass
class UnknownRequestMethodError(Exception):
"""
Raise when it is an unknown request method.
"""
pass
class ConfigurationError(Exception):
"""
Raise when there is an error with configuration file.
"""
pass
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Callable, Union
from abc import ABC, abstractmethod
from functools import partial
from http import HTTPStatus
import requests
from osdu_api.libs.exceptions.exceptions import TokenRefresherNotPresentError
logger = logging.getLogger()
class TokenRefresher(ABC):
@abstractmethod
def refresh_token(self) -> str:
"""
Implement logics of refreshing token here.
"""
pass
@property
@abstractmethod
def access_token(self) -> str:
pass
@property
def authorization_header(self) -> dict:
"""
Must return authorization header for updating headers dict.
E.g. return {"Authorization": "Bearer <access_token>"}
"""
return {"Authorization": f"Bearer {self.access_token}"}
def make_callable_request(obj: Union[object, None], request_function: Callable, headers: dict,
*args, **kwargs) -> Callable:
"""
Create send_request_with_auth function.
"""
if obj: # if wrapped function is an object's method
callable_request = partial(request_function, obj, headers, *args, **kwargs)
else:
callable_request = partial(request_function, headers, *args, **kwargs)
return callable_request
def _validate_headers_type(headers: dict):
if not isinstance(headers, dict):
logger.error(f"Got headers {headers}")
raise TypeError(f"Request's headers type expected to be 'dict'. Got {dict}")
def _validate_response_type(response: requests.Response, request_function: Callable):
if not isinstance(response, requests.Response):
logger.error(f"Function or method {request_function}"
f" must return values of type 'requests.Response'. "
f"Got {type(response)} instead")
raise TypeError
def _validate_token_refresher_type(token_refresher: TokenRefresher):
if not isinstance(token_refresher, TokenRefresher):
raise TypeError(
f"Token refresher must be of type {TokenRefresher}. Got {type(token_refresher)}"
)
def _get_object_token_refresher(
token_refresher: TokenRefresher,
obj: object = None
) -> TokenRefresher:
"""
Check if token refresher passed into decorator or specified in object's as 'token_refresher'
property.
"""
if token_refresher is None and obj:
try:
obj.__getattribute__("token_refresher")
except AttributeError:
raise TokenRefresherNotPresentError("Token refresher must be passed into decorator or "
"set as object's 'refresh_token' attribute.")
else:
token_refresher = obj.token_refresher
return token_refresher
def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
**kwargs) -> requests.Response:
"""
Send request with authorization token. If response status is in HTTPStatus.UNAUTHORIZED or
HTTPStatus.FORBIDDEN, then refresh token and send request once again.
"""
obj = kwargs.pop("obj", None)
request_function = kwargs.pop("request_function")
headers = kwargs.pop("headers")
_validate_headers_type(headers)
headers.update(token_refresher.authorization_header)
send_request_with_auth = make_callable_request(obj, request_function, headers, *args, **kwargs)
response = send_request_with_auth()
_validate_response_type(response, request_function)
if not response.ok:
if response.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
token_refresher.refresh_token()
headers.update(token_refresher.authorization_header)
send_request_with_auth = make_callable_request(obj,
request_function,
headers,
*args, **kwargs)
response = send_request_with_auth()
try:
response.raise_for_status()
except requests.HTTPError as e:
logger.error(f"{response.text}")
raise e
return response
def authorize(token_refresher: TokenRefresher = None) -> Callable:
"""
Wrap a request function and check response. If response's error status code
is about Authorization, refresh token and invoke this function once again.
Expects function:
If response is not ok and not about Authorization, then raises HTTPError
request_func(header: dict, *args, **kwargs) -> requests.Response
Or method:
request_method(self, header: dict, *args, **kwargs) -> requests.Response
"""
def refresh_token_wrapper(request_function: Callable) -> Callable:
is_method = len(request_function.__qualname__.split(".")) > 1
if is_method:
def _wrapper(obj: object, headers: dict, *args, **kwargs) -> requests.Response:
_token_refresher = _get_object_token_refresher(token_refresher, obj)
_validate_token_refresher_type(_token_refresher)
return send_request_with_auth_header(_token_refresher,
request_function=request_function,
obj=obj,
headers=headers,
*args,
**kwargs)
else:
def _wrapper(headers: dict, *args, **kwargs) -> requests.Response:
_validate_token_refresher_type(token_refresher)
return send_request_with_auth_header(token_refresher,
request_function=request_function,
headers=headers,
*args, **kwargs)
return _wrapper
return refresh_token_wrapper
......@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
import enum
class HttpMethod(Enum):
GET = 0
POST = 1
PUT = 2
DELETE = 3
class HttpMethod(enum.Enum):
GET = enum.auto()
POST = enum.auto()
PUT = enum.auto()
DELETE = enum.auto()
......@@ -13,107 +13,95 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Holds the logic for interfacing with Storage's record api.
"""
import json
import logging
from typing import List
import requests
from osdu_api.base_client import BaseClient
from osdu_api.libs.context.context import Context
from osdu_api.libs.refresh_token.refresh_token import TokenRefresher
from osdu_api.model.record import Record
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for interfacing with Storage's record api
'''
class RecordClient(BaseClient):
logger = logging.getLogger()
'''
Calls storage's api endpoint createOrUpdateRecords taking a list of record objects and constructing
the body of the request
Returns the response object for the call
Example of code to new up a record:
acl = Acl(['data.test1@opendes.testing.com'], ['data.test1@opendes.testing.com'])
legal = Legal(['opendes-storage-1579034803194'], ['US'], LegalCompliance.compliant)
ancestry = RecordAncestry([])
id = 'opendes:welldb:123456'
kind = 'opendes:welldb:wellbore:1.0.0'
meta = [{}]
version = 0
data = {'id': 'test'}
record = Record(id, version, kind, acl, legal, data, ancestry, meta)
'''
def create_update_records(self, records: List[Record], headers: dict):