diff --git a/src/dags/libs/processors/single_manifest_processor.py b/src/dags/libs/processors/single_manifest_processor.py index 2a35cc1d65035a7df14847dd4b3fc62a5927cbff..ac9d04f73e04fc996a91eb9b1f403f4d74e0e568 100644 --- a/src/dags/libs/processors/single_manifest_processor.py +++ b/src/dags/libs/processors/single_manifest_processor.py @@ -72,7 +72,7 @@ class SingleManifestProcessor: )[FIRST_STORED_RECORD_INDEX] entity_node.system_srn = record_id except Exception as e: - raise ProcessRecordError(entity_node.entity_info.entity, f"{e}") + raise ProcessRecordError(entity_node.entity_info.entity, f"{e}"[:128]) return record_id def _process_records(self, manifest_analyzer: ManifestAnalyzer) -> Tuple[List[str], List[dict]]: diff --git a/src/dags/libs/utils.py b/src/dags/libs/utils.py index 437787694509a3f4f7d5b29b9259a6dfeff7ed2e..22b8dba5afdc17f2ff2c143018c2b8f3a9980e25 100644 --- a/src/dags/libs/utils.py +++ b/src/dags/libs/utils.py @@ -63,7 +63,7 @@ def create_skipped_entity_info(entity: Any, reason: str) -> dict: skipped_entity_info = { "id": entity.get("id", ""), "kind": entity.get("kind", ""), - "reason": reason[:128] + "reason": reason } else: skipped_entity_info = { diff --git a/src/dags/libs/validation/validate_file_source.py b/src/dags/libs/validation/validate_file_source.py index 8eb5559b81788c34d05b0d05adefae5d8ef9fcbe..c6a06ea00c5b7bf63631a302ecd227d1d0ece45f 100644 --- a/src/dags/libs/validation/validate_file_source.py +++ b/src/dags/libs/validation/validate_file_source.py @@ -88,16 +88,17 @@ class FileSourceValidator: logger.error( f"Rejecting invalid dataset: {dataset.get('id')}, invalid structure. KeyError: {exc}" ) - raise DatasetValidationError(dataset, f"Invalid structure. KeyError: {exc}") + raise DatasetValidationError(dataset, f"Invalid structure. KeyError: {exc}"[:128]) elif is_file_collection: try: is_valid_dataset = self._validate_file_collection_record(dataset) except KeyError as exc: logger.error( - f"Rejecting invalid dataset: {dataset.get('id')}, invalid structure. KeyError: {exc}" + f"Rejecting invalid dataset: {dataset.get('id')}, " + f"invalid structure. KeyError: {exc}"[:128] ) - raise DatasetValidationError(dataset, f"Invalid structure. KeyError: {exc}") + raise DatasetValidationError(dataset, f"Invalid structure. KeyError: {exc}"[:128]) else: logger.error(f"Unknown file type: {dataset.get('kind')}.") diff --git a/src/dags/libs/validation/validate_referential_integrity.py b/src/dags/libs/validation/validate_referential_integrity.py index ac7f9d471975b16a29772dff09448ffc9db69e60..3c0b0db29458ab9c2822b0f69ab9bfe5f403e5b3 100644 --- a/src/dags/libs/validation/validate_referential_integrity.py +++ b/src/dags/libs/validation/validate_referential_integrity.py @@ -240,7 +240,8 @@ class ManifestIntegrity: f"the WPC's 'Datasets': {duplicated_ids}." ) raise ValidationIntegrityError(wpc, - reason=f"It has duplicated Datasets: {duplicated_ids}.") + reason=f"It has duplicated " + f"Datasets and Artefacts: {duplicated_ids}.") def _ensure_artefacts_integrity(self, work_product_components: list) -> Tuple[ List[dict], List[dict]]: diff --git a/src/plugins/operators/validate_manifest_schema.py b/src/plugins/operators/validate_manifest_schema.py index 49973634f41f9ddb21148afa32cdffebdea4f20c..7d8268a201a8f3ee9dc9d869b03c25c8d09a9127 100644 --- a/src/plugins/operators/validate_manifest_schema.py +++ b/src/plugins/operators/validate_manifest_schema.py @@ -23,7 +23,7 @@ from airflow.utils import apply_defaults from airflow.models import BaseOperator, Variable from libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from libs.context import Context -from libs.exceptions import EmptyManifestError +from libs.exceptions import EmptyManifestError, GenericManifestSchemaError from libs.refresh_token import AirflowTokenRefresher from libs.validation.validate_schema import SchemaValidator from operators.mixins.ReceivingContextMixin import ReceivingContextMixin @@ -78,7 +78,13 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin): f"Data {context['dag_run'].conf} doesn't contain 'manifest field'") _ = schema_validator.validate_common_schema(manifest_data) - valid_manifest_file, skipped_entities = schema_validator.ensure_manifest_validity(manifest_data) + try: + valid_manifest_file, skipped_entities = schema_validator.ensure_manifest_validity( + manifest_data + ) + except GenericManifestSchemaError as err: + context["ti"].xcom_push(key="skipped_ids", value=str(err)) + raise err if self._show_skipped_ids: context["ti"].xcom_push(key="skipped_ids", value=skipped_entities) return {"manifest": valid_manifest_file}