Skip to content
Snippets Groups Projects
Commit 1b615708 authored by ethiraj krishnamanaidu's avatar ethiraj krishnamanaidu
Browse files

Merge branch 'trusted-update_workflow' into 'master'

GONRG-1709: update workflow service interaction (#32)

See merge request !13
parents 0af46d4b efc0ff21
No related branches found
No related tags found
1 merge request!13GONRG-1709: update workflow service interaction (#32)
Pipeline #25356 passed
Showing
with 625 additions and 500 deletions
......@@ -21,7 +21,7 @@ from datetime import timedelta
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from operators.update_status import UpdateStatusOperator
from operators.deprecated.update_status import UpdateStatusOperator
from libs.create_records import create_records # isort:skip
......
......@@ -19,6 +19,7 @@ import json
import logging
import requests
from libs.context import Context
from libs.mixins import HeadersMixin
from libs.refresh_token import TokenRefresher, refresh_token
......@@ -31,7 +32,9 @@ class UpdateStatus(HeadersMixin):
def __init__(
self,
workflow_name: str,
workflow_id: str,
run_id: str,
workflow_url: str,
status: str,
token_refresher: TokenRefresher,
......@@ -39,8 +42,12 @@ class UpdateStatus(HeadersMixin):
) -> None:
"""Init the status update processor.
:param workflow_name: the name of the workflow
:type workflow_name: str
:param workflow_id: The id of the workflow
:type workflow_id: str
:param run_id: The id of workflow run instance
:type run_id: str
:param workflow_url: The base url of the Workflow service
:type workflow_url: str
:param status: The status
......@@ -51,8 +58,10 @@ class UpdateStatus(HeadersMixin):
:type context: Context
"""
super().__init__(context)
self.workflow_name = workflow_name
self.workflow_url = workflow_url
self.workflow_id = workflow_id
self.run_id = run_id
self.context = context
self.status = status
self.token_refresher = token_refresher
......@@ -61,6 +70,25 @@ class UpdateStatus(HeadersMixin):
def update_status_request(self, headers: dict) -> requests.Response:
"""Send request to update status.
:param headers: The request headers
:type headers: dict
:return: The Workflow server response
:rtype: requests.Response
"""
request_body = {
"status": self.status
}
request_body = json.dumps(request_body)
logger.debug(f"Sending request '{request_body}'")
update_status_url = f"{self.workflow_url}/v1/workflow/{self.workflow_name}/workflowRun/{self.run_id}"
logger.debug(f"Workflow URL: {update_status_url}")
response = requests.put(update_status_url, request_body, headers=headers)
return response
@refresh_token()
def update_status_request_old(self, headers: dict) -> requests.Response:
"""Send request to update status.
:param headers: The request headers
:type headers: dict
:return: The Workflow server response
......@@ -78,4 +106,7 @@ class UpdateStatus(HeadersMixin):
def update_workflow_status(self):
"""Updates workflow status."""
headers = self.request_headers
self.update_status_request(headers)
if self.workflow_name:
self.update_status_request(headers)
else:
self.update_status_request_old(headers)
......@@ -19,9 +19,9 @@ from datetime import timedelta
import airflow
from airflow import DAG
from operators.deprecated.update_status import UpdateStatusOperator
from operators.process_manifest_r2 import ProcessManifestOperatorR2
from operators.search_record_id import SearchRecordIdOperator
from operators.update_status import UpdateStatusOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
......@@ -30,8 +30,9 @@ default_args = {
"trigger_rule": "none_failed",
}
workflow_name = "Osdu_ingest_r2"
dag = DAG(
"Osdu_ingest_r2",
workflow_name,
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
......@@ -46,7 +47,7 @@ update_status_running_op = UpdateStatusOperator(
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
trigger_rule="all_done"
)
process_manifest_op = ProcessManifestOperatorR2(
......
......@@ -29,8 +29,9 @@ default_args = {
"trigger_rule": "none_failed",
}
workflow_name = "Osdu_ingest"
dag = DAG(
"Osdu_ingest",
workflow_name,
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Update Status operator."""
import copy
import enum
import logging
from airflow.models import BaseOperator, Variable
from libs.context import Context
from libs.refresh_token import AirflowTokenRefresher
from libs.exceptions import PipelineFailedError
from libs.update_status import UpdateStatus
logger = logging.getLogger()
class UpdateStatusOperator(BaseOperator):
"""Operator to update status."""
ui_color = '#10ECAA'
ui_fgcolor = '#000000'
class prev_ti_state(enum.Enum):
NONE = "running"
SUCCESS = "finished"
FAILED = "failed"
def get_previous_ti_statuses(self, context: dict) -> enum.Enum:
"""Get status of previous tasks' executions.
Return corresponding enum value.
:param context: Airflow context
:type context: dict
:return: Previous status
:rtype: enum.Enum
"""
dagrun = context['ti'].get_dagrun()
failed_ti = dagrun.get_task_instances(state='failed')
success_ti = dagrun.get_task_instances(state='success')
if not failed_ti and not success_ti: # There is no prev task so it can't have been failed
logger.info("There are no tasks before this one. So it has status RUNNING")
return self.prev_ti_state.NONE
if failed_ti:
logger.info("There are failed tasks before this one. So it has status FAILED")
return self.prev_ti_state.FAILED
logger.info("There are successed tasks before this one. So it has status SUCCESSED")
return self.prev_ti_state.SUCCESS
def pre_execute(self, context: dict):
self.status = self.get_previous_ti_statuses(context)
def execute(self, context: dict):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether records
are searchable or not.
If they are then update status FINISHED else FAILED
:param context: Airflow context
:type context: dict
:raises PipelineFailedError: If any of the status is failed
"""
conf = copy.deepcopy(context["dag_run"].conf)
logger.debug(f"Got conf {conf}.")
if "Payload" in conf:
payload_context = Context.populate(conf)
else:
payload_context = Context(data_partition_id=conf["data-partition-id"],
app_key=conf.get("AppKey", ""))
workflow_id = conf["WorkflowID"]
status = self.status.value
status_updater = UpdateStatus(
workflow_name="",
workflow_url=Variable.get("update_status_url"),
workflow_id=workflow_id,
run_id="",
status=status,
token_refresher=AirflowTokenRefresher(),
context=payload_context
)
status_updater.update_workflow_status()
if self.status is self.prev_ti_state.FAILED:
raise PipelineFailedError("Dag failed")
......@@ -56,7 +56,9 @@ class ProcessManifestOperatorR3(BaseOperator):
:param context: Airflow context
:type context: dict
"""
payload_context = Context.populate(context["dag_run"].conf)
execution_context = context["dag_run"].conf["execution_context"]
payload_context = Context.populate(execution_context)
token_refresher = AirflowTokenRefresher()
file_handler = FileHandler(self.file_service_url, token_refresher, payload_context)
source_file_checker = SourceFileChecker()
......@@ -66,8 +68,8 @@ class ProcessManifestOperatorR3(BaseOperator):
token_refresher,
payload_context
)
manifest_schema = validator.validate_common_schema(context["dag_run"].conf)
traversal = ManifestTraversal(context["dag_run"].conf, manifest_schema)
manifest_schema = validator.validate_common_schema(execution_context)
traversal = ManifestTraversal(execution_context, manifest_schema)
manifest_entities = traversal.traverse_manifest()
logger.debug(f"entities count: {len(manifest_entities)}")
valid_manifest_entities = validator.validate_manifest(manifest_entities)
......
......@@ -45,7 +45,7 @@ class UpdateStatusOperator(BaseOperator):
:param context: Airflow context
:type context: dict
:return: Previous status
:return: Previous status
:rtype: enum.Enum
"""
dagrun = context['ti'].get_dagrun()
......@@ -75,16 +75,20 @@ class UpdateStatusOperator(BaseOperator):
"""
conf = copy.deepcopy(context["dag_run"].conf)
logger.debug(f"Got conf {conf}.")
if "Payload" in conf:
payload_context = Context.populate(conf)
execution_context = conf["execution_context"]
if "Payload" in execution_context:
payload_context = Context.populate(execution_context)
else:
payload_context = Context(data_partition_id=conf["data-partition-id"],
app_key=conf.get("AppKey", ""))
workflow_id = conf["WorkflowID"]
payload_context = Context(data_partition_id=execution_context["data-partition-id"],
app_key=execution_context.get("AppKey", ""))
workflow_name = conf["workflow_name"]
run_id = conf["run_id"]
status = self.status.value
status_updater = UpdateStatus(
workflow_url=Variable.get("update_status_url"),
workflow_id=workflow_id,
workflow_name=workflow_name,
workflow_url=Variable.get("workflow_url"),
workflow_id="",
run_id=run_id,
status=status,
token_refresher=AirflowTokenRefresher(),
context=payload_context
......
{
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
"execution_context": {
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"WorkflowID": "foo"
"workflow_name": "osdu_ingest",
"run_id": "foo"
}
{
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
"execution_context": {
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": {
"test": "test"
}
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": {"test": "test"},
"WorkflowID": "foo"
"workflow_name": "osdu_ingest",
"run_id": "foo"
}
{
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": {
"kind": "test:test:Manifest:1.0.0",
"ReferenceData": [],
"MasterData": [
{
"id": "opendes:master-data/Wellbore:350112350400",
"kind": "opendes:osdu:TestMaster:0.3.0",
"groupType": "master-data",
"version": 1,
"acl": {
"owners": [
"data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com"
],
"viewers": [
"data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com"
]
},
"legal": {
"legaltags": [
"opendes-demo-legaltag"
],
"otherRelevantDataCountries": [
"srn:opendes:master-data/GeoPoliticalEntity:USA:"
],
"status": "srn:opendes:reference-data/LegalStatus:public:1111"
},
"resourceHostRegionIDs": [
"srn:opendes:reference-data/OSDURegion:US-EAST:"
"execution_context": {
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": {
"kind": "test:test:Manifest:1.0.0",
"ReferenceData": [],
"MasterData": [
{
"id": "opendes:master-data/Wellbore:350112350400",
"kind": "opendes:osdu:TestMaster:0.3.0",
"groupType": "master-data",
"version": 1,
"acl": {
"owners": [
"data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com"
],
"viewers": [
"data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com"
]
},
"legal": {
"legaltags": [
"opendes-demo-legaltag"
],
"otherRelevantDataCountries": [
"srn:opendes:master-data/GeoPoliticalEntity:USA:"
],
"status": "srn:opendes:reference-data/LegalStatus:public:1111"
},
"resourceHostRegionIDs": [
"srn:opendes:reference-data/OSDURegion:US-EAST:"
],
"resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:public:",
"source": "srn:opendes:master-data/Organisation:Oklahoma Corporation Commission:",
"existenceKind": "srn:opendes:reference-data/ExistenceKind:Active:",
"licenseState": "srn:opendes:reference-data/LicenseState:Unlicensed:",
"data": {
"SequenceNumber": 1
},
"schema": "test:test:GenericMasterData:1.0.0"
}
],
"resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:public:",
"source": "srn:opendes:master-data/Organisation:Oklahoma Corporation Commission:",
"existenceKind": "srn:opendes:reference-data/ExistenceKind:Active:",
"licenseState": "srn:opendes:reference-data/LicenseState:Unlicensed:",
"data": {
"SequenceNumber": 1
},
"schema": "test:test:GenericMasterData:1.0.0"
"Data": {}
}
],
"Data": {}
},
"WorkflowID": "foo"
"workflow_name": "osdu_ingest",
"run_id": "foo"
}
......@@ -84,7 +84,7 @@ class TestManifestProcessor:
with open(traversal_manifest_file) as f:
manifest_file = json.load(f)
manifest_records = manifest_file
context = Context.populate(conf)
context = Context.populate(conf["execution_context"])
token_refresher = AirflowTokenRefresher()
file_handler = FileHandler("test", token_refresher, context)
source_file_checker = SourceFileChecker()
......
......@@ -36,7 +36,7 @@ class TestManifestTraversal:
with open(manifest_schema_file) as f:
manifest_schema = json.load(f)
traversal = ManifestTraversal(
conf_manifest_file,
conf_manifest_file["execution_context"],
manifest_schema
)
return traversal
......
......@@ -84,37 +84,10 @@ class TestOperators(object):
task.pre_execute(context)
task.execute(context)
@pytest.mark.parametrize(
"record_ids,search_response_path",
[
pytest.param(
["test"],
SEARCH_VALID_RESPONSE_PATH
),
pytest.param(
["test", "test", "test"],
SEARCH_VALID_RESPONSE_PATH
)
]
)
def test_search_id_operator(self, monkeypatch, record_ids: list, search_response_path: str):
# make validator pass
monkeypatch.setattr(SchemaValidator, "validate_manifest", lambda obj: None)
task, context = self._create_task(SearchRecordIdOperator)
monkeypatch.setattr(context["ti"], "xcom_pull", lambda **kwargs: record_ids)
mock_search_response = MockSearchResponse(body_path=search_response_path,
status_code=http.HTTPStatus.OK,
total_count=len(record_ids))
monkeypatch.setattr(requests, "post", lambda *args, **kwargs: mock_search_response)
task.pre_execute(context)
task.execute(context)
def _test_update_status_operator(self, monkeypatch, status: UpdateStatusOperator.prev_ti_state):
monkeypatch.setattr(UpdateStatusOperator, "get_previous_ti_statuses",
lambda obj, context: status)
monkeypatch.setattr(requests, "post", lambda *args, **kwargs: MockWorkflowResponse(
monkeypatch.setattr(requests, "put", lambda *args, **kwargs: MockWorkflowResponse(
status_code=http.HTTPStatus.OK, json="test"))
task, context = self._create_task(UpdateStatusOperator)
......
......@@ -40,11 +40,14 @@ class TestUpdateStatus:
def status_updater(self, status: str, conf_path: str):
with open(conf_path) as f:
conf = json.load(f)
context = Context.populate(conf)
workflow_id = conf["WorkflowID"]
context = Context.populate(conf["execution_context"])
workflow_name = conf["workflow_name"]
run_id = conf["run_id"]
status_updater = UpdateStatus(
workflow_url = "http://test",
workflow_id=workflow_id,
workflow_name=workflow_name,
workflow_url="http://test",
workflow_id="",
run_id=run_id,
token_refresher=AirflowTokenRefresher(get_test_credentials()),
context=context,
status=status
......@@ -63,4 +66,5 @@ class TestUpdateStatus:
def test_update_workflow_status(self, monkeypatch, status_updater: UpdateStatus, conf_path: str,
status: str):
monkeypatch.setattr(requests, "post", lambda *args, **kwargs: MockWorkflowResponse())
monkeypatch.setattr(requests, "put", lambda *args, **kwargs: MockWorkflowResponse())
status_updater.update_workflow_status()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment