Skip to content
Snippets Groups Projects
Commit e443c3bc authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM) Committed by Siarhei Khaletski (EPAM)
Browse files

GONRG-1732: Fix end2end tests

parent 253a7ceb
No related branches found
No related tags found
1 merge request!10Feature/providers logic split
Pipeline #24679 passed
......@@ -22,7 +22,7 @@ stages:
- deploy
pylint:
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
stage: linters
allow_failure: true
script:
......@@ -32,7 +32,7 @@ pylint:
- exit ${EXIT_CODE}
isort:
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
allow_failure: true
stage: linters
script:
......@@ -42,7 +42,7 @@ isort:
test_dags:
stage: test_dags
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
script:
- chmod +x tests/test_dags.sh
- tests/./test_dags.sh || EXIT_CODE=$?
......@@ -55,7 +55,7 @@ test_dags:
unit_tests:
stage: unit_tests
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
script:
- chmod +x tests/unit_tests.sh
- tests/./unit_tests.sh || EXIT_CODE=$?
......
......@@ -17,9 +17,12 @@
import configparser
import logging
import os
from airflow.models import Variable
from libs.refresh_token import AirflowTokenRefresher, refresh_token
from libs.context import Context
from libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.auth.authorization import authorize
from osdu_api.model.acl import Acl
from osdu_api.model.legal.legal import Legal
from osdu_api.model.legal.legal_compliance import LegalCompliance
......@@ -40,7 +43,8 @@ DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version")
@refresh_token(AirflowTokenRefresher())
@authorize(AirflowTokenRefresher())
def create_update_record_request(headers, record_client, record):
"""Send request to create or update records via RecordClient.
......@@ -49,7 +53,7 @@ def create_update_record_request(headers, record_client, record):
:param record: The record to create or update
:return: Storage service response
"""
resp = record_client.create_update_records([record], headers.items())
resp = record_client.create_update_records([record], headers)
return resp
......@@ -91,7 +95,7 @@ def create_records(**kwargs):
"AppKey": data_conf.get("app-key", "")
}
record_client = RecordClient()
record_client = RecordClient(AirflowTokenRefresher(), Context("test", "test"))
record_client.data_partition_id = data_conf.get(
"partition-id", DEFAULT_SOURCE)
resp = create_update_record_request(headers, record_client, record)
......
......@@ -25,6 +25,7 @@ from typing import Any, Callable, Union
import requests
from libs.exceptions import TokenRefresherNotPresentError
from osdu_api.libs.auth.authorization import TokenRefresher as OSDUAPITokenRefresher
from providers import credentials
from providers.types import BaseCredentials
from tenacity import retry, stop_after_attempt
......@@ -59,7 +60,7 @@ class TokenRefresher(ABC):
@property
@abstractmethod
def authorization_header(self) -> dict:
"""Authorization header. Must return authorization header for
"""Authorization header. Must return authorization header for
updating headers dict.
E.g. return {"Authorization": "Bearer <access_token>"}
......@@ -69,7 +70,7 @@ class TokenRefresher(ABC):
pass
class AirflowTokenRefresher(TokenRefresher):
class AirflowTokenRefresher(TokenRefresher, OSDUAPITokenRefresher):
"""Simple wrapper for credentials to be used in refresh_token decorator within Airflow."""
def __init__(self, creds: BaseCredentials = None):
......@@ -124,7 +125,7 @@ def make_callable_request(obj: Union[object, None], request_function: Callable,
:type request_function: Callable
:param headers: The request headers
:type headers: dict
:return: A partial callable
:return: A partial callable
:rtype: Callable
"""
if obj: # if wrapped function is an object's method
......@@ -190,7 +191,7 @@ def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
:param token_refresher: The token refresher instance
:type token_refresher: TokenRefresher
:raises e: Re-raises any requests.HTTPError
:return: The server response
:return: The server response
:rtype: requests.Response
"""
obj = kwargs.pop("obj", None)
......
......@@ -37,6 +37,7 @@ def index():
return 'index'
@app.route('/st', methods=['POST', 'GET', "PUT"])
@app.route('/st/api/storage/v2/records/', methods=['POST', 'GET', "PUT"])
def storage():
logger.info(request.json)
with open("/tmp/osdu_ingest_result", "w") as f:
......
airflow_vars: 1
......@@ -64,7 +64,9 @@ mkdir -p /usr/local/airflow/dags/
cp -rf src/* /usr/local/airflow/
cp -r tests/end-to-end-tests/mock-external-apis /mock-server
cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
cp tests/end-to-end-tests/osdu_api_config.yaml /mock-server/
cp tests/*.py /mock-server/
chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh}
......@@ -14,6 +14,7 @@
# limitations under the License.
import enum
import os
import subprocess
import time
......@@ -23,6 +24,7 @@ class DagStatus(enum.Enum):
FAILED = enum.auto()
FINISHED = enum.auto()
OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
......@@ -31,15 +33,17 @@ DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
with open("/tmp/osdu_ingest_result", "w") as f:
f.close()
os.environ["OSDU_API_CONFIG"] = "/mock-server/osdu_api_config.yaml"
subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
def check_dag_status(dag_name):
time.sleep(5)
output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}')
if "failed" in output:
print(dag_name)
print(output)
return DagStatus.FAILED
return DagStatus.FAILED
if "running" in output:
return DagStatus.RUNNING
print(dag_name)
......@@ -59,6 +63,7 @@ def test_dag_success(dag_name, script):
else:
raise Exception(f"Error {dag_name} supposed to be finished")
def test_dag_fail(dag_name, script):
subprocess.run(f"{script}", shell=True)
print(f"Expecting {dag_name} fail")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment