Commit 34ab085b authored by Jorge Garcia-Rivera's avatar Jorge Garcia-Rivera
Browse files

Implement workflow client and update workflow status method

parent 544fb8d5
......@@ -39,6 +39,7 @@ class BaseClient:
self.search_url = self._parse_config_value(config, 'search_url', False)
self.provider = self._parse_config_value(config, 'provider', True)
self.entitlements_module_name = self._parse_config_value(config, 'entitlements_module_name', True)
self.workflow_url = self._parse_config_value(config, 'workflow_url', False)
'''
Used during parsing of the yaml config file. Will raise an exception if a required config
......
storage_url: http://localhost:8081/api/storage/v2/records
search_url: http://localhost:8085/api/search/v2/query
workflow_url: http://localhost:8090/updateStatusFunction
data_partition_id: stubbed_partition
# dynamically injects what provider-specific logic to use
provider: stubbed_provider
......
import json
from osdu_api.base_client import BaseClient
from osdu_api.model.http_method import HttpMethod
'''
Holds the logic for interfacing with the Workflow microservice's UpdateWorkflowStatus API endpoint.
'''
class WorkflowClient(BaseClient):
'''
Calls the service's UpdateWorkflowStatus API endpoint taking in a workflowId and workflowStatus and constructing
the JSON body of the request. Returns the response object for the call.
'''
def update_workflow_status(self, workflowId: str, workflowStatus: str):
# Create a dictionary containing the data for the request
workflow_data_dictionary = {'WorkflowID': workflowId, 'Status': workflowStatus}
# Convert the dictionary into JSON, submit request to API, and return response
return self.update_workflow_status_from_dict(workflow_data_dictionary)
'''
Calls Workflow API's endpoint UpdateWorkflowstatus taking individual attributes and constructing
the body of the request. Returns the response object for the call
Example of workflow_data:
{
"WorkflowID": "6893fab0-38eb-4aed-96e9-c667f1e771c8",
"Status": "finished"
}
'''
def update_workflow_status_from_dict(self, workflowDataDictionary: dict):
workflow_data = json.dumps(workflowDataDictionary)
response = self.make_request(method=HttpMethod.POST, url=self.workflow_url, data=workflow_data)
return response
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment