Commit 113e1a63 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Merge branch 'aws-update' into 'master'

Data partition decoupling, Boto client factory, ingestion workflow client

See merge request !33
parents e908cdaa 05c2a99c
Pipeline #69206 passed with stages
in 2 minutes and 12 seconds
......@@ -31,6 +31,7 @@ env:
CUSTOM_SCOPE: stub
ENVIRONMENT: stub
AWS_REGION: stub
CI_COMMIT_TAG: v0.0.8
phases:
install:
......@@ -48,13 +49,16 @@ phases:
# run unit tests
- cp osdu_api/test/osdu_api.ini osdu_api.ini
- rm -r osdu_api/test/providers-unit-tests
- rm -r osdu_api/test/libs-unit-tests
- python -m unittest discover osdu_api/test
- rm osdu_api.ini
# publish new artifact to code artifact
- aws codeartifact login --tool twine --domain osdu-dev --domain-owner 888733619319 --repository osdu-python
- export AWS_ACCOUNT_ID=`aws sts get-caller-identity | grep Account | cut -d':' -f 2 | cut -d'"' -f 2`
- aws codeartifact login --tool twine --domain osdu-dev --domain-owner ${AWS_ACCOUNT_ID} --repository osdu-python
- python setup.py sdist bdist_wheel
- twine upload --skip-existing --repository codeartifact dist/osdu_api-0.0.5.tar.gz
- twine upload --skip-existing --repository codeartifact dist/osdu_api-0.0.8.tar.gz
artifacts:
......
......@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
from configparser import SafeConfigParser
import os
import requests
from osdu_api.configuration.base_config_manager import BaseConfigManager
......@@ -26,17 +28,17 @@ class BaseClient:
Base client that is meant to be extended by service specific clients
"""
def __init__(self, config_manager: BaseConfigManager = None):
def __init__(self, config_manager: BaseConfigManager = None, data_partition_id = None):
"""
Base client gets initialized with configuration values and a bearer token
based on provider-specific logic
"""
self._parse_config(config_manager)
self._parse_config(config_manager, data_partition_id)
self.unauth_retries = 0
if self.use_service_principal:
self._refresh_service_principal_token()
def _parse_config(self, config_manager: BaseConfigManager = None):
def _parse_config(self, config_manager: BaseConfigManager = None, data_partition_id = None):
"""
Parse config.
......@@ -45,7 +47,6 @@ class BaseClient:
"""
config_manager = config_manager or DefaultConfigManager()
self.data_partition_id = config_manager.get('environment', 'data_partition_id')
self.provider = config_manager.get('provider', 'name')
self.data_workflow_url = config_manager.get('environment', 'data_workflow_url')
......@@ -56,11 +57,18 @@ class BaseClient:
self.schema_url = config_manager.get('environment', 'schema_url')
self.search_url = config_manager.get('environment', 'search_url')
self.storage_url = config_manager.get('environment', 'storage_url')
self.ingestion_workflow_url = config_manager.get('environment', 'ingestion_workflow_url')
self.provider = config_manager.get('provider', 'name')
self.use_service_principal = config_manager.getbool('environment', 'use_service_principal', False)
if self.use_service_principal:
self.service_principal_module_name = config_manager.get('provider', 'service_principal_module_name')
if data_partition_id is None:
self.data_partition_id = config_manager.get('environment', 'data_partition_id')
else:
self.data_partition_id = data_partition_id
def _refresh_service_principal_token(self):
"""
The path to the logic to get a valid bearer token is dynamically injected based on
......@@ -74,13 +82,16 @@ class BaseClient:
Makes a request using python's built in requests library. Takes additional headers if
necessary
"""
if bearer_token is None:
bearer_token = self.service_principal_token
if bearer_token is not None and 'Bearer ' not in bearer_token:
bearer_token = 'Bearer ' + bearer_token
headers = {
'content-type': 'application/json',
'data-partition-id': self.data_partition_id,
'Authorization': bearer_token if bearer_token is not None else self.service_principal_token
'Authorization': bearer_token
}
if (len(add_headers) > 0):
......
......@@ -23,7 +23,6 @@ class DatasetDmsClient(BaseClient):
"""
Holds the logic for interfacing with Data Registry Service's DMS api
"""
def get_storage_instructions(self, kind_sub_type: str, bearer_token=None):
params = {'kindSubType': kind_sub_type}
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.dataset_url, '/getStorageInstructions'),
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.ingestion_workflow.create_workflow_request import CreateWorkflowRequest
from osdu_api.model.ingestion_workflow.trigger_workflow_request import TriggerWorkflowRequest
from osdu_api.model.ingestion_workflow.update_workflow_run_request import UpdateWorkflowRunRequest
from osdu_api.model.http_method import HttpMethod
class IngestionWorkflowClient(BaseClient):
"""
Holds the logic for interfacing with Ingestion Workflow's api
"""
def get_workflow(self, workflow_name: str, bearer_token=None):
params = {'workflowName': workflow_name}
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
params=params, bearer_token=bearer_token)
def create_workflow(self, create_workflow_request: CreateWorkflowRequest, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
data=create_workflow_request.to_JSON(), bearer_token=bearer_token)
def get_all_workflows_in_partition(self, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
bearer_token=bearer_token)
def delete_workflow(self, workflow_name: str, bearer_token=None):
params = {'workflowName': workflow_name}
return self.make_request(method=HttpMethod.DELETE, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
params=params, bearer_token=bearer_token)
def trigger_workflow(self, trigger_workflow_request: TriggerWorkflowRequest, workflow_name: str, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun'),
data=trigger_workflow_request.to_JSON(), bearer_token=bearer_token)
def get_workflow_runs(self, workflow_name: str, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun'),
bearer_token=bearer_token)
def get_workflow_run_by_id(self, workflow_name: str, run_id: str, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun/', run_id),
bearer_token=bearer_token)
def update_workflow_run(self, update_workflow_run_request: UpdateWorkflowRunRequest, workflow_name: str, run_id: str, bearer_token=None):
return self.make_request(method=HttpMethod.PUT, url='{}{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun/', run_id),
data=update_workflow_run_request.to_JSON(), bearer_token=bearer_token)
\ No newline at end of file
......@@ -79,3 +79,17 @@ class RecordClient(BaseClient):
def query_records(self, query_records_request: QueryRecordsRequest, bearer_token = None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.storage_url, '/query/records'),
data=query_records_request.to_JSON(), bearer_token=bearer_token)
#ingest bulk records which is coming as JSON response -- Start
def ingest_records(self, records, bearer_token = None):
"""
Calls storage's api endpoint createOrUpdateRecords taking a list of record objects and constructing
the body of the request
Returns the response object for the call
Example of code to new up a record:
"""
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.data_workflow_url, '/workflowRun'), data=records, bearer_token=bearer_token)
def query_record(self, recordId: str, bearer_token = None):
return self.make_request(method=HttpMethod.GET, url=('{}{}/{}'.format(self.storage_url, '/records', recordId)), bearer_token=bearer_token)
......@@ -68,7 +68,7 @@ class DefaultConfigManager(BaseConfigManager):
:rtype: configparser.ConfigParser
"""
config_file_path = config_file_path or os.environ.get("OSDU_API_CONFIG_INI") or "osdu_api.ini"
parser = configparser.ConfigParser()
parser = configparser.ConfigParser(os.environ)
config_read_results = parser.read(config_file_path)
if not config_read_results:
raise configparser.Error(f"Could not find the config file in '{config_file_path}'.")
......
......@@ -26,8 +26,8 @@ from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record import Record
from osdu_api.model.storage.record_ancestry import RecordAncestry
record_client = RecordClient()
search_client = SearchClient()
record_client = RecordClient("opendes")
search_client = SearchClient("opendes")
kind = 'opendes:osdu:dataset-registry:0.0.1'
viewers = ['data.default.viewers@opendes.testing.com']
......
......@@ -28,10 +28,10 @@ from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record import Record
from osdu_api.model.storage.record_ancestry import RecordAncestry
data_workflow_client = DataWorkflowClient()
data_workflow_scheduling_client = DataWorkflowSchedulingClient()
dataset_registry_client = DatasetRegistryClient()
file_dms_client = FileDMSClient()
data_workflow_client = DataWorkflowClient("opendes")
data_workflow_scheduling_client = DataWorkflowSchedulingClient("opendes")
dataset_registry_client = DatasetRegistryClient("opendes")
file_dms_client = FileDMSClient("opendes")
# cd ../.. && pip3 uninstall osdu_api && python3 setup.py sdist bdist_wheel && python3 -m pip install ./dist/osdu_api-0.0.2-py3-none-any.whl && cd osdu_api/examples
......
from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient
dataset_dms_client = DatasetDmsClient("opendes")
retrieval_instructions_response = dataset_dms_client.get_retrieval_instructions(record_id="opendes:dataset--File.Generic:6358621f99b64dc9bab5aeb82b0ed3ab")
print(retrieval_instructions_response.content)
print(retrieval_instructions_response.status_code)
\ No newline at end of file
from osdu_api.model.ingestion_workflow.create_workflow_request import CreateWorkflowRequest
from osdu_api.model.ingestion_workflow.trigger_workflow_request import TriggerWorkflowRequest
from osdu_api.model.ingestion_workflow.update_workflow_run_request import UpdateWorkflowRunRequest
from osdu_api.clients.ingestion_workflow.ingestion_workflow_client import IngestionWorkflowClient
ingestion_client = IngestionWorkflowClient()
create_workflow_request = CreateWorkflowRequest("test description", {}, "my_second_dag")
response = ingestion_client.create_workflow(create_workflow_request)
print(">>>>>>>")
print(response.status_code)
print(response.content)
if response.status_code == 200 or response.status_code == 409:
response = ingestion_client.get_workflow("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.get_all_workflows_in_partition()
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.delete_workflow("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
create_workflow_request = CreateWorkflowRequest("test description", {}, "my_second_dag")
response = ingestion_client.create_workflow(create_workflow_request)
if response.status_code == 200 or response.status_code == 409:
trigger_workflow_request = TriggerWorkflowRequest({})
response = ingestion_client.trigger_workflow(trigger_workflow_request, "my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.get_workflow_runs("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
......@@ -14,8 +14,11 @@
from osdu_api.clients.legal.legal_client import LegalClient
from osdu_api.model.legal.legal_tag import LegalTag
from osdu_api.model.legal.legal_tag_properties import LegalTagProperties
from osdu_api.configuration.config_manager import DefaultConfigManager
import os
legal_client = LegalClient()
os.environ['BASE_URL'] = 'https://suttonsp.dev.osdu.aws'
legal_client = LegalClient(DefaultConfigManager(), "opendes")
legal_tag_properties = LegalTagProperties(['US'], 'A1234', 2222222222222, 'default', 'Public Domain Data', 'Public', 'No Personal Data', 'EAR99')
......
......@@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id=opendes
storage_url=%(BASE_URL)s/api/storage/v2
search_url=%(BASE_URL)s/api/search/v2
legal_url=%(BASE_URL)s/api/legal/v1
data_workflow_url=%(BASE_URL)s/api/data-workflow/v1
data_workflow_url=deprecated
file_dms_url=%(BASE_URL)s/api/filedms/v2
dataset_url=%(BASE_URL)s/api/dataset-registry/v1
dataset_url=%(BASE_URL)s/api/dataset/v1
entitlements_url=%(BASE_URL)s/api/entitlements/v1
schema_url=%(BASE_URL)s/api/schema-service/v1
ingestion_workflow_url=%(BASE_URL)s/api/workflow/v1
use_service_principal=True
[provider]
......
from osdu_api.clients.search.search_client import SearchClient
from osdu_api.model.search.query_request import QueryRequest
search_client = SearchClient("osdu")
query_request = QueryRequest(kind='osdu:wks:*:*', query="*", cursor=None, limit=1000)
search_response = search_client.query_with_cursor(query_request, bearer_token=None)
print(search_response.status_code)
print("________________")
print(search_response.content)
\ No newline at end of file
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class CreateWorkflowRequest(Base):
"""
Request body to ingestion workflow's create workflow endpoint
"""
def __init__(self, description: str, registration_instructions: dict, workflow_name: str):
self.description = description
self.registrationInstructions = registration_instructions
self.workflowName = workflow_name
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class TriggerWorkflowRequest(Base):
"""
Request body to ingestion workflow's trigger workflow endpoint
"""
def __init__(self, execution_context: dict):
self.executionContext = execution_context
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class UpdateWorkflowRunRequest(Base):
"""
Request body to ingestion workflow's update workflow run endpoint
"""
def __init__(self, status: str):
self.status = status
import os
import boto3
class BotoClientFactory:
def get_boto_client(self, client_type: str, region_name: str):
session = boto3.session.Session(region_name=region_name)
client = session.client(
service_name=client_type,
region_name=region_name
)
if 'USE_AIRFLOW' in os.environ and (os.environ['USE_AIRFLOW'] == 'true' or os.environ['USE_AIRFLOW'] == 'True' or os.environ['USE_AIRFLOW'] == True):
sts_client = boto3.client('sts')
if 'AWS_ROLE_ARN' not in os.environ:
raise Exception('Must have AWS_ROLE_ARN set')
assumed_role_object=sts_client.assume_role(
RoleArn=os.environ['AWS_ROLE_ARN'],
RoleSessionName="airflow_session"
)
credentials=assumed_role_object['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
client = session.client(
service_name=client_type,
region_name=region_name
)
return client
......@@ -18,18 +18,17 @@ import requests
import json
from botocore.exceptions import ClientError
from configparser import ConfigParser
from osdu_api.providers.aws.boto_client_factory import BotoClientFactory
def _get_ssm_parameter(session, region_name, ssm_path):
ssm_client = session.client('ssm', region_name=region_name)
ssm_response = ssm_client.get_parameter(Name=ssm_path)
return ssm_response['Parameter']['Value']
def _get_ssm_parameter(region_name, ssm_path):
boto_client_factory = BotoClientFactory()
ssm_client = boto_client_factory.get_boto_client('ssm', region_name)
ssm_response = ssm_client.get_parameter(Name=ssm_path)
return ssm_response['Parameter']['Value']
def _get_secret(session, region_name, secret_name, secret_dict_key):
# Create a Secrets Manager client
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
def _get_secret(region_name, secret_name, secret_dict_key):
boto_client_factory = BotoClientFactory()
client = boto_client_factory.get_boto_client('secretsmanager', region_name)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
......@@ -74,11 +73,10 @@ def get_service_principal_token():
region_name = config_parser.get('provider', 'region_name')
token_url_ssm_path = config_parser.get('provider', 'token_url_ssm_path')
session = boto3.session.Session()
client_id = _get_ssm_parameter(session, region_name, client_id_ssm_path)
client_secret = _get_secret(session, region_name, client_secret_name, client_secret_dict_key)
token_url = _get_ssm_parameter(session, region_name, token_url_ssm_path)
aws_oauth_custom_scope = _get_ssm_parameter(session, region_name, aws_oauth_custom_scope_ssm_path)
client_id = _get_ssm_parameter(region_name, client_id_ssm_path)
client_secret = _get_secret(region_name, client_secret_name, client_secret_dict_key)
token_url = _get_ssm_parameter(region_name, token_url_ssm_path)
aws_oauth_custom_scope = _get_ssm_parameter(region_name, aws_oauth_custom_scope_ssm_path)
auth = '{}:{}'.format(client_id, client_secret)
encoded_auth = base64.b64encode(str.encode(auth))
......@@ -90,6 +88,4 @@ def get_service_principal_token():
token_url = '{}?grant_type=client_credentials&client_id={}&scope={}'.format(token_url,client_id, aws_oauth_custom_scope)
response = requests.post(url=token_url, headers=headers)
# return 'Bearer {}'.format(json.loads(response.content.decode())['access_token'])
print(json.loads(response.content.decode())['access_token'])
return json.loads(response.content.decode())['access_token']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment