diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000000000000000000000000000000000..0f2d239c561d27a2b6d3f0db383c3c86010b6f7a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,14 @@ +# https://editorconfig.org/ + +root = true + +[*] +indent_style = space +indent_size = 4 +insert_final_newline = true +trim_trailing_whitespace = true +end_of_line = lf +charset = utf-8 + +[*.py] +max_line_length = 100 diff --git a/.gitignore b/.gitignore index 47f184d59c0bc74955d3f92e3f6becc7c3b8f895..07f42eea1e2ebbb204b07951462c6e06024219f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,33 @@ -build -dist -osdu_api.egg-info -__pycache__ +**/.idea +**/.DS_Store + +# Byte-compiled / optimized / DLL files +**/__pycache__/ +**/*.py[cod] +**/*$py.class + +*.log + +# C extensions +**/*.so + +# Distribution / packaging +**/.Python +**/build/ +**/develop-eggs/ +**/dist/ +**/downloads/ +**/eggs/ +**/.eggs/ +**/lib/ +**/lib64/ +**/parts/ +**/sdist/ +**/var/ +**/wheels/ +**/share/python-wheels/ +**/*.egg-info/ +**/.installed.cfg +**/*.egg +**/MANIFEST -**/venv/** -**/.idea/** \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5071d516be456af0dd4556c1b9ce04bed79279d6..421acb135e0adb326f7ca4a7ee69d3bd2a3f950b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,21 +16,33 @@ image: google/cloud-sdk:alpine stages: - - test - - verify + - linters + - unit_tests + - test_dags - deploy -unit_tests: - stage: test - image: johnybear/osdu-airflow:python36 +pylint: + image: johnybear/osdu-airflow:python36-1 + stage: linters + allow_failure: true script: - - chmod +x tests/unit_tests.sh - - tests/./unit_tests.sh || EXIT_CODE=$? + - python -m pip install setuptools pylint pylint_quotes pylint-exit + - tests/./set_airflow_env.sh + - pylint --rcfile=.pylintrc src/*/*.py || EXIT_CODE=$? + - exit ${EXIT_CODE} + +isort: + image: johnybear/osdu-airflow:python36-1 + allow_failure: true + stage: linters + script: + - python -m pip install setuptools isort + - isort -c -v src/*/*.py || EXIT_CODE=$? - exit ${EXIT_CODE} test_dags: - stage: verify - image: johnybear/osdu-airflow:python36 + stage: test_dags + image: johnybear/osdu-airflow:python36-1 script: - chmod +x tests/test_dags.sh - tests/./test_dags.sh || EXIT_CODE=$? @@ -41,11 +53,20 @@ test_dags: when: on_failure expire_in: 1 week +unit_tests: + stage: unit_tests + image: johnybear/osdu-airflow:python36-1 + script: + - chmod +x tests/unit_tests.sh + - tests/./unit_tests.sh || EXIT_CODE=$? + - exit ${EXIT_CODE} + dags_rsync_community: stage: deploy script: - gcloud auth activate-service-account --key-file $OSDU_GCP_DEPLOY_FILE + - cd src - gsutil -m rsync -x "\.git.*|tests/.*|plugins/tests.*$" -r "$PWD" $OSDU_GCP_DEPL_TARGET only: variables: - - $CI_COMMIT_REF_PROTECTED \ No newline at end of file + - $CI_COMMIT_REF_PROTECTED diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000000000000000000000000000000000000..8bf223eb6b0df51c0e0f4063c99dcde58d332ad0 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,279 @@ +[MASTER] + +# Specify a configuration file. +#rcfile=.pylintrc + +# Profiled execution. +profile=no + +# Add <file or directory> to the black list. It should be a base name, not a +# path. You may set this option multiple times. +ignore=.git, .venv, .idea, CVS + +# Pickle collected data for later comparisons. +persistent=yes + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# List of plugins (as comma separated values of python modules names) to load, +# usually to register additional checkers. +load-plugins=pylint_quotes + + +[MESSAGES CONTROL] + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time. +enable=c-extension-no-member + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifier separated by comma (,) or put this option +# multiple time. +disable=no-member, no-self-use + + +[REPORTS] + +# Python expression which should return a note less than 10 (10 is the highest +# note). You have access to the variables errors warning, statement which +# respectively contain the number of errors / warnings messages and the total +# number of statements analyzed. This is used by the global evaluation report +# (R0004). +evaluation=13.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) + +# Set the output format. Available formats are text, parseable, colorized, msvs +# (visual studio) and html +output-format=text + +# Include message's id in output +include-ids=yes + +# Tells whether to display a full report or only the messages +reports=yes + +# Put messages in a separate file for each module / package specified on the +# command line instead of printing them on stdout. Reports (if any) will be +# written in a file name "pylint_global.[txt|html]". +files-output=no + +# Activate the evaluation score. +score=yes + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + + +[VARIABLES] + +# Tells whether we should check for unused import in __init__ files. +init-import=yes + +# A regular expression matching the name of dummy variables (i.e. expectedly +# not used). +dummy-variables-rgx=(_+[a-zA-Z0-9]*?$)|dummy + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid to define new builtins when possible. +additional-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_,_cb + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,future.builtins + + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__,__new__,setUp + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=cls + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make + + +[BASIC] + +# Required attributes for module, separated by a comma +required-attributes= + +# Regular expression which should only match correct module names +module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ + +# Regular expression which should only match correct module level names +const-rgx= + +# Regular expression which should only match correct class names +class-rgx=[A-Z_][a-zA-Z0-9]+$ + +# Regular expression which should only match correct function names +function-rgx=[a-z_][a-z0-9_]{2,60}$ + +# Regular expression which should only match correct method names +method-rgx=[a-z_][a-z0-9_]{2,60}$ + +# Regular expression which should only match correct instance attribute names +attr-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression which should only match correct argument names +argument-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression which should only match correct variable names +variable-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression which should only match correct list comprehension / +# generator expression variable names +inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ + +# Good variable names which should always be accepted, separated by a comma +good-names=e,f,i,j,k,ex,Run,_ + +# Bad variable names which should always be refused, separated by a comma +bad-names=foo,bar,baz,toto,tutu,tata + +# Regular expression which should only match functions or classes name which do +# not require a docstring +no-docstring-rgx=__.*__ + + +[FORMAT] + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module +max-module-lines=1000 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Set the linting for string quotes +string-quote=double +triple-quote=double +docstring-quote=double + + +[LOGGING] + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + +# Format style used to check logging format string. `old` means using % +# formatting, `new` is for `{}` formatting,and `fstr` is for f-strings. +logging-format-style=new + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME,XXX,TODO + + +[SIMILARITIES] + +# Minimum lines number of a similarity. +min-similarity-lines=4 + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=no + + +[DESIGN] + +# Maximum number of arguments for function / method +max-args=7 + +# Argument names that match this expression will be ignored. Default to name +# with leading underscore +ignored-argument-names=_.* + +# Maximum number of locals for function / method body +max-locals=15 + +# Maximum number of return / yield for function / method body +max-returns=6 + +# Maximum number of branch for function / method body +max-branches=12 + +# Maximum number of statements in function / method body +max-statements=50 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=0 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of boolean expressions in a if statement +max-bool-expr=5 + + +[IMPORTS] + +# Deprecated modules which should not be used, separated by a comma +deprecated-modules=regsub,TERMIOS,Bastion,rexec + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled) +import-graph= + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled) +ext-import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled) +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "BaseException, Exception". +overgeneral-exceptions=BaseException, Exception diff --git a/plugins/operators/update_status_tasks.py b/plugins/operators/update_status_tasks.py deleted file mode 100644 index 49dcf07a94c9444bd7fc0b94407653cff2f990c8..0000000000000000000000000000000000000000 --- a/plugins/operators/update_status_tasks.py +++ /dev/null @@ -1,173 +0,0 @@ -# Copyright 2020 Google LLC -# Copyright 2020 EPAM Systems -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import sys -import enum -import json -import tenacity -from functools import partial -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.models import Variable - -from hooks import workflow_hook, search_http_hook - - -# Set up base logger -handler = logging.StreamHandler(sys.stdout) -handler.setFormatter(logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s")) -logger = logging.getLogger("Dataload") -logger.setLevel(logging.INFO) -logger.addHandler(handler) - - - -class UpdateStatusOperator(BaseOperator): - - ui_color = '#10ECAA' - ui_fgcolor = '#000000' - - FINISHED_STATUS = "finished" - RUNNING_STATUS = "running" - FAILED_STATUS = "failed" - - class prev_ti_state(enum.Enum): - NONE = enum.auto() - SUCCESS = enum.auto() - FAILED = enum.auto() - - @apply_defaults - def __init__( self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.workflow_hook = workflow_hook - self.search_hook = search_http_hook - - @staticmethod - def _file_searched(resp, expected_total_count): - """Check if search service returns totalCount. - The method is used as a callback - """ - data = resp.json() - return data.get("totalCount") == expected_total_count - - def get_headers(self, **kwargs): - data_conf = kwargs['dag_run'].conf - # for /submitWithManifest authorization and partition-id are inside Payload field - if "Payload" in data_conf: - auth = data_conf["Payload"]["authorization"] - partition_id = data_conf["Payload"]["data-partition-id"] - else: - auth = data_conf["authorization"] - partition_id = data_conf["data-partition-id"] - headers = { - 'Content-type': 'application/json', - 'data-partition-id': partition_id, - 'Authorization': auth, - } - return headers - - def search_files(self, **kwargs): - def create_query(record_ids): - expected_total_count = len(record_ids) - record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids) - logger.info(f"Search query {record_ids}") - query = f"id:({record_ids})" - return query, expected_total_count - - record_ids_default = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records') - record_ids_manifest = kwargs["ti"].xcom_pull(key="record_ids", task_ids='proccess_manifest_task') - if record_ids_default: - query, expected_total_count = create_query(record_ids_default) - elif record_ids_manifest: - query, expected_total_count = create_query(record_ids_manifest) - else: - logger.error("There are no record ids") - sys.exit(2) - headers = self.get_headers(**kwargs) - request_body = { - "kind": "*:*:*:*", - "query": query - } - retry_opts = { - "wait": tenacity.wait_exponential(multiplier=5), - "stop": tenacity.stop_after_attempt(5), - "retry": tenacity.retry_if_not_result( - partial(self._file_searched, expected_total_count=expected_total_count) - ) - } - self.search_hook.run_with_advanced_retry( - endpoint=Variable.get("search_query_ep"), - headers=headers, - data=json.dumps(request_body), - _retry_args=retry_opts - ) - - def previous_ti_statuses(self, context): - dagrun = context['ti'].get_dagrun() - failed_ti, success_ti = dagrun.get_task_instances(state='failed'), dagrun.get_task_instances(state='success') - if not failed_ti and not success_ti: # There is no prev task so it can't have been failed - logger.info("There are no tasks before this one. So it has status RUNNING") - return self.prev_ti_state.NONE - if failed_ti: - logger.info("There are failed tasks before this one. So it has status FAILED") - return self.prev_ti_state.FAILED - logger.info("There are successed tasks before this one. So it has status SUCCESSED") - return self.prev_ti_state.SUCCESS - - def pre_execute(self, context): - prev_tis = self.previous_ti_statuses(context) - if prev_tis is self.prev_ti_state.NONE: - self.status = self.RUNNING_STATUS - elif prev_tis is self.prev_ti_state.FAILED: - self.status = self.FAILED_STATUS - elif prev_tis is self.prev_ti_state.SUCCESS: - self.status = self.FINISHED_STATUS - - def execute(self, context): - """Execute update workflow status. - If status assumed to be FINISHED then we check whether proceed files are searchable or not. - If they are then update status FINISHED else FAILED - """ - if self.status in (self.RUNNING_STATUS, self.FAILED_STATUS): - self.update_status_rqst(self.status, **context) - elif self.status == self.FINISHED_STATUS: - try: - self.search_files(**context) - except Exception as e: - logger.error(str(e)) - self.status = self.FAILED_STATUS - self.update_status_rqst(self.FAILED_STATUS, **context) - else: - self.update_status_rqst(self.FINISHED_STATUS, **context) - if self.status == self.FAILED_STATUS: - raise Exception("Dag failed") - - def update_status_rqst(self, status, **kwargs): - data_conf = kwargs['dag_run'].conf - logger.info(f"Got dataconf {data_conf}") - workflow_id = data_conf["WorkflowID"] - headers = self.get_headers(**kwargs) - request_body = { - "WorkflowID": workflow_id, - "Status": status - } - logger.info(f" Sending request '{status}'") - self.workflow_hook.run( - endpoint=Variable.get("update_status_ep"), - data=json.dumps(request_body), - headers=headers - ) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dags/configs/dataload.ini b/src/dags/configs/dataload.ini similarity index 100% rename from dags/configs/dataload.ini rename to src/dags/configs/dataload.ini diff --git a/dags/create_records.py b/src/dags/create_records.py similarity index 69% rename from dags/create_records.py rename to src/dags/create_records.py index 1573eaeb08136ba25a1e81441709a9210de1cf3d..90f3a8fb570d1e0122df8ff9db348c9bb4e92001 100644 --- a/dags/create_records.py +++ b/src/dags/create_records.py @@ -14,17 +14,15 @@ # limitations under the License. import configparser +import logging +from airflow.models import Variable from osdu_api.model.acl import Acl from osdu_api.model.legal import Legal from osdu_api.model.legal_compliance import LegalCompliance +from osdu_api.model.record import Record from osdu_api.model.record_ancestry import RecordAncestry from osdu_api.storage.record_client import RecordClient -from osdu_api.model.record import Record -import json -from airflow.models import Variable - -import logging logger = logging.getLogger() @@ -39,33 +37,45 @@ DEFAULT_TENANT = config.get("DEFAULTS", "tenant") DEFAULT_SOURCE = config.get("DEFAULTS", "authority") DEFAULT_VERSION = config.get("DEFAULTS", "kind_version") + def create_records(**kwargs): - # the only way to pass in values through the experimental api is through the conf parameter - data_conf = kwargs['dag_run'].conf + # the only way to pass in values through the experimental api is through + # the conf parameter + data_conf = kwargs["dag_run"].conf logger.debug(kwargs) logger.debug(data_conf) - acl = Acl(ACL_DICT['viewers'], ACL_DICT['owners']) - legal = Legal(LEGAL_DICT['legaltags'], LEGAL_DICT['otherRelevantDataCountries'], LegalCompliance.compliant) + acl = Acl(ACL_DICT["viewers"], ACL_DICT["owners"]) + legal = Legal( + LEGAL_DICT["legaltags"], + LEGAL_DICT["otherRelevantDataCountries"], + LegalCompliance.compliant) auth = data_conf["authorization"] ancestry = RecordAncestry([]) record_id = None - kind = Variable.get('record_kind') + + # TODO: find out how to get kind through request not using Variables + kind = Variable.get("record_kind") + tenant, authority, file_type, version = kind.split(":") + total_count = {file_type.capitalize(): 1} + logger.info(f"The count of records to be ingested: {total_count}") + meta = [{}] version = 0 - data = data_conf.get('data', {}) + data = data_conf.get("data", {}) record = Record(record_id, version, kind, acl, legal, data, ancestry, meta) headers = { - 'content-type': 'application/json', - 'slb-data-partition-id': data_conf.get('partition-id', DEFAULT_SOURCE), - 'Authorization': f'{auth}', - 'AppKey': data_conf.get('app-key', '') + "content-type": "application/json", + "slb-data-partition-id": data_conf.get("partition-id", DEFAULT_SOURCE), + "Authorization": f"{auth}", + "AppKey": data_conf.get("app-key", "") } record_client = RecordClient() - record_client.data_partition_id = data_conf.get('partition-id', DEFAULT_SOURCE) + record_client.data_partition_id = data_conf.get( + "partition-id", DEFAULT_SOURCE) resp = record_client.create_update_records([record], headers.items()) logger.info(f"Response: {resp.text}") kwargs["ti"].xcom_push(key="record_ids", value=resp.json()["recordIds"]) diff --git a/dags/default-ingest.py b/src/dags/default-ingest.py similarity index 70% rename from dags/default-ingest.py rename to src/dags/default-ingest.py index 6d9b381450da11f5fc45c4bb00ec2dc8e4ef3deb..4ea1a84ac411ace3754f995a904bad852e8c11ef 100644 --- a/dags/default-ingest.py +++ b/src/dags/default-ingest.py @@ -14,36 +14,43 @@ # See the License for the specific language governing permissions and # limitations under the License. -import airflow -from airflow import DAG -from airflow.operators.python_operator import PythonOperator -from airflow.utils.dates import days_ago + + from datetime import timedelta -from create_records import create_records + +import airflow.utils.dates +from airflow import DAG from airflow.operators import UpdateStatusOperator +from airflow.operators.python_operator import PythonOperator + +from create_records import create_records #isort:skip """ A workflow creating a record """ default_args = { - 'owner': 'Airflow', - 'depends_on_past': False, - 'start_date': airflow.utils.dates.days_ago(0), - 'email': ['airflow@example.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, - 'retry_delay': timedelta(minutes=5), - 'trigger_rule': 'none_failed', + "owner": "Airflow", + "depends_on_past": False, + "start_date": airflow.utils.dates.days_ago(0), + "email": ["airflow@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, + "retry_delay": timedelta(minutes=5), + "trigger_rule": "none_failed", # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } -workflow_name = 'Default_ingest' -dag = DAG(workflow_name, default_args=default_args, schedule_interval=timedelta(days=1)) +workflow_name = "Default_ingest" +dag = DAG( + workflow_name, + default_args=default_args, + schedule_interval=timedelta( + days=1)) update_status_running_op = UpdateStatusOperator( task_id="update_status_running_task", @@ -56,11 +63,11 @@ update_status_finished_op = UpdateStatusOperator( ) create_records_op = PythonOperator( - task_id='create_records', + task_id="create_records", python_callable=create_records, provide_context=True, dag=dag ) -update_status_running_op >> create_records_op >> update_status_finished_op \ No newline at end of file +update_status_running_op >> create_records_op >> update_status_finished_op diff --git a/dags/osdu-ingest.py b/src/dags/osdu-ingest.py similarity index 68% rename from dags/osdu-ingest.py rename to src/dags/osdu-ingest.py index 6e917678b8576511eba5b93afb0cbafc09ebd103..9fdfc5311c7357e87c42a929a41c299ff7e84358 100644 --- a/dags/osdu-ingest.py +++ b/src/dags/osdu-ingest.py @@ -13,24 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta + import airflow from airflow import DAG -from airflow.operators.python_operator import PythonOperator, BranchPythonOperator -from datetime import timedelta -from airflow.operators import UpdateStatusOperator, ProcessManifestOperator +from airflow.operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator default_args = { - 'start_date': airflow.utils.dates.days_ago(0), - 'retries': 0, - 'retry_delay': timedelta(minutes=50), - 'trigger_rule': 'none_failed', + "start_date": airflow.utils.dates.days_ago(0), + "retries": 0, + "retry_delay": timedelta(minutes=50), + "trigger_rule": "none_failed", } dag = DAG( - 'Osdu_ingest', + "Osdu_ingest", default_args=default_args, - description='liveness monitoring dag', + description="liveness monitoring dag", schedule_interval=None, dagrun_timeout=timedelta(minutes=60) ) @@ -49,11 +49,16 @@ update_status_finished_op = UpdateStatusOperator( ) process_manifest_op = ProcessManifestOperator( - task_id='proccess_manifest_task', + task_id="proccess_manifest_task", provide_context=True, dag=dag ) -update_status_running_op >> process_manifest_op >> update_status_finished_op +search_record_ids_op = SearchRecordIdOperator( + task_id="search_record_ids_task", + provide_context=True, + dag=dag +) +update_status_running_op >> process_manifest_op >> search_record_ids_op >> update_status_finished_op diff --git a/dags/other-log-ingest.py b/src/dags/other-log-ingest.py similarity index 77% rename from dags/other-log-ingest.py rename to src/dags/other-log-ingest.py index 2799a074530b152fd278c99a84c82a43ff213215..3be207a1a708ce32a4fc5e29f4469709060b7f88 100644 --- a/dags/other-log-ingest.py +++ b/src/dags/other-log-ingest.py @@ -12,28 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta + import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import timedelta default_args = { - 'start_date': airflow.utils.dates.days_ago(0), - 'retries': 1, - 'retry_delay': timedelta(minutes=50) + "start_date": airflow.utils.dates.days_ago(0), + "retries": 1, + "retry_delay": timedelta(minutes=50) } dag = DAG( - 'Other_log_ingest', + "Other_log_ingest", default_args=default_args, - description='liveness monitoring dag', + description="liveness monitoring dag", schedule_interval=None, dagrun_timeout=timedelta(minutes=60)) t1 = BashOperator( - task_id='echo', - bash_command='echo test', + task_id="echo", + bash_command="echo test", dag=dag, depends_on_past=False, - priority_weight=2**31-1) + priority_weight=2**31 - 1) diff --git a/dags/well-log-ingest.py b/src/dags/well-log-ingest.py similarity index 77% rename from dags/well-log-ingest.py rename to src/dags/well-log-ingest.py index 8363fe6b7be26807bca262e26d5fa4d838cfc034..24d949b6054d0c449a6d7431f103d7d79a9970db 100644 --- a/dags/well-log-ingest.py +++ b/src/dags/well-log-ingest.py @@ -12,27 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta + import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import timedelta default_args = { - 'start_date': airflow.utils.dates.days_ago(0), - 'retries': 1, - 'retry_delay': timedelta(minutes=50) + "start_date": airflow.utils.dates.days_ago(0), + "retries": 1, + "retry_delay": timedelta(minutes=50) } dag = DAG( - 'Well_log_ingest', + "Well_log_ingest", default_args=default_args, - description='liveness monitoring dag', + description="liveness monitoring dag", schedule_interval=None, dagrun_timeout=timedelta(minutes=60)) t1 = BashOperator( - task_id='echo', - bash_command='echo test', + task_id="echo", + bash_command="echo test", dag=dag, depends_on_past=False, - priority_weight=2**31-1) + priority_weight=2**31 - 1) diff --git a/plugins/__init__.py b/src/plugins/__init__.py similarity index 77% rename from plugins/__init__.py rename to src/plugins/__init__.py index 1d43e866713007dc734e49ffc054148fe9e3edd0..59ea0498909ef9cd595895d7046ef06b37d9d2ab 100644 --- a/plugins/__init__.py +++ b/src/plugins/__init__.py @@ -1,12 +1,15 @@ from airflow.plugins_manager import AirflowPlugin -from .operators import UpdateStatusOperator, ProcessManifestOperator + +from .operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator + # Defining the plugin class class OSDUPlugin(AirflowPlugin): name = "osdu_plugin" operators = [ UpdateStatusOperator, - ProcessManifestOperator + ProcessManifestOperator, + SearchRecordIdOperator ] hooks = [] # A list of class(es) derived from BaseExecutor @@ -19,4 +22,4 @@ class OSDUPlugin(AirflowPlugin): # A list of Blueprint object created from flask.Blueprint flask_blueprints = [] # A list of menu links (flask_admin.base.MenuLink) - menu_links = [] \ No newline at end of file + menu_links = [] diff --git a/plugins/hooks/__init__.py b/src/plugins/hooks/__init__.py similarity index 100% rename from plugins/hooks/__init__.py rename to src/plugins/hooks/__init__.py diff --git a/plugins/hooks/http_hooks.py b/src/plugins/hooks/http_hooks.py similarity index 99% rename from plugins/hooks/http_hooks.py rename to src/plugins/hooks/http_hooks.py index 3d8365b20d24ddd9195de48dd69b9f5e4c790686..34fe9b793e359c3aefde73f380bebc7c57a9140a 100644 --- a/plugins/hooks/http_hooks.py +++ b/src/plugins/hooks/http_hooks.py @@ -16,4 +16,4 @@ from airflow.hooks.http_hook import HttpHook workflow_hook = HttpHook(http_conn_id='workflow', method="POST") -search_http_hook = HttpHook(http_conn_id='search', method="POST") \ No newline at end of file +search_http_hook = HttpHook(http_conn_id='search', method="POST") diff --git a/plugins/operators/__init__.py b/src/plugins/operators/__init__.py similarity index 86% rename from plugins/operators/__init__.py rename to src/plugins/operators/__init__.py index 1d5bf32208b35b593099352a162834463c16f562..c0da87d0584cba75f258461d4ed576e7a8ca0c3e 100644 --- a/plugins/operators/__init__.py +++ b/src/plugins/operators/__init__.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .update_status_op import UpdateStatusOperator from .process_manifest_op import ProcessManifestOperator +from .search_record_id_op import SearchRecordIdOperator +from .update_status_op import UpdateStatusOperator __all__ = [ 'UpdateStatusOperator', - 'ProcessManifestOperator' -] \ No newline at end of file + 'ProcessManifestOperator', + 'SearchRecordIdOperator', +] diff --git a/plugins/operators/process_manifest_op.py b/src/plugins/operators/process_manifest_op.py similarity index 98% rename from plugins/operators/process_manifest_op.py rename to src/plugins/operators/process_manifest_op.py index b93116fc55f46f0e29aea49607efdb8a4bef1f4f..2900281ff3115ea558ac427d70c06d0a840fb837 100644 --- a/plugins/operators/process_manifest_op.py +++ b/src/plugins/operators/process_manifest_op.py @@ -13,21 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime +import configparser +import enum import json import logging -import configparser +import re import sys import time import uuid -import enum -import re -from airflow.models import BaseOperator +from collections import Counter +from datetime import datetime from typing import Tuple from urllib.error import HTTPError -from collections import Counter -from airflow.models import Variable + import requests +from airflow.models import BaseOperator, Variable ACL_DICT = eval(Variable.get("acl")) LEGAL_DICT = eval(Variable.get("legal")) @@ -232,6 +232,7 @@ def separate_type_data(request_data): for elem in request_data: data.append(elem[0]) types[elem[1]] += 1 + logger.info(f"The count of records to be ingested: {str(dict(types))}") return types, data diff --git a/src/plugins/operators/search_record_id_op.py b/src/plugins/operators/search_record_id_op.py new file mode 100644 index 0000000000000000000000000000000000000000..fefe1809cbe1ee6a41480ad508ee90c21cd88826 --- /dev/null +++ b/src/plugins/operators/search_record_id_op.py @@ -0,0 +1,120 @@ +# Copyright 2020 Google LLC +# Copyright 2020 EPAM Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import enum +import json +import logging +import sys +from functools import partial +from typing import Tuple + +import tenacity +from airflow.models import BaseOperator, Variable +from airflow.utils.decorators import apply_defaults + +from hooks import search_http_hook, workflow_hook + +# Set up base logger +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s")) +logger = logging.getLogger("Dataload") +logger.setLevel(logging.INFO) +logger.addHandler(handler) + + + +class SearchRecordIdOperator(BaseOperator): + """ + Operator to search files in SearchService by record ids. + Expects "record_ids" field in xcom. + """ + ui_color = '#10ECAA' + ui_fgcolor = '#000000' + + FINISHED_STATUS = "finished" + RUNNING_STATUS = "running" + FAILED_STATUS = "failed" + + @apply_defaults + def __init__( self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.workflow_hook = workflow_hook + self.search_hook = search_http_hook + + @staticmethod + def _file_searched(resp, expected_total_count) -> bool: + """Check if search service returns expected totalCount. + The method is used as a callback + """ + data = resp.json() + return data.get("totalCount") == expected_total_count + + def get_headers(self, **kwargs) -> dict: + data_conf = kwargs['dag_run'].conf + # for /submitWithManifest authorization and partition-id are inside Payload field + if "Payload" in data_conf: + auth = data_conf["Payload"]["authorization"] + partition_id = data_conf["Payload"]["data-partition-id"] + else: + auth = data_conf["authorization"] + partition_id = data_conf["data-partition-id"] + headers = { + 'Content-type': 'application/json', + 'data-partition-id': partition_id, + 'Authorization': auth, + } + return headers + + @staticmethod + def _create_search_query(record_ids) -> Tuple[str, int]: + expected_total_count = len(record_ids) + record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids) + logger.info(f"Search query {record_ids}") + query = f"id:({record_ids})" + return query, expected_total_count + + def search_files(self, **kwargs): + record_ids = kwargs["ti"].xcom_pull(key="record_ids",) + if record_ids: + query, expected_total_count = self._create_search_query(record_ids) + else: + logger.error("There are no record ids") + sys.exit(2) + headers = self.get_headers(**kwargs) + request_body = { + "kind": "*:*:*:*", + "query": query + } + retry_opts = { + "wait": tenacity.wait_exponential(multiplier=5), + "stop": tenacity.stop_after_attempt(5), + "retry": tenacity.retry_if_not_result( + partial(self._file_searched, expected_total_count=expected_total_count) + ) + } + self.search_hook.run_with_advanced_retry( + endpoint=Variable.get("search_query_ep"), + headers=headers, + data=json.dumps(request_body), + _retry_args=retry_opts + ) + + def execute(self, context): + """Execute update workflow status. + If status assumed to be FINISHED then we check whether proceed files are searchable or not. + If they are then update status FINISHED else FAILED + """ + self.search_files(**context) diff --git a/plugins/operators/update_status_op.py b/src/plugins/operators/update_status_op.py similarity index 84% rename from plugins/operators/update_status_op.py rename to src/plugins/operators/update_status_op.py index 49dcf07a94c9444bd7fc0b94407653cff2f990c8..5ae50498ad42a1936aab5a045b61186a691086f9 100644 --- a/plugins/operators/update_status_op.py +++ b/src/plugins/operators/update_status_op.py @@ -14,18 +14,17 @@ # limitations under the License. -import logging -import sys import enum import json -import tenacity +import logging +import sys from functools import partial -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.models import Variable -from hooks import workflow_hook, search_http_hook +import tenacity +from airflow.models import BaseOperator, Variable +from airflow.utils.decorators import apply_defaults +from hooks import search_http_hook, workflow_hook # Set up base logger handler = logging.StreamHandler(sys.stdout) @@ -88,12 +87,9 @@ class UpdateStatusOperator(BaseOperator): query = f"id:({record_ids})" return query, expected_total_count - record_ids_default = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records') - record_ids_manifest = kwargs["ti"].xcom_pull(key="record_ids", task_ids='proccess_manifest_task') - if record_ids_default: - query, expected_total_count = create_query(record_ids_default) - elif record_ids_manifest: - query, expected_total_count = create_query(record_ids_manifest) + record_ids = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records') + if record_ids: + query, expected_total_count = create_query(record_ids) else: logger.error("There are no record ids") sys.exit(2) @@ -142,17 +138,7 @@ class UpdateStatusOperator(BaseOperator): If status assumed to be FINISHED then we check whether proceed files are searchable or not. If they are then update status FINISHED else FAILED """ - if self.status in (self.RUNNING_STATUS, self.FAILED_STATUS): - self.update_status_rqst(self.status, **context) - elif self.status == self.FINISHED_STATUS: - try: - self.search_files(**context) - except Exception as e: - logger.error(str(e)) - self.status = self.FAILED_STATUS - self.update_status_rqst(self.FAILED_STATUS, **context) - else: - self.update_status_rqst(self.FINISHED_STATUS, **context) + self.update_status_rqst(self.status, **context) if self.status == self.FAILED_STATUS: raise Exception("Dag failed") diff --git a/tests/constants.py b/tests/constants.py deleted file mode 100644 index c2769d4ebcacc4cf77b12f865ebaf60cf157c745..0000000000000000000000000000000000000000 --- a/tests/constants.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2020 Google LLC -# Copyright 2020 EPAM Systems -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -OSDU_INGEST_SUCCES_FIFO = "/tmp/osdu_ingest_success" -OSDU_INGEST_FAILED_FIFO = "/tmp/osdu_ingest_failed" - diff --git a/tests/mock-data/default-ingest-invalid.json b/tests/end-to-end-tests/mock-data/default-ingest-invalid.json similarity index 100% rename from tests/mock-data/default-ingest-invalid.json rename to tests/end-to-end-tests/mock-data/default-ingest-invalid.json diff --git a/tests/mock-data/default-ingest-valid.json b/tests/end-to-end-tests/mock-data/default-ingest-valid.json similarity index 100% rename from tests/mock-data/default-ingest-valid.json rename to tests/end-to-end-tests/mock-data/default-ingest-valid.json diff --git a/tests/mock-data/osdu-ingest-invalid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json similarity index 100% rename from tests/mock-data/osdu-ingest-invalid.json rename to tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json diff --git a/tests/mock-data/osdu-ingest-valid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-valid.json similarity index 100% rename from tests/mock-data/osdu-ingest-valid.json rename to tests/end-to-end-tests/mock-data/osdu-ingest-valid.json diff --git a/tests/mock-external-apis/app.py b/tests/end-to-end-tests/mock-external-apis/app.py similarity index 100% rename from tests/mock-external-apis/app.py rename to tests/end-to-end-tests/mock-external-apis/app.py diff --git a/tests/test-default-ingest-fail.sh b/tests/end-to-end-tests/test-default-ingest-fail.sh similarity index 100% rename from tests/test-default-ingest-fail.sh rename to tests/end-to-end-tests/test-default-ingest-fail.sh diff --git a/tests/test-default-ingest-success.sh b/tests/end-to-end-tests/test-default-ingest-success.sh similarity index 100% rename from tests/test-default-ingest-success.sh rename to tests/end-to-end-tests/test-default-ingest-success.sh diff --git a/tests/test-osdu-ingest-fail.sh b/tests/end-to-end-tests/test-osdu-ingest-fail.sh similarity index 100% rename from tests/test-osdu-ingest-fail.sh rename to tests/end-to-end-tests/test-osdu-ingest-fail.sh diff --git a/tests/test-osdu-ingest-success.sh b/tests/end-to-end-tests/test-osdu-ingest-success.sh similarity index 100% rename from tests/test-osdu-ingest-success.sh rename to tests/end-to-end-tests/test-osdu-ingest-success.sh diff --git a/tests/plugin-unit-tests/README.md b/tests/plugin-unit-tests/README.md new file mode 100644 index 0000000000000000000000000000000000000000..dc225cb727844e11cc3bb880026335f6edd6520e --- /dev/null +++ b/tests/plugin-unit-tests/README.md @@ -0,0 +1,5 @@ +``` +pip install pytest +export AIRFLOW_SRC_DIR=/path/to/airflow-folder +pytest +``` diff --git a/plugins/tests/__init__.py b/tests/plugin-unit-tests/__init__.py similarity index 100% rename from plugins/tests/__init__.py rename to tests/plugin-unit-tests/__init__.py diff --git a/plugins/tests/data/__init__.py b/tests/plugin-unit-tests/data/__init__.py similarity index 100% rename from plugins/tests/data/__init__.py rename to tests/plugin-unit-tests/data/__init__.py diff --git a/plugins/tests/data/process_manifest_op.py b/tests/plugin-unit-tests/data/process_manifest_op.py similarity index 100% rename from plugins/tests/data/process_manifest_op.py rename to tests/plugin-unit-tests/data/process_manifest_op.py diff --git a/plugins/tests/test_process_manifest_op.py b/tests/plugin-unit-tests/test_process_manifest_op.py similarity index 76% rename from plugins/tests/test_process_manifest_op.py rename to tests/plugin-unit-tests/test_process_manifest_op.py index 4a1eff774f96c5d9f05c77a39eaedb6965cd138e..56717e2ff5c3a96d1e18fd9605d6f99600d0c4ef 100644 --- a/plugins/tests/test_process_manifest_op.py +++ b/tests/plugin-unit-tests/test_process_manifest_op.py @@ -15,24 +15,14 @@ import re +import os import pytest import sys -sys.path.insert(0, "..") -sys.path.insert(0, "../../dags") +sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") -# we use tries here to omit import errors when we start DagBag -# because Airflow imports plugins in its own way -try: - from plugins.operators import process_manifest_op as p_m_op -except: - pass -try: - from plugins.tests.data import process_manifest_op as test_data -except ModuleNotFoundError: - pass - -print(sys.path) +from operators import process_manifest_op as p_m_op +from data import process_manifest_op as test_data @pytest.mark.parametrize( "test_input, expected", @@ -49,7 +39,10 @@ def test_determine_data_type(test_input, expected): @pytest.mark.parametrize( "data_type, loaded_conf, conf_payload, expected_file_result", [ - ("las2", test_data.LOADED_CONF, test_data.CONF_PAYLOAD, test_data.PROCESS_FILE_ITEMS_RESULT) + ("las2", + test_data.LOADED_CONF, + test_data.CONF_PAYLOAD, + test_data.PROCESS_FILE_ITEMS_RESULT) ] ) def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result): diff --git a/tests/set_airflow_env.sh b/tests/set_airflow_env.sh index 7323295d4acd72ff31256c71528bee57914852f1..ad293724fc6ff1718f84138d70eb8af0c938f7aa 100755 --- a/tests/set_airflow_env.sh +++ b/tests/set_airflow_env.sh @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +pip install --upgrade google-api-python-client export ACL='{"viewers": ["foo"],"owners": ["foo"]}' export LEGAL='{"legaltags": ["foo"], "otherRelevantDataCountries": ["FR", "US", "CA"],"status": "compliant"}' export WORKFLOW_URL="http://127.0.0.1:5000/wf" export STORAGE_URL="http://127.0.0.1:5000/st" -export export LOCALHOST="http://127.0.0.1:5000" export SEARCH_CONN_ID="http://127.0.0.1:5000" export WORKFLOW_CONN_ID="http://127.0.0.1:5000" @@ -25,11 +25,12 @@ export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini" mkdir -p /usr/local/airflow/dags/ -cp -rf . /usr/local/airflow/ -cp -r tests/mock-external-apis /mock-server -cp -r tests/mock-data /mock-server/mock-data -cp tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/ +cp -rf src/* /usr/local/airflow/ +cp -r tests/end-to-end-tests/mock-external-apis /mock-server +cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data +cp tests/end-to-end-tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/ cp tests/*.py /mock-server/ + airflow initdb > /dev/null 2>&1 # exclude testing DAGS sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg @@ -52,5 +53,6 @@ airflow variables -s dataload_config_path $DATALOAD_CONFIG_PATH airflow variables -s search_query_ep sr/qr airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID +airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} diff --git a/tests/test_dags.sh b/tests/test_dags.sh index 3a12aa5ee9f6eb7cc8966993162067682df66706..50d9d3bbadbdb06d0f72ed30c492cf866a9c4d47 100644 --- a/tests/test_dags.sh +++ b/tests/test_dags.sh @@ -2,7 +2,7 @@ chmod +x tests/set_airflow_env.sh tests/./set_airflow_env.sh > /dev/null 2>&1 rm -r /usr/local/airflow/plugins/tests export FLASK_APP=/mock-server/app.py && flask run & -sleep 5 +sleep 15 timeout 700 python /mock-server/test_dags.py || EXIT_CODE=$? cp -r /usr/local/airflow/logs logs/ diff --git a/tests/unit_tests.sh b/tests/unit_tests.sh index c968c8ad5e03cb4550e3754979077c10967b7b7c..dc6e07c88fc480bc627a4fea63206d900f294ea2 100644 --- a/tests/unit_tests.sh +++ b/tests/unit_tests.sh @@ -1,6 +1,8 @@ pip install pytest +pip install --upgrade google-api-python-client chmod +x tests/set_airflow_env.sh +export AIRFLOW_SRC_DIR="/usr/local/airflow/" tests/./set_airflow_env.sh > /dev/null 2>&1 -cd /usr/local/airflow/plugins/tests +cd tests/plugin-unit-tests pytest || EXIT_CODE=$? -exit $EXIT_CODE \ No newline at end of file +exit $EXIT_CODE