Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
osdu-airflow-lib
Commits
595c1c51
Commit
595c1c51
authored
Mar 29, 2022
by
Valentin Gauthier
Browse files
bugfix : now the manifest content is really used and use of manifest ACL and legaltag content
parent
34df8623
Pipeline
#101000
passed with stages
in 2 minutes
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
osdu_airflow/operators/mixins/ReceivingContextMixin.py
View file @
595c1c51
...
@@ -86,19 +86,34 @@ class ReceivingContextMixin:
...
@@ -86,19 +86,34 @@ class ReceivingContextMixin:
# record_id_list = execution_context["manifest_ref_ids"]
# record_id_list = execution_context["manifest_ref_ids"]
record_id_list
=
context
[
"ti"
].
xcom_pull
(
task_ids
=
self
.
previous_task_id
,
key
=
"manifest_ref_ids"
)
record_id_list
=
context
[
"ti"
].
xcom_pull
(
task_ids
=
self
.
previous_task_id
,
key
=
"manifest_ref_ids"
)
record_id
=
record_id_list
[
-
1
]
# the last one is the most recent one
record_id
=
record_id_list
[
-
1
]
# the last one is the most recent one
logger
.
debug
(
f
"#ReceivingContextMixin record id list :
{
record_id_list
}
"
)
elif
self
.
previous_task_id
:
elif
self
.
previous_task_id
:
record_id
=
context
[
"ti"
].
xcom_pull
(
task_ids
=
self
.
previous_task_id
,
key
=
"return_value"
)
record_id
=
context
[
"ti"
].
xcom_pull
(
task_ids
=
self
.
previous_task_id
,
key
=
"return_value"
)
else
:
else
:
logger
.
debug
(
f
"#ReceivingContextMixin _get_manifest_data_by_reference no data found (you should try to use records id history) "
)
logger
.
debug
(
f
"#ReceivingContextMixin _get_manifest_data_by_reference no data found (you should try to use records id history) "
)
clientDms
=
DatasetDmsClient
(
data_partition_id
=
data_partition_id
)
#
clientDms = DatasetDmsClient(data_partition_id=data_partition_id)
retrieval
=
clientDms
.
get_retrieval_instructions
(
record_id
=
record_id
).
json
()
#
retrieval = clientDms.get_retrieval_instructions(record_id=record_id).json()
#
logger.debug(f"#ReceivingContextMixin
retrieval : {retrieval}
")
logger
.
debug
(
f
"#ReceivingContextMixin
try to retrieve data from id :
{
record_id
}
.
"
)
client_reg
=
DatasetRegistryClient
()
#
client_reg = DatasetRegistryClient()
manifest_data
=
client_reg
.
get_dataset_registry
(
record_id
=
record_id
)
#
manifest_data = client_reg.get_dataset_registry(record_id=record_id)
# logger.debug(f"#ReceivingContextMixin manifest_data : {manifest_data.json()}")
# logger.debug(f"#ReceivingContextMixin manifest_data : {manifest_data.json()}")
return
manifest_data
.
json
()[
'datasetRegistries'
][
0
]
# return manifest_data.json()['datasetRegistries'][0]
clientDms
=
DatasetDmsClient
(
data_partition_id
=
data_partition_id
)
retrieval
=
clientDms
.
get_retrieval_instructions
(
record_id
=
record_id
)
retrievalContentURL
=
retrieval
.
json
()[
"delivery"
][
0
][
"retrievalProperties"
][
"signedUrl"
]
manifest_data
=
clientDms
.
make_request
(
method
=
HttpMethod
.
GET
,
url
=
retrievalContentURL
).
json
()
# logger.debug(f"#ReceivingContextMixin [NEW] manifest_data type : {type(manifest_data)}")
# logger.debug(f"#ReceivingContextMixin [NEW] manifest_data : {str(manifest_data)[:300]}")
if
isinstance
(
manifest_data
,
str
):
return
json
.
loads
(
manifest_data
)
else
:
return
manifest_data
def
_put_manifest_data_by_reference
(
self
,
context
:
dict
,
execution_context
:
dict
,
manifest_data
,
use_history
:
bool
=
False
)
->
int
:
def
_put_manifest_data_by_reference
(
self
,
context
:
dict
,
execution_context
:
dict
,
manifest_data
,
use_history
:
bool
=
False
)
->
int
:
...
@@ -114,11 +129,12 @@ class ReceivingContextMixin:
...
@@ -114,11 +129,12 @@ class ReceivingContextMixin:
dataset_registry_url
=
config_manager
.
get
(
'environment'
,
'dataset_registry_url'
)
dataset_registry_url
=
config_manager
.
get
(
'environment'
,
'dataset_registry_url'
)
match_domain
=
re
.
search
(
r
'https?://([\w\.-]+).*'
,
dataset_registry_url
)
match_domain
=
re
.
search
(
r
'https?://([\w\.-]+).*'
,
dataset_registry_url
)
dataset_registry_url_domain
=
match_domain
.
group
(
1
)
dataset_registry_url_domain
=
match_domain
.
group
(
1
)
#
logger.debug(f"##ReceivingContextMixin dataset_registry_url {dataset_registry_url} : \ndomain is {dataset_registry_url_domain}")
logger
.
debug
(
f
"##ReceivingContextMixin dataset_registry_url
{
dataset_registry_url
}
:
\n
domain is
{
dataset_registry_url_domain
}
\n\n
data_partition_id :
{
data_partition_id
}
"
)
client_dms
=
DatasetDmsClient
(
data_partition_id
=
data_partition_id
)
client_dms
=
DatasetDmsClient
(
data_partition_id
=
data_partition_id
)
storage_instruction
=
client_dms
.
get_storage_instructions
(
kind_sub_type
=
"dataset--File.Generic"
)
# TODO: change to fit for manifest
storage_instruction
=
client_dms
.
get_storage_instructions
(
kind_sub_type
=
"dataset--File.Generic"
)
# TODO: change to fit for manifest
# FileCollection.Generic
# logger.debug(f"##ReceivingContextMixin storage_instruction : {storage_instruction.json()}")
# logger.debug(f"##ReceivingContextMixin storage_instruction : {storage_instruction.json()}")
...
@@ -147,58 +163,95 @@ class ReceivingContextMixin:
...
@@ -147,58 +163,95 @@ class ReceivingContextMixin:
logger
.
debug
(
"No 'signedUploadFileName' parameter found for storage location"
)
logger
.
debug
(
"No 'signedUploadFileName' parameter found for storage location"
)
# Uploading data
#### Uploading data
# logger.debug(f"##ReceivingContextMixin signedUrl : {signedUrl}")
put_result
=
client_dms
.
make_request
(
method
=
HttpMethod
.
PUT
,
url
=
signedUrl
,
data
=
str
(
manifest_data
))
manifest_dict
=
manifest_data
## NOTE : should be put as python dict but class BaseClient "_send_request" function
# sends data in "data" parameter of "requests.put" function, but should send with "json" parameter
if
isinstance
(
manifest_data
,
dict
):
manifest_data
=
json
.
dumps
(
manifest_data
)
else
:
manifest_dict
=
json
.
loads
(
manifest_dict
)
# if isinstance(manifest_data, dict):
# manifest_data = json.dumps(manifest_data)
put_result
=
client_dms
.
make_request
(
method
=
HttpMethod
.
PUT
,
url
=
signedUrl
,
data
=
manifest_data
)
# logger.debug(f"##ReceivingContextMixin put_result : {put_result}")
# logger.debug(f"##ReceivingContextMixin put_result : {put_result}")
########## ACL
#### ACL
acl_data
=
None
if
isinstance
(
manifest_dict
,
dict
)
and
"acl"
in
manifest_dict
:
try
:
acl_data
=
Acl
(
viewers
=
manifest_dict
[
"acl"
][
"viewers"
],
owners
=
manifest_dict
[
"acl"
][
"owners"
])
except
Exception
as
e
:
logger
.
error
(
"Exception while searching ACL in manifest "
+
str
(
e
.
__dict__
))
ent_client
=
EntitlementsClient
()
ent_response
=
ent_client
.
get_groups_for_user
()
# logger.debug(f"ent_client : {ent_response.json()}")
if
acl_data
is
None
:
# if no possible to find acl in manifest
# logger.debug(f"ent_client : {ent_response.json()['groups']}")
logger
.
error
(
"Failed to find ACL in manifest, trying to put default values"
)
ent_client
=
EntitlementsClient
()
ent_response
=
ent_client
.
get_groups_for_user
()
acl_domain
=
data_partition_id
+
"."
+
dataset_registry_url_domain
# logger.debug(f"ent_client : {ent_response.json()}")
# logger.debug(f"ent_client : {ent_response.json()['groups']}")
acl_domain
=
data_partition_id
+
"."
+
dataset_registry_url_domain
data_default_viewers
=
"data.default.viewers@"
+
acl_domain
data_default_viewers
=
"data.default.viewers@"
+
acl_domain
data_default_owners
=
"data.default.owners@"
+
acl_domain
data_default_owners
=
"data.default.owners@"
+
acl_domain
viewers_found
=
False
viewers_found
=
False
owners_found
=
False
owners_found
=
False
# Should we use the ACL from the manifest ? or as follow, the default acl
# Should we use the ACL from the manifest ? or as follow, the default acl
# search ACL in the entitlement clien response
# search ACL in the entitlement clien response
# here we search a group name that contains "data" (and "viewers" / "owners")
# here we search a group name that contains "data" (and "viewers" / "owners")
for
ent_grp_elt
in
ent_response
.
json
()[
"groups"
]:
for
ent_grp_elt
in
ent_response
.
json
()[
"groups"
]:
# logger.debug(f"ent_client : {ent_grp_elt}")
# logger.debug(f"ent_client : {ent_grp_elt}")
if
"data"
in
ent_grp_elt
[
"name"
]
and
"viewers"
in
ent_grp_elt
[
"name"
]:
if
"data"
in
ent_grp_elt
[
"name"
]
and
"viewers"
in
ent_grp_elt
[
"name"
]:
data_default_viewers
=
ent_grp_elt
[
"email"
]
data_default_viewers
=
ent_grp_elt
[
"email"
]
viewers_found
=
True
viewers_found
=
True
if
"data"
in
ent_grp_elt
[
"name"
]
and
"owner"
in
ent_grp_elt
[
"name"
]:
if
"data"
in
ent_grp_elt
[
"name"
]
and
"owner"
in
ent_grp_elt
[
"name"
]:
data_default_owners
=
ent_grp_elt
[
"email"
]
data_default_owners
=
ent_grp_elt
[
"email"
]
owners_found
=
True
owners_found
=
True
if
viewers_found
and
owners_found
:
if
viewers_found
and
owners_found
:
break
break
acl_data
=
Acl
(
viewers
=
[
data_default_viewers
],
owners
=
[
data_default_owners
])
# logger.debug(f"ACL for the record are set to : {acl_data.__dict__}")
acl_data
=
Acl
(
viewers
=
[
data_default_viewers
],
#### END ACL
owners
=
[
data_default_owners
])
# logger.debug(f"ACL for the record are set to : {acl_data.__dict__}")
#### Legal Tags
# TODO: get the legalTag with a request to the specific service :
# TODO: get the legalTag with a request to the specific service :
# Legal tag may be generated with a request on : "/api/legal/v1/legaltags"
# Legal tag may be generated with a request on : "/api/legal/v1/legaltags"
legal_tags
=
None
if
isinstance
(
manifest_dict
,
dict
)
and
"legal"
in
manifest_dict
:
try
:
legal_tags
=
Legal
(
legaltags
=
manifest_dict
[
"legal"
][
"legaltags"
],
other_relevant_data_countries
=
manifest_dict
[
"legal"
][
"otherRelevantDataCountries"
],
status
=
manifest_dict
[
"legal"
][
"status"
])
except
Exception
as
e
:
logger
.
debug
(
"Exception while searching legaltags in manifest "
+
str
(
e
.
__dict__
))
if
legal_tags
is
None
:
# if no legalTags found in manifest
logger
.
error
(
"Failed to find legaltags in manifest, trying to put default values"
)
legal_tags
=
Legal
(
legaltags
=
[
data_partition_id
+
"-demo-legaltag"
],
other_relevant_data_countries
=
[
"US"
],
status
=
"compliant"
)
#### END Legal tags
recordList
=
[
recordList
=
[
Record
(
kind
=
"osdu
:wks:dataset--File.Generic:1.0.0"
,
Record
(
kind
=
data_partition_id
+
"
:wks:dataset--File.Generic:1.0.0"
,
acl
=
acl_data
,
acl
=
acl_data
,
legal
=
Legal
(
legaltags
=
[
data_partition_id
+
"-demo-legaltag"
],
legal
=
legal_tags
,
other_relevant_data_countries
=
[
"US"
],
status
=
"compliant"
),
data
=
{
data
=
{
"DatasetProperties"
:
{
"DatasetProperties"
:
{
"FileSourceInfo"
:
{
"FileSourceInfo"
:
{
...
@@ -206,17 +259,17 @@ class ReceivingContextMixin:
...
@@ -206,17 +259,17 @@ class ReceivingContextMixin:
"PreLoadFilePath"
:
unsignedUrl
+
signedUploadFileName
"PreLoadFilePath"
:
unsignedUrl
+
signedUploadFileName
}
}
},
},
"ResourceSecurityClassification"
:
"osdu
:reference-data--ResourceSecurityClassification:RESTRICTED:"
,
"ResourceSecurityClassification"
:
data_partition_id
+
"
:reference-data--ResourceSecurityClassification:RESTRICTED:"
,
"SchemaFormatTypeID"
:
"osdu
:reference-data--SchemaFormatType:TabSeparatedColumnarText:"
"SchemaFormatTypeID"
:
data_partition_id
+
"
:reference-data--SchemaFormatType:TabSeparatedColumnarText:"
},
},
# id: str = None,
# id: str = None,
version
=
1614105463059152
,
version
=
1614105463059152
,
# TODO : patch the version number
ancestry
=
RecordAncestry
(
parents
=
[]))
ancestry
=
RecordAncestry
(
parents
=
[]))
]
]
client_reg
=
DatasetRegistryClient
()
client_reg
=
DatasetRegistryClient
()
registered_dataset
=
client_reg
.
register_dataset
(
CreateDatasetRegistriesRequest
(
dataset_registries
=
recordList
))
registered_dataset
=
client_reg
.
register_dataset
(
CreateDatasetRegistriesRequest
(
dataset_registries
=
recordList
))
# logger.debug(f"##ReceivingContextMixin storage_instruction : {registered_dataset.
json()}"
)
# logger.debug(f"##ReceivingContextMixin storage_instruction : {registered_dataset.
__dict__}\nSent : " + str(recordList[0].__dict__)
)
record_id
=
registered_dataset
.
json
()[
'datasetRegistries'
][
0
][
"id"
]
record_id
=
registered_dataset
.
json
()[
'datasetRegistries'
][
0
][
"id"
]
...
...
osdu_airflow/operators/validate_manifest_schema_by_reference.py
View file @
595c1c51
...
@@ -72,8 +72,8 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi
...
@@ -72,8 +72,8 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi
)
)
manifest_data
=
self
.
_get_manifest_data
(
context
,
execution_context
)
manifest_data
=
self
.
_get_manifest_data
(
context
,
execution_context
)
# manifest_data = self._get_manifest_data_by_reference(context, execution_context
)
# manifest_data = self._get_manifest_data_by_reference(context, execution_context
, True) # use the history because "check_payload_type" dosent return the id
#
logger.debug(f"Manifest data: {manifest_data}")
logger
.
debug
(
f
"Manifest data:
{
manifest_data
}
"
)
if
not
manifest_data
:
if
not
manifest_data
:
...
...
osdu_airflow/operators/validate_manifest_schema_by_reference_full.py
0 → 100644
View file @
595c1c51
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate Manifest against R3 schemas operator.
"""
import
logging
from
airflow.models
import
BaseOperator
,
Variable
from
osdu_ingestion.libs.constants
import
DATA_TYPES_WITH_SURROGATE_KEYS
,
SURROGATE_KEYS_PATHS
from
osdu_ingestion.libs.context
import
Context
from
osdu_ingestion.libs.exceptions
import
EmptyManifestError
,
GenericManifestSchemaError
from
osdu_ingestion.libs.refresh_token
import
AirflowTokenRefresher
from
osdu_ingestion.libs.validation.validate_schema
import
SchemaValidator
from
osdu_airflow.backward_compatibility.airflow_utils
import
apply_defaults
from
osdu_airflow.operators.mixins.ReceivingContextMixin
import
ReceivingContextMixin
logger
=
logging
.
getLogger
()
class
ValidateManifestSchemaOperatorByReferenceFull
(
BaseOperator
,
ReceivingContextMixin
):
"""Operator to validate manifest against definition schemasR3."""
ui_color
=
'#dad5ff'
ui_fgcolor
=
'#000000'
@
apply_defaults
def
__init__
(
self
,
previous_task_id
:
str
=
None
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
previous_task_id
=
previous_task_id
self
.
_show_skipped_ids
=
Variable
.
get
(
'core__config__show_skipped_ids'
,
default_var
=
False
)
def
execute
(
self
,
context
:
dict
):
"""Execute manifest validation then process it.
Get a single manifest file or a list of them.
If it is a list, calculate which range (slice) of manifest files must be processed and then
process this range one by one.
:param context: Airflow context
:type context: dict
"""
logger
.
debug
(
"Starting Validating manifest"
)
execution_context
=
context
[
"dag_run"
].
conf
[
"execution_context"
]
payload_context
=
Context
.
populate
(
execution_context
)
token_refresher
=
AirflowTokenRefresher
()
logger
.
debug
(
f
"DATA_TYPES_WITH_SURROGATE_KEYS:
{
DATA_TYPES_WITH_SURROGATE_KEYS
}
"
)
logger
.
debug
(
f
"SURROGATE_KEYS_PATHS:
{
SURROGATE_KEYS_PATHS
}
"
)
schema_validator
=
SchemaValidator
(
token_refresher
,
payload_context
,
surrogate_key_fields_paths
=
SURROGATE_KEYS_PATHS
,
data_types_with_surrogate_ids
=
DATA_TYPES_WITH_SURROGATE_KEYS
)
# manifest_data = self._get_manifest_data(context, execution_context)
manifest_data
=
self
.
_get_manifest_data_by_reference
(
context
,
execution_context
,
True
)
# use the history because "check_payload_type" dosent return the id
if
not
manifest_data
:
raise
EmptyManifestError
(
f
"Data
{
context
[
'dag_run'
].
conf
}
doesn't contain 'manifest field'"
)
_
=
schema_validator
.
validate_common_schema
(
manifest_data
)
try
:
valid_manifest_file
,
skipped_entities
=
schema_validator
.
ensure_manifest_validity
(
manifest_data
)
except
GenericManifestSchemaError
as
err
:
context
[
"ti"
].
xcom_push
(
key
=
"skipped_ids"
,
value
=
str
(
err
))
raise
err
if
self
.
_show_skipped_ids
:
context
[
"ti"
].
xcom_push
(
key
=
"skipped_ids"
,
value
=
skipped_entities
)
return
self
.
_put_manifest_data_by_reference
(
context
,
execution_context
,
valid_manifest_file
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment