Commit e454e27f authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Updated with data workflow framework

commit a0c29c73 
Author: Sutton <suttonsp@147dda3a90de.ant.amazon.com> 
Date: Tue Nov 17 2020 15:09:41 GMT-0600 (Central Standard Time) 

    Moving osdu ini config to env vars


commit 3ff47579 
Author: Sutton <suttonsp@147dda3a90de.ant.amazon.com> 
Date: Tue Nov 17 2020 15:07:26 GMT-0600 (Central Standard Time) 

    Missed removing some unneeded code from record class


commit b8f0732f 
Author: Sutton <suttonsp@147dda3a90de.ant.amazon.com> 
Date: Tue Nov 17 2020 15:05:27 GMT-0600 (Central Standard Time) 

    Updating to better practices and a fuller api including data workflow framework


commit 4b381090 
Author: Sutton <suttonsp@147dda3a90de.ant.amazon.com> 
Date: Mon Nov 16 2020 08:51:59 GMT-0600 (Central Standard Time) 

    Deprecating unused services' classes
parent 307a879c
import sys, os
import importlib
import yaml # MIT license
from configparser import ConfigParser
import requests
from osdu_api.model.http_method import HttpMethod
......@@ -15,56 +15,63 @@ class BaseClient:
'''
def __init__(self):
self._parse_config()
self.bearer_token = self._get_bearer_token()
self.unauth_retries = 0
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self._refresh_service_principal_token()
'''
The path to the logic to get a valid bearer token is dynamically injected based on
what provider and entitlements module name is provided in the configuration yaml
'''
def _get_bearer_token(self):
entitlements_client = importlib.import_module('osdu_api.provider.%s.%s' % (self.provider, self.entitlements_module_name))
return entitlements_client.get_bearer_token()
Example config file:
[environment]
data_partition_id=opendes
storage_url=https://[STORAGE_ENDPOINT]/api/storage/v2
search_url=https://[SEARCH_ENDPOINT]/api/search/v2
data_workflow_url=https://[WORKFLOW_ENDPOINT]/api/data-workflow/v1
file_dms_url=https://[FILE_DMS_ENDPOINT]/api/filedms/v2
dataset_registry_url=https://[DATASET_REGISTRY_URL]/api/dataset-registry/v1
'''
Parses a yaml filed named osdu_api.yaml. All config values listed below are meant to
be required except URLs to specific services which may or may not be used depending
on the specific script
[provider]
name=aws
entitlements_module_name=entitlements_client
'''
def _parse_config(self):
config_file_location = os.path.join(sys.path[0], 'osdu_api.yaml')
with open(config_file_location, 'r') as config_file:
config = yaml.load(config_file)
self.data_partition_id = self._parse_config_value(config, 'data_partition_id', True)
self.storage_url = self._parse_config_value(config, 'storage_url', False)
self.search_url = self._parse_config_value(config, 'search_url', False)
self.provider = self._parse_config_value(config, 'provider', True)
self.entitlements_module_name = self._parse_config_value(config, 'entitlements_module_name', True)
self.workflow_url = self._parse_config_value(config, 'workflow_url', False)
config_parser = ConfigParser()
config_file_name = 'osdu_api.ini'
found_names = config_parser.read(config_file_name)
if config_file_name not in found_names:
raise Exception('Could not find osdu_api.ini config file')
self.data_partition_id = config_parser.get('environment', 'data_partition_id')
self.storage_url = config_parser.get('environment', 'storage_url')
self.search_url = config_parser.get('environment', 'search_url')
self.data_workflow_url = config_parser.get('environment', 'data_workflow_url')
self.file_dms_url = config_parser.get('environment', 'file_dms_url')
self.dataset_registry_url = config_parser.get('environment', 'dataset_registry_url')
self.use_service_principal = config_parser.get('environment', 'use_service_principal')
self.provider = config_parser.get('provider', 'name')
self.service_principal_module_name = config_parser.get('provider', 'service_principal_module_name')
'''
Used during parsing of the yaml config file. Will raise an exception if a required config
value is missing
The path to the logic to get a valid bearer token is dynamically injected based on
what provider and entitlements module name is provided in the configuration yaml
'''
def _parse_config_value(self, config, config_name, is_required):
config_value = ''
try:
config_value = config[config_name]
except TypeError:
if(is_required):
raise Exception('Config value %s missing and is required' % config_name)
else:
print('Config value %s missing' % config_name)
return config_value
def _refresh_service_principal_token(self):
entitlements_client = importlib.import_module('osdu_api.provider.%s.%s' % (self.provider, self.service_principal_module_name))
self.service_principal_token = entitlements_client.get_service_principal_token()
'''
Makes a request using python's built in requests library. Takes additional headers if
necessary
'''
def make_request(self, method: HttpMethod, url: str, data = '', add_headers = {}, params = {}):
def make_request(self, method: HttpMethod, url: str, data = '', add_headers = {}, params = {}, bearer_token = None):
if bearer_token is not None and 'Bearer ' not in bearer_token:
bearer_token = 'Bearer ' + bearer_token
headers = {
'content-type': 'application/json',
'data-partition-id': self.data_partition_id,
'Authorization': self.bearer_token
'Authorization': bearer_token if bearer_token is not None else self.service_principal_token
}
if (len(add_headers) > 0):
......@@ -79,5 +86,13 @@ class BaseClient:
response = requests.post(url=url, params=params, data=data, headers=headers)
elif (method == HttpMethod.PUT):
response = requests.put(url=url, params=params, data=data, headers=headers)
if (response.status_code == 401 or response.status_code == 403) and self.unauth_retries < 1:
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self.unauth_retries += 1
self._refresh_service_principal_token()
self.make_request(method, url, data, add_headers, params, None)
self.unauth_retries = 0
return response
\ No newline at end of file
import json
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.data_workflow.start_workflow import StartWorkflow
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for interfacing with Data Workflow's api
'''
class DataWorkflowClient(BaseClient):
def start_workflow(self, start_workflow: StartWorkflow, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.data_workflow_url, '/startWorkflow'),
data=start_workflow.to_JSON(), bearer_token=bearer_token)
\ No newline at end of file
import json
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.dataset_registry.create_dataset_registries import CreateDatasetRegistries
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for interfacing with Data Registry Service's api
'''
class DatasetRegistryClient(BaseClient):
def create_registries(self, dataset_registries: CreateDatasetRegistries, bearer_token=None):
return self.make_request(method=HttpMethod.PUT, url='{}{}'.format(self.dataset_registry_url, '/registry'),
data=dataset_registries.to_JSON(), bearer_token=bearer_token)
import json
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.file_dms.register_files import RegisterFiles
from osdu_api.model.file_dms.get_files_delivery import GetFilesDelivery
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for interfacing with File DMS's api
'''
class FileDMSClient(BaseClient):
def get_file_upload_location(self, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.file_dms_url, '/getFileUploadLocation'), bearer_token=bearer_token)
def register_files(self, register_files: RegisterFiles, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.file_dms_url, '/registerFiles'),
data=register_files.to_JSON(), bearer_token=bearer_token)
def get_file_delivery_instructions(self, get_files_delivery: GetFilesDelivery, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.file_dms_url, '/getFileDeliveryInstructions'),
data=get_files_delivery.to_JSON(), bearer_token=bearer_token)
import json
from typing import List
from osdu_api.base_client import BaseClient
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.http_method import HttpMethod
from osdu_api.model.search.query_response import QueryResponse
from osdu_api.model.search.query_request import QueryRequest
'''
Holds the logic for interfacing with Search's query api
......@@ -12,12 +13,6 @@ class SearchClient(BaseClient):
'''
Used to hit search's api endpoint "queryRecords"
'''
def query_records_from_dict(self, query_request: dict):
query_request_data = json.dumps(query_request)
response = self.make_request(method=HttpMethod.POST, url=self.search_url, data=query_request_data)
response_content = json.loads(response.content)
query_response = QueryResponse(response_content['results'], response_content['aggregations'])
return query_response
def query_records(self, query_request: QueryRequest, bearer_token = None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.search_url, '/query'),
data=query_request.to_JSON(), bearer_token=bearer_token)
import json
from typing import List
from osdu_api.base_client import BaseClient
from osdu_api.model.record import Record
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.storage.record import Record
from osdu_api.model.http_method import HttpMethod
'''
......@@ -25,80 +25,39 @@ class RecordClient(BaseClient):
data = {'id': 'test'}
record = Record(id, version, kind, acl, legal, data, ancestry, meta)
'''
def create_update_records(self, records: List[Record]):
records_data = [record.convert_to_dict() for record in records]
print(records_data)
# def create_update_records(self, records: List[Record], bearer_token = None):
# records_data = [record.convert_to_dict() for record in records]
# return self.create_update_records_from_dict(records_data, bearer_token)
return self.create_update_records_from_dict(records_data)
def create_update_records(self, records: List[Record], bearer_token = None):
records_data = '['
for record in records:
records_data = '{}{}'.format(records_data, record.to_JSON())
records_data = '{}{}'.format(records_data, ']')
return self.make_request(method=HttpMethod.PUT, url='{}{}'.format(self.storage_url, '/records'), data=records_data, bearer_token=bearer_token)
'''
Calls storage's api endpoint createOrUpdateRecords taking individual attributes and constructing
the body of the request
Returns the response object for the call
Example of records_data:
[
{
"acl": {
"owners":[
"data.test1@opendes.testing.com"
],
"viewers":[
"data.test1@opendes.testing.com"
]
},
"ancestry":{
"parents":[]
},
"data":{"id":"test"},
"id":"opendes:welldb:123456",
"kind":"opendes:welldb:wellbore:1.0.0",
"legal":{
"legaltags":["opendes-storage-1579034803194"],
"otherRelevantDataCountries":["US"],
"status":"compliant"
},
"meta":[
{}
],
"version":0
}
]
'''
def create_update_records_from_dict(self, records: dict):
records_data = json.dumps(records)
response = self.make_request(method=HttpMethod.PUT, url=self.storage_url, data=records_data)
return response
'''
Calls storage's api endpoint getLatestRecordVersion taking the required attributes
Returns the content for the response object
'''
def get_latest_record(self, recordId: str, attributes: List[str] = []):
def get_latest_record(self, recordId: str, attributes: List[str] = [], bearer_token = None):
request_params = {'attribute': attributes}
response = self.make_request(method=HttpMethod.GET, params=request_params, url=(self.storage_url + '/%s' % (recordId)))
response_content = json.loads(response.content)
return Record.from_dict(response_content)
return self.make_request(method=HttpMethod.GET, params=request_params, url=('{}{}/{}'.format(self.storage_url, '/records', recordId)), bearer_token=bearer_token)
'''
Calls storage's api endpoint getSpecificRecordVersion taking the required attributes
Returns the content for the response object
'''
def get_specific_record(self, recordId: str, version: str, attributes: List[str] = []):
def get_specific_record(self, recordId: str, version: str, attributes: List[str] = [], bearer_token = None):
request_params = {'attribute': attributes}
response = self.make_request(method=HttpMethod.GET, params=request_params, url=(self.storage_url + '/%s/%s' % (recordId, version)))
response_content = json.loads(response.content)
return Record.from_dict(response_content)
return self.make_request(method=HttpMethod.GET, params=request_params, url=('{}{}/{}/{}'.format(self.storage_url, '/records', recordId, version)), bearer_token=bearer_token)
'''
Calls storage's api endpoint getRecordVersions taking the one required parameter record id
Returns the content for the response object for the call containing the list of versions.
Find the versions in the response.content attribute
'''
def get_record_versions(self, recordId: str):
response = self.make_request(method=HttpMethod.GET, url=(self.storage_url + '/versions/%s' % (recordId)))
response_content = json.loads(response.content.decode("utf-8"))
return response_content['versions']
def get_record_versions(self, recordId: str, bearer_token = None):
return self.make_request(method=HttpMethod.GET, url=('{}{}/{}'.format(self.storage_url, '/records/versions', recordId)), bearer_token=bearer_token)
\ No newline at end of file
'''
Basic example on creating a record and searching on it to get it back
'''
import os
import time
from osdu_api.storage.record_client import RecordClient
from osdu_api.search.search_client import SearchClient
# temporarily set environment variables in windows
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
os.environ['AWS_COGNITO_CLIENT_ID'] = ''
os.environ['AWS_COGNITO_AUTH_PARAMS_USER'] = ''
os.environ['AWS_COGNITO_AUTH_PARAMS_PASSWORD'] = ''
record_client = RecordClient()
search_client = SearchClient()
records = [
{
"data": {
"ResourceID": "srn:file/ascii:043253514614681856:",
"Data.GroupTypeProperties.PreLoadFilePath": "s3://osdu-test-data-918358651390/volve/horizons/Balder_Fm+ST0202R08_PS_PSDM_FULL_PP+STAT+TIME.dat",
"Data.GroupTypeProperties.FileSource": "",
"ResourceTypeID": "srn:type:file/ascii:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"Data.GroupTypeProperties.EncodingFormatTypeID": "srn:reference-data/EncodingFormatType:Landmark:",
"AssociativeID": "f-1"
},
"kind": "opendes:osdu:file:0.2.0",
"namespace": "opendes:osdu",
"legal": {
"legaltags": [
"opendes-public-usa-dataset-5"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"id": "opendes:doc:59f9718a05e349c6b5ba38adb87a0317",
"acl": {
"viewers": [
"data.default.viewers@opendes.amazon.com"
],
"owners": [
"data.default.owners@opendes.amazon.com"
]
},
"type": "file",
"version": 0
}
]
query = {
"kind": "opendes:osdu:file:0.2.0",
"query": "data.Data.GroupTypeProperties.PreLoadFilePath: \"*Balder_Fm+ST0202R08_PS_PSDM_FULL_PP+STAT+TIME.dat\"",
"limit": 1000,
"aggregateBy": "kind"
}
create_record_resp = record_client.create_update_records_from_dict(records)
if create_record_resp.status_code != 201:
print("Record failed to create")
exit
# give the system 10 seconds to index the record
time.sleep(10)
search_record_resp = search_client.query_records_from_dict(query)
print(search_record_resp.results)
\ No newline at end of file
'''
Basic example on creating a record and searching on it to get it back
'''
import os
import time
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.clients.search.search_client import SearchClient
from osdu_api.model.storage.record import Record
from osdu_api.model.storage.acl import Acl
from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record_ancestry import RecordAncestry
from osdu_api.model.search.query_request import QueryRequest
record_client = RecordClient()
search_client = SearchClient()
kind = 'opendes:osdu:dataset-registry:0.0.1'
viewers = ['data.default.viewers@opendes.testing.com']
owners = ['data.default.owners@opendes.testing.com']
acl = Acl(viewers, owners)
legal = Legal(['opendes-public-usa-dataset-1'], ['US'], "compliant")
data = {
"ResourceID": "srn:osdu:file:dc556e0e3a554105a80cfcb19372a62d:",
"ResourceTypeID": "srn:type:file/json:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"ResourceSource": "Some Company App",
"ResourceName": "trajectories - 1000.json",
"ResourceDescription": "Trajectory For Wellbore xyz",
"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "",
"PreLoadFilePath": "s3://default_bucket/r1/data/provided/trajectories/1000.json"
}
}
}
record = Record(kind, acl, legal, data)
query_request = QueryRequest(kind, "data.ResourceName = \"trajectories - 1000.json\"")
create_record_resp = record_client.create_update_records([record])
print(create_record_resp.status_code)
print(create_record_resp.content)
if create_record_resp.status_code != 201:
print("Record failed to create")
exit
# give the system 10 seconds to index the record
time.sleep(10)
search_record_resp = search_client.query_records(query_request)
print(search_record_resp.results)
\ No newline at end of file
import json
from osdu_api.data_workflow.data_workflow_client import DataWorkflowClient
from osdu_api.dataset_registry.dataset_registry_client import DatasetRegistryClient
from osdu_api.file_dms.file_dms_client import FileDMSClient
from osdu_api.model.file_dms.file import File
from osdu_api.model.file_dms.register_files import RegisterFiles
from osdu_api.model.storage.record import Record
from osdu_api.model.storage.acl import Acl
from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record_ancestry import RecordAncestry
from osdu_api.model.dataset_registry.create_dataset_registries import CreateDatasetRegistries
data_workflow_client = DataWorkflowClient()
dataset_registry_client = DatasetRegistryClient()
file_dms_client = FileDMSClient()
# cd ../.. && pip3 uninstall osdu_api && python3 setup.py sdist bdist_wheel && python3 -m pip install ./dist/osdu_api-0.0.2-py3-none-any.whl && cd osdu_api/examples
def file_dms_example():
location = file_dms_client.get_file_upload_location()
print(location.content)
unsigned_url = json.loads(location.content)['uploadLocation']['unsignedUrl'] + '/example.json'
cust_file = File(unsigned_url, 'example.json', 'An example file')
register_files = RegisterFiles([cust_file])
register_files_resp = file_dms_client.register_files(register_files)
print(register_files_resp.status_code)
print(register_files_resp.content)
def dataset_registry_example():
# def __init__(self, id: str, version: int, kind: str, acl: Acl, legal: Legal, data: dict, ancestry: RecordAncestry,
kind = 'opendes:osdu:dataset-registry:0.0.1'
viewers = ['data.default.viewers@opendes.testing.com']
owners = ['data.default.owners@opendes.testing.com']
acl = Acl(viewers, owners)
# legal_compliance = LegalCompliance.compliant
legal = Legal(['opendes-public-usa-dataset-1'], ['US'], "compliant")
data = {
"ResourceID": "srn:osdu:file:dc556e0e3a554105a80cfcb19372a62d:",
"ResourceTypeID": "srn:type:file/json:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"ResourceSource": "Some Company App",
"ResourceName": "trajectories - 1000.json",
"ResourceDescription": "Trajectory For Wellbore xyz",
"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "",
"PreLoadFilePath": "s3://default_bucket/r1/data/provided/trajectories/1000.json"
}
}
}
ancestry = RecordAncestry([])
record = Record(None, None, kind, acl, legal, data, ancestry, None)
create_request = CreateDatasetRegistries(record)
response = dataset_registry_client.create_registries(create_request)
print(response.status_code)
print(response.content)
dataset_registry_example()
# file_dms_example()
[environment]
data_partition_id=opendes
storage_url=$STORAGE_BASE_URL/api/storage/v2
search_url=$SEARCH_BASE_URL/api/search/v2
data_workflow_url=$DATA_WORKFLOW_URL/api/data-workflow/v1
file_dms_url=$FILE_DMS_URL/api/filedms/v2
dataset_registry_url=$DATASET_REGISTRY_URL/api/dataset-registry/v1
use_service_principal=True
[provider]
name=aws
service_principal_module_name=service_principal_util
token_url=$TOKEN_URL
aws_oauth_custom_scope=$CUSTOM_SCOPE
client_id_ssm_path=/osdu/$ENVIRONMENT/client-credentials-client-id
client_secret_name=/osdu/$ENVIRONMENT/client_credenials_secret
client_secret_dict_key=client_credentials_client_secret
region_name=us-east-1
\ No newline at end of file
storage_url: https://[STORAGE_ENDPOINT]/api/storage/v2/records
search_url: https://[SEARCH_ENDPOINT]/api/search/v2/query
workflow_url: https://[WORKFLOW_ENDPOINT]/updateStatusFunction
data_partition_id: opendes
# dynamically injects what provider-specific logic to use
provider: aws
entitlements_module_name: entitlements_client
\ No newline at end of file
import json
import uuid
import re
from typing import List
from osdu_api.base_client import BaseClient
from osdu_api.storage.record_client import RecordClient
from osdu_api.model.record import Record
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for ingesting a manifest as records
'''
class ManifestClient(BaseClient):
'''
Inheriting from BaseClient to get needed config values
'''
def __init__(self):
super(ManifestClient, self).__init__()
self.record_client = RecordClient()
'''
Parses a manifest into its distinct records and uses the record client
to ingest those records. It requires that the kind for the record be known in
addition to legal tags, acl_viewers, and acl_owners.
Example of manifest: {
"WorkProduct": {
"ResourceTypeID": "srn:type:work-product/WellboreTrajectory:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"Data": {
"GroupTypeProperties": {
"Description": None,
"Schema": None,
"RequireKey": None,
"Components": []
},