Commit f7f9f8e4 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Consolidated libs code

commit f726af9f 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 14:33:11 GMT-0500 (Central Daylight Time) 

    DS store removal


commit cbcf9507 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 14:05:50 GMT-0500 (Central Daylight Time) 

    Updating blob storage relative import


commit 4834c533 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 13:54:29 GMT-0500 (Central Daylight Time) 

    Updating paths again


commit 15521278 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 13:28:38 GMT-0500 (Central Daylight Time) 

    Fixing more broken paths


commit 0bc1f97d 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 13:13:20 GMT-0500 (Central Daylight Time) 

    Updating paths for providers folder


commit 2399eb21 
Author: Spencer Sutton <spencerpsutton@yahoo.com> 
Date: Thu Apr 22 2021 11:17:26 GMT-0500 (Central Daylight Time) 

    Bad imports leftover

...
parent 54c74176
......@@ -6,3 +6,4 @@ __pycache__
**/venv/**
**/.idea/**
.vscode/
.DS_Store
\ No newline at end of file
......@@ -54,7 +54,7 @@ phases:
# publish new artifact to code artifact
- aws codeartifact login --tool twine --domain osdu-dev --domain-owner 888733619319 --repository osdu-python
- python setup.py sdist bdist_wheel
- twine upload --skip-existing --repository codeartifact dist/osdu_api-0.0.4.tar.gz
- twine upload --skip-existing --repository codeartifact dist/osdu_api-0.0.5.tar.gz
artifacts:
......
# Copyright 2020 Google LLC
# Copyright © 2020 Amazon Web Services
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, os
import importlib
from configparser import SafeConfigParser
import requests
from osdu_api.libs.configuration.config_manager import ConfigManager
from osdu_api.libs.context.context import Context
from osdu_api.libs.exceptions.exceptions import UnknownRequestMethodError, ConfigurationError
from osdu_api.libs.auth.authorization import TokenRefresher, authorize
from osdu_api.model.http_method import HttpMethod
'''
Base client that is meant to be extended by service specific clients
'''
class BaseClient:
"""
Base client that is meant to be extended by service specific clients.
"""
def __init__(self, token_refresher: TokenRefresher, context: Context):
self._config_manager = ConfigManager()
self.token_refresher = token_refresher
self.data_partition_id = context.data_partition_id
'''
Base client gets initialized with configuration values and a bearer token
based on provider-specific logic
'''
def __init__(self):
self._parse_config()
self.unauth_retries = 0
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self._refresh_service_principal_token()
'''
Example config file:
[environment]
data_partition_id=opendes
storage_url=https://[STORAGE_ENDPOINT]/api/storage/v2
search_url=https://[SEARCH_ENDPOINT]/api/search/v2
data_workflow_url=https://[WORKFLOW_ENDPOINT]/api/data-workflow/v1
file_dms_url=https://[FILE_DMS_ENDPOINT]/api/filedms/v2
dataset_registry_url=https://[DATASET_REGISTRY_URL]/api/dataset-registry/v1
def get_config_value(self, value: str) -> str:
return self._config_manager.get_config_value(value)
[provider]
name=aws
entitlements_module_name=entitlements_client
'''
def _parse_config(self):
config_parser = SafeConfigParser(os.environ)
config_file_name = 'osdu_api.ini'
found_names = config_parser.read(config_file_name)
if config_file_name not in found_names:
raise Exception('Could not find osdu_api.ini config file')
@authorize()
def _send_request(self, headers: dict, url: str, params: dict, data: str, method: HttpMethod) -> requests.Response:
if method is HttpMethod.GET:
response = requests.get(url=url, params=params, headers=headers)
elif method is HttpMethod.POST:
response = requests.post(url=url, params=params, data=data, headers=headers)
elif method is HttpMethod.PUT:
response = requests.put(url=url, params=params, data=data, headers=headers)
else:
raise UnknownRequestMethodError
return response
self.data_partition_id = config_parser.get('environment', 'data_partition_id')
self.storage_url = config_parser.get('environment', 'storage_url')
self.search_url = config_parser.get('environment', 'search_url')
self.data_workflow_url = config_parser.get('environment', 'data_workflow_url')
self.file_dms_url = config_parser.get('environment', 'file_dms_url')
self.legal_url = config_parser.get('environment', 'legal_url')
self.entitlements_url = config_parser.get('environment', 'entitlements_url')
self.dataset_url = config_parser.get('environment', 'dataset_url')
self.use_service_principal = config_parser.get('environment', 'use_service_principal')
self.provider = config_parser.get('provider', 'name')
self.service_principal_module_name = config_parser.get('provider', 'service_principal_module_name')
def make_request(
self,
method: HttpMethod,
url: str,
data: str = '',
add_headers: dict = None,
params: dict = None
) -> requests.Response:
"""
Makes a request using python's built in requests library. Takes additional headers if
necessary
"""
params = params or {}
add_headers = add_headers or {}
'''
The path to the logic to get a valid bearer token is dynamically injected based on
what provider and entitlements module name is provided in the configuration yaml
'''
def _refresh_service_principal_token(self):
entitlements_client = importlib.import_module('osdu_api.providers.%s.%s' % (self.provider, self.service_principal_module_name))
self.service_principal_token = entitlements_client.get_service_principal_token()
'''
Makes a request using python's built in requests library. Takes additional headers if
necessary
'''
def make_request(self, method: HttpMethod, url: str, data = '', add_headers = {}, params = {}, bearer_token = None):
if bearer_token is not None and 'Bearer ' not in bearer_token:
bearer_token = 'Bearer ' + bearer_token
headers = {
'content-type': 'application/json',
'data-partition-id': self.data_partition_id,
'Authorization': bearer_token if bearer_token is not None else self.service_principal_token
}
for key, value in add_headers.items():
headers[key] = value
response = self._send_request(headers, url, params, data, method)
if (len(add_headers) > 0):
for key, value in add_headers:
headers[key] = value
response = object
if (method == HttpMethod.GET):
response = requests.get(url=url, params=params, headers=headers, verify=False)
elif (method == HttpMethod.DELETE):
response = requests.delete(url=url, params=params, headers=headers, verify=False)
elif (method == HttpMethod.POST):
response = requests.post(url=url, params=params, data=data, headers=headers, verify=False)
elif (method == HttpMethod.PUT):
response = requests.put(url=url, params=params, data=data, headers=headers, verify=False)
if (response.status_code == 401 or response.status_code == 403) and self.unauth_retries < 1:
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self.unauth_retries += 1
self._refresh_service_principal_token()
self.make_request(method, url, data, add_headers, params, None)
self.unauth_retries = 0
return response
......@@ -21,7 +21,7 @@
dataset_url=%(BASE_URL)s/api/dataset-registry/v1
entitlements_url=%(BASE_URL)s/api/entitlements/v1
schema_url=%(BASE_URL)s/api/schema-service/v1
use_service_principal=False
use_service_principal=True
[provider]
name=aws
......
......@@ -20,7 +20,7 @@ from functools import partial
from http import HTTPStatus
import requests
from osdu_api.libs.exceptions.exceptions import TokenRefresherNotPresentError
from osdu_api.libs.exceptions import TokenRefresherNotPresentError
logger = logging.getLogger()
......
......@@ -17,7 +17,7 @@ import enum
import os
import yaml
from osdu_api.libs.exceptions.exceptions import ConfigurationError
from osdu_api.libs.exceptions import ConfigurationError
"""
This module is used for initializing configurations, such as OSDU API endpoints, vendor info etc.
......
# Copyright 2020 Google LLC
# Copyright © 2020 Amazon Web Services
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -13,26 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List
from osdu_api.base_client import BaseClient
from osdu_api.model.http_method import HttpMethod
from osdu_api.model.search.query_response import QueryResponse
"""Constants module."""
'''
Holds the logic for interfacing with Search's query api
'''
class SearchClient(BaseClient):
RETRIES = 3
TIMEOUT = 1
WAIT = 10
'''
Used to hit search's api endpoint "queryRecords"
'''
def query_records_from_dict(self, query_request: dict):
query_request_data = json.dumps(query_request)
FIRST_STORED_RECORD_INDEX = 0
response = self.make_request(method=HttpMethod.POST, url=self.search_url, data=query_request_data)
response_content = json.loads(response.content)
query_response = QueryResponse(response_content['results'], response_content['aggregations'])
return query_response
# Paths to extend schema fields with surrogate keys
DATA_TYPES_WITH_SURROGATE_KEYS = ("dataset", "work-product", "work-product-component")
SURROGATE_KEYS_PATHS = [
("definitions", "{{data-partition-id}}:wks:AbstractWPCGroupType:1.0.0", "properties", "Datasets",
"items"),
("definitions", "osdu:wks:AbstractWPCGroupType:1.0.0", "properties", "Artefacts",
"items", "properties", "ResourceID"),
("properties", "data", "allOf", 1, "properties", "Components", "items"),
]
DATA_SECTION = "Data"
DATASETS_SECTION = "Datasets"
MASTER_DATA_SECTION ="MasterData"
REFERENCE_DATA_SECTION ="ReferenceData"
WORK_PRODUCT_SECTION = "WorkProduct"
WORK_PRODUCT_COMPONENTS_SECTION = "WorkProductComponents"
......@@ -13,23 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Context module."""
import dataclasses
@dataclasses.dataclass
class Context(object):
"""
Store data-partition-id and AppKey passed via Payload field of dagrun.conf.
"""
class Context:
"""Class to store data-partition-id and AppKey."""
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
"""
Populates Context dataclass from dagrun.conf dict.
:return: populated Context
:rtype: Context
"""
ctx_payload = ctx.pop('Payload')
ctx_obj = cls(
app_key=ctx_payload['AppKey'],
data_partition_id=ctx_payload['data-partition-id']
)
ctx_obj = cls(app_key=ctx_payload['AppKey'],
data_partition_id=ctx_payload['data-partition-id'])
return ctx_obj
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions module."""
class RecordsNotSearchableError(Exception):
"""Raise when expected totalCount of records differs from actual one."""
pass
class PipelineFailedError(Exception):
"""Raise when pipeline failed."""
pass
class EmptyManifestError(Exception):
"""Raise when manifest field is empty."""
pass
class GetSchemaError(Exception):
"""Raise when can't find schema."""
pass
class SRNNotFound(Exception):
"""Raise when can't find SRN."""
pass
class NotOSDUSchemaFormatError(Exception):
"""Raise when schema doesn't correspond OSDU format."""
pass
class FileSourceError(Exception):
"""Raise when file doesn't exist under given URI path."""
pass
class UploadFileError(Exception):
"""Raise when there is an error while uploading a file into OSDU."""
class TokenRefresherNotPresentError(Exception):
"""Raise when token refresher is not present in "refresh_token' decorator."""
pass
class NoParentEntitySystemSRNError(Exception):
"""Raise when parent entity doesn't have system-generated SRN."""
pass
class NoParentEntitySystemSRNError(Exception):
"""
Raise when parent entity doesn't have system-generated SRN.
"""
pass
class InvalidFileRecordData(Exception):
"""Raise when file data does not contain mandatory fields."""
class GenericManifestSchemaError(Exception):
"""Raise when a generic manifest schema is invalid."""
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides cloud specific File Handler implementations."""
import dataclasses
import io
import json
import logging
import uuid
from typing import List, Tuple, TypeVar
import requests
import tenacity
from osdu_api.libs.constants import RETRIES, WAIT
from osdu_api.libs.context import Context
from osdu_api.libs.exceptions import InvalidFileRecordData
from osdu_api.libs.mixins import HeadersMixin
from osdu_api.libs.auth.authorization import TokenRefresher, authorize
from osdu_api.providers import blob_storage
from osdu_api.providers.types import BlobStorageClient, FileLikeObject
logger = logging.getLogger()
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(RETRIES),
"wait": tenacity.wait_fixed(WAIT),
}
@dataclasses.dataclass
class FileUploadUrlResponse:
"""Simple class to store File service uploadURL response values."""
file_id: str
signed_url: str
file_source: str
@dataclasses.dataclass
class FileDownloadUrlResponse:
"""Simple class to store File service downloadURL response values."""
signed_url: str
unsigned_url: str
kind: str
class FileHandler(HeadersMixin):
"""Class to perform operations using OSDU File Service."""
def __init__(self, file_service_host: str, token_refresher: TokenRefresher, context: Context,
blob_storage_client: BlobStorageClient = None):
"""File handler.
:param file_service_host: Base OSDU File service url
:type file_service_host: str
:param token_refresher: Object to refresh tokens
:type token_refresher: TokenRefresher
:param context: The tenant context data
:type context: Context
"""
super().__init__(context)
self._file_service_host = file_service_host
self.token_refresher = token_refresher
self._blob_storage_client = blob_storage_client or blob_storage.get_client()
def _get_file_from_preload_path(self, preload_file_path: str,
file: FileLikeObject) -> Tuple[FileLikeObject, str]:
"""Get file from a preloaded path.
:param preload_file_path: Full URI of the file to obtain
:type preload_file_path: str
:return: Raw file data and content-type
:rtype: Tuple[FileLikeObject, str]
"""
return self._blob_storage_client.download_to_file(preload_file_path, file)
@staticmethod
def _verify_file_record_data(file_record_data: dict):
"""Perform simple verification of mandatory fields according to OSDU
File Service.
:param file_record_data: Data field of file_record
:type file_record_data: dict
:raises InvalidFileRecordData: When some of the mandatory fields is
missing or empty
"""
endian = file_record_data.get("Endian")
file_source = file_record_data["DatasetProperties"]["FileSourceInfo"].get("FileSource")
if not (endian and file_source):
raise InvalidFileRecordData(f"Mandatory fields: Endian-{endian}"
f"FileSource-{file_source}")
@staticmethod
def _handle_download_url_response(response: dict) -> FileDownloadUrlResponse:
"""
Handle downloadURL according to file service version
:param response: The response already load from json
:type response: dict
:return: FileDownloadUrlResponse filled properly
:rtype: FileDownloadUrlResponse
"""
try:
# response got from latest version of File service
return FileDownloadUrlResponse(signed_url=response["signedUrl"],
unsigned_url=response["unsignedUrl"],
kind=response["kind"])
except KeyError:
# response got from a legacy version of File service
return FileDownloadUrlResponse(signed_url=response["SignedUrl"],
unsigned_url=None,
kind=None)
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
logger.debug(f"{request_body}")
response = requests.post(url, request_body, headers=headers)
logger.debug(response.content)
return response
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def _send_get_request(self, headers: dict, url: str) -> requests.Response:
response = requests.get(url, headers=headers)
logger.debug(response)
return response
def _get_upload_signed_url(self, headers: dict) -> FileUploadUrlResponse:
"""Get FileID, SignedURL and FileSource using File Service uploadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:return: FileUploadUrlResponse with data from service
:rtype: FileUploadUrlResponse
"""
logger.debug("Getting upload signed url.")
endpoint = f"{self._file_service_host}/v1/files/uploadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
upload_url_response = FileUploadUrlResponse(file_id=response["FileID"],
signed_url=response["Location"]["SignedURL"],
file_source=response["Location"]["FileSource"])
return upload_url_response
def _get_download_signed_url(self, headers: dict, record_id: str) -> FileDownloadUrlResponse:
"""Get signedURL, unsignedURL and kind using File Service downloadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param record_id: Unique id of the file record saved in the osdu system
:type record_id: str
:return: FileDownloadUrlResponse with signed and unsigned urls
:rtype: FileDownloadUrlResponse
"""
logger.debug("Getting download signed url.")
endpoint = f"{self._file_service_host}/v1/files/{record_id}/downloadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
download_url_response = self._handle_download_url_response(response)
return download_url_response
@tenacity.retry(**RETRY_SETTINGS)
def _upload_file_request(self, headers: dict, signed_url: str, buffer: FileLikeObject):
"""Upload file via File service using signed_url.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param signed_url: SignedURL to authenticate request
:type signed_url: str
:param buffer: Raw file data
:type buffer: FileLikeObject
"""
logger.debug("Uploading file.")
buffer.seek(0)
requests.put(signed_url, buffer.read(), headers=headers)
logger.debug("File uploaded.")
def _get_file_location_request(self, headers: dict, file_id: str) -> str:
"""Get file location using File Service.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param file_id: String identifier of the file
:type file_id: str
:return: Full URI of the located file
:rtype: str
"""
logger.debug("Getting file location.")
request_body = json.dumps({"FileID": file_id})
endpoint = f"{self._file_service_host}/getFileLocation"
response = self._send_post_request(headers, endpoint, request_body)
logger.debug("File location got.")
return response.json()["Location"]
def upload_file(self, preload_file_path: str) -> str:
"""Copy file from preload_file_path location to Landing Zone in OSDU
platform using File service. Get Content-Type of this file, refresh
Content-Type with this value in headers while this file is being
uploaded onto OSDU platform.
:param preload_file_path: The URI of the preloaded file
:type preload_file_path: str
:return: FileSource obtained via File service
:rtype: str