Commit da27e267 authored by Dmitriy Rudko's avatar Dmitriy Rudko 💬
Browse files

#16 - Added example of unit tests. Refactored run on integration tests in CI pipline

parent b8233319
Pipeline #3130 passed with stages
in 5 minutes and 43 seconds
......@@ -15,66 +15,16 @@
image: google/cloud-sdk:alpine
stages:
- unit_tests
- test_dags
test_dags:
variables:
ACL: '{"viewers": ["foo"],"owners": ["foo"]}'
LEGAL: '{"legaltags": ["foo"], "otherRelevantDataCountries": ["FR", "US", "CA"],"status": "compliant"}'
WORKFLOW_URL: http://127.0.0.1:5000/wf
STORAGE_URL: http://127.0.0.1:5000/st
LOCALHOST: http://127.0.0.1:5000
SEARCH_CONN_ID: http://127.0.0.1:5000
WORKFLOW_CONN_ID: http://127.0.0.1:5000
DATALOAD_CONFIG_PATH: "dataload.ini"
stage: test
stage: test_dags
image: johnybear/osdu-airflow:python36
script:
- mkdir -p /usr/local/airflow/dags/
- cp -rf . /usr/local/airflow/
- cp -r tests/mock-external-apis /mock-server
- cp -r tests/mock-data /mock-server/mock-data
- cp tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
- cp tests/*.py /mock-server/
- airflow initdb > /dev/null 2>&1
# exclude testing DAGS
- sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg
# turn on all dags
- sed -i 's/dags_are_paused_at_creation = True/dags_are_paused_at_creation = False/' /usr/local/airflow/airflow.cfg
# plugins path
- sed -i 's/dags_are_paused_at_creation = True/dags_are_paused_at_creation = False/' /usr/local/airflow/airflow.cfg
- airflow resetdb -y > /dev/null 2>&1
- airflow initdb > /dev/null 2>&1
- airflow variables -s storage_url $STORAGE_URL
- airflow variables -s id_token http://127.0.0.1:5000/storage
- echo $ACL
- airflow variables -s acl "$ACL" > /dev/null 2>&1
- airflow variables -s data_partition_id odes > /dev/null 2>&1
- airflow variables -s entitlements_module_name entitlements_client > /dev/null 2>&1
- airflow variables -s legal "$LEGAL" > /dev/null 2>&1
- airflow variables -s provider gcp > /dev/null 2>&1
- airflow variables -s record_kind "odes:osdu:file:0.2.0" > /dev/null 2>&1
- airflow variables -s schema_version "0.2.0" > /dev/null 2>&1
- airflow variables -s workflow_url $WORKFLOW_URL > /dev/null 2>&1
- airflow variables -s update_status_ep wf/us > /dev/null 2>&1
- airflow variables -s search_url $LOCALHOST > /dev/null 2>&1
- airflow variables -s dataload_config_path $DATALOAD_CONFIG_PATH > /dev/null 2>&1
- airflow variables -s search_query_ep sr/qr > /dev/null 2>&1
- airflow variables -s dataload_config_path /usr/local/airflow/dags/configs/dataload.ini > /dev/null 2>&1
- airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID
- airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
- chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh}
- export FLASK_APP=/mock-server/app.py && flask run &
- sleep 5
# - cd /mock-server && ./test-osdu-ingest-success.sh &
- timeout 700 python /mock-server/test_dags.py || EXIT_CODE=$?
- cp -r /usr/local/airflow/logs logs/
- ls logs/
- chmod +x tests/test_dags.sh
- tests/./test_dags.sh || EXIT_CODE=$?
- exit ${EXIT_CODE}
artifacts:
paths:
......@@ -82,5 +32,10 @@ test_dags:
when: on_failure
expire_in: 1 week
stages:
- test
\ No newline at end of file
unit_tests:
stage: unit_tests
image: johnybear/osdu-airflow:python36
script:
- chmod +x tests/unit_tests.sh
- tests/./unit_tests.sh || EXIT_CODE=$?
- exit ${EXIT_CODE}
\ No newline at end of file
from airflow.plugins_manager import AirflowPlugin
from operators import UpdateStatusOperator, ProcessManifestOperator
from .operators import UpdateStatusOperator, ProcessManifestOperator
# Defining the plugin class
class OSDUPlugin(AirflowPlugin):
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .http_hooks import *
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.hooks.http_hook import HttpHook
workflow_hook = HttpHook(http_conn_id='workflow', method="POST")
......
from .update_status_tasks import UpdateStatusOperator
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .update_status_op import UpdateStatusOperator
from .process_manifest_op import ProcessManifestOperator
__all__ = [
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import enum
import json
import tenacity
from functools import partial
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.models import Variable
from hooks import workflow_hook, search_http_hook
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
class UpdateStatusOperator(BaseOperator):
ui_color = '#10ECAA'
ui_fgcolor = '#000000'
FINISHED_STATUS = "finished"
RUNNING_STATUS = "running"
FAILED_STATUS = "failed"
class prev_ti_state(enum.Enum):
NONE = enum.auto()
SUCCESS = enum.auto()
FAILED = enum.auto()
@apply_defaults
def __init__( self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.workflow_hook = workflow_hook
self.search_hook = search_http_hook
@staticmethod
def _file_searched(resp, expected_total_count):
"""Check if search service returns totalCount.
The method is used as a callback
"""
data = resp.json()
return data.get("totalCount") == expected_total_count
def get_headers(self, **kwargs):
data_conf = kwargs['dag_run'].conf
# for /submitWithManifest authorization and partition-id are inside Payload field
if "Payload" in data_conf:
auth = data_conf["Payload"]["authorization"]
partition_id = data_conf["Payload"]["data-partition-id"]
else:
auth = data_conf["authorization"]
partition_id = data_conf["data-partition-id"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
'Authorization': auth,
}
return headers
def search_files(self, **kwargs):
def create_query(record_ids):
expected_total_count = len(record_ids)
record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids)
logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})"
return query, expected_total_count
record_ids_default = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records')
record_ids_manifest = kwargs["ti"].xcom_pull(key="record_ids", task_ids='proccess_manifest_task')
if record_ids_default:
query, expected_total_count = create_query(record_ids_default)
elif record_ids_manifest:
query, expected_total_count = create_query(record_ids_manifest)
else:
logger.error("There are no record ids")
sys.exit(2)
headers = self.get_headers(**kwargs)
request_body = {
"kind": "*:*:*:*",
"query": query
}
retry_opts = {
"wait": tenacity.wait_exponential(multiplier=5),
"stop": tenacity.stop_after_attempt(5),
"retry": tenacity.retry_if_not_result(
partial(self._file_searched, expected_total_count=expected_total_count)
)
}
self.search_hook.run_with_advanced_retry(
endpoint=Variable.get("search_query_ep"),
headers=headers,
data=json.dumps(request_body),
_retry_args=retry_opts
)
def previous_ti_statuses(self, context):
dagrun = context['ti'].get_dagrun()
failed_ti, success_ti = dagrun.get_task_instances(state='failed'), dagrun.get_task_instances(state='success')
if not failed_ti and not success_ti: # There is no prev task so it can't have been failed
logger.info("There are no tasks before this one. So it has status RUNNING")
return self.prev_ti_state.NONE
if failed_ti:
logger.info("There are failed tasks before this one. So it has status FAILED")
return self.prev_ti_state.FAILED
logger.info("There are successed tasks before this one. So it has status SUCCESSED")
return self.prev_ti_state.SUCCESS
def pre_execute(self, context):
prev_tis = self.previous_ti_statuses(context)
if prev_tis is self.prev_ti_state.NONE:
self.status = self.RUNNING_STATUS
elif prev_tis is self.prev_ti_state.FAILED:
self.status = self.FAILED_STATUS
elif prev_tis is self.prev_ti_state.SUCCESS:
self.status = self.FINISHED_STATUS
def execute(self, context):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED
"""
if self.status in (self.RUNNING_STATUS, self.FAILED_STATUS):
self.update_status_rqst(self.status, **context)
elif self.status == self.FINISHED_STATUS:
try:
self.search_files(**context)
except Exception as e:
logger.error(str(e))
self.status = self.FAILED_STATUS
self.update_status_rqst(self.FAILED_STATUS, **context)
else:
self.update_status_rqst(self.FINISHED_STATUS, **context)
if self.status == self.FAILED_STATUS:
raise Exception("Dag failed")
def update_status_rqst(self, status, **kwargs):
data_conf = kwargs['dag_run'].conf
logger.info(f"Got dataconf {data_conf}")
workflow_id = data_conf["WorkflowID"]
headers = self.get_headers(**kwargs)
request_body = {
"WorkflowID": workflow_id,
"Status": status
}
logger.info(f" Sending request '{status}'")
self.workflow_hook.run(
endpoint=Variable.get("update_status_ep"),
data=json.dumps(request_body),
headers=headers
)
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .process_manifest_op import *
\ No newline at end of file
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ACL_DICT = {'viewers': ['data.default.viewers@odes.osdu.test.net'],'owners': ['data.default.owners@odes.osdu.test.net']}
LEGAL_DICT = {'legaltags': ['odes-demo-legaltag'], 'otherRelevantDataCountries': ['FR', 'US', 'CA'],'status': 'compliant'}
CONF = {
"WorkProduct": {
"ResourceTypeID": "srn:type:work-product/WellLog:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"Data": {
"GroupTypeProperties": {
"Components": []
},
"IndividualTypeProperties": {
"Name": "Test AKM LOG 111",
"Description": "Well Log"
},
"ExtensionProperties": {}
},
"ComponentsAssociativeIDs": [
"wpc-1"
]
},
"WorkProductComponents": [
{
"ResourceTypeID": "srn:type:work-product-component/WellLog:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"Data": {
"GroupTypeProperties": {
"Files": [],
"Artefacts": []
},
"AssociativeID": "wpc-1",
"FileAssociativeIDs": [
"f-1"
]
}
}
],
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "test",
"AppKey": "test",
"kind_version": "3.0.0"
},
"Files": [
{
"ResourceTypeID": "srn:type:file/las2:",
"ResourceSecurityClassification": "srn:reference-data/ResourceSecurityClassification:RESTRICTED:",
"Data": {
"GroupTypeProperties": {
"FileSource": "",
"PreLoadFilePath": "foo"
},
"IndividualTypeProperties": {},
"ExtensionProperties": {}
},
"AssociativeID": "f-1"
}
],
"WorkflowID": "foo"
}
PROCESS_FILE_ITEMS_RESULT = (
[
(
{
'kind': 'test:osdu:file:3.0.0',
'legal': {'legaltags': ['odes-demo-legaltag'], 'otherRelevantDataCountries': ['US'], 'status': 'compliant'},
'acl': {'viewers': ['data.default.viewers@odes.osdu.test.net'],
'owners': ['data.default.owners@odes.osdu.test.net']},
'data': {
'ResourceTypeID': 'srn:type:file/las2:',
'ResourceSecurityClassification': 'srn:reference-data/ResourceSecurityClassification:RESTRICTED:',
'Data': {'GroupTypeProperties': {'FileSource': '', 'PreLoadFilePath': 'foo'}, 'IndividualTypeProperties': {}, 'ExtensionProperties': {}},
'AssociativeID': 'f-1',
'ResourceID': ""
}
},
'File'
)
],
['srn:file/las2:434064998475386:']
)
LOADED_CONF = {
"acl": ACL_DICT,
"legal_tag": LEGAL_DICT,
"data_object": CONF
}
CONF_PAYLOAD = CONF["Payload"]
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import pytest
import sys
sys.path.insert(0, "..")
sys.path.insert(0, "../../dags")
# we use tries here to omit import errors when we start DagBag
# because Airflow imports plugins in its own way
try:
from plugins.operators import process_manifest_op as p_m_op
except:
pass
try:
from plugins.tests.data import process_manifest_op as test_data
except ModuleNotFoundError:
pass
print(sys.path)
@pytest.mark.parametrize(
"test_input, expected",
[
("srn:type:work-product/WellLog:", "WellLog"),
("srn:type:file/las2:", "las2"),
]
)
def test_determine_data_type(test_input, expected):
data_type = p_m_op.determine_data_type(test_input)
assert data_type == expected
@pytest.mark.parametrize(
"data_type, loaded_conf, conf_payload, expected_file_result",
[
("las2", test_data.LOADED_CONF, test_data.CONF_PAYLOAD, test_data.PROCESS_FILE_ITEMS_RESULT)
]
)
def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result):
file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:")
expected_file_list = expected_file_result[0]
file_list, file_ids = p_m_op.process_file_items(loaded_conf, conf_payload)
for i in file_ids:
assert file_id_regex.match(i)
for i in file_list:
assert file_id_regex.match(i[0]["data"]["ResourceID"])
i[0]["data"]["ResourceID"] = ""
assert file_list == expected_file_list
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
export ACL='{"viewers": ["foo"],"owners": ["foo"]}'
export LEGAL='{"legaltags": ["foo"], "otherRelevantDataCountries": ["FR", "US", "CA"],"status": "compliant"}'
export WORKFLOW_URL="http://127.0.0.1:5000/wf"
export STORAGE_URL="http://127.0.0.1:5000/st"
export
export LOCALHOST="http://127.0.0.1:5000"
export SEARCH_CONN_ID="http://127.0.0.1:5000"
export WORKFLOW_CONN_ID="http://127.0.0.1:5000"
export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini"
mkdir -p /usr/local/airflow/dags/
cp -rf . /usr/local/airflow/
cp -r tests/mock-external-apis /mock-server
cp -r tests/mock-data /mock-server/mock-data
cp tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
cp tests/*.py /mock-server/
airflow initdb > /dev/null 2>&1
# exclude testing DAGS
sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airflow.cfg
# turn on all dags
sed -i 's/dags_are_paused_at_creation = True/dags_are_paused_at_creation = False/' /usr/local/airflow/airflow.cfg
airflow variables -s storage_url $STORAGE_URL
airflow variables -s id_token http://127.0.0.1:5000/storage
airflow variables -s acl "$ACL"
airflow variables -s data_partition_id odes
airflow variables -s entitlements_module_name entitlements_client
airflow variables -s legal "$LEGAL"
airflow variables -s provider gcp
airflow variables -s record_kind "odes:osdu:file:0.2.0"
airflow variables -s schema_version "0.2.0"
airflow variables -s workflow_url $WORKFLOW_URL
airflow variables -s update_status_ep wf/us
airflow variables -s search_url $LOCALHOST
airflow variables -s dataload_config_path $DATALOAD_CONFIG_PATH
airflow variables -s search_query_ep sr/qr
airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID
airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh}
chmod +x tests/set_airflow_env.sh
tests/./set_airflow_env.sh > /dev/null 2>&1
rm -r /usr/local/airflow/plugins/tests
export FLASK_APP=/mock-server/app.py && flask run &
sleep 5
timeout 700 python /mock-server/test_dags.py || EXIT_CODE=$?
cp -r /usr/local/airflow/logs logs/
exit $EXIT_CODE
pip install pytest
chmod +x tests/set_airflow_env.sh
tests/./set_airflow_env.sh > /dev/null 2>&1
cd /usr/local/airflow/plugins/tests