Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
osdu-airflow-lib
Commits
59e58330
Commit
59e58330
authored
Sep 09, 2021
by
Siarhei Khaletski (EPAM)
🚩
Browse files
Merge branch 'GONRG-3109_move_common_logic' into 'master'
GONRG-3109: move common logic from ingestion See merge request
!2
parents
e210e2a5
3c3e4859
Pipeline
#64315
passed with stages
in 7 minutes and 35 seconds
Changes
27
Pipelines
1
Expand all
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
59e58330
...
...
@@ -8,6 +8,7 @@ default:
stages
:
-
linters
-
unit_tests
-
deploy
pylint
:
...
...
@@ -26,6 +27,14 @@ isort:
-
isort -c -v osdu_airflow/*/*.py || EXIT_CODE=$?
-
exit ${EXIT_CODE}
unit_tests
:
stage
:
unit_tests
image
:
eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
script
:
-
chmod +x ./osdu_airflow/tests/unit_tests.sh
-
./osdu_airflow/tests/./unit_tests.sh || EXIT_CODE=$?
-
exit ${EXIT_CODE}
deploy
:
stage
:
deploy
script
:
...
...
osdu_airflow/operators/__init__.py
0 → 100644
View file @
59e58330
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
osdu_airflow/operators/deprecated/__init__.py
0 → 100644
View file @
59e58330
osdu_airflow/operators/deprecated/update_status.py
0 → 100644
View file @
59e58330
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Update Status operator."""
import
copy
import
enum
import
logging
from
airflow.models
import
BaseOperator
,
Variable
from
osdu_api.libs.context
import
Context
from
osdu_api.libs.exceptions
import
PipelineFailedError
from
osdu_api.libs.refresh_token
import
AirflowTokenRefresher
from
osdu_api.libs.update_status
import
UpdateStatus
logger
=
logging
.
getLogger
()
class
UpdateStatusOperator
(
BaseOperator
):
"""Operator to update status."""
ui_color
=
'#10ECAA'
ui_fgcolor
=
'#000000'
class
prev_ti_state
(
enum
.
Enum
):
NONE
=
"running"
SUCCESS
=
"finished"
FAILED
=
"failed"
def
get_previous_ti_statuses
(
self
,
context
:
dict
)
->
enum
.
Enum
:
"""Get status of previous tasks' executions.
Return corresponding enum value.
:param context: Airflow context
:type context: dict
:return: Previous status
:rtype: enum.Enum
"""
dagrun
=
context
[
'ti'
].
get_dagrun
()
failed_ti
=
dagrun
.
get_task_instances
(
state
=
'failed'
)
success_ti
=
dagrun
.
get_task_instances
(
state
=
'success'
)
if
not
failed_ti
and
not
success_ti
:
# There is no prev task so it can't have been failed
logger
.
info
(
"There are no tasks before this one. So it has status RUNNING"
)
return
self
.
prev_ti_state
.
NONE
if
failed_ti
:
logger
.
info
(
"There are failed tasks before this one. So it has status FAILED"
)
return
self
.
prev_ti_state
.
FAILED
logger
.
info
(
"There are successed tasks before this one. So it has status SUCCESSED"
)
return
self
.
prev_ti_state
.
SUCCESS
def
pre_execute
(
self
,
context
:
dict
):
self
.
status
=
self
.
get_previous_ti_statuses
(
context
)
def
execute
(
self
,
context
:
dict
):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether records
are searchable or not.
If they are then update status FINISHED else FAILED
:param context: Airflow context
:type context: dict
:raises PipelineFailedError: If any of the status is failed
"""
conf
=
copy
.
deepcopy
(
context
[
"dag_run"
].
conf
)
logger
.
debug
(
f
"Got conf
{
conf
}
."
)
if
"Payload"
in
conf
:
payload_context
=
Context
.
populate
(
conf
)
else
:
payload_context
=
Context
(
data_partition_id
=
conf
[
"data-partition-id"
],
app_key
=
conf
.
get
(
"AppKey"
,
""
))
workflow_id
=
conf
[
"WorkflowID"
]
status
=
self
.
status
.
value
status_updater
=
UpdateStatus
(
workflow_name
=
""
,
workflow_url
=
Variable
.
get
(
"core__service__workflow__url"
),
workflow_id
=
workflow_id
,
run_id
=
""
,
status
=
status
,
token_refresher
=
AirflowTokenRefresher
(),
context
=
payload_context
)
status_updater
.
update_workflow_status
()
if
self
.
status
is
self
.
prev_ti_state
.
FAILED
:
raise
PipelineFailedError
(
"Dag failed"
)
osdu_airflow/operators/ensure_manifest_integrity.py
0 → 100644
View file @
59e58330
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""R3 Validate reference Manifest operator."""
import
logging
from
airflow.models
import
BaseOperator
,
Variable
from
osdu_api.libs.context
import
Context
from
osdu_api.libs.refresh_token
import
AirflowTokenRefresher
from
osdu_api.libs.validation.validate_file_source
import
FileSourceValidator
from
osdu_api.libs.validation.validate_referential_integrity
import
ManifestIntegrity
from
osdu_airflow.backward_compatibility.airflow_utils
import
apply_defaults
from
osdu_airflow.operators.mixins.ReceivingContextMixin
import
ReceivingContextMixin
logger
=
logging
.
getLogger
()
class
EnsureManifestIntegrityOperator
(
BaseOperator
,
ReceivingContextMixin
):
"""Operator to validate ref inside manifest R3 and remove invalid entities."""
ui_color
=
'#dad5ff'
ui_fgcolor
=
'#000000'
@
apply_defaults
def
__init__
(
self
,
previous_task_id
:
str
=
None
,
*
args
,
**
kwargs
):
"""Init base operator and obtain base urls from Airflow Variables."""
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
search_url
=
Variable
.
get
(
'core__service__search__url'
)
self
.
whitelist_ref_patterns
=
Variable
.
get
(
'core__config__reference_patterns_whitelist'
,
default_var
=
None
)
self
.
previous_task_id
=
previous_task_id
self
.
_show_skipped_ids
=
Variable
.
get
(
'core__config__show_skipped_ids'
,
default_var
=
False
)
def
execute
(
self
,
context
:
dict
):
"""Execute manifest validation then process it.
:param context: Airflow context
:type context: dict
"""
payload_context
=
Context
.
populate
(
context
[
"dag_run"
].
conf
[
"execution_context"
])
token_refresher
=
AirflowTokenRefresher
()
file_source_validator
=
FileSourceValidator
()
manifest_integrity
=
ManifestIntegrity
(
self
.
search_url
,
token_refresher
,
file_source_validator
,
payload_context
,
self
.
whitelist_ref_patterns
,
)
execution_context
=
context
[
"dag_run"
].
conf
[
"execution_context"
]
manifest_data
=
self
.
_get_manifest_data
(
context
,
execution_context
)
previously_skipped_entities
=
self
.
_get_previously_skipped_entities
(
context
)
logger
.
debug
(
f
"Manifest data:
{
manifest_data
}
"
)
manifest
,
skipped_ids
=
manifest_integrity
.
ensure_integrity
(
manifest_data
,
previously_skipped_entities
)
logger
.
debug
(
f
"Valid manifest data:
{
manifest_data
}
"
)
if
self
.
_show_skipped_ids
:
context
[
"ti"
].
xcom_push
(
key
=
"skipped_ids"
,
value
=
skipped_ids
)
return
{
"manifest"
:
manifest
}
osdu_airflow/operators/mixins/ReceivingContextMixin.py
0 → 100644
View file @
59e58330
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
logging
from
osdu_api.libs.types
import
ManifestType
class
ReceivingContextMixin
:
"""Mixin for receiving manifest file from XCOMs in case if current operator not the first in the row"""
def
_get_manifest_data
(
self
,
context
:
dict
,
execution_context
:
dict
)
->
ManifestType
:
"""
Receive manifest file. If previous task id not None - get manifest file from XCOMs.
Otherwise - get manifest file from execution context
"""
if
self
.
previous_task_id
:
previous_task_value
=
context
[
"ti"
].
xcom_pull
(
task_ids
=
self
.
previous_task_id
,
key
=
"return_value"
)
if
previous_task_value
:
manifest_data
=
previous_task_value
[
"manifest"
]
else
:
manifest_data
=
execution_context
[
"manifest"
]
else
:
manifest_data
=
execution_context
[
"manifest"
]
return
manifest_data
def
_get_previously_skipped_entities
(
self
,
context
:
dict
)
->
list
:
"""
Receive skipped entities from previous tasks.
"""
previously_skipped_ids
=
[]
dagrun
=
context
[
'ti'
].
get_dagrun
()
task_instances
=
dagrun
.
get_task_instances
()
for
task
in
task_instances
:
task_skipped_ids
=
context
[
"ti"
].
xcom_pull
(
key
=
"skipped_ids"
,
task_ids
=
task
.
task_id
)
if
task_skipped_ids
:
previously_skipped_ids
.
extend
(
task_skipped_ids
)
return
previously_skipped_ids
osdu_airflow/operators/mixins/__init__.py
0 → 100644
View file @
59e58330
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
osdu_airflow/operators/process_manifest_r2.py
0 → 100644
View file @
59e58330
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""R2 Process Manifest operator."""
import
configparser
import
enum
import
json
import
logging
import
re
import
sys
import
time
import
uuid
from
collections
import
Counter
from
typing
import
Tuple
from
urllib.error
import
HTTPError
import
requests
import
tenacity
from
airflow.models
import
BaseOperator
,
Variable
from
osdu_api.libs.auth.authorization
import
authorize
from
osdu_api.libs.refresh_token
import
AirflowTokenRefresher
config
=
configparser
.
RawConfigParser
()
config
.
read
(
Variable
.
get
(
"core__config__dataload_config_path"
))
DEFAULT_TENANT
=
config
.
get
(
"DEFAULTS"
,
"tenant"
)
DEFAULT_SOURCE
=
config
.
get
(
"DEFAULTS"
,
"authority"
)
DEFAULT_VERSION
=
config
.
get
(
"DEFAULTS"
,
"kind_version"
)
RETRIES
=
3
TIMEOUT
=
1
# Set up base logger
handler
=
logging
.
StreamHandler
(
sys
.
stdout
)
handler
.
setFormatter
(
logging
.
Formatter
(
"%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"
))
logger
=
logging
.
getLogger
(
"Dataload"
)
logger
.
setLevel
(
logging
.
INFO
)
logger
.
addHandler
(
handler
)
# Some constants, used by script
SEARCH_OK_RESPONSE_CODES
=
[
200
]
DATA_LOAD_OK_RESPONSE_CODES
=
[
201
]
NOT_FOUND_RESPONSE_CODES
=
[
404
]
BAD_TOKEN_RESPONSE_CODES
=
[
400
,
401
,
403
,
500
]
class
FileType
(
enum
.
Enum
):
MANIFEST
=
enum
.
auto
()
WORKPRODUCT
=
enum
.
auto
()
def
dataload
(
**
kwargs
):
data_conf
=
kwargs
[
'dag_run'
].
conf
conf_payload
=
kwargs
[
"dag_run"
].
conf
[
"Payload"
]
loaded_conf
=
{
"acl"
:
conf_payload
[
"acl"
],
"legal_tag"
:
conf_payload
[
"legal"
],
"data_object"
:
data_conf
}
return
loaded_conf
,
conf_payload
def
create_headers
(
conf_payload
):
"""Create header.
:param conf_payload: config payload
:return: headers
"""
partition_id
=
conf_payload
[
"data-partition-id"
]
app_key
=
conf_payload
[
"AppKey"
]
headers
=
{
'Content-type'
:
'application/json'
,
'data-partition-id'
:
partition_id
,
'AppKey'
:
app_key
}
return
headers
def
generate_id
(
type_id
):
"""Generate resource ID.
:param type_id: resource type ID
:return: resource ID
"""
return
"{0}{1}:"
.
format
(
type_id
.
replace
(
"type:"
,
""
),
re
.
sub
(
r
"\D"
,
""
,
str
(
uuid
.
uuid4
())))
def
determine_data_type
(
raw_resource_type_id
):
"""Determine resource type ID.
:param raw_resource_type_id: raw resource type ID from manifest file
:return: short resource type ID
"""
return
raw_resource_type_id
.
split
(
"/"
)[
-
1
].
replace
(
":"
,
""
)
\
if
raw_resource_type_id
is
not
None
else
None
# TODO: add comments to functions that implement actions in this function
def
process_file_items
(
loaded_conf
,
conf_payload
)
->
Tuple
[
list
,
list
]:
"""Process files items.
:param loaded_conf: loaded configuration
:param conf_payload: configuration payload
:return: list of file records and list of their ids
"""
file_ids
=
[]
file_list
=
[]
data_object
=
loaded_conf
.
get
(
"data_object"
)
acl
=
loaded_conf
.
get
(
"acl"
)
legal_tag
=
loaded_conf
.
get
(
"legal_tag"
)
for
file
in
data_object
[
"Files"
]:
file
[
"ResourceID"
]
=
generate_id
(
file
[
"ResourceTypeID"
])
file_ids
.
append
(
file
[
"ResourceID"
])
file_list
.
append
(
(
populate_request_body
(
file
,
acl
,
legal_tag
,
"file"
,
conf_payload
),
"File"
)
)
return
file_list
,
file_ids
def
process_wpc_items
(
loaded_conf
,
product_type
,
file_ids
,
conf_payload
):
"""Process WorkProductComponents items.
:param loaded_conf: loaded configuration
:param product_type: product type
:param file_ids: list of file ids
:param conf_payload: configuration payload
:return: list of workproductcomponents records and list of their ids
"""
wpc_ids
=
[]
wpc_list
=
[]
data_object
=
loaded_conf
.
get
(
"data_object"
)
acl
=
loaded_conf
.
get
(
"acl"
)
legal_tag
=
loaded_conf
.
get
(
"legal_tag"
)
for
wpc
in
data_object
[
"WorkProductComponents"
]:
wpc
[
"ResourceID"
]
=
generate_id
(
wpc
[
"ResourceTypeID"
])
wpc_ids
.
append
(
wpc
[
"ResourceID"
])
wpc
[
"Data"
][
"GroupTypeProperties"
][
"Files"
]
=
file_ids
wpc_list
.
append
(
(
populate_request_body
(
wpc
,
acl
,
legal_tag
,
product_type
+
"_wpc"
,
conf_payload
),
product_type
+
"_wpc"
)
)
return
wpc_list
,
wpc_ids
def
process_wp_item
(
loaded_conf
,
product_type
,
wpc_ids
,
conf_payload
)
->
list
:
"""Process WorkProduct item.
:param loaded_conf: loaded configuration
:param product_type: product type
:param wpc_ids: work product component ids
:param conf_payload: configuration payload
:return: work product record
"""
data_object
=
loaded_conf
.
get
(
"data_object"
)
acl
=
loaded_conf
.
get
(
"acl"
)
legal_tag
=
loaded_conf
.
get
(
"legal_tag"
)
work_product
=
data_object
[
"WorkProduct"
]
work_product
[
"ResourceID"
]
=
generate_id
(
work_product
[
"ResourceTypeID"
])
work_product
[
"Data"
][
"GroupTypeProperties"
][
"Components"
]
=
wpc_ids
work_product
=
[
(
populate_request_body
(
work_product
,
acl
,
legal_tag
,
product_type
+
"_wp"
,
conf_payload
),
product_type
+
"_wp"
)
]
return
work_product
def
validate_file_type
(
file_type
,
data_object
):
"""Validate file type.
:param file_type: file type
:param data_object: file record
"""
if
not
file_type
:
logger
.
error
(
f
"Error with file
{
data_object
}
. Type could not be specified."
)
sys
.
exit
(
2
)
def
validate_file
(
loaded_conf
)
->
Tuple
[
FileType
,
str
]:
"""Validate file.
:param loaded_conf: loaded configuration
:return: file type and produc type
"""
data_object
=
loaded_conf
.
get
(
"data_object"
)
if
not
data_object
:
logger
.
error
(
f
"Error with file
{
data_object
}
. It is empty."
)
sys
.
exit
(
2
)
elif
"Manifest"
in
data_object
and
"ResourceTypeID"
in
data_object
.
get
(
"Manifest"
):
product_type
=
determine_data_type
(
data_object
[
"Manifest"
].
get
(
"ResourceTypeID"
))
validate_file_type
(
product_type
,
data_object
)
return
(
FileType
.
MANIFEST
,
product_type
)
elif
"WorkProduct"
in
data_object
and
"ResourceTypeID"
in
data_object
.
get
(
"WorkProduct"
):
product_type
=
determine_data_type
(
data_object
[
"WorkProduct"
].
get
(
"ResourceTypeID"
))
validate_file_type
(
product_type
,
data_object
)
if
product_type
.
lower
()
==
"workproduct"
and
\
data_object
.
get
(
"WorkProductComponents"
)
and
\
len
(
data_object
[
"WorkProductComponents"
])
>=
1
:
product_type
=
determine_data_type
(
data_object
[
"WorkProductComponents"
][
0
].
get
(
"ResourceTypeID"
))
validate_file_type
(
product_type
,
data_object
)
return
(
FileType
.
WORKPRODUCT
,
product_type
)
else
:
logger
.
error
(
f
"Error with file
{
data_object
}
. It doesn't have either Manifest or WorkProduct or ResourceTypeID."
)
sys
.
exit
(
2
)
def
create_kind
(
data_kind
,
conf_payload
):
"""Create kind.
:param data_kind: data kind
:param conf_payload: configuration payload
:return: kind
"""
partition_id
=
conf_payload
.
get
(
"data-partition-id"
,
DEFAULT_TENANT
)
source
=
conf_payload
.
get
(
"authority"
,
DEFAULT_SOURCE
)
version
=
conf_payload
.
get
(
"kind_version"
,
DEFAULT_VERSION
)
kind_init
=
config
.
get
(
"KINDS_INITIAL"
,
f
"
{
data_kind
.
lower
()
}
_kind"
)
kind
=
f
"
{
partition_id
}
:
{
source
}
:
{
kind_init
}
:
{
version
}
"
return
kind
def
populate_request_body
(
data
,
acl
,
legal_tag
,
data_type
,
conf_payload
):
"""Populate request body according API specification
:param data: item data from manifest files
:param data_type: resource type ID
:return: populated request
:rtype: dict
"""
request
=
{
"kind"
:
create_kind
(
data_type
,
conf_payload
),
"legal"
:
{
"legaltags"
:
[],
"otherRelevantDataCountries"
:
[
"US"
],
"status"
:
"compliant"
},
"acl"
:
{
"viewers"
:
[],
"owners"
:
[]
},
"data"
:
data
}
request
[
"legal"
][
"legaltags"
]
=
legal_tag
[
"legaltags"
]
request
[
"acl"
][
"viewers"
]
=
acl
[
"viewers"
]
request
[
"acl"
][
"owners"
]
=
acl
[
"owners"
]
return
request
def
separate_type_data
(
request_data
):
"""Separate the list of tuples into Data Type Counter and data list
:param request_data: tuple of data and types
:type request_data: tuple(list, str)
:return: counter with data types and data list
:rtype: tuple(counter, list)
"""
data
=
[]
types
=
Counter
()
for
elem
in
request_data
:
data
.
append
(
elem
[
0
])
types
[
elem
[
1
]]
+=
1
logger
.
info
(
f
"The count of records to be ingested:
{
str
(
dict
(
types
))
}
"
)
return
types
,
data
def
create_manifest_request_data
(
loaded_conf
:
dict
,
product_type
:
str
):
acl
=
loaded_conf
.
get
(
"acl"
)
legal_tag
=
loaded_conf
.
get
(
"legal_tag"
)
data_object
=
loaded_conf
.
get
(
"data_object"
)
data_objects_list
=
[
(
populate_request_body
(
data_object
[
"Manifest"
],
acl
,
legal_tag
,
product_type
),
product_type
)]
return
data_objects_list
def
create_workproduct_request_data
(
loaded_conf
:
dict
,
product_type
:
str
,
wp
,
wpc_list
,
file_list
):
data_object_list
=
file_list
+
wpc_list
+
wp
types
,
data_objects_list
=
separate_type_data
(
data_object_list
)
return
data_objects_list
@
tenacity
.
retry
(
wait
=
tenacity
.
wait_fixed
(
TIMEOUT
),