Skip to content
Snippets Groups Projects
Commit 57d6671a authored by YanSushchynski's avatar YanSushchynski
Browse files

GONRG-2696: Manifest integrity batch search

parent 759140dd
No related branches found
No related tags found
1 merge request!66GONRG-2696: Manifest integrity batch search
Pipeline #57544 passed
......@@ -40,6 +40,7 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.search_url = Variable.get('core__service__search__url')
self.whitelist_ref_patterns = Variable.get('core__config__reference_patterns_whitelist', default_var=None)
self.previous_task_id = previous_task_id
self._show_skipped_ids = Variable.get(
'core__config__show_skipped_ids', default_var=False
......@@ -60,13 +61,18 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
token_refresher,
file_source_validator,
payload_context,
self.whitelist_ref_patterns,
)
execution_context = context["dag_run"].conf["execution_context"]
manifest_data = self._get_manifest_data(context, execution_context)
previously_skipped_entities = self._get_previously_skipped_entities(context)
logger.debug(f"Manifest data: {manifest_data}")
manifest, skipped_ids = manifest_integrity.ensure_integrity(manifest_data)
manifest, skipped_ids = manifest_integrity.ensure_integrity(
manifest_data,
previously_skipped_entities
)
logger.debug(f"Valid manifest data: {manifest_data}")
if self._show_skipped_ids:
context["ti"].xcom_push(key="skipped_ids", value=skipped_ids)
......
......@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from osdu_api.libs.types import ManifestType
......@@ -34,3 +36,16 @@ class ReceivingContextMixin:
else:
manifest_data = execution_context["manifest"]
return manifest_data
def _get_previously_skipped_entities(self, context: dict) -> list:
"""
Receive skipped entities from previous tasks.
"""
previously_skipped_ids = []
dagrun = context['ti'].get_dagrun()
task_instances = dagrun.get_task_instances()
for task in task_instances:
task_skipped_ids = context["ti"].xcom_pull(key="skipped_ids", task_ids=task.task_id)
if task_skipped_ids:
previously_skipped_ids.extend(task_skipped_ids)
return previously_skipped_ids
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment