Skip to content
Snippets Groups Projects
validate_schema.py 10.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Copyright 2020 Google LLC
    #  Copyright 2020 EPAM Systems
    #
    #  Licensed under the Apache License, Version 2.0 (the "License");
    #  you may not use this file except in compliance with the License.
    #  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    import logging
    
    from typing import Union, Any, List
    
    
    import jsonschema
    import requests
    import tenacity
    
    from jsonschema import exceptions
    
    from libs.context import Context
    
    from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError
    
    from libs.mixins import HeadersMixin
    
    from libs.refresh_token import TokenRefresher, refresh_token
    
    
    logger = logging.getLogger()
    
    RETRIES = 3
    TIMEOUT = 1
    
    
    class OSDURefResolver(jsonschema.RefResolver):
    
        """Extends base jsonschema resolver for OSDU."""
    
    
        def __init__(self, schema_service: str, *args, **kwargs):
    
            """Implements the schema validatoe
    
            :param schema_service: The base url for schema service
            :type schema_service: str
            """
    
            super(OSDURefResolver, self).__init__(*args, **kwargs)
            self.schema_service = schema_service
    
        def resolve_fragment(self, document: dict, fragment: str) -> dict:
    
            """Extend base resolve_fragment method. If a nested schema has
            'definitions' field and there is a schema under this 'definitions',
            jsonschema attempts to use the id field of this double-nested schema
            as URI to get this schema later. So it has sense to replace this id
    
            with a correct one.
    
    
            :param document: The schema document
            :type document: dict
            :param fragment: schema fragment
            :type fragment: str
            :return: The updated schema document
            :rtype: dict
    
            """
            document = super().resolve_fragment(document, fragment)
            fragment_parts = fragment.split("/")  # /definitions/<OsduID> -> [..., <OsduID>]
            if len(fragment_parts) > 1:
                osdu_id = fragment_parts[-1]
                url = f"{self.schema_service}/{osdu_id}"
                document["$id"] = url
            return document
    
    
    class SchemaValidator(HeadersMixin):
        """Class to validate schema of Manifests."""
    
    
        def __init__(
            self, schema_service: str,
            token_refresher: TokenRefresher,
            context: Context
        ):
    
            """Init SchemaValidator.
    
            :param schema_service: The base OSDU Schema service url
            :type schema_service: str
            :param dagrun_conf: The airflow dagrun.conf
            :type dagrun_conf: dict
            :param token_refresher: An instance of token refresher
            :type token_refresher: TokenRefresher
            :param context: The tenant context
            :type context: Context
            """
    
            super().__init__(context)
            self.schema_service = schema_service
            self.context = context
    
            self.token_refresher = token_refresher
    
            self.resolver_handlers = {
                "osdu": self.get_schema_request,
                "https": self.get_schema_request,
                self.context.data_partition_id: self.get_schema_request
            }
    
        @tenacity.retry(
            wait=tenacity.wait_fixed(TIMEOUT),
            stop=tenacity.stop_after_attempt(RETRIES),
            reraise=True
        )
    
        @refresh_token()
    
        def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
    
            """Send request to Schema service to retrieve schema."""
    
            response = requests.get(uri, headers=headers, timeout=60)
            return response
    
    
        def __delete_refs(self, schema_part: Union[dict, list]):
            """
            Recursively clear a schema's object parts containing "$ref".
            This method is used by generic manifest validation, deleting these fields make such a
            validation.more generic.
            :param schema_part:
            """
            if isinstance(schema_part, dict):
                if "$ref" in schema_part:
                    schema_part.clear()
                else:
                    for k in schema_part:
                        self.__delete_refs(schema_part[k])
            elif isinstance(schema_part, list):
                for i in schema_part:
                    self.__delete_refs(i)
    
    
        def get_schema_request(self, uri: str) -> dict:
    
            """Get schema from Schema service. Change $id field to url.
    
            :param uri: The URI to fetch the schema
            :type uri: str
            :return: The Schema service response
            :rtype: dict
    
            """
            if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
                uri = f"{self.schema_service}/{uri}"
            response = self._get_schema_from_schema_service(self.request_headers, uri).json()
            response["$id"] = uri
            return response
    
        def get_schema(self, kind: str) -> dict:
    
            """Fetch schema from Schema service.
    
            :param kind: The kind of the scheema to fetch
            :type kind: str
            :raises e: Generic exception
            :return: Schema server response
            :rtype: dict
            """
    
            manifest_schema_uri = f"{self.schema_service}/{kind}"
            try:
                response = self.get_schema_request(manifest_schema_uri)
            except Exception as e:
                logger.error(f"Error on getting schema of kind '{kind}'")
                raise e
            return response
    
    
        def _validate_entity(self, entity: dict, schema: dict = None):
            """
            Validate the 'data' field of any entity against a schema got by entity's kind.
    
            """
            if not schema:
    
                schema = self.get_schema(entity["kind"])
    
            try:
                self._validate_against_schema(schema, data)
                logger.debug(f"Record successfully validated")
                return True
            except exceptions.ValidationError as exc:
                logger.error("Schema validation error. Data field.")
                logger.error(f"Manifest kind: {entity['kind']}")
                logger.error(f"Error: {exc}")
                return False
    
        def _validate_work_product(self, work_product: dict):
            """
            Validate WP manifest. Raise error if manifest is not valid.
    
            """
            for key, value in work_product.items():
                if key != "WorkProduct":
                    for component in value:
    
                        self._validate_entity(component)
    
                    self._validate_entity(value)
    
        def _validate_against_schema(self, schema: dict, data: Any):
            """
            Validate any data against schema.
            :param schema:
            :param data:
            :return:
            """
            resolver = OSDURefResolver(
                schema_service=self.schema_service,
                base_uri=schema.get("$id", ""),
                referrer=schema,
                handlers=self.resolver_handlers,
                cache_remote=True
            )
            validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
            validator.validate(data)
    
        def _validate_data_group(self, entities: list):
            """
            Validate each entity from a list of entities.
            :param entities:
            :return:
            """
            if isinstance(entities, list):
                for entity in entities:
                    self._validate_entity(entity)
    
        def _validate_whole_manifest(self, manifest: dict):
            """
            Validate any manifest in general.
            Also at this step verify that MasterData, ReferenceData, WorkProduct, WorkProductComponents,
            Files entities correspond their generic schemas, because references to those schemas are in
            a Manifest schema.
            """
            schema = self.get_schema(manifest["kind"])
            logger.debug(f"Validating kind {manifest['kind']}")
            self._validate_against_schema(schema, manifest)
    
        def validate_common_schema(self, manifest: dict) -> dict:
            """
            This is a preliminary validation of a manifest that verifies that a manifest corresponds
            the OSDU schemes at the highest level.
            This validation skips verifying each concrete entity by removing references to their schemas.
            :param manifest:
            :return: Manifest schema
            """
            if "manifest" not in manifest:
                raise EmptyManifestError
            schema = self.get_schema(manifest["manifest"]["kind"])
            schema_without_refs = copy.deepcopy(schema)
            if schema_without_refs.get("properties"):
                self.__delete_refs(schema_without_refs["properties"])
            else:
                self.__delete_refs(schema_without_refs)
            logger.debug("Schema without refs")
            logger.debug(f"{schema_without_refs}")
            self._validate_against_schema(schema, manifest)
            return schema
    
        def _validate_against_generic_schema(self, schema: str, entity: Any) -> bool:
            try:
                self._validate_against_schema(schema, entity)
                logger.debug(f"Record successfully validated against generic schema.")
                return True
            except exceptions.ValidationError as exc:
                logger.error("Schema validation error.")
    
                logger.error(f"Manifest kind: {entity['kind']}")
    
                logger.error(f"Manifest: {entity}")
                logger.error(f"Error: {exc}")
                return False
    
        def validate_manifest(self, manifest_records: List[dict]) -> List[dict]:
            """
            Validate manifest. Raise error if manifest is not valid.
    
            validated_records = []
            if not manifest_records:
    
                raise EmptyManifestError
    
            for manifest_record in manifest_records:
                manifest = manifest_record.get("entity")
    
                if isinstance(manifest, dict) and manifest.get("kind"):
    
                    generic_schema = self.get_schema(manifest_record.get("schema"))
                    validation_result = self._validate_against_generic_schema(generic_schema, manifest) \
                                        and self._validate_entity(manifest)
                    if validation_result:
                        validated_records.append(manifest_record)
    
                    raise NotOSDUSchemaFormatError(f"Not valid schema {manifest}")