Commit 3c3e4859 authored by Aleksandr Spivakov (EPAM)'s avatar Aleksandr Spivakov (EPAM)
Browse files

GONRG-3109: move common logic from ingestion

parent af432a2a
Pipeline #63678 passed with stages
in 14 minutes and 23 seconds
......@@ -3,6 +3,7 @@ default:
stages:
- linters
- unit_tests
- deploy
pylint:
......@@ -21,6 +22,14 @@ isort:
- isort -c -v osdu_airflow/*/*.py || EXIT_CODE=$?
- exit ${EXIT_CODE}
unit_tests:
stage: unit_tests
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
script:
- chmod +x ./osdu_airflow/tests/unit_tests.sh
- ./osdu_airflow/tests/./unit_tests.sh || EXIT_CODE=$?
- exit ${EXIT_CODE}
deploy:
stage: deploy
script:
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Update Status operator."""
import copy
import enum
import logging
from airflow.models import BaseOperator, Variable
from osdu_api.libs.context import Context
from osdu_api.libs.exceptions import PipelineFailedError
from osdu_api.libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.update_status import UpdateStatus
logger = logging.getLogger()
class UpdateStatusOperator(BaseOperator):
"""Operator to update status."""
ui_color = '#10ECAA'
ui_fgcolor = '#000000'
class prev_ti_state(enum.Enum):
NONE = "running"
SUCCESS = "finished"
FAILED = "failed"
def get_previous_ti_statuses(self, context: dict) -> enum.Enum:
"""Get status of previous tasks' executions.
Return corresponding enum value.
:param context: Airflow context
:type context: dict
:return: Previous status
:rtype: enum.Enum
"""
dagrun = context['ti'].get_dagrun()
failed_ti = dagrun.get_task_instances(state='failed')
success_ti = dagrun.get_task_instances(state='success')
if not failed_ti and not success_ti: # There is no prev task so it can't have been failed
logger.info("There are no tasks before this one. So it has status RUNNING")
return self.prev_ti_state.NONE
if failed_ti:
logger.info("There are failed tasks before this one. So it has status FAILED")
return self.prev_ti_state.FAILED
logger.info("There are successed tasks before this one. So it has status SUCCESSED")
return self.prev_ti_state.SUCCESS
def pre_execute(self, context: dict):
self.status = self.get_previous_ti_statuses(context)
def execute(self, context: dict):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether records
are searchable or not.
If they are then update status FINISHED else FAILED
:param context: Airflow context
:type context: dict
:raises PipelineFailedError: If any of the status is failed
"""
conf = copy.deepcopy(context["dag_run"].conf)
logger.debug(f"Got conf {conf}.")
if "Payload" in conf:
payload_context = Context.populate(conf)
else:
payload_context = Context(data_partition_id=conf["data-partition-id"],
app_key=conf.get("AppKey", ""))
workflow_id = conf["WorkflowID"]
status = self.status.value
status_updater = UpdateStatus(
workflow_name="",
workflow_url=Variable.get("core__service__workflow__url"),
workflow_id=workflow_id,
run_id="",
status=status,
token_refresher=AirflowTokenRefresher(),
context=payload_context
)
status_updater.update_workflow_status()
if self.status is self.prev_ti_state.FAILED:
raise PipelineFailedError("Dag failed")
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""R3 Validate reference Manifest operator."""
import logging
from airflow.models import BaseOperator, Variable
from osdu_api.libs.context import Context
from osdu_api.libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.validation.validate_file_source import FileSourceValidator
from osdu_api.libs.validation.validate_referential_integrity import ManifestIntegrity
from osdu_airflow.backward_compatibility.airflow_utils import apply_defaults
from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin
logger = logging.getLogger()
class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
"""Operator to validate ref inside manifest R3 and remove invalid entities."""
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
@apply_defaults
def __init__(self,
previous_task_id: str=None,
*args, **kwargs):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.search_url = Variable.get('core__service__search__url')
self.whitelist_ref_patterns = Variable.get('core__config__reference_patterns_whitelist', default_var=None)
self.previous_task_id = previous_task_id
self._show_skipped_ids = Variable.get(
'core__config__show_skipped_ids', default_var=False
)
def execute(self, context: dict):
"""Execute manifest validation then process it.
:param context: Airflow context
:type context: dict
"""
payload_context = Context.populate(context["dag_run"].conf["execution_context"])
token_refresher = AirflowTokenRefresher()
file_source_validator = FileSourceValidator()
manifest_integrity = ManifestIntegrity(
self.search_url,
token_refresher,
file_source_validator,
payload_context,
self.whitelist_ref_patterns,
)
execution_context = context["dag_run"].conf["execution_context"]
manifest_data = self._get_manifest_data(context, execution_context)
previously_skipped_entities = self._get_previously_skipped_entities(context)
logger.debug(f"Manifest data: {manifest_data}")
manifest, skipped_ids = manifest_integrity.ensure_integrity(
manifest_data,
previously_skipped_entities
)
logger.debug(f"Valid manifest data: {manifest_data}")
if self._show_skipped_ids:
context["ti"].xcom_push(key="skipped_ids", value=skipped_ids)
return {"manifest": manifest}
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from osdu_api.libs.types import ManifestType
class ReceivingContextMixin:
"""Mixin for receiving manifest file from XCOMs in case if current operator not the first in the row"""
def _get_manifest_data(self, context: dict, execution_context: dict) -> ManifestType:
"""
Receive manifest file. If previous task id not None - get manifest file from XCOMs.
Otherwise - get manifest file from execution context
"""
if self.previous_task_id:
previous_task_value = context["ti"].xcom_pull(task_ids=self.previous_task_id,
key="return_value")
if previous_task_value:
manifest_data = previous_task_value["manifest"]
else:
manifest_data = execution_context["manifest"]
else:
manifest_data = execution_context["manifest"]
return manifest_data
def _get_previously_skipped_entities(self, context: dict) -> list:
"""
Receive skipped entities from previous tasks.
"""
previously_skipped_ids = []
dagrun = context['ti'].get_dagrun()
task_instances = dagrun.get_task_instances()
for task in task_instances:
task_skipped_ids = context["ti"].xcom_pull(key="skipped_ids", task_ids=task.task_id)
if task_skipped_ids:
previously_skipped_ids.extend(task_skipped_ids)
return previously_skipped_ids
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""R2 Process Manifest operator."""
import configparser
import enum
import json
import logging
import re
import sys
import time
import uuid
from collections import Counter
from typing import Tuple
from urllib.error import HTTPError
import requests
import tenacity
from airflow.models import BaseOperator, Variable
from osdu_api.libs.auth.authorization import authorize
from osdu_api.libs.refresh_token import AirflowTokenRefresher
config = configparser.RawConfigParser()
config.read(Variable.get("core__config__dataload_config_path"))
DEFAULT_TENANT = config.get("DEFAULTS", "tenant")
DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version")
RETRIES = 3
TIMEOUT = 1
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# Some constants, used by script
SEARCH_OK_RESPONSE_CODES = [200]
DATA_LOAD_OK_RESPONSE_CODES = [201]
NOT_FOUND_RESPONSE_CODES = [404]
BAD_TOKEN_RESPONSE_CODES = [400, 401, 403, 500]
class FileType(enum.Enum):
MANIFEST = enum.auto()
WORKPRODUCT = enum.auto()
def dataload(**kwargs):
data_conf = kwargs['dag_run'].conf
conf_payload = kwargs["dag_run"].conf["Payload"]
loaded_conf = {
"acl": conf_payload["acl"],
"legal_tag": conf_payload["legal"],
"data_object": data_conf
}
return loaded_conf, conf_payload
def create_headers(conf_payload):
"""Create header.
:param conf_payload: config payload
:return: headers
"""
partition_id = conf_payload["data-partition-id"]
app_key = conf_payload["AppKey"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
'AppKey': app_key
}
return headers
def generate_id(type_id):
"""Generate resource ID.
:param type_id: resource type ID
:return: resource ID
"""
return "{0}{1}:".format(type_id.replace("type:", ""), re.sub(r"\D", "", str(uuid.uuid4())))
def determine_data_type(raw_resource_type_id):
"""Determine resource type ID.
:param raw_resource_type_id: raw resource type ID from manifest file
:return: short resource type ID
"""
return raw_resource_type_id.split("/")[-1].replace(":", "") \
if raw_resource_type_id is not None else None
# TODO: add comments to functions that implement actions in this function
def process_file_items(loaded_conf, conf_payload) -> Tuple[list, list]:
"""Process files items.
:param loaded_conf: loaded configuration
:param conf_payload: configuration payload
:return: list of file records and list of their ids
"""
file_ids = []
file_list = []
data_object = loaded_conf.get("data_object")
acl = loaded_conf.get("acl")
legal_tag = loaded_conf.get("legal_tag")
for file in data_object["Files"]:
file["ResourceID"] = generate_id(file["ResourceTypeID"])
file_ids.append(file["ResourceID"])
file_list.append(
(
populate_request_body(file, acl, legal_tag, "file", conf_payload),
"File"
)
)
return file_list, file_ids
def process_wpc_items(loaded_conf, product_type, file_ids, conf_payload):
"""Process WorkProductComponents items.
:param loaded_conf: loaded configuration
:param product_type: product type
:param file_ids: list of file ids
:param conf_payload: configuration payload
:return: list of workproductcomponents records and list of their ids
"""
wpc_ids = []
wpc_list = []
data_object = loaded_conf.get("data_object")
acl = loaded_conf.get("acl")
legal_tag = loaded_conf.get("legal_tag")
for wpc in data_object["WorkProductComponents"]:
wpc["ResourceID"] = generate_id(wpc["ResourceTypeID"])
wpc_ids.append(wpc["ResourceID"])
wpc["Data"]["GroupTypeProperties"]["Files"] = file_ids
wpc_list.append(
(
populate_request_body(wpc, acl, legal_tag, product_type + "_wpc", conf_payload),
product_type + "_wpc"
)
)
return wpc_list, wpc_ids
def process_wp_item(loaded_conf, product_type, wpc_ids, conf_payload) -> list:
"""Process WorkProduct item.
:param loaded_conf: loaded configuration
:param product_type: product type
:param wpc_ids: work product component ids
:param conf_payload: configuration payload
:return: work product record
"""
data_object = loaded_conf.get("data_object")
acl = loaded_conf.get("acl")
legal_tag = loaded_conf.get("legal_tag")
work_product = data_object["WorkProduct"]
work_product["ResourceID"] = generate_id(work_product["ResourceTypeID"])
work_product["Data"]["GroupTypeProperties"]["Components"] = wpc_ids
work_product = [
(
populate_request_body(work_product, acl, legal_tag, product_type + "_wp", conf_payload),
product_type + "_wp"
)
]
return work_product
def validate_file_type(file_type, data_object):
"""Validate file type.
:param file_type: file type
:param data_object: file record
"""
if not file_type:
logger.error(f"Error with file {data_object}. Type could not be specified.")
sys.exit(2)
def validate_file(loaded_conf) -> Tuple[FileType, str]:
"""Validate file.
:param loaded_conf: loaded configuration
:return: file type and produc type
"""
data_object = loaded_conf.get("data_object")
if not data_object:
logger.error(f"Error with file {data_object}. It is empty.")
sys.exit(2)
elif "Manifest" in data_object and "ResourceTypeID" in data_object.get("Manifest"):
product_type = determine_data_type(data_object["Manifest"].get("ResourceTypeID"))
validate_file_type(product_type, data_object)
return (FileType.MANIFEST, product_type)
elif "WorkProduct" in data_object and "ResourceTypeID" in data_object.get("WorkProduct"):
product_type = determine_data_type(data_object["WorkProduct"].get("ResourceTypeID"))
validate_file_type(product_type, data_object)
if product_type.lower() == "workproduct" and \
data_object.get("WorkProductComponents") and \
len(data_object["WorkProductComponents"]) >= 1:
product_type = determine_data_type(
data_object["WorkProductComponents"][0].get("ResourceTypeID"))
validate_file_type(product_type, data_object)
return (FileType.WORKPRODUCT, product_type)
else:
logger.error(
f"Error with file {data_object}. It doesn't have either Manifest or WorkProduct or ResourceTypeID.")
sys.exit(2)
def create_kind(data_kind, conf_payload):
"""Create kind.
:param data_kind: data kind
:param conf_payload: configuration payload
:return: kind
"""
partition_id = conf_payload.get("data-partition-id", DEFAULT_TENANT)
source = conf_payload.get("authority", DEFAULT_SOURCE)
version = conf_payload.get("kind_version", DEFAULT_VERSION)
kind_init = config.get("KINDS_INITIAL", f"{data_kind.lower()}_kind")
kind = f"{partition_id}:{source}:{kind_init}:{version}"
return kind
def populate_request_body(data, acl, legal_tag, data_type, conf_payload):
"""Populate request body according API specification
:param data: item data from manifest files
:param data_type: resource type ID
:return: populated request
:rtype: dict
"""
request = {"kind": create_kind(data_type, conf_payload),
"legal": {
"legaltags": [],
"otherRelevantDataCountries": ["US"],
"status": "compliant"
},
"acl": {
"viewers": [],
"owners": []
},
"data": data}
request["legal"]["legaltags"] = legal_tag["legaltags"]
request["acl"]["viewers"] = acl["viewers"]
request["acl"]["owners"] = acl["owners"]
return request
def separate_type_data(request_data):
"""Separate the list of tuples into Data Type Counter and data list
:param request_data: tuple of data and types
:type request_data: tuple(list, str)
:return: counter with data types and data list
:rtype: tuple(counter, list)
"""
data = []
types = Counter()
for elem in request_data:
data.append(elem[0])
types[elem[1]] += 1
logger.info(f"The count of records to be ingested: {str(dict(types))}")
return types, data
def create_manifest_request_data(loaded_conf: dict, product_type: str):
acl = loaded_conf.get("acl")
legal_tag = loaded_conf.get("legal_tag")
data_object = loaded_conf.get("data_object")
data_objects_list = [
(
populate_request_body(data_object["Manifest"], acl, legal_tag, product_type),
product_type)]
return data_objects_list
def create_workproduct_request_data(loaded_conf: dict, product_type: str, wp, wpc_list, file_list):
data_object_list = file_list + wpc_list + wp
types, data_objects_list = separate_type_data(data_object_list)
return data_objects_list
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),