Commit 984f82f6 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Adding ingestion workflow endpoints

parent 6ce08f52
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, os
import importlib
from configparser import SafeConfigParser
import requests
from osdu_api.model.http_method import HttpMethod
'''
Base client that is meant to be extended by service specific clients
'''
class BaseClient:
'''
Base client gets initialized with configuration values and a bearer token
based on provider-specific logic
'''
def __init__(self):
self._parse_config()
self.unauth_retries = 0
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self._refresh_service_principal_token()
'''
Example config file:
[environment]
data_partition_id=opendes
storage_url=https://[STORAGE_ENDPOINT]/api/storage/v2
search_url=https://[SEARCH_ENDPOINT]/api/search/v2
data_workflow_url=https://[WORKFLOW_ENDPOINT]/api/data-workflow/v1
file_dms_url=https://[FILE_DMS_ENDPOINT]/api/filedms/v2
dataset_registry_url=https://[DATASET_REGISTRY_URL]/api/dataset-registry/v1
[provider]
name=aws
entitlements_module_name=entitlements_client
'''
def _parse_config(self):
config_parser = SafeConfigParser(os.environ)
config_file_name = 'osdu_api.ini'
found_names = config_parser.read(config_file_name)
if config_file_name not in found_names:
raise Exception('Could not find osdu_api.ini config file')
self.data_partition_id = config_parser.get('environment', 'data_partition_id')
self.storage_url = config_parser.get('environment', 'storage_url')
self.search_url = config_parser.get('environment', 'search_url')
self.data_workflow_url = config_parser.get('environment', 'data_workflow_url')
self.file_dms_url = config_parser.get('environment', 'file_dms_url')
self.legal_url = config_parser.get('environment', 'legal_url')
self.entitlements_url = config_parser.get('environment', 'entitlements_url')
self.dataset_url = config_parser.get('environment', 'dataset_url')
self.use_service_principal = config_parser.get('environment', 'use_service_principal')
self.provider = config_parser.get('provider', 'name')
self.service_principal_module_name = config_parser.get('provider', 'service_principal_module_name')
'''
The path to the logic to get a valid bearer token is dynamically injected based on
what provider and entitlements module name is provided in the configuration yaml
'''
def _refresh_service_principal_token(self):
entitlements_client = importlib.import_module('osdu_api.providers.%s.%s' % (self.provider, self.service_principal_module_name))
self.service_principal_token = entitlements_client.get_service_principal_token()
'''
Makes a request using python's built in requests library. Takes additional headers if
necessary
'''
def make_request(self, method: HttpMethod, url: str, data = '', add_headers = {}, params = {}, bearer_token = None):
if bearer_token is not None and 'Bearer ' not in bearer_token:
bearer_token = 'Bearer ' + bearer_token
headers = {
'content-type': 'application/json',
'data-partition-id': self.data_partition_id,
'Authorization': bearer_token if bearer_token is not None else self.service_principal_token
}
if (len(add_headers) > 0):
for key, value in add_headers:
headers[key] = value
response = object
if (method == HttpMethod.GET):
response = requests.get(url=url, params=params, headers=headers, verify=False)
elif (method == HttpMethod.DELETE):
response = requests.delete(url=url, params=params, headers=headers, verify=False)
elif (method == HttpMethod.POST):
response = requests.post(url=url, params=params, data=data, headers=headers, verify=False)
elif (method == HttpMethod.PUT):
response = requests.put(url=url, params=params, data=data, headers=headers, verify=False)
if (response.status_code == 401 or response.status_code == 403) and self.unauth_retries < 1:
if self.use_service_principal == 'True' or self.use_service_principal == 'true':
self.unauth_retries += 1
self._refresh_service_principal_token()
self.make_request(method, url, data, add_headers, params, None)
self.unauth_retries = 0
return response
......@@ -64,6 +64,7 @@ class BaseClient:
self.dataset_url = config_parser.get('environment', 'dataset_url')
self.use_service_principal = config_parser.get('environment', 'use_service_principal')
self.schema_url = config_parser.get('environment', 'schema_url')
self.ingestion_workflow_url = config_parser.get('environment', 'ingestion_workflow_url')
self.provider = config_parser.get('provider', 'name')
self.service_principal_module_name = config_parser.get('provider', 'service_principal_module_name')
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.ingestion_workflow.create_workflow_request import CreateWorkflowRequest
from osdu_api.model.ingestion_workflow.trigger_workflow_request import TriggerWorkflowRequest
from osdu_api.model.ingestion_workflow.update_workflow_run_request import UpdateWorkflowRunRequest
from osdu_api.model.http_method import HttpMethod
class IngestionWorkflowClient(BaseClient):
"""
Holds the logic for interfacing with Ingestion Workflow's api
"""
def get_workflow(self, workflow_name: str, bearer_token=None):
params = {'workflowName': workflow_name}
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
params=params, bearer_token=bearer_token)
def create_workflow(self, create_workflow_request: CreateWorkflowRequest, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
data=create_workflow_request.to_JSON(), bearer_token=bearer_token)
def get_all_workflows_in_partition(self, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
bearer_token=bearer_token)
def delete_workflow(self, workflow_name: str, bearer_token=None):
params = {'workflowName': workflow_name}
return self.make_request(method=HttpMethod.DELETE, url='{}{}'.format(self.ingestion_workflow_url, '/workflow'),
params=params, bearer_token=bearer_token)
def trigger_workflow(self, trigger_workflow_request: TriggerWorkflowRequest, workflow_name: str, bearer_token=None):
return self.make_request(method=HttpMethod.POST, url='{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun'),
data=trigger_workflow_request.to_JSON(), bearer_token=bearer_token)
def get_workflow_runs(self, workflow_name: str, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun'),
bearer_token=bearer_token)
def get_workflow_run_by_id(self, workflow_name: str, run_id: str, bearer_token=None):
return self.make_request(method=HttpMethod.GET, url='{}{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun/', run_id),
bearer_token=bearer_token)
def update_workflow_run(self, update_workflow_run_request: UpdateWorkflowRunRequest, workflow_name: str, run_id: str, bearer_token=None):
return self.make_request(method=HttpMethod.PUT, url='{}{}{}{}{}'.format(self.ingestion_workflow_url, '/workflow/', workflow_name, '/workflowRun/', run_id),
data=update_workflow_run_request.to_JSON(), bearer_token=bearer_token)
\ No newline at end of file
from osdu_api.model.ingestion_workflow.create_workflow_request import CreateWorkflowRequest
from osdu_api.model.ingestion_workflow.trigger_workflow_request import TriggerWorkflowRequest
from osdu_api.model.ingestion_workflow.update_workflow_run_request import UpdateWorkflowRunRequest
from osdu_api.clients.ingestion_workflow.ingestion_workflow_client import IngestionWorkflowClient
ingestion_client = IngestionWorkflowClient()
create_workflow_request = CreateWorkflowRequest("test description", {}, "my_second_dag")
response = ingestion_client.create_workflow(create_workflow_request)
print(">>>>>>>")
print(response.status_code)
print(response.content)
if response.status_code == 200 or response.status_code == 409:
response = ingestion_client.get_workflow("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.get_all_workflows_in_partition()
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.delete_workflow("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
create_workflow_request = CreateWorkflowRequest("test description", {}, "my_second_dag")
response = ingestion_client.create_workflow(create_workflow_request)
if response.status_code == 200 or response.status_code == 409:
trigger_workflow_request = TriggerWorkflowRequest({})
response = ingestion_client.trigger_workflow(trigger_workflow_request, "my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
response = ingestion_client.get_workflow_runs("my_second_dag")
print(">>>>>>>")
print(response.status_code)
print(response.content)
......@@ -21,6 +21,7 @@
dataset_url=%(BASE_URL)s/api/dataset-registry/v1
entitlements_url=%(BASE_URL)s/api/entitlements/v1
schema_url=%(BASE_URL)s/api/schema-service/v1
ingestion_workflow_url=%(BASE_URL)s/api/workflow/v1
use_service_principal=True
[provider]
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class CreateWorkflowRequest(Base):
"""
Request body to ingestion workflow's create workflow endpoint
"""
def __init__(self, description: str, registration_instructions: dict, workflow_name: str):
self.description = description
self.registrationInstructions = registration_instructions
self.workflowName = workflow_name
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class TriggerWorkflowRequest(Base):
"""
Request body to ingestion workflow's trigger workflow endpoint
"""
def __init__(self, execution_context: dict):
self.executionContext = execution_context
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.model.base import Base
class UpdateWorkflowRunRequest(Base):
"""
Request body to ingestion workflow's update workflow run endpoint
"""
def __init__(self, status: str):
self.status = status
......@@ -91,5 +91,3 @@ def get_service_principal_token():
response = requests.post(url=token_url, headers=headers)
return 'Bearer {}'.format(json.loads(response.content.decode())['access_token'])
# print(json.loads(response.content.decode())['access_token'])
# return json.loads(response.content.decode())['access_token']
......@@ -21,6 +21,7 @@ entitlements_url=%(ENTITLEMENTS_BASE_URL)s/api/entitlements/v1
file_dms_url=%(FILE_DMS_BASE_URL)s/api/filedms/v2
dataset_url=%(DATASET_REGISTRY_BASE_URL)s/api/dataset-registry/v1
schema_url=%(SCHEMA_BASE_URL)s/api/schema-service/v1
ingestion_workflow_url=stub
use_service_principal=True
[provider]
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
from osdu_api.clients.data_workflow.ingestion_workflow_client import IngestionWorkflowClient
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.http_method import HttpMethod
from osdu_api.model.ingestion_workflow.create_workflow_request import CreateWorkflowRequest
class TestIngestionWorkflowClient(unittest.TestCase):
@mock.patch.object(BaseClient, 'make_request', return_value="response")
@mock.patch.object(BaseClient, '_refresh_service_principal_token', return_value="stubbed")
def test_make_request(self, get_bearer_token_mock, make_request_mock):
# Arrange
client = IngestionWorkflowClient()
client.service_principal_token = 'stubbed'
client.ingestion_workflow_url = 'stubbed url'
client.headers = {}
create_workflow_request = CreateWorkflowRequest("test description", {}, "my_second_dag")
# Act
response = ingestion_client.create_workflow(create_workflow_request)
# Assert
assert response == make_request_mock.return_value
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment