Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
Ingestion DAGs
Commits
a6803d4e
Commit
a6803d4e
authored
Apr 19, 2021
by
Yan Sushchynski (EPAM)
Browse files
GONRG-1934: Add aggregated report
parent
e8ca3539
Pipeline
#36461
failed with stages
in 5 minutes and 36 seconds
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
src/plugins/operators/update_status.py
View file @
a6803d4e
...
...
@@ -18,6 +18,7 @@
import
copy
import
enum
import
logging
from
typing
import
Tuple
from
airflow.models
import
BaseOperator
,
Variable
from
airflow.utils
import
apply_defaults
...
...
@@ -70,7 +71,7 @@ class UpdateStatusOperator(BaseOperator):
def
pre_execute
(
self
,
context
:
dict
):
self
.
status
=
self
.
get_previous_ti_statuses
(
context
)
def
_create_skipped_report
(
self
,
context
:
dict
)
->
dict
:
def
_create_skipped_report
(
self
,
context
:
dict
)
->
Tuple
[
dict
,
dict
]
:
"""
Return aggregated report of skipped ids grouoped by tasks
...
...
@@ -78,13 +79,19 @@ class UpdateStatusOperator(BaseOperator):
:return: Aggregated report grouped by tasks
"""
skipped_ids_report
=
{}
saved_record_ids
=
{}
dagrun
=
context
[
'ti'
].
get_dagrun
()
task_instances
=
dagrun
.
get_task_instances
()
for
task
in
task_instances
:
task_skipped_ids
=
context
[
"ti"
].
xcom_pull
(
key
=
"skipped_ids"
,
task_ids
=
task
.
task_id
)
if
task_skipped_ids
:
skipped_ids_report
[
task
.
task_id
]
=
task_skipped_ids
return
skipped_ids_report
for
task
in
task_instances
:
task_saved_ids
=
context
[
"ti"
].
xcom_pull
(
key
=
"record_ids"
,
task_ids
=
task
.
task_id
)
if
task_saved_ids
:
saved_record_ids
[
task
.
task_id
]
=
task_saved_ids
return
skipped_ids_report
,
saved_record_ids
def
execute
(
self
,
context
:
dict
):
"""Execute update workflow status.
...
...
@@ -119,8 +126,9 @@ class UpdateStatusOperator(BaseOperator):
status_updater
.
update_workflow_status
()
if
self
.
_show_skipped_ids
:
skipped_ids
=
self
.
_create_skipped_report
(
context
)
skipped_ids
,
saved_record_ids
=
self
.
_create_skipped_report
(
context
)
context
[
"ti"
].
xcom_push
(
key
=
"skipped_ids"
,
value
=
skipped_ids
)
context
[
"ti"
].
xcom_push
(
key
=
"saved_record_ids"
,
value
=
saved_record_ids
)
if
self
.
status
is
self
.
prev_ti_state
.
FAILED
:
raise
PipelineFailedError
(
"Dag failed"
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment