Skip to content
Snippets Groups Projects
Commit 03cd8986 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'feature/remove_obsolete_dags' into 'integration-master'

GONRG-1567: Removed obsolete dags

See merge request go3-nrg/platform/data-flow/ingestion/ingestion-dags!64
parents 26694965 2dcf5b8d
No related branches found
No related tags found
No related merge requests found
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
# Copyright 2020 Amazon
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for opaque ingestion."""
from datetime import timedelta
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from operators.deprecated.update_status import UpdateStatusOperator
from libs.create_records import create_records # isort:skip
default_args = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(0),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"trigger_rule": "none_failed",
}
workflow_name = "Default_ingest"
dag = DAG(
workflow_name,
default_args=default_args,
schedule_interval=timedelta(
days=1))
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
dag=dag
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag
)
create_records_op = PythonOperator(
task_id="create_records",
python_callable=create_records,
provide_context=True,
dag=dag
)
update_status_running_op >> create_records_op >> update_status_finished_op
# Copyright 2020 Google LLC
# Copyright 2020 Amazon
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provide R2 record processor."""
import configparser
import logging
import os
from airflow.models import Variable
from libs.context import Context
from libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.auth.authorization import authorize
from osdu_api.model.acl import Acl
from osdu_api.model.legal.legal import Legal
from osdu_api.model.legal.legal_compliance import LegalCompliance
from osdu_api.model.record import Record
from osdu_api.model.record_ancestry import RecordAncestry
from osdu_api.storage.record_client import RecordClient
logger = logging.getLogger()
ACL_DICT = eval(Variable.get("acl"))
LEGAL_DICT = eval(Variable.get("legal"))
config = configparser.RawConfigParser()
config.read(Variable.get("dataload_config_path"))
DEFAULT_TENANT = config.get("DEFAULTS", "tenant")
DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version")
@authorize(AirflowTokenRefresher())
def create_update_record_request(headers, record_client, record):
"""Send request to create or update records via RecordClient.
:param headers: The request headers
:param record_client: RecordClient
:param record: The record to create or update
:return: Storage service response
"""
resp = record_client.create_update_records([record], headers)
return resp
def create_records(**kwargs):
"""Create records.
The only way to pass in values through the experimental api is through
the conf parameter
:return: response status
"""
data_conf = kwargs["dag_run"].conf
logger.debug(kwargs)
logger.debug(data_conf)
acl = Acl(ACL_DICT["viewers"], ACL_DICT["owners"])
legal = Legal(
LEGAL_DICT["legaltags"],
LEGAL_DICT["otherRelevantDataCountries"],
LegalCompliance.compliant)
auth = data_conf["authorization"]
ancestry = RecordAncestry([])
record_id = None
# TODO: find out how to get kind through request not using Variables
kind = Variable.get("record_kind")
tenant, authority, file_type, version = kind.split(":")
total_count = {file_type.capitalize(): 1}
logger.info(f"The count of records to be ingested: {total_count}")
meta = [{}]
version = 0
data = data_conf.get("data", {})
record = Record(record_id, version, kind, acl, legal, data, ancestry, meta)
headers = {
"content-type": "application/json",
"slb-data-partition-id": data_conf.get("partition-id", DEFAULT_SOURCE),
"AppKey": data_conf.get("app-key", "")
}
record_client = RecordClient(AirflowTokenRefresher(), Context("test", "test"))
record_client.data_partition_id = data_conf.get(
"partition-id", DEFAULT_SOURCE)
resp = create_update_record_request(headers, record_client, record)
logger.info(f"Response: {resp.text}")
kwargs["ti"].xcom_push(key="record_ids", value=resp.json()["recordIds"])
return {"response_status": resp.status_code}
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dummy DAG"""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 1,
"retry_delay": timedelta(minutes=50)
}
dag = DAG(
"Other_log_ingest",
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60))
t1 = BashOperator(
task_id="echo",
bash_command="echo test",
dag=dag,
depends_on_past=False,
priority_weight=2**31 - 1)
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""WellLog DAG."""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 1,
"retry_delay": timedelta(minutes=50)
}
dag = DAG(
"Well_log_ingest",
default_args=default_args,
description="liveness monitoring dag",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60))
t1 = BashOperator(
task_id="echo",
bash_command="echo test",
dag=dag,
depends_on_past=False,
priority_weight=2**31 - 1)
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{
"authorization": "Bearer secret",
"legal-tags": "\"legaltags\": [\"foo\"],\"otherRelevantDataCountries\": [\"FR\",\"US\",\"CA\"]\"",
"AppKey": "test",
"acl": "\"viewers\": [\"foo\"], \"owners\": [\"foo\"]",
"data-partition-id": "odes"
}
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{
"authorization": "Bearer foo",
"legal-tags": "\"legaltags\": [\"foo\"],\"otherRelevantDataCountries\": [\"FR\",\"US\",\"CA\"]\", \"data\": {\"FileID\": \"83d0ec02be114cf580da81f96b2a831e\"}",
"AppKey": "test",
"acl": "\"viewers\": [\"foo\"], \"owners\": [\"foo\"]",
"data-partition-id": "odes",
"WorkflowID": "foo"
}
\ No newline at end of file
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
json=$(cat `dirname $0`/mock-data/default-ingest-invalid.json | tail -n +15)
airflow trigger_dag -c "$json" Default_ingest
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
json=$(cat `dirname $0`/mock-data/default-ingest-valid.json | tail -n +15)
airflow trigger_dag -c "$json" Default_ingest
......@@ -70,8 +70,8 @@ cp -rf src/* /usr/local/airflow/
cp -r tests/end-to-end-tests/mock-external-apis /mock-server
cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh} /mock-server/
cp tests/end-to-end-tests/osdu_api_config.yaml /mock-server/
cp tests/*.py /mock-server/
chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh}
chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh}
......@@ -27,8 +27,6 @@ class DagStatus(enum.Enum):
OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
with open("/tmp/osdu_ingest_result", "w") as f:
f.close()
......@@ -79,5 +77,3 @@ def test_dag_fail(dag_name, script):
test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH)
test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH)
test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH)
test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment