Skip to content
Snippets Groups Projects
test_dags.py 2.53 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Copyright 2020 Google LLC
    #  Copyright 2020 EPAM Systems
    #
    #  Licensed under the Apache License, Version 2.0 (the "License");
    #  you may not use this file except in compliance with the License.
    #  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    import enum
    import subprocess
    import time
    
    
    class DagStatus(enum.Enum):
    
        RUNNING = enum.auto()
        FAILED = enum.auto()
        FINISHED = enum.auto()
    
    OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
    OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
    
    DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
    DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
    
    
    with open("/tmp/osdu_ingest_result", "w") as f:
        f.close()
    
    subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
    
    def check_dag_status(dag_name):
    
        time.sleep(5)
        output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}')
        if "failed" in output:
            print(dag_name)
            print(output)
    
        if "running" in output:
            return DagStatus.RUNNING
        print(dag_name)
        print(output)
        return DagStatus.FINISHED
    
    
    
    def test_dag_success(dag_name, script):
        print(f"Test {dag_name} success")
        subprocess.run(f"{script}", shell=True)
        while True:
            dag_status = check_dag_status(dag_name)
            if dag_status is DagStatus.RUNNING:
                continue
            elif dag_status is DagStatus.FINISHED:
                return
            else:
                raise Exception(f"Error {dag_name} supposed to be finished")
    
    def test_dag_fail(dag_name, script):
    
        subprocess.run(f"{script}", shell=True)
    
        print(f"Expecting {dag_name} fail")
    
        while True:
            dag_status = check_dag_status(dag_name)
    
            if dag_status is DagStatus.RUNNING:
                continue
            elif dag_status is DagStatus.FAILED:
                return
            else:
                raise Exception(f"Error {dag_name} supposed to be failed")
    
    test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH)
    test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH)
    test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH)
    test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)