Skip to content
Snippets Groups Projects
Commit e8a87871 authored by Dmitriy Rudko's avatar Dmitriy Rudko :speech_balloon:
Browse files

Merge branch 'feature/change_structure_and_linters' into 'master'

Feature/change structure and linters (GONRG-568)

See merge request !3
parents bb02fa19 d1f8542b
No related branches found
No related tags found
1 merge request!3Feature/change structure and linters (GONRG-568)
Pipeline #6548 passed
Showing
with 593 additions and 136 deletions
# https://editorconfig.org/
root = true
[*]
indent_style = space
indent_size = 4
insert_final_newline = true
trim_trailing_whitespace = true
end_of_line = lf
charset = utf-8
[*.py]
max_line_length = 100
build **/.idea
dist **/.DS_Store
osdu_api.egg-info
__pycache__ # Byte-compiled / optimized / DLL files
**/__pycache__/
**/*.py[cod]
**/*$py.class
*.log
# C extensions
**/*.so
# Distribution / packaging
**/.Python
**/build/
**/develop-eggs/
**/dist/
**/downloads/
**/eggs/
**/.eggs/
**/lib/
**/lib64/
**/parts/
**/sdist/
**/var/
**/wheels/
**/share/python-wheels/
**/*.egg-info/
**/.installed.cfg
**/*.egg
**/MANIFEST
**/venv/**
**/.idea/**
\ No newline at end of file
...@@ -16,21 +16,33 @@ ...@@ -16,21 +16,33 @@
image: google/cloud-sdk:alpine image: google/cloud-sdk:alpine
stages: stages:
- test - linters
- verify - unit_tests
- test_dags
- deploy - deploy
unit_tests: pylint:
stage: test image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36 stage: linters
allow_failure: true
script: script:
- chmod +x tests/unit_tests.sh - python -m pip install setuptools pylint pylint_quotes pylint-exit
- tests/./unit_tests.sh || EXIT_CODE=$? - tests/./set_airflow_env.sh
- pylint --rcfile=.pylintrc src/*/*.py || EXIT_CODE=$?
- exit ${EXIT_CODE}
isort:
image: johnybear/osdu-airflow:python36-1
allow_failure: true
stage: linters
script:
- python -m pip install setuptools isort
- isort -c -v src/*/*.py || EXIT_CODE=$?
- exit ${EXIT_CODE} - exit ${EXIT_CODE}
test_dags: test_dags:
stage: verify stage: test_dags
image: johnybear/osdu-airflow:python36 image: johnybear/osdu-airflow:python36-1
script: script:
- chmod +x tests/test_dags.sh - chmod +x tests/test_dags.sh
- tests/./test_dags.sh || EXIT_CODE=$? - tests/./test_dags.sh || EXIT_CODE=$?
...@@ -41,11 +53,20 @@ test_dags: ...@@ -41,11 +53,20 @@ test_dags:
when: on_failure when: on_failure
expire_in: 1 week expire_in: 1 week
unit_tests:
stage: unit_tests
image: johnybear/osdu-airflow:python36-1
script:
- chmod +x tests/unit_tests.sh
- tests/./unit_tests.sh || EXIT_CODE=$?
- exit ${EXIT_CODE}
dags_rsync_community: dags_rsync_community:
stage: deploy stage: deploy
script: script:
- gcloud auth activate-service-account --key-file $OSDU_GCP_DEPLOY_FILE - gcloud auth activate-service-account --key-file $OSDU_GCP_DEPLOY_FILE
- cd src
- gsutil -m rsync -x "\.git.*|tests/.*|plugins/tests.*$" -r "$PWD" $OSDU_GCP_DEPL_TARGET - gsutil -m rsync -x "\.git.*|tests/.*|plugins/tests.*$" -r "$PWD" $OSDU_GCP_DEPL_TARGET
only: only:
variables: variables:
- $CI_COMMIT_REF_PROTECTED - $CI_COMMIT_REF_PROTECTED
\ No newline at end of file
.pylintrc 0 → 100644
[MASTER]
# Specify a configuration file.
#rcfile=.pylintrc
# Profiled execution.
profile=no
# Add <file or directory> to the black list. It should be a base name, not a
# path. You may set this option multiple times.
ignore=.git, .venv, .idea, CVS
# Pickle collected data for later comparisons.
persistent=yes
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=pylint_quotes
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time.
enable=c-extension-no-member
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifier separated by comma (,) or put this option
# multiple time.
disable=no-member, no-self-use
[REPORTS]
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (R0004).
evaluation=13.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Include message's id in output
include-ids=yes
# Tells whether to display a full report or only the messages
reports=yes
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]".
files-output=no
# Activate the evaluation score.
score=yes
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=yes
# A regular expression matching the name of dummy variables (i.e. expectedly
# not used).
dummy-variables-rgx=(_+[a-zA-Z0-9]*?$)|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,future.builtins
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=cls
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,_fields,_replace,_source,_make
[BASIC]
# Required attributes for module, separated by a comma
required-attributes=
# Regular expression which should only match correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression which should only match correct module level names
const-rgx=
# Regular expression which should only match correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Regular expression which should only match correct function names
function-rgx=[a-z_][a-z0-9_]{2,60}$
# Regular expression which should only match correct method names
method-rgx=[a-z_][a-z0-9_]{2,60}$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct list comprehension /
# generator expression variable names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Good variable names which should always be accepted, separated by a comma
good-names=e,f,i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Regular expression which should only match functions or classes name which do
# not require a docstring
no-docstring-rgx=__.*__
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=100
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Set the linting for string quotes
string-quote=double
triple-quote=double
docstring-quote=double
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format.
logging-modules=logging
# Format style used to check logging format string. `old` means using %
# formatting, `new` is for `{}` formatting,and `fstr` is for f-strings.
logging-format-style=new
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
[DESIGN]
# Maximum number of arguments for function / method
max-args=7
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of branch for function / method body
max-branches=12
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Minimum number of public methods for a class (see R0903).
min-public-methods=0
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of boolean expressions in a if statement
max-bool-expr=5
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException, Exception
File moved
...@@ -14,17 +14,15 @@ ...@@ -14,17 +14,15 @@
# limitations under the License. # limitations under the License.
import configparser import configparser
import logging
from airflow.models import Variable
from osdu_api.model.acl import Acl from osdu_api.model.acl import Acl
from osdu_api.model.legal import Legal from osdu_api.model.legal import Legal
from osdu_api.model.legal_compliance import LegalCompliance from osdu_api.model.legal_compliance import LegalCompliance
from osdu_api.model.record import Record
from osdu_api.model.record_ancestry import RecordAncestry from osdu_api.model.record_ancestry import RecordAncestry
from osdu_api.storage.record_client import RecordClient from osdu_api.storage.record_client import RecordClient
from osdu_api.model.record import Record
import json
from airflow.models import Variable
import logging
logger = logging.getLogger() logger = logging.getLogger()
...@@ -39,33 +37,45 @@ DEFAULT_TENANT = config.get("DEFAULTS", "tenant") ...@@ -39,33 +37,45 @@ DEFAULT_TENANT = config.get("DEFAULTS", "tenant")
DEFAULT_SOURCE = config.get("DEFAULTS", "authority") DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version") DEFAULT_VERSION = config.get("DEFAULTS", "kind_version")
def create_records(**kwargs): def create_records(**kwargs):
# the only way to pass in values through the experimental api is through the conf parameter # the only way to pass in values through the experimental api is through
data_conf = kwargs['dag_run'].conf # the conf parameter
data_conf = kwargs["dag_run"].conf
logger.debug(kwargs) logger.debug(kwargs)
logger.debug(data_conf) logger.debug(data_conf)
acl = Acl(ACL_DICT['viewers'], ACL_DICT['owners']) acl = Acl(ACL_DICT["viewers"], ACL_DICT["owners"])
legal = Legal(LEGAL_DICT['legaltags'], LEGAL_DICT['otherRelevantDataCountries'], LegalCompliance.compliant) legal = Legal(
LEGAL_DICT["legaltags"],
LEGAL_DICT["otherRelevantDataCountries"],
LegalCompliance.compliant)
auth = data_conf["authorization"] auth = data_conf["authorization"]
ancestry = RecordAncestry([]) ancestry = RecordAncestry([])
record_id = None record_id = None
kind = Variable.get('record_kind')
# TODO: find out how to get kind through request not using Variables
kind = Variable.get("record_kind")
tenant, authority, file_type, version = kind.split(":")
total_count = {file_type.capitalize(): 1}
logger.info(f"The count of records to be ingested: {total_count}")
meta = [{}] meta = [{}]
version = 0 version = 0
data = data_conf.get('data', {}) data = data_conf.get("data", {})
record = Record(record_id, version, kind, acl, legal, data, ancestry, meta) record = Record(record_id, version, kind, acl, legal, data, ancestry, meta)
headers = { headers = {
'content-type': 'application/json', "content-type": "application/json",
'slb-data-partition-id': data_conf.get('partition-id', DEFAULT_SOURCE), "slb-data-partition-id": data_conf.get("partition-id", DEFAULT_SOURCE),
'Authorization': f'{auth}', "Authorization": f"{auth}",
'AppKey': data_conf.get('app-key', '') "AppKey": data_conf.get("app-key", "")
} }
record_client = RecordClient() record_client = RecordClient()
record_client.data_partition_id = data_conf.get('partition-id', DEFAULT_SOURCE) record_client.data_partition_id = data_conf.get(
"partition-id", DEFAULT_SOURCE)
resp = record_client.create_update_records([record], headers.items()) resp = record_client.create_update_records([record], headers.items())
logger.info(f"Response: {resp.text}") logger.info(f"Response: {resp.text}")
kwargs["ti"].xcom_push(key="record_ids", value=resp.json()["recordIds"]) kwargs["ti"].xcom_push(key="record_ids", value=resp.json()["recordIds"])
......
...@@ -14,36 +14,43 @@ ...@@ -14,36 +14,43 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta from datetime import timedelta
from create_records import create_records
import airflow.utils.dates
from airflow import DAG
from airflow.operators import UpdateStatusOperator from airflow.operators import UpdateStatusOperator
from airflow.operators.python_operator import PythonOperator
from create_records import create_records #isort:skip
""" """
A workflow creating a record A workflow creating a record
""" """
default_args = { default_args = {
'owner': 'Airflow', "owner": "Airflow",
'depends_on_past': False, "depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(0), "start_date": airflow.utils.dates.days_ago(0),
'email': ['airflow@example.com'], "email": ["airflow@example.com"],
'email_on_failure': False, "email_on_failure": False,
'email_on_retry': False, "email_on_retry": False,
'retries': 0, "retries": 0,
'retry_delay': timedelta(minutes=5), "retry_delay": timedelta(minutes=5),
'trigger_rule': 'none_failed', "trigger_rule": "none_failed",
# 'queue': 'bash_queue', # 'queue': 'bash_queue',
# 'pool': 'backfill', # 'pool': 'backfill',
# 'priority_weight': 10, # 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1), # 'end_date': datetime(2016, 1, 1),
} }
workflow_name = 'Default_ingest' workflow_name = "Default_ingest"
dag = DAG(workflow_name, default_args=default_args, schedule_interval=timedelta(days=1)) dag = DAG(
workflow_name,
default_args=default_args,
schedule_interval=timedelta(
days=1))
update_status_running_op = UpdateStatusOperator( update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task", task_id="update_status_running_task",
...@@ -56,11 +63,11 @@ update_status_finished_op = UpdateStatusOperator( ...@@ -56,11 +63,11 @@ update_status_finished_op = UpdateStatusOperator(
) )
create_records_op = PythonOperator( create_records_op = PythonOperator(
task_id='create_records', task_id="create_records",
python_callable=create_records, python_callable=create_records,
provide_context=True, provide_context=True,
dag=dag dag=dag
) )
update_status_running_op >> create_records_op >> update_status_finished_op update_status_running_op >> create_records_op >> update_status_finished_op
\ No newline at end of file
...@@ -13,24 +13,24 @@ ...@@ -13,24 +13,24 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import timedelta
import airflow import airflow
from airflow import DAG from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from airflow.operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator
from datetime import timedelta
from airflow.operators import UpdateStatusOperator, ProcessManifestOperator
default_args = { default_args = {
'start_date': airflow.utils.dates.days_ago(0), "start_date": airflow.utils.dates.days_ago(0),
'retries': 0, "retries": 0,
'retry_delay': timedelta(minutes=50), "retry_delay": timedelta(minutes=50),
'trigger_rule': 'none_failed', "trigger_rule": "none_failed",
} }
dag = DAG( dag = DAG(
'Osdu_ingest', "Osdu_ingest",
default_args=default_args, default_args=default_args,
description='liveness monitoring dag', description="liveness monitoring dag",
schedule_interval=None, schedule_interval=None,
dagrun_timeout=timedelta(minutes=60) dagrun_timeout=timedelta(minutes=60)
) )
...@@ -49,11 +49,16 @@ update_status_finished_op = UpdateStatusOperator( ...@@ -49,11 +49,16 @@ update_status_finished_op = UpdateStatusOperator(
) )
process_manifest_op = ProcessManifestOperator( process_manifest_op = ProcessManifestOperator(
task_id='proccess_manifest_task', task_id="proccess_manifest_task",
provide_context=True, provide_context=True,
dag=dag dag=dag
) )
update_status_running_op >> process_manifest_op >> update_status_finished_op search_record_ids_op = SearchRecordIdOperator(
task_id="search_record_ids_task",
provide_context=True,
dag=dag
)
update_status_running_op >> process_manifest_op >> search_record_ids_op >> update_status_finished_op
...@@ -12,28 +12,29 @@ ...@@ -12,28 +12,29 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import timedelta
import airflow import airflow
from airflow import DAG from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
default_args = { default_args = {
'start_date': airflow.utils.dates.days_ago(0), "start_date": airflow.utils.dates.days_ago(0),
'retries': 1, "retries": 1,
'retry_delay': timedelta(minutes=50) "retry_delay": timedelta(minutes=50)
} }
dag = DAG( dag = DAG(
'Other_log_ingest', "Other_log_ingest",
default_args=default_args, default_args=default_args,
description='liveness monitoring dag', description="liveness monitoring dag",
schedule_interval=None, schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)) dagrun_timeout=timedelta(minutes=60))
t1 = BashOperator( t1 = BashOperator(
task_id='echo', task_id="echo",
bash_command='echo test', bash_command="echo test",
dag=dag, dag=dag,
depends_on_past=False, depends_on_past=False,
priority_weight=2**31-1) priority_weight=2**31 - 1)
...@@ -12,27 +12,28 @@ ...@@ -12,27 +12,28 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import timedelta
import airflow import airflow
from airflow import DAG from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
default_args = { default_args = {
'start_date': airflow.utils.dates.days_ago(0), "start_date": airflow.utils.dates.days_ago(0),
'retries': 1, "retries": 1,
'retry_delay': timedelta(minutes=50) "retry_delay": timedelta(minutes=50)
} }
dag = DAG( dag = DAG(
'Well_log_ingest', "Well_log_ingest",
default_args=default_args, default_args=default_args,
description='liveness monitoring dag', description="liveness monitoring dag",
schedule_interval=None, schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)) dagrun_timeout=timedelta(minutes=60))
t1 = BashOperator( t1 = BashOperator(
task_id='echo', task_id="echo",
bash_command='echo test', bash_command="echo test",
dag=dag, dag=dag,
depends_on_past=False, depends_on_past=False,
priority_weight=2**31-1) priority_weight=2**31 - 1)
from airflow.plugins_manager import AirflowPlugin from airflow.plugins_manager import AirflowPlugin
from .operators import UpdateStatusOperator, ProcessManifestOperator
from .operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator
# Defining the plugin class # Defining the plugin class
class OSDUPlugin(AirflowPlugin): class OSDUPlugin(AirflowPlugin):
name = "osdu_plugin" name = "osdu_plugin"
operators = [ operators = [
UpdateStatusOperator, UpdateStatusOperator,
ProcessManifestOperator ProcessManifestOperator,
SearchRecordIdOperator
] ]
hooks = [] hooks = []
# A list of class(es) derived from BaseExecutor # A list of class(es) derived from BaseExecutor
...@@ -19,4 +22,4 @@ class OSDUPlugin(AirflowPlugin): ...@@ -19,4 +22,4 @@ class OSDUPlugin(AirflowPlugin):
# A list of Blueprint object created from flask.Blueprint # A list of Blueprint object created from flask.Blueprint
flask_blueprints = [] flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink) # A list of menu links (flask_admin.base.MenuLink)
menu_links = [] menu_links = []
\ No newline at end of file
File moved
...@@ -16,4 +16,4 @@ ...@@ -16,4 +16,4 @@
from airflow.hooks.http_hook import HttpHook from airflow.hooks.http_hook import HttpHook
workflow_hook = HttpHook(http_conn_id='workflow', method="POST") workflow_hook = HttpHook(http_conn_id='workflow', method="POST")
search_http_hook = HttpHook(http_conn_id='search', method="POST") search_http_hook = HttpHook(http_conn_id='search', method="POST")
\ No newline at end of file
...@@ -13,10 +13,12 @@ ...@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .update_status_op import UpdateStatusOperator
from .process_manifest_op import ProcessManifestOperator from .process_manifest_op import ProcessManifestOperator
from .search_record_id_op import SearchRecordIdOperator
from .update_status_op import UpdateStatusOperator
__all__ = [ __all__ = [
'UpdateStatusOperator', 'UpdateStatusOperator',
'ProcessManifestOperator' 'ProcessManifestOperator',
] 'SearchRecordIdOperator',
\ No newline at end of file ]
...@@ -13,21 +13,21 @@ ...@@ -13,21 +13,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime import configparser
import enum
import json import json
import logging import logging
import configparser import re
import sys import sys
import time import time
import uuid import uuid
import enum from collections import Counter
import re from datetime import datetime
from airflow.models import BaseOperator
from typing import Tuple from typing import Tuple
from urllib.error import HTTPError from urllib.error import HTTPError
from collections import Counter
from airflow.models import Variable
import requests import requests
from airflow.models import BaseOperator, Variable
ACL_DICT = eval(Variable.get("acl")) ACL_DICT = eval(Variable.get("acl"))
LEGAL_DICT = eval(Variable.get("legal")) LEGAL_DICT = eval(Variable.get("legal"))
...@@ -232,6 +232,7 @@ def separate_type_data(request_data): ...@@ -232,6 +232,7 @@ def separate_type_data(request_data):
for elem in request_data: for elem in request_data:
data.append(elem[0]) data.append(elem[0])
types[elem[1]] += 1 types[elem[1]] += 1
logger.info(f"The count of records to be ingested: {str(dict(types))}")
return types, data return types, data
......
...@@ -14,18 +14,18 @@ ...@@ -14,18 +14,18 @@
# limitations under the License. # limitations under the License.
import logging
import sys
import enum import enum
import json import json
import tenacity import logging
import sys
from functools import partial from functools import partial
from airflow.models import BaseOperator from typing import Tuple
from airflow.utils.decorators import apply_defaults
from airflow.models import Variable
from hooks import workflow_hook, search_http_hook import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks import search_http_hook, workflow_hook
# Set up base logger # Set up base logger
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
...@@ -36,8 +36,11 @@ logger.addHandler(handler) ...@@ -36,8 +36,11 @@ logger.addHandler(handler)
class UpdateStatusOperator(BaseOperator): class SearchRecordIdOperator(BaseOperator):
"""
Operator to search files in SearchService by record ids.
Expects "record_ids" field in xcom.
"""
ui_color = '#10ECAA' ui_color = '#10ECAA'
ui_fgcolor = '#000000' ui_fgcolor = '#000000'
...@@ -45,11 +48,6 @@ class UpdateStatusOperator(BaseOperator): ...@@ -45,11 +48,6 @@ class UpdateStatusOperator(BaseOperator):
RUNNING_STATUS = "running" RUNNING_STATUS = "running"
FAILED_STATUS = "failed" FAILED_STATUS = "failed"
class prev_ti_state(enum.Enum):
NONE = enum.auto()
SUCCESS = enum.auto()
FAILED = enum.auto()
@apply_defaults @apply_defaults
def __init__( self, *args, **kwargs) -> None: def __init__( self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
...@@ -57,14 +55,14 @@ class UpdateStatusOperator(BaseOperator): ...@@ -57,14 +55,14 @@ class UpdateStatusOperator(BaseOperator):
self.search_hook = search_http_hook self.search_hook = search_http_hook
@staticmethod @staticmethod
def _file_searched(resp, expected_total_count): def _file_searched(resp, expected_total_count) -> bool:
"""Check if search service returns totalCount. """Check if search service returns expected totalCount.
The method is used as a callback The method is used as a callback
""" """
data = resp.json() data = resp.json()
return data.get("totalCount") == expected_total_count return data.get("totalCount") == expected_total_count
def get_headers(self, **kwargs): def get_headers(self, **kwargs) -> dict:
data_conf = kwargs['dag_run'].conf data_conf = kwargs['dag_run'].conf
# for /submitWithManifest authorization and partition-id are inside Payload field # for /submitWithManifest authorization and partition-id are inside Payload field
if "Payload" in data_conf: if "Payload" in data_conf:
...@@ -80,20 +78,18 @@ class UpdateStatusOperator(BaseOperator): ...@@ -80,20 +78,18 @@ class UpdateStatusOperator(BaseOperator):
} }
return headers return headers
def search_files(self, **kwargs): @staticmethod
def create_query(record_ids): def _create_search_query(record_ids) -> Tuple[str, int]:
expected_total_count = len(record_ids) expected_total_count = len(record_ids)
record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids) record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids)
logger.info(f"Search query {record_ids}") logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})" query = f"id:({record_ids})"
return query, expected_total_count return query, expected_total_count
record_ids_default = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records') def search_files(self, **kwargs):
record_ids_manifest = kwargs["ti"].xcom_pull(key="record_ids", task_ids='proccess_manifest_task') record_ids = kwargs["ti"].xcom_pull(key="record_ids",)
if record_ids_default: if record_ids:
query, expected_total_count = create_query(record_ids_default) query, expected_total_count = self._create_search_query(record_ids)
elif record_ids_manifest:
query, expected_total_count = create_query(record_ids_manifest)
else: else:
logger.error("There are no record ids") logger.error("There are no record ids")
sys.exit(2) sys.exit(2)
...@@ -116,58 +112,9 @@ class UpdateStatusOperator(BaseOperator): ...@@ -116,58 +112,9 @@ class UpdateStatusOperator(BaseOperator):
_retry_args=retry_opts _retry_args=retry_opts
) )
def previous_ti_statuses(self, context):
dagrun = context['ti'].get_dagrun()
failed_ti, success_ti = dagrun.get_task_instances(state='failed'), dagrun.get_task_instances(state='success')
if not failed_ti and not success_ti: # There is no prev task so it can't have been failed
logger.info("There are no tasks before this one. So it has status RUNNING")
return self.prev_ti_state.NONE
if failed_ti:
logger.info("There are failed tasks before this one. So it has status FAILED")
return self.prev_ti_state.FAILED
logger.info("There are successed tasks before this one. So it has status SUCCESSED")
return self.prev_ti_state.SUCCESS
def pre_execute(self, context):
prev_tis = self.previous_ti_statuses(context)
if prev_tis is self.prev_ti_state.NONE:
self.status = self.RUNNING_STATUS
elif prev_tis is self.prev_ti_state.FAILED:
self.status = self.FAILED_STATUS
elif prev_tis is self.prev_ti_state.SUCCESS:
self.status = self.FINISHED_STATUS
def execute(self, context): def execute(self, context):
"""Execute update workflow status. """Execute update workflow status.
If status assumed to be FINISHED then we check whether proceed files are searchable or not. If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED If they are then update status FINISHED else FAILED
""" """
if self.status in (self.RUNNING_STATUS, self.FAILED_STATUS): self.search_files(**context)
self.update_status_rqst(self.status, **context)
elif self.status == self.FINISHED_STATUS:
try:
self.search_files(**context)
except Exception as e:
logger.error(str(e))
self.status = self.FAILED_STATUS
self.update_status_rqst(self.FAILED_STATUS, **context)
else:
self.update_status_rqst(self.FINISHED_STATUS, **context)
if self.status == self.FAILED_STATUS:
raise Exception("Dag failed")
def update_status_rqst(self, status, **kwargs):
data_conf = kwargs['dag_run'].conf
logger.info(f"Got dataconf {data_conf}")
workflow_id = data_conf["WorkflowID"]
headers = self.get_headers(**kwargs)
request_body = {
"WorkflowID": workflow_id,
"Status": status
}
logger.info(f" Sending request '{status}'")
self.workflow_hook.run(
endpoint=Variable.get("update_status_ep"),
data=json.dumps(request_body),
headers=headers
)
...@@ -14,18 +14,17 @@ ...@@ -14,18 +14,17 @@
# limitations under the License. # limitations under the License.
import logging
import sys
import enum import enum
import json import json
import tenacity import logging
import sys
from functools import partial from functools import partial
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.models import Variable
from hooks import workflow_hook, search_http_hook import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks import search_http_hook, workflow_hook
# Set up base logger # Set up base logger
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
...@@ -88,12 +87,9 @@ class UpdateStatusOperator(BaseOperator): ...@@ -88,12 +87,9 @@ class UpdateStatusOperator(BaseOperator):
query = f"id:({record_ids})" query = f"id:({record_ids})"
return query, expected_total_count return query, expected_total_count
record_ids_default = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records') record_ids = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records')
record_ids_manifest = kwargs["ti"].xcom_pull(key="record_ids", task_ids='proccess_manifest_task') if record_ids:
if record_ids_default: query, expected_total_count = create_query(record_ids)
query, expected_total_count = create_query(record_ids_default)
elif record_ids_manifest:
query, expected_total_count = create_query(record_ids_manifest)
else: else:
logger.error("There are no record ids") logger.error("There are no record ids")
sys.exit(2) sys.exit(2)
...@@ -142,17 +138,7 @@ class UpdateStatusOperator(BaseOperator): ...@@ -142,17 +138,7 @@ class UpdateStatusOperator(BaseOperator):
If status assumed to be FINISHED then we check whether proceed files are searchable or not. If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED If they are then update status FINISHED else FAILED
""" """
if self.status in (self.RUNNING_STATUS, self.FAILED_STATUS): self.update_status_rqst(self.status, **context)
self.update_status_rqst(self.status, **context)
elif self.status == self.FINISHED_STATUS:
try:
self.search_files(**context)
except Exception as e:
logger.error(str(e))
self.status = self.FAILED_STATUS
self.update_status_rqst(self.FAILED_STATUS, **context)
else:
self.update_status_rqst(self.FINISHED_STATUS, **context)
if self.status == self.FAILED_STATUS: if self.status == self.FAILED_STATUS:
raise Exception("Dag failed") raise Exception("Dag failed")
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
OSDU_INGEST_SUCCES_FIFO = "/tmp/osdu_ingest_success"
OSDU_INGEST_FAILED_FIFO = "/tmp/osdu_ingest_failed"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment