diff --git a/src/dags/libs/processors/single_manifest_processor.py b/src/dags/libs/processors/single_manifest_processor.py index 703ceeebc7af6f56fac477436e6502bb038fa96b..a66a20b268f53027b058189110d9ab54f5d6e9f6 100644 --- a/src/dags/libs/processors/single_manifest_processor.py +++ b/src/dags/libs/processors/single_manifest_processor.py @@ -91,13 +91,19 @@ class SingleManifestProcessor(object): 7) create an ingestion queue according to child-parent relationships between entities 8) process valid manifest entities one-by-one + :param with_validation: Flag for validation steps :return: List of record ids. """ record_ids = [] - self.data_integrity_validator.validate_manifest_data_integrity(manifest) - manifest_schema = self.schema_validator.validate_common_schema(manifest) + if with_validation: + self.data_integrity_validator.validate_manifest_data_integrity(manifest) + manifest_schema = self.schema_validator.validate_common_schema(manifest) + else: + manifest_schema = self.schema_validator.get_schema(self.schema_validator.get_manifest_kind(manifest)) + traversal = ManifestTraversal(manifest, manifest_schema) manifest_entities = traversal.traverse_manifest() + # This set is used inside manifest_analyzer to know what entities must be before schema validation and integrity check. # Used to detect orphaned entities in manifest_analyzer original_manifest_entities_ids = set( @@ -107,15 +113,19 @@ class SingleManifestProcessor(object): if with_validation: manifest_entities = self.schema_validator.validate_manifest(manifest_entities) logger.debug(f"valid schema entities count: {len(manifest_entities)}") - if with_validation: + self.referential_integrity_validator.ensure_integrity(manifest) logger.debug(f"valid referential integrity entities count: {len(manifest_entities)}") + manifest_analyzer = ManifestAnalyzer( manifest_entities, self.token_refresher, original_manifest_entities_ids ) + for record_id in self._process_records(manifest_analyzer): record_ids.append(record_id) + logger.info(f"Processed ids {record_ids}") + return record_ids diff --git a/src/dags/libs/validation/validate_schema.py b/src/dags/libs/validation/validate_schema.py index 25944d998d65600342ca63269fbe3b13ca65ee3a..95d969a8197a62d804bdaa08ffee5ef8fc60da80 100644 --- a/src/dags/libs/validation/validate_schema.py +++ b/src/dags/libs/validation/validate_schema.py @@ -295,6 +295,20 @@ class SchemaValidator(HeadersMixin): ) jsonschema.validate(schema=schema, instance=data, resolver=resolver) + @staticmethod + def get_manifest_kind(manifest: dict) -> str: + """ + Utility method to extract kind value + Raises GenericManifestSchemaError exception in case of absent `kind` property + + :param manifest: Manifest data + :return: Manifest's kind + """ + try: + return manifest["kind"] + except KeyError: + raise GenericManifestSchemaError("There is no kind in the Manifest.") + def validate_common_schema(self, manifest: dict) -> dict: """ This is a preliminary validation of a manifest that verifies that a manifest corresponds @@ -303,13 +317,11 @@ class SchemaValidator(HeadersMixin): :param manifest: :return: Manifest schema """ - if not manifest.get("kind"): - raise GenericManifestSchemaError(f"There is no kind in the Manifest.") - - schema = self.get_schema(manifest["kind"]) + kind = self.get_manifest_kind(manifest) + schema = self.get_schema(kind) if not schema: raise GenericManifestSchemaError( - f"There is no schema for Manifest kind {manifest['kind']}") + f"There is no schema for Manifest kind {kind}") schema_without_refs = copy.deepcopy(schema) if schema_without_refs.get("properties"):