diff --git a/src/dags/osdu-ingest.py b/src/dags/osdu-ingest-r2.py similarity index 97% rename from src/dags/osdu-ingest.py rename to src/dags/osdu-ingest-r2.py index f43fb22358f21742d45da7d21a5d0afb1addd017..c6f171c461bb8cb4b127ff29180c6464cf43546a 100644 --- a/src/dags/osdu-ingest.py +++ b/src/dags/osdu-ingest-r2.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# DAG for R2 ingestion + from datetime import timedelta import airflow @@ -24,11 +26,10 @@ default_args = { "retries": 0, "retry_delay": timedelta(minutes=50), "trigger_rule": "none_failed", - } dag = DAG( - "Osdu_ingest", + "Osdu_ingest_r2", default_args=default_args, description="liveness monitoring dag", schedule_interval=None, diff --git a/src/plugins/operators/__init__.py b/src/plugins/operators/__init__.py index c0da87d0584cba75f258461d4ed576e7a8ca0c3e..2eecd112c887b53a0aed045020e236ad5c374fb6 100644 --- a/src/plugins/operators/__init__.py +++ b/src/plugins/operators/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .process_manifest_op import ProcessManifestOperator +from .process_manifest_r2_op import ProcessManifestOperator from .search_record_id_op import SearchRecordIdOperator from .update_status_op import UpdateStatusOperator diff --git a/src/plugins/operators/process_manifest_op.py b/src/plugins/operators/process_manifest_r2_op.py similarity index 100% rename from src/plugins/operators/process_manifest_op.py rename to src/plugins/operators/process_manifest_r2_op.py diff --git a/tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-r2-invalid.json similarity index 100% rename from tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json rename to tests/end-to-end-tests/mock-data/osdu-ingest-r2-invalid.json diff --git a/tests/end-to-end-tests/mock-data/osdu-ingest-valid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-r2-valid.json similarity index 100% rename from tests/end-to-end-tests/mock-data/osdu-ingest-valid.json rename to tests/end-to-end-tests/mock-data/osdu-ingest-r2-valid.json diff --git a/tests/end-to-end-tests/test-osdu-ingest-success.sh b/tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh similarity index 83% rename from tests/end-to-end-tests/test-osdu-ingest-success.sh rename to tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh index fef8a0f8c4e1fe21157da2d212a668d35c558869..8ba0d80f6b39ff14a239a8816c85052cac584a78 100755 --- a/tests/end-to-end-tests/test-osdu-ingest-success.sh +++ b/tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh @@ -14,5 +14,5 @@ # limitations under the License. -json=$(cat `dirname $0`/mock-data/osdu-ingest-valid.json | tail -n +15) -airflow trigger_dag -c "$json" Osdu_ingest +json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-invalid.json | tail -n +15) +airflow trigger_dag -c "$json" Osdu_ingest_r2 diff --git a/tests/end-to-end-tests/test-osdu-ingest-fail.sh b/tests/end-to-end-tests/test-osdu-ingest-r2-success.sh similarity index 83% rename from tests/end-to-end-tests/test-osdu-ingest-fail.sh rename to tests/end-to-end-tests/test-osdu-ingest-r2-success.sh index e8a24a7dfed017d26d7b24c9f4e0edbf8bfff589..eeb9080051ea6ec6b9c327cbd61d1fb84a778c90 100755 --- a/tests/end-to-end-tests/test-osdu-ingest-fail.sh +++ b/tests/end-to-end-tests/test-osdu-ingest-r2-success.sh @@ -14,5 +14,5 @@ # limitations under the License. -json=$(cat `dirname $0`/mock-data/osdu-ingest-invalid.json | tail -n +15) -airflow trigger_dag -c "$json" Osdu_ingest +json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-valid.json | tail -n +15) +airflow trigger_dag -c "$json" Osdu_ingest_r2 diff --git a/tests/plugin-unit-tests/data/__init__.py b/tests/plugin-unit-tests/data/__init__.py index e9d8ac876ee1c3ce060bf9b178727657bf6f2758..7be2c556ed3b00241e3a46b707419523a803dcbd 100644 --- a/tests/plugin-unit-tests/data/__init__.py +++ b/tests/plugin-unit-tests/data/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .process_manifest_op import * +from .process_manifest_r2_op import * diff --git a/tests/plugin-unit-tests/data/process_manifest_op.py b/tests/plugin-unit-tests/data/process_manifest_r2_op.py similarity index 100% rename from tests/plugin-unit-tests/data/process_manifest_op.py rename to tests/plugin-unit-tests/data/process_manifest_r2_op.py diff --git a/tests/plugin-unit-tests/test_process_manifest_op.py b/tests/plugin-unit-tests/test_process_manifest_op.py deleted file mode 100644 index 3c516f4a801ed3c3851cfff00676d854b8ba1a89..0000000000000000000000000000000000000000 --- a/tests/plugin-unit-tests/test_process_manifest_op.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2020 Google LLC -# Copyright 2020 EPAM Systems -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os -import re -import sys - -import pytest - -sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") - -from data import process_manifest_op as test_data -from operators import process_manifest_op - - -@pytest.mark.parametrize( - "test_input, expected", - [ - ("srn:type:work-product/WellLog:", "WellLog"), - ("srn:type:file/las2:", "las2"), - ] -) -def test_determine_data_type(test_input, expected): - data_type = process_manifest_op.determine_data_type(test_input) - assert data_type == expected - - -@pytest.mark.parametrize( - "data_type, loaded_conf, conf_payload, expected_file_result", - [ - ("las2", - test_data.LOADED_CONF, - test_data.CONF_PAYLOAD, - test_data.PROCESS_FILE_ITEMS_RESULT) - ] -) -def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result): - file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:") - expected_file_list = expected_file_result[0] - file_list, file_ids = process_manifest_op.process_file_items(loaded_conf, conf_payload) - for i in file_ids: - assert file_id_regex.match(i) - - for i in file_list: - assert file_id_regex.match(i[0]["data"]["ResourceID"]) - i[0]["data"]["ResourceID"] = "" - assert file_list == expected_file_list diff --git a/tests/plugin-unit-tests/test_process_manifest_r2_op.py b/tests/plugin-unit-tests/test_process_manifest_r2_op.py new file mode 100644 index 0000000000000000000000000000000000000000..7e184ab640f311c174f998258cbfe4badaf21c25 --- /dev/null +++ b/tests/plugin-unit-tests/test_process_manifest_r2_op.py @@ -0,0 +1,60 @@ +# Copyright 2020 Google LLC +# Copyright 2020 EPAM Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import re +import sys + +import pytest + +sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") + +from data import process_manifest_r2_op as test_data +from operators import process_manifest_r2_op as p_m_op + + +@pytest.mark.parametrize( + "test_input, expected", + [ + ("srn:type:work-product/WellLog:", "WellLog"), + ("srn:type:file/las2:", "las2"), + ] +) +def test_determine_data_type(test_input, expected): + data_type = p_m_op.determine_data_type(test_input) + assert data_type == expected + + +@pytest.mark.parametrize( + "data_type, loaded_conf, conf_payload, expected_file_result", + [ + ("las2", + test_data.LOADED_CONF, + test_data.CONF_PAYLOAD, + test_data.PROCESS_FILE_ITEMS_RESULT) + ] +) +def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result): + file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:") + expected_file_list = expected_file_result[0] + file_list, file_ids = p_m_op.process_file_items(loaded_conf, conf_payload) + for i in file_ids: + assert file_id_regex.match(i) + + for i in file_list: + assert file_id_regex.match(i[0]["data"]["ResourceID"]) + i[0]["data"]["ResourceID"] = "" + assert file_list == expected_file_list diff --git a/tests/set_airflow_env.sh b/tests/set_airflow_env.sh index 21be656adb59f203d1351dc5b68c60b0d3047755..af68cf97b59eb0d4ffe17a6a3c2a62ffd1547229 100755 --- a/tests/set_airflow_env.sh +++ b/tests/set_airflow_env.sh @@ -23,15 +23,8 @@ export SEARCH_CONN_ID="http://127.0.0.1:5000" export WORKFLOW_CONN_ID="http://127.0.0.1:5000" export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini" - -mkdir -p /usr/local/airflow/dags/ -cp -rf src/* /usr/local/airflow/ -cp -r tests/end-to-end-tests/mock-external-apis /mock-server -cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data -cp tests/end-to-end-tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/ -cp tests/*.py /mock-server/ - airflow initdb > /dev/null 2>&1 + # exclude testing DAGS sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg # turn on all dags @@ -54,8 +47,15 @@ airflow variables -s search_query_ep sr/qr airflow variables -s access_token test airflow variables -s "sa-file-osdu" "test" -airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID -airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID -airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID +airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID +airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID +airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID + +mkdir -p /usr/local/airflow/dags/ +cp -rf src/* /usr/local/airflow/ +cp -r tests/end-to-end-tests/mock-external-apis /mock-server +cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data +cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/ +cp tests/*.py /mock-server/ -chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} +chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} diff --git a/tests/test_dags.py b/tests/test_dags.py index ffd6a056f05130d54d1b9327ebc19b341e622d8c..43a99ac78e51caab1cc6e8a94f78630ca44c4ef0 100644 --- a/tests/test_dags.py +++ b/tests/test_dags.py @@ -19,26 +19,27 @@ import time class DagStatus(enum.Enum): - RUNNING = "running" - FAILED = "failed" - FINISHED = "finished" + RUNNING = enum.auto() + FAILED = enum.auto() + FINISHED = enum.auto() - -OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-success.sh" -OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-fail.sh" +OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh" +OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh" DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh" DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh" -subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True) +with open("/tmp/osdu_ingest_result", "w") as f: + f.close() +subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True) -def check_dag_status(dag_name: str) -> DagStatus: +def check_dag_status(dag_name): time.sleep(5) output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}') if "failed" in output: print(dag_name) print(output) - return DagStatus.FAILED + return DagStatus.FAILED if "running" in output: return DagStatus.RUNNING print(dag_name) @@ -46,17 +47,32 @@ def check_dag_status(dag_name: str) -> DagStatus: return DagStatus.FINISHED -def test_dag_execution_result(dag_name: str, script: str, expected_status: DagStatus): +def test_dag_success(dag_name, script): + print(f"Test {dag_name} success") + subprocess.run(f"{script}", shell=True) + while True: + dag_status = check_dag_status(dag_name) + if dag_status is DagStatus.RUNNING: + continue + elif dag_status is DagStatus.FINISHED: + return + else: + raise Exception(f"Error {dag_name} supposed to be finished") + +def test_dag_fail(dag_name, script): subprocess.run(f"{script}", shell=True) - print(f"Expecting {dag_name} to be {expected_status.value}") + print(f"Expecting {dag_name} fail") while True: dag_status = check_dag_status(dag_name) - if dag_status is not DagStatus.RUNNING: - break - assert dag_status is expected_status, f"Error {dag_name} supposed to be {expected_status.value}" + if dag_status is DagStatus.RUNNING: + continue + elif dag_status is DagStatus.FAILED: + return + else: + raise Exception(f"Error {dag_name} supposed to be failed") -test_dag_execution_result("Osdu_ingest", OSDU_INGEST_SUCCESS_SH, DagStatus.FINISHED) -test_dag_execution_result("Osdu_ingest", OSDU_INGEST_FAIL_SH, DagStatus.FAILED) -test_dag_execution_result("Default_ingest", DEFAULT_INGEST_SUCCESS_SH, DagStatus.FINISHED) -test_dag_execution_result("Default_ingest", DEFAULT_INGEST_FAIL_SH, DagStatus.FAILED) +test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH) +test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH) +test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH) +test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)