Skip to content
Snippets Groups Projects
Commit b94178eb authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

GONRG-1011: Renamed Osdu_ingest DAG to Osdu_ingest_r2.

parent e0cab95b
No related branches found
No related tags found
1 merge request!6R3 Data Ingestion
Showing with 115 additions and 38 deletions
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# DAG for R2 ingestion
from datetime import timedelta from datetime import timedelta
import airflow import airflow
...@@ -24,11 +26,10 @@ default_args = { ...@@ -24,11 +26,10 @@ default_args = {
"retries": 0, "retries": 0,
"retry_delay": timedelta(minutes=50), "retry_delay": timedelta(minutes=50),
"trigger_rule": "none_failed", "trigger_rule": "none_failed",
} }
dag = DAG( dag = DAG(
"Osdu_ingest", "Osdu_ingest_r2",
default_args=default_args, default_args=default_args,
description="liveness monitoring dag", description="liveness monitoring dag",
schedule_interval=None, schedule_interval=None,
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .process_manifest_op import ProcessManifestOperator from .process_manifest_r2_op import ProcessManifestOperator
from .search_record_id_op import SearchRecordIdOperator from .search_record_id_op import SearchRecordIdOperator
from .update_status_op import UpdateStatusOperator from .update_status_op import UpdateStatusOperator
......
...@@ -14,5 +14,5 @@ ...@@ -14,5 +14,5 @@
# limitations under the License. # limitations under the License.
json=$(cat `dirname $0`/mock-data/osdu-ingest-valid.json | tail -n +15) json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-invalid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest airflow trigger_dag -c "$json" Osdu_ingest_r2
...@@ -14,5 +14,5 @@ ...@@ -14,5 +14,5 @@
# limitations under the License. # limitations under the License.
json=$(cat `dirname $0`/mock-data/osdu-ingest-invalid.json | tail -n +15) json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-valid.json | tail -n +15)
airflow trigger_dag -c "$json" Osdu_ingest airflow trigger_dag -c "$json" Osdu_ingest_r2
...@@ -13,4 +13,4 @@ ...@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .process_manifest_op import * from .process_manifest_r2_op import *
...@@ -22,39 +22,39 @@ import pytest ...@@ -22,39 +22,39 @@ import pytest
sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
from data import process_manifest_op as test_data from data import process_manifest_r2_op as test_data
from operators import process_manifest_op from operators import process_manifest_r2_op as p_m_op
@pytest.mark.parametrize( @pytest.mark.parametrize(
"test_input, expected", "test_input, expected",
[ [
("srn:type:work-product/WellLog:", "WellLog"), ("srn:type:work-product/WellLog:", "WellLog"),
("srn:type:file/las2:", "las2"), ("srn:type:file/las2:", "las2"),
] ]
) )
def test_determine_data_type(test_input, expected): def test_determine_data_type(test_input, expected):
data_type = process_manifest_op.determine_data_type(test_input) data_type = p_m_op.determine_data_type(test_input)
assert data_type == expected assert data_type == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(
"data_type, loaded_conf, conf_payload, expected_file_result", "data_type, loaded_conf, conf_payload, expected_file_result",
[ [
("las2", ("las2",
test_data.LOADED_CONF, test_data.LOADED_CONF,
test_data.CONF_PAYLOAD, test_data.CONF_PAYLOAD,
test_data.PROCESS_FILE_ITEMS_RESULT) test_data.PROCESS_FILE_ITEMS_RESULT)
] ]
) )
def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result): def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result):
file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:") file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:")
expected_file_list = expected_file_result[0] expected_file_list = expected_file_result[0]
file_list, file_ids = process_manifest_op.process_file_items(loaded_conf, conf_payload) file_list, file_ids = p_m_op.process_file_items(loaded_conf, conf_payload)
for i in file_ids: for i in file_ids:
assert file_id_regex.match(i) assert file_id_regex.match(i)
for i in file_list: for i in file_list:
assert file_id_regex.match(i[0]["data"]["ResourceID"]) assert file_id_regex.match(i[0]["data"]["ResourceID"])
i[0]["data"]["ResourceID"] = "" i[0]["data"]["ResourceID"] = ""
assert file_list == expected_file_list assert file_list == expected_file_list
...@@ -23,15 +23,8 @@ export SEARCH_CONN_ID="http://127.0.0.1:5000" ...@@ -23,15 +23,8 @@ export SEARCH_CONN_ID="http://127.0.0.1:5000"
export WORKFLOW_CONN_ID="http://127.0.0.1:5000" export WORKFLOW_CONN_ID="http://127.0.0.1:5000"
export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini" export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini"
mkdir -p /usr/local/airflow/dags/
cp -rf src/* /usr/local/airflow/
cp -r tests/end-to-end-tests/mock-external-apis /mock-server
cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
cp tests/end-to-end-tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
cp tests/*.py /mock-server/
airflow initdb > /dev/null 2>&1 airflow initdb > /dev/null 2>&1
# exclude testing DAGS # exclude testing DAGS
sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg
# turn on all dags # turn on all dags
...@@ -54,8 +47,15 @@ airflow variables -s search_query_ep sr/qr ...@@ -54,8 +47,15 @@ airflow variables -s search_query_ep sr/qr
airflow variables -s access_token test airflow variables -s access_token test
airflow variables -s "sa-file-osdu" "test" airflow variables -s "sa-file-osdu" "test"
airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID
airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID
mkdir -p /usr/local/airflow/dags/
cp -rf src/* /usr/local/airflow/
cp -r tests/end-to-end-tests/mock-external-apis /mock-server
cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
cp tests/*.py /mock-server/
chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh}
...@@ -19,26 +19,27 @@ import time ...@@ -19,26 +19,27 @@ import time
class DagStatus(enum.Enum): class DagStatus(enum.Enum):
RUNNING = "running" RUNNING = enum.auto()
FAILED = "failed" FAILED = enum.auto()
FINISHED = "finished" FINISHED = enum.auto()
OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-success.sh" OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-fail.sh"
DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh" DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh" DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True) with open("/tmp/osdu_ingest_result", "w") as f:
f.close()
subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
def check_dag_status(dag_name: str) -> DagStatus: def check_dag_status(dag_name):
time.sleep(5) time.sleep(5)
output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}') output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}')
if "failed" in output: if "failed" in output:
print(dag_name) print(dag_name)
print(output) print(output)
return DagStatus.FAILED return DagStatus.FAILED
if "running" in output: if "running" in output:
return DagStatus.RUNNING return DagStatus.RUNNING
print(dag_name) print(dag_name)
...@@ -46,17 +47,32 @@ def check_dag_status(dag_name: str) -> DagStatus: ...@@ -46,17 +47,32 @@ def check_dag_status(dag_name: str) -> DagStatus:
return DagStatus.FINISHED return DagStatus.FINISHED
def test_dag_execution_result(dag_name: str, script: str, expected_status: DagStatus): def test_dag_success(dag_name, script):
print(f"Test {dag_name} success")
subprocess.run(f"{script}", shell=True)
while True:
dag_status = check_dag_status(dag_name)
if dag_status is DagStatus.RUNNING:
continue
elif dag_status is DagStatus.FINISHED:
return
else:
raise Exception(f"Error {dag_name} supposed to be finished")
def test_dag_fail(dag_name, script):
subprocess.run(f"{script}", shell=True) subprocess.run(f"{script}", shell=True)
print(f"Expecting {dag_name} to be {expected_status.value}") print(f"Expecting {dag_name} fail")
while True: while True:
dag_status = check_dag_status(dag_name) dag_status = check_dag_status(dag_name)
if dag_status is not DagStatus.RUNNING: if dag_status is DagStatus.RUNNING:
break continue
assert dag_status is expected_status, f"Error {dag_name} supposed to be {expected_status.value}" elif dag_status is DagStatus.FAILED:
return
else:
raise Exception(f"Error {dag_name} supposed to be failed")
test_dag_execution_result("Osdu_ingest", OSDU_INGEST_SUCCESS_SH, DagStatus.FINISHED) test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH)
test_dag_execution_result("Osdu_ingest", OSDU_INGEST_FAIL_SH, DagStatus.FAILED) test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH)
test_dag_execution_result("Default_ingest", DEFAULT_INGEST_SUCCESS_SH, DagStatus.FINISHED) test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH)
test_dag_execution_result("Default_ingest", DEFAULT_INGEST_FAIL_SH, DagStatus.FAILED) test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment