Skip to content
Snippets Groups Projects
Commit f2a67962 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'feature/GONRG-952_r3_data_ingestion' into 'integration-master'

R3 Reference and Master data ingestion

See merge request go3-nrg/platform/data-flow/ingestion/ingestion-dags!14
parents f0cd1baf 1551cb6a
No related branches found
No related tags found
1 merge request!6R3 Data Ingestion
Showing
with 931 additions and 36 deletions
......@@ -31,3 +31,6 @@
**/*.egg
**/MANIFEST
# will remove it latter
**/schema_registration/
[MASTER]
# Add operators and dags to sys-path
init-hook='import sys; sys.path.append("src/plugins"); sys.path.append("src/dags")'
# Specify a configuration file.
#rcfile=.pylintrc
......
......@@ -14,16 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for opaque ingestion"""
from datetime import timedelta
import airflow.utils.dates
from airflow import DAG
from airflow.operators import UpdateStatusOperator
from airflow.operators.python_operator import PythonOperator
from operators.update_status_op import UpdateStatusOperator
from libs.create_records import create_records #isort:skip
from libs.create_records import create_records # isort:skip
"""
A workflow creating a record
......@@ -69,5 +69,4 @@ create_records_op = PythonOperator(
dag=dag
)
update_status_running_op >> create_records_op >> update_status_finished_op
......@@ -33,3 +33,16 @@ class PipelineFailedError(Exception):
Raise when pipeline failed.
"""
pass
class EmptyManifestError(Exception):
"""
Raise when manifest field is empty.
"""
pass
class GetSchemaError(Exception):
"""
Raise when can't find schema.
"""
pass
......@@ -15,6 +15,8 @@
import logging
import sys
import time
from functools import partial
from http import HTTPStatus
import requests
......@@ -83,6 +85,18 @@ def _check_token():
set_access_token(SA_FILE_PATH, ACCESS_SCOPES)
def make_callable_request(obj, request_function, headers, *args, **kwargs):
"""
Create send_request_with_auth function.
"""
headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
if obj: # if wrapped function is an object's method
callable_request = lambda: request_function(obj, headers, *args, **kwargs)
else:
callable_request = lambda: request_function(headers, *args, **kwargs)
return callable_request
def _wrapper(*args, **kwargs):
"""
Generic decorator wrapper for checking token and refreshing it.
......@@ -94,22 +108,22 @@ def _wrapper(*args, **kwargs):
if not isinstance(headers, dict):
logger.error("Got headers %s" % headers)
raise TypeError
headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
if obj: # if wrapped function is an object's method
send_request_with_auth = lambda: request_function(obj, headers, *args, **kwargs)
else:
send_request_with_auth = lambda: request_function(headers, *args, **kwargs)
send_request_with_auth = make_callable_request(obj, request_function, headers,
*args, **kwargs)
response = send_request_with_auth()
if not isinstance(response, requests.Response):
logger.error("Function %s must return values of type requests.Response. "
"Got %s instead" % (kwargs["rqst_func"], type(response)))
"Got %s instead" % (request_function, type(response)))
raise TypeError
if not response.ok:
if response.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
set_access_token(SA_FILE_PATH, ACCESS_SCOPES)
send_request_with_auth = make_callable_request(obj,
request_function,
headers,
*args, **kwargs)
response = send_request_with_auth()
else:
response.raise_for_status()
response.raise_for_status()
return response
......
......@@ -13,22 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for R2 ingestion"""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator
from operators.process_manifest_r2_op import ProcessManifestOperatorR2
from operators.search_record_id_op import SearchRecordIdOperator
from operators.update_status_op import UpdateStatusOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(minutes=50),
"trigger_rule": "none_failed",
}
dag = DAG(
"Osdu_ingest",
"Osdu_ingest_r2",
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
......@@ -48,13 +51,12 @@ update_status_finished_op = UpdateStatusOperator(
trigger_rule="all_done",
)
process_manifest_op = ProcessManifestOperator(
process_manifest_op = ProcessManifestOperatorR2(
task_id="proccess_manifest_task",
provide_context=True,
dag=dag
)
search_record_ids_op = SearchRecordIdOperator(
task_id="search_record_ids_task",
provide_context=True,
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
# Copyright 2020 Amazon
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -14,32 +13,54 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for R3 ingestion"""
"""isort:skip_file"""
from airflow.plugins_manager import AirflowPlugin
from operators.search_record_id_op import SearchRecordIdOperator # pylint: disable=import-error
from .operators import ProcessManifestOperator, UpdateStatusOperator
# Defining the plugin class
class OSDUPlugin(AirflowPlugin):
name = "osdu_plugin"
operators = [
UpdateStatusOperator,
ProcessManifestOperator,
SearchRecordIdOperator
]
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
from datetime import timedelta
import airflow
from airflow import DAG
from operators.process_manifest_r3 import ProcessManifestOperatorR3
from operators.search_record_id_op import SearchRecordIdOperator
from operators.update_status_op import UpdateStatusOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
dag = DAG(
"Osdu_ingest",
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
)
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
dag=dag
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
)
process_manifest_op = ProcessManifestOperatorR3(
task_id="proccess_manifest_task",
provide_context=True,
dag=dag
)
search_record_ids_op = SearchRecordIdOperator(
task_id="search_record_ids_task",
provide_context=True,
dag=dag,
retries=4
)
update_status_running_op >> process_manifest_op >> \
search_record_ids_op >> update_status_finished_op # pylint: disable=pointless-statement
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dummy DAG"""
from datetime import timedelta
import airflow
......@@ -31,7 +33,6 @@ dag = DAG(
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60))
t1 = BashOperator(
task_id="echo",
bash_command="echo test",
......
......@@ -12,13 +12,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .process_manifest_op import ProcessManifestOperator
from .search_record_id_op import SearchRecordIdOperator
from .update_status_op import UpdateStatusOperator
__all__ = [
'UpdateStatusOperator',
'ProcessManifestOperator',
'SearchRecordIdOperator',
]
......@@ -338,7 +338,7 @@ def process_manifest(**kwargs):
kwargs["ti"].xcom_push(key="record_ids", value=record_ids)
class ProcessManifestOperator(BaseOperator):
class ProcessManifestOperatorR2(BaseOperator):
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import dataclasses
import json
import logging
import sys
import uuid
from datetime import datetime
import jsonschema
import requests
import tenacity
from airflow.models import BaseOperator, Variable
from libs.exceptions import EmptyManifestError, GetSchemaError
from libs.refresh_token import refresh_token
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
timestamp = datetime.now().isoformat()
# Set up file logger
handler = logging.FileHandler(f"execution_{timestamp}.log")
handler.setFormatter(logging.Formatter("%(message)s"))
file_logger = logging.getLogger("Execution")
file_logger.setLevel(logging.INFO)
file_logger.addHandler(handler)
RETRIES = 3
TIMEOUT = 1
@dataclasses.dataclass
class Context(object):
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
ctx_payload = ctx.pop('Payload')
app_key = ctx_payload['AppKey']
data_partition_id = ctx_payload['data-partition-id']
ctx_obj = cls(app_key=app_key, data_partition_id=data_partition_id)
return ctx_obj
class OSDURefResolver(jsonschema.RefResolver):
def __init__(self, schema_service, *args, **kwargs):
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document, fragment):
"""
Extend base resolve_fragment method. If a nested schema has 'definitions' field and there
is a schema under this 'definitions', jsonschema attempts to use the id field of this
double-nested schema as URI to get this schema later. So it has sense to replace this id
with a correct one.
"""
document = super().resolve_fragment(document, fragment)
fragment_parts = fragment.split("/") # /definitions/<OsduID> -> [..., <OsduID>]
if len(fragment_parts) > 1:
osdu_id = fragment_parts[-1]
url = f"{self.schema_service}/{osdu_id}"
document["$id"] = url
return document
class SchemaValidator(object):
"""Class to validate schema of Manifests."""
def __init__(self, schema_service, dagrun_conf, context):
self.schema_service = schema_service
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
self.resolver_handlers = {"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request}
self.create_request_headers()
def create_request_headers(self):
self.request_headers = {
'Content-type': 'application/json',
'data-partition-id': self.context.data_partition_id,
'AppKey': self.context.app_key,
}
@refresh_token
def _get_schema_request(self, headers, uri):
response = requests.get(uri, headers=headers, timeout=60)
return response
def get_schema_request(self, uri):
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_request(self.request_headers, uri).json()
response["$id"] = uri
return response
def get_schema(self, kind):
manifest_schema_uri = f"{self.schema_service}/{kind}"
response = self.get_schema_request(manifest_schema_uri)
return response
def _validate_schema(self, schema, manifest):
resolver = OSDURefResolver(schema_service=self.schema_service,
base_uri=schema.get("$id", ""), referrer=schema,
handlers=self.resolver_handlers, cache_remote=True)
validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
validator.validate(manifest)
def validate_manifest(self):
for m in self.data_object["manifest"]:
manifest_schema = self.get_schema(m["kind"])
self._validate_schema(manifest_schema, m)
class ManifestProcessor(object):
"""Class to process WP, Master and Reference data"""
RECORD_TEMPLATE: dict = {
"legal": {},
"acl": {},
"kind": "",
"id": "",
"data": {
}
}
def __init__(self, storage_url, dagrun_conf, context):
self.storage_url = storage_url
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
def generate_id(self):
return f"{self.context.data_partition_id}:doc:{str(uuid.uuid4())}"
@property
def request_headers(self):
headers = {
'Content-type': 'application/json',
'data-partition-id': self.context.data_partition_id,
'AppKey': self.context.app_key
}
return headers
def populate_manifest_storage_record(self, manifest):
"""
Create a record from Master-manifest to store it in Storage service
"""
record = copy.deepcopy(self.RECORD_TEMPLATE)
record["id"] = self.generate_id()
record["kind"] = manifest.pop("kind")
record["legal"] = manifest.pop("legal")
record["acl"] = manifest.pop("acl")
record["data"] = manifest
return record
@tenacity.retry(tenacity.wait_fixed(TIMEOUT),
tenacity.stop_after_attempt(RETRIES))
@refresh_token
def save_record(self, headers, request_data):
"""
Send request to record storage API
"""
response = requests.put(self.storage_url, json.dumps(request_data), headers=headers)
if response.ok:
file_logger.info(",".join(map(str, response.json()["recordIds"])))
else:
reason = response.text[:250]
logger.error(f"Request error.")
logger.error(f"Response status: {response.status_code}. "
f"Response content: {reason}.")
return response
def create_manifest_records(self):
manifest_records = []
for manifest in self.data_object["manifest"]:
record = self.populate_manifest_storage_record(manifest)
manifest_records.append(record)
return manifest_records
def process_manifest(self):
if "manifest" in self.data_object:
manifest_records = self.create_manifest_records()
else:
raise EmptyManifestError
response = self.save_record(self.request_headers,
request_data=manifest_records)
record_ids = response.json()["recordIds"]
return record_ids
class ProcessManifestOperatorR3(BaseOperator):
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
def pre_execute(self, context):
self.schema_service_url = Variable.get('schema_service_url')
self.storage_url = Variable.get('storage_url')
def execute(self, context):
payload_context = Context.populate(context["dag_run"].conf)
validator = SchemaValidator(
self.schema_service_url,
context["dag_run"].conf,
payload_context
)
manifest_processor = ManifestProcessor(
self.storage_url,
context["dag_run"].conf,
payload_context
)
validator.validate_manifest()
record_ids = manifest_processor.process_manifest()
context["ti"].xcom_push(key="record_ids", value=record_ids)
......@@ -95,10 +95,10 @@ class SearchRecordIdOperator(BaseOperator):
"""
Check if search service returns expected totalCount of records.
"""
logger.info(resp.text)
data = resp.json()
return data.get("totalCount") == self.expected_total_count
@tenacity.retry(tenacity.wait_exponential(multiplier=5), tenacity.stop_after_attempt(5))
@refresh_token
def search_files(self, headers, **kwargs):
if self.request_body:
......
......@@ -23,7 +23,7 @@ from functools import partial
import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks import search_http_hook, workflow_hook
from hooks.http_hooks import search_http_hook, workflow_hook
from libs.exceptions import PipelineFailedError
from libs.refresh_token import refresh_token
......
......@@ -14,5 +14,5 @@
# limitations under the License.
json=$(cat `dirname $0`/mock-data/osdu-ingest-invalid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest
json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-invalid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest_r2
......@@ -14,5 +14,5 @@
# limitations under the License.
json=$(cat `dirname $0`/mock-data/osdu-ingest-valid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest
json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-valid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest_r2
......@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .process_manifest_op import *
from .process_manifest_r2_op import *
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment