Skip to content
Snippets Groups Projects
Commit 2d1ee64d authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'trusted-manifest-ingestion-fixes' into 'master'

Fix for FileSource; fix for pipelines with invalid manifest

See merge request !32
parents e52f30bf 64df438a
No related branches found
No related tags found
1 merge request!32Fix for FileSource; fix for pipelines with invalid manifest
Pipeline #32614 passed
...@@ -54,6 +54,7 @@ class FileSourceError(Exception): ...@@ -54,6 +54,7 @@ class FileSourceError(Exception):
class UploadFileError(Exception): class UploadFileError(Exception):
"""Raise when there is an error while uploading a file into OSDU.""" """Raise when there is an error while uploading a file into OSDU."""
class TokenRefresherNotPresentError(Exception): class TokenRefresherNotPresentError(Exception):
"""Raise when token refresher is not present in "refresh_token' decorator.""" """Raise when token refresher is not present in "refresh_token' decorator."""
pass pass
...@@ -63,6 +64,7 @@ class NoParentEntitySystemSRNError(Exception): ...@@ -63,6 +64,7 @@ class NoParentEntitySystemSRNError(Exception):
"""Raise when parent entity doesn't have system-generated SRN.""" """Raise when parent entity doesn't have system-generated SRN."""
pass pass
class NoParentEntitySystemSRNError(Exception): class NoParentEntitySystemSRNError(Exception):
""" """
Raise when parent entity doesn't have system-generated SRN. Raise when parent entity doesn't have system-generated SRN.
...@@ -72,3 +74,7 @@ class NoParentEntitySystemSRNError(Exception): ...@@ -72,3 +74,7 @@ class NoParentEntitySystemSRNError(Exception):
class InvalidFileRecordData(Exception): class InvalidFileRecordData(Exception):
"""Raise when file data does not contain mandatory fields.""" """Raise when file data does not contain mandatory fields."""
class GenericManifestSchemaError(Exception):
"""Raise when a generic manifest schema is invalid."""
...@@ -35,7 +35,7 @@ class FileSourceValidator: ...@@ -35,7 +35,7 @@ class FileSourceValidator:
:return: True if validation succeed or field was populated. :return: True if validation succeed or field was populated.
:rtype: bool :rtype: bool
""" """
if file_source_info["FileSource"]: if file_source_info["FileSource"].strip():
return True return True
else: else:
return False return False
......
...@@ -17,14 +17,14 @@ ...@@ -17,14 +17,14 @@
import copy import copy
import logging import logging
from typing import Any, List, Union, Tuple, Dict from typing import Any, List, Union, Tuple, Union
import jsonschema import jsonschema
import requests import requests
import tenacity import tenacity
from jsonschema import exceptions from jsonschema import exceptions
from libs.context import Context from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError from libs.exceptions import GenericManifestSchemaError, NotOSDUSchemaFormatError
from libs.traverse_manifest import ManifestEntity from libs.traverse_manifest import ManifestEntity
from libs.mixins import HeadersMixin from libs.mixins import HeadersMixin
from osdu_api.libs.auth.authorization import TokenRefresher, authorize from osdu_api.libs.auth.authorization import TokenRefresher, authorize
...@@ -144,7 +144,7 @@ class SchemaValidator(HeadersMixin): ...@@ -144,7 +144,7 @@ class SchemaValidator(HeadersMixin):
response["$id"] = uri response["$id"] = uri
return response return response
def get_schema(self, kind: str) -> dict: def get_schema(self, kind: str) -> Union[dict, None]:
"""Fetch schema from Schema service. """Fetch schema from Schema service.
:param kind: The kind of the scheema to fetch :param kind: The kind of the scheema to fetch
...@@ -156,9 +156,10 @@ class SchemaValidator(HeadersMixin): ...@@ -156,9 +156,10 @@ class SchemaValidator(HeadersMixin):
manifest_schema_uri = f"{self.schema_service}/{kind}" manifest_schema_uri = f"{self.schema_service}/{kind}"
try: try:
response = self.get_schema_request(manifest_schema_uri) response = self.get_schema_request(manifest_schema_uri)
except Exception as e: except requests.HTTPError as e:
logger.error(f"Error on getting schema of kind '{kind}'") logger.error(f"Error on getting schema of kind '{kind}'")
raise e logger.error(e)
return None
return response return response
@staticmethod @staticmethod
...@@ -240,19 +241,32 @@ class SchemaValidator(HeadersMixin): ...@@ -240,19 +241,32 @@ class SchemaValidator(HeadersMixin):
""" """
Add 'surrogate-key:.+' to patterns of specific fields. Add 'surrogate-key:.+' to patterns of specific fields.
:param schema: :param schema: Original schema.
:return: :return: Extended schema.
""" """
schema = self._extend_id_patterns_with_surrogate_keys(schema) schema = self._extend_id_patterns_with_surrogate_keys(schema)
schema = self._extend_referential_patterns_with_surrogate_keys(schema) schema = self._extend_referential_patterns_with_surrogate_keys(schema)
return schema return schema
def _validate_entity(self, entity: dict, schema: dict = None): def _validate_entity(self, entity: dict, schema: dict = None) -> bool:
""" """
Validate the 'data' field of any entity against a schema got by entity's kind. Validate an entity against a schema.
If the entity is valid, then return True, otherwise, False.
If the schema isn't passed, get it from Schema service.
:param entity: A manifest's entity.
:param schema: The schema to validate an entity against.
:return: Validation result.
""" """
if not entity.get("kind"):
return False
if not schema: if not schema:
schema = self.get_schema(entity["kind"]) schema = self.get_schema(entity["kind"])
if not schema:
logger.warning(f"{entity['kind']} is not present in Schema service.")
return False
try: try:
schema = self._add_surrogate_keys_to_patterns(schema) schema = self._add_surrogate_keys_to_patterns(schema)
self._validate_against_schema(schema, entity) self._validate_against_schema(schema, entity)
...@@ -266,9 +280,10 @@ class SchemaValidator(HeadersMixin): ...@@ -266,9 +280,10 @@ class SchemaValidator(HeadersMixin):
def _validate_against_schema(self, schema: dict, data: Any): def _validate_against_schema(self, schema: dict, data: Any):
""" """
Validate any data against schema. Validate any data against schema. If the data is not valid, raises ValidationError.
:param schema:
:param data: :param schema: The schema to validate an entity against.
:param data: Any data to validate against schema.
:return: :return:
""" """
resolver = OSDURefResolver( resolver = OSDURefResolver(
...@@ -288,7 +303,14 @@ class SchemaValidator(HeadersMixin): ...@@ -288,7 +303,14 @@ class SchemaValidator(HeadersMixin):
:param manifest: :param manifest:
:return: Manifest schema :return: Manifest schema
""" """
if not manifest.get("kind"):
raise GenericManifestSchemaError(f"There is no kind in the Manifest.")
schema = self.get_schema(manifest["kind"]) schema = self.get_schema(manifest["kind"])
if not schema:
raise GenericManifestSchemaError(
f"There is no schema for Manifest kind {manifest['kind']}")
schema_without_refs = copy.deepcopy(schema) schema_without_refs = copy.deepcopy(schema)
if schema_without_refs.get("properties"): if schema_without_refs.get("properties"):
self._clear_data_fields(schema_without_refs["properties"]) self._clear_data_fields(schema_without_refs["properties"])
...@@ -296,7 +318,12 @@ class SchemaValidator(HeadersMixin): ...@@ -296,7 +318,12 @@ class SchemaValidator(HeadersMixin):
self._clear_data_fields(schema_without_refs) self._clear_data_fields(schema_without_refs)
logger.debug("Schema without refs") logger.debug("Schema without refs")
logger.debug(f"{schema_without_refs}") logger.debug(f"{schema_without_refs}")
self._validate_against_schema(schema_without_refs, manifest)
try:
self._validate_against_schema(schema_without_refs, manifest)
except jsonschema.ValidationError as err:
raise GenericManifestSchemaError(f"Manifest failed generic schema validation.\n {err}")
return schema return schema
def validate_manifest(self, manifest_records: List[ManifestEntity]) -> List[ManifestEntity]: def validate_manifest(self, manifest_records: List[ManifestEntity]) -> List[ManifestEntity]:
......
...@@ -27,7 +27,7 @@ from airflow.models import BaseOperator, Variable ...@@ -27,7 +27,7 @@ from airflow.models import BaseOperator, Variable
from jsonschema import SchemaError from jsonschema import SchemaError
from libs.context import Context from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError, \ from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError, \
UploadFileError, GetSchemaError UploadFileError, GetSchemaError, GenericManifestSchemaError
from libs.source_file_check import SourceFileChecker from libs.source_file_check import SourceFileChecker
from libs.handle_file import FileHandler from libs.handle_file import FileHandler
from libs.refresh_token import AirflowTokenRefresher from libs.refresh_token import AirflowTokenRefresher
...@@ -96,8 +96,10 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin): ...@@ -96,8 +96,10 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
for single_manifest in manifest[slice_start_index:slice_end_index]: for single_manifest in manifest[slice_start_index:slice_end_index]:
logger.debug(f"processing {single_manifest}") logger.debug(f"processing {single_manifest}")
try: try:
record_ids.extend(single_manifest_processor.process_manifest(single_manifest, True)) record_ids.extend(
except (UploadFileError, HTTPError, GetSchemaError, SchemaError) as e: single_manifest_processor.process_manifest(single_manifest, True))
except (UploadFileError, HTTPError, GetSchemaError, SchemaError,
GenericManifestSchemaError) as e:
logger.error(f"Can't process {single_manifest}") logger.error(f"Can't process {single_manifest}")
logger.error(e) logger.error(e)
continue continue
......
...@@ -52,6 +52,22 @@ class TestFileSourceValidator: ...@@ -52,6 +52,22 @@ class TestFileSourceValidator:
for i in range(len(expected_datasets)): for i in range(len(expected_datasets)):
assert expected_datasets[i] == filtered_datasets[i] assert expected_datasets[i] == filtered_datasets[i]
@pytest.mark.parametrize("input_datasets_path,wrong_file_source", [
pytest.param(FILES_SOURCE_VALID, " "),
pytest.param(FILES_SOURCE_VALID, " "),
pytest.param(FILES_SOURCE_VALID, "\t"),
])
def test_FileSource_space_chars(self, provide_datasets: list, input_datasets_path: str, wrong_file_source: str):
"""Test invalid inputs with spaces."""
file_source_validator = FileSourceValidator()
for ds in provide_datasets:
ds["data"]["DatasetProperties"]["FileSourceInfo"]["FileSource"] = wrong_file_source
filtered_datasets = file_source_validator.filter_valid_datasets(provide_datasets)
filtered_datasets = sorted(filtered_datasets, key=lambda k: k["id"])
assert not filtered_datasets
@pytest.mark.parametrize("input_datasets_path", [ @pytest.mark.parametrize("input_datasets_path", [
pytest.param(FILES_SOURCE_INVALID), pytest.param(FILES_SOURCE_INVALID),
pytest.param(FILE_COLLECTIONS_INVALID), pytest.param(FILE_COLLECTIONS_INVALID),
......
...@@ -194,8 +194,7 @@ class TestSchemaValidator: ...@@ -194,8 +194,7 @@ class TestSchemaValidator:
"get", "get",
lambda *args, **kwargs: MockSchemaResponse("{}", lambda *args, **kwargs: MockSchemaResponse("{}",
http.HTTPStatus.INTERNAL_SERVER_ERROR)) http.HTTPStatus.INTERNAL_SERVER_ERROR))
with pytest.raises(requests.HTTPError): assert not schema_validator.get_schema(kind)
schema_validator.get_schema(kind)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"manifest_file,schema_file", "manifest_file,schema_file",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment