Commit f14875ad authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

Add operator to process xml file

parent 7ccf4654
Pipeline #5890 failed with stages
in 38 seconds
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import timedelta
from airflow.operators import SearchRecordIdOperator, ProcessEnergisticsOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=50),
'trigger_rule': 'none_failed',
}
dag = DAG(
'Energistics_xml_ingest',
default_args=default_args,
description='liveness monitoring dag',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
)
process_energistics_task = ProcessEnergisticsOperator(
name = "process_energistics",
task_id='process_energistics_task',
provide_context=True,
dag=dag
)
search_record_id_task = SearchRecordIdOperator(
task_id='search_record_id_task',
provide_context=True,
dag=dag
)
process_energistics_task >> search_record_id_task
\ No newline at end of file
from airflow.plugins_manager import AirflowPlugin
from .operators import UpdateStatusOperator, ProcessManifestOperator
from .operators import UpdateStatusOperator, ProcessManifestOperator, SearchRecordIdOperator, ProcessEnergisticsOperator
# Defining the plugin class
class OSDUPlugin(AirflowPlugin):
name = "osdu_plugin"
operators = [
UpdateStatusOperator,
ProcessManifestOperator
ProcessManifestOperator,
SearchRecordIdOperator,
ProcessEnergisticsOperator,
]
hooks = []
# A list of class(es) derived from BaseExecutor
......
......@@ -15,8 +15,12 @@
from .update_status_op import UpdateStatusOperator
from .process_manifest_op import ProcessManifestOperator
from .search_record_id_op import SearchRecordIdOperator
from .processEnergistics import ProcessEnergisticsOperator
__all__ = [
'UpdateStatusOperator',
'ProcessManifestOperator'
'ProcessManifestOperator',
'SearchRecordIdOperator',
'ProcessEnergisticsOperator'
]
\ No newline at end of file
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
# Copyright 2020 Energistics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import json
import logging
import configparser
import sys
import time
import enum
from airflow.models import BaseOperator
from typing import Tuple
from urllib.error import HTTPError
from collections import Counter
from airflow.models import Variable
from airflow.utils.decorators import apply_defaults
import requests
from parsers import parse_witsml_file, FileInfo
ACL_DICT = eval(Variable.get("acl"))
LEGAL_DICT = eval(Variable.get("legal"))
config = configparser.RawConfigParser()
config.read(Variable.get("dataload_config_path"))
DEFAULT_TENANT = config.get("DEFAULTS", "tenant",fallback="osdu")
DEFAULT_SOURCE = config.get("DEFAULTS", "authority",fallback="1.0.0")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version", fallback="opendes")
RETRIES = 3
TIMEOUT = 1
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
timestamp = datetime.now().isoformat().replace(":","-")
# Set up file logger
handler = logging.FileHandler(f"execution_{timestamp}.log")
handler.setFormatter(logging.Formatter("%(message)s"))
file_logger = logging.getLogger("Execution")
file_logger.setLevel(logging.INFO)
file_logger.addHandler(handler)
# Some constants, used by script
SEARCH_OK_RESPONSE_CODES = [200]
DATA_LOAD_OK_RESPONSE_CODES = [201]
DELIVERY_OK_RESPONSE_CODES = [200]
DOWNLOAD_OK_RESPONSE_CODES = [200]
NOT_FOUND_RESPONSE_CODES = [404]
BAD_TOKEN_RESPONSE_CODES = [400, 401, 403, 500]
class FileType(enum.Enum):
EnergisticsXML = enum.auto()
EnergisticsEPC = enum.auto()
def dataload(**kwargs):
data_conf = kwargs['dag_run'].conf
loaded_conf = {
"acl": ACL_DICT,
"legal_tag": LEGAL_DICT,
"data_object": data_conf
}
data = kwargs["dag_run"].conf["Payload"]
return loaded_conf, data
# Process errors and sleep between retries
def on_error(response, retry: int, retries: int):
reason = response.text[:250]
logger.error(f"Request error.")
logger.error(f"Response status: {response.status_code}. "
f"Response content: {reason}.")
if retry + 1 < retries:
if response.status_code in BAD_TOKEN_RESPONSE_CODES:
logger.error("Invalid or expired token.")
else:
time_to_sleep = TIMEOUT
logger.info(f"Retrying in {time_to_sleep} seconds...")
time.sleep(time_to_sleep)
return reason
def get_file_content(signed_url: str):
"""
Get file content from file storage
"""
# loop for implementing retries get process
retries = RETRIES
for retry in range(retries):
try:
# send batch request for creating records
response = requests.get(f"{str(signed_url)}")
if response.status_code in DOWNLOAD_OK_RESPONSE_CODES:
break
reason = on_error(response, retry, retries)
except (requests.RequestException, HTTPError) as exc:
logger.error(f"Unexpected request error. Reason: {exc}")
sys.exit(2)
# End script if ran out of retries and data could not be uploaded.
else:
logger.error(f"Request could not be completed.\n"
f"Reason: {reason}")
sys.exit(2)
return response.text
def get_file_signed_url(conf_payload):
"""
Get signed URL from file id
"""
# It is not clear how to get the signed URL from the file ID unless the file ID is the SRN
# Then the code below should replace next line
return conf_payload["data"]["Context"].get("SignedUrl")
# auth = conf_payload["authorization"]
# partition_id = conf_payload["data-partition-id"]
# app_key = conf_payload["AppKey"]
# headers = {
# 'Content-type': 'application/json',
# 'data-partition-id': partition_id,
# 'Authorization': auth,
# 'AppKey': app_key
# }
# logger.info(f"Header {str(headers)}")
# # loop for implementing retries send process
# retries = RETRIES
# delivery_api_url = Variable.get('file_api_url')
# srn = f"srn:file/xml:{str(request_data.FileID)}:"
# params = {"srns": [ srn ]}
# for retry in range(retries):
# try:
# # send batch request for creating records
# response = requests.get(f"{str(delivery_api_url)}/GetFileSignedUrl",params=params)
# if response.status_code in DELIVERY_OK_RESPONSE_CODES:
# file_logger.info(",".join(map(str, response.json()["SignedURL"])))
# break
# reason = on_error(response, retry, retries)
# except (requests.RequestException, HTTPError) as exc:
# logger.error(f"Unexpected request error. Reason: {exc}")
# sys.exit(2)
# # End script if ran out of retries and data could not be uploaded.
# else:
# logger.error(f"Request could not be completed.\n"
# f"Reason: {reason}")
# sys.exit(2)
# return response.json()["processed"][srn]["signedUrl"]
def send_request(request_data, conf_payload):
"""
Send request to records storage API
"""
auth = conf_payload["authorization"]
partition_id = conf_payload["data-partition-id"]
app_key = conf_payload["AppKey"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
'Authorization': auth,
'AppKey': app_key
}
logger.info(f"Header {str(headers)}")
# loop for implementing retries send process
retries = RETRIES
for retry in range(retries):
try:
# send batch request for creating records
response = requests.put(Variable.get('storage_url'), json.dumps(request_data),
headers=headers)
if response.status_code in DATA_LOAD_OK_RESPONSE_CODES:
file_logger.info(",".join(map(str, response.json()["recordIds"])))
break
reason = on_error(response, retry, retries)
except (requests.RequestException, HTTPError) as exc:
logger.error(f"Unexpected request error. Reason: {exc}")
sys.exit(2)
# End script if ran out of retries and data could not be uploaded.
else:
logger.error(f"Request could not be completed.\n"
f"Reason: {reason}")
sys.exit(2)
return response.json()["recordIds"]
def process_file(**kwargs):
loaded_conf, payload = dataload(**kwargs)
data = payload['data']
context = data['Context']
if not context.get('acl'):
acl = loaded_conf.get("acl")
if not context.get('legal_tag'):
legal_tags = loaded_conf.get("legal_tag")
file_type = FileType.EnergisticsXML
if file_type is FileType.EnergisticsXML:
file_id = data["FileID"]
signed_url = get_file_signed_url(payload)
file_content = get_file_content(signed_url)
info = FileInfo(
file_id = file_id,
file_content = file_content,
owners = acl.get("Owners"),
viewers = acl.get("Viewers"),
legal_tags = legal_tags.get("LegalTags"),
other_relevant_data_countries = legal_tags.get("OtherRelevantDataCountries"),
status = legal_tags.get("Status"),
authority = DEFAULT_SOURCE,
kind_version = DEFAULT_VERSION,
namespace = DEFAULT_TENANT
)
files, wp, wpc_list = parse_witsml_file(info)
file_ids = []
for f in files:
file_ids += f["ID"]
kwargs["ti"].xcom_push(key="file_ids", value=file_ids)
all_dictionaries = files
all_dictionaries += wpc_list
all_dictionaries += [wp]
record_ids = send_request(all_dictionaries, data)
kwargs["ti"].xcom_push(key="record_ids", value=record_ids)
elif file_type is FileType.EnergisticsEPC:
logger.error(f"Request could not be completed.\n"
f"Reason: EPC file type not yet supported")
sys.exit(2)
else:
logger.error(f"Request could not be completed.\n"
f"Reason: Unknown file type")
sys.exit(2)
class ProcessEnergisticsOperator(BaseOperator):
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
process_file(**context)
\ No newline at end of file
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import json
import logging
import sys
from functools import partial
from typing import Tuple
import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks import search_http_hook, workflow_hook
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
class SearchRecordIdOperator(BaseOperator):
"""
Operator to search files in SearchService by record ids.
Expects "record_ids" field in xcom.
"""
ui_color = '#10ECAA'
ui_fgcolor = '#000000'
FINISHED_STATUS = "finished"
RUNNING_STATUS = "running"
FAILED_STATUS = "failed"
@apply_defaults
def __init__( self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.workflow_hook = workflow_hook
self.search_hook = search_http_hook
@staticmethod
def _file_searched(resp, expected_total_count) -> bool:
"""Check if search service returns expected totalCount.
The method is used as a callback
"""
data = resp.json()
return data.get("totalCount") == expected_total_count
def get_headers(self, **kwargs) -> dict:
data_conf = kwargs['dag_run'].conf
# for /submitWithManifest authorization and partition-id are inside Payload field
if "Payload" in data_conf:
auth = data_conf["Payload"]["authorization"]
partition_id = data_conf["Payload"]["data-partition-id"]
else:
auth = data_conf["authorization"]
partition_id = data_conf["data-partition-id"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
'Authorization': auth,
}
return headers
@staticmethod
def _create_search_query(record_ids) -> Tuple[str, int]:
expected_total_count = len(record_ids)
record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids)
logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})"
return query, expected_total_count
def search_files(self, **kwargs):
record_ids = kwargs["ti"].xcom_pull(key="record_ids",)
if record_ids:
query, expected_total_count = self._create_search_query(record_ids)
else:
logger.error("There are no record ids")
sys.exit(2)
headers = self.get_headers(**kwargs)
request_body = {
"kind": "*:*:*:*",
"query": query
}
retry_opts = {
"wait": tenacity.wait_exponential(multiplier=5),
"stop": tenacity.stop_after_attempt(5),
"retry": tenacity.retry_if_not_result(
partial(self._file_searched, expected_total_count=expected_total_count)
)
}
self.search_hook.run_with_advanced_retry(
endpoint=Variable.get("search_query_ep"),
headers=headers,
data=json.dumps(request_body),
_retry_args=retry_opts
)
def execute(self, context):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED
"""
self.search_files(**context)
alembic==0.8.10
apache-airflow==1.10.2
apispec==1.3.3
argcomplete==1.11.1
astroid==2.3.3
attrs==19.3.0
autopep8==1.5.4
Babel==2.8.0
bandit==1.6.2
bleach==2.1.4
cached-property==1.5.1
cachetools==4.1.1
cattrs==0.9.2
certifi==2020.6.20
chardet==3.0.4
click==7.1.1
colorama==0.4.3
colorlog==4.0.2
configparser==3.5.3
croniter==0.3.31
dataclasses==0.7
defusedxml==0.6.0
dill==0.2.9
dnspython==1.16.0
docutils==0.16
email-validator==1.0.5
enum34==1.1.10
Flask==0.12.5
Flask-Admin==1.5.2
Flask-AppBuilder==1.12.1
Flask-Babel==0.12.2
Flask-Caching==1.3.3
Flask-JWT-Extended==3.24.1
Flask-Login==0.4.1
Flask-OpenID==1.2.5
Flask-SQLAlchemy==2.4.1
flask-swagger==0.2.13
Flask-WTF==0.14.3
funcsigs==1.0.0
future==0.16.0
gitdb==4.0.5
GitPython==3.1.2
google-api-core==1.22.0
google-api-python-client==1.10.0
google-auth==1.19.2
google-auth-httplib2==0.0.4
googleapis-common-protos==1.52.0
graphviz==0.14
gunicorn==19.10.0
h5py==2.10.0
html5lib==1.0.1
httplib2==0.18.1
idna==2.10
importlib-metadata==1.6.0
importlib-resources==3.0.0
iso8601==0.1.12
isort==4.3.21
itsdangerous==1.1.0
jaraco.classes==3.1.0
jaraco.collections==3.0.0
jaraco.functools==3.0.1
jaraco.text==3.2.0
Jinja2==2.10
json-merge-patch==0.2
jsonschema==3.2.0
lazy-object-proxy==1.4.3
lockfile==0.12.2
lxml==4.5.1
Mako==1.1.2
Markdown==2.6.11
MarkupSafe==1.1.1
marshmallow==2.21.0
marshmallow-enum==1.5.1
marshmallow-sqlalchemy==0.23.0
mccabe==0.6.1
monotonic==1.5
more-itertools==8.3.0
numpy==1.18.3
openpack==2.1.1
ordereddict==1.1
osdu-api==0.0.1
osdu-energistics-parsers==0.0.1
packaging==20.4
pandas==0.25.3
pbr==5.4.5
pendulum==1.4.4
pkg-resources==0.0.0
pluggy==0.13.1
postgres==3.0.0
prison==0.1.3
protobuf==3.12.2
psutil==5.7.0
psycopg2-binary==2.8.5
psycopg2-pool==1.1
py==1.8.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycodestyle==2.6.0
Pygments==2.6.1
PyJWT==1.7.1
pylint==2.4.4
pylint-quotes==0.2.1
pyparsing==2.4.7
pyrsistent==0.16.0
pytest==5.4.2