validate_manifest_schema_by_reference.py 3.78 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#  Copyright 2021 Google LLC
#  Copyright 2021 EPAM Systems
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""
Validate Manifest against R3 schemas operator.
"""

import logging

from airflow.models import BaseOperator, Variable
from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import EmptyManifestError, GenericManifestSchemaError
from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher
from osdu_ingestion.libs.validation.validate_schema import SchemaValidator

from osdu_airflow.backward_compatibility.airflow_utils import apply_defaults
from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin

logger = logging.getLogger()


class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMixin):
    """Operator to validate manifest against definition schemasR3."""

    ui_color = '#dad5ff'
    ui_fgcolor = '#000000'

    @apply_defaults
    def __init__(self, previous_task_id: str = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.previous_task_id = previous_task_id
        self._show_skipped_ids = Variable.get(
            'core__config__show_skipped_ids', default_var=False
        )

    def execute(self, context: dict):
        """Execute manifest validation then process it.

        Get a single manifest file or a list of them.
        If it is a list, calculate which range (slice) of manifest files must be processed and then
        process this range one by one.

        :param context: Airflow context
        :type context: dict
        """
        logger.debug("Starting Validating manifest")
        execution_context = context["dag_run"].conf["execution_context"]
        payload_context = Context.populate(execution_context)
        token_refresher = AirflowTokenRefresher()

        logger.debug(f"DATA_TYPES_WITH_SURROGATE_KEYS: {DATA_TYPES_WITH_SURROGATE_KEYS}")
        logger.debug(f"SURROGATE_KEYS_PATHS: {SURROGATE_KEYS_PATHS}")

        schema_validator = SchemaValidator(
            token_refresher,
            payload_context,
            surrogate_key_fields_paths=SURROGATE_KEYS_PATHS,
            data_types_with_surrogate_ids=DATA_TYPES_WITH_SURROGATE_KEYS
        )

        manifest_data = self._get_manifest_data(context, execution_context)
75
76
        # manifest_data = self._get_manifest_data_by_reference(context, execution_context, True) # use the history because "check_payload_type" dosent return the id
        logger.debug(f"Manifest data: {manifest_data}")
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95


        if not manifest_data:
            raise EmptyManifestError(
                f"Data {context['dag_run'].conf} doesn't contain 'manifest field'")

        _ = schema_validator.validate_common_schema(manifest_data)
        try:
            valid_manifest_file, skipped_entities = schema_validator.ensure_manifest_validity(
                manifest_data
            )
        except GenericManifestSchemaError as err:
            context["ti"].xcom_push(key="skipped_ids", value=str(err))
            raise err
        if self._show_skipped_ids:
            context["ti"].xcom_push(key="skipped_ids", value=skipped_entities)
    
        return self._put_manifest_data_by_reference(context, execution_context, valid_manifest_file)