Newer
Older
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import subprocess
import time
RUNNING = enum.auto()
FAILED = enum.auto()
FINISHED = enum.auto()
OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
with open("/tmp/osdu_ingest_result", "w") as f:
f.close()
subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
def check_dag_status(dag_name):
time.sleep(5)
output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}')
if "failed" in output:
print(dag_name)
print(output)
return DagStatus.FAILED
if "running" in output:
return DagStatus.RUNNING
print(dag_name)
print(output)
return DagStatus.FINISHED
def test_dag_success(dag_name, script):
print(f"Test {dag_name} success")
subprocess.run(f"{script}", shell=True)
while True:
dag_status = check_dag_status(dag_name)
if dag_status is DagStatus.RUNNING:
continue
elif dag_status is DagStatus.FINISHED:
return
else:
raise Exception(f"Error {dag_name} supposed to be finished")
def test_dag_fail(dag_name, script):
subprocess.run(f"{script}", shell=True)
print(f"Expecting {dag_name} fail")
while True:
dag_status = check_dag_status(dag_name)
if dag_status is DagStatus.RUNNING:
continue
elif dag_status is DagStatus.FAILED:
return
else:
raise Exception(f"Error {dag_name} supposed to be failed")
test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH)
test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH)
test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH)
test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)