Skip to content
Snippets Groups Projects

Cherry-pick 'Enhancement of Put File api Call in Dataset Service for Azure: Addition of New Header Parameter' into release/0.19

Merged David Diederich requested to merge cherry-pick-for-48 into release/0.19
1 file
+ 28
26
Compare changes
  • Side-by-side
  • Inline
@@ -30,7 +30,7 @@ from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record import Record
from osdu_api.model.storage.record_ancestry import RecordAncestry
from osdu_ingestion.libs.types import ManifestType
import os
class ReceivingContextMixin:
"""
@@ -68,12 +68,12 @@ class ReceivingContextMixin:
previously_skipped_ids.extend(task_skipped_ids)
return previously_skipped_ids
def _get_manifest_data_by_reference(self, context: dict, execution_context: dict,
dataset_dms_client: DatasetDmsClient, use_history:bool=False,
def _get_manifest_data_by_reference(self, context: dict, execution_context: dict,
dataset_dms_client: DatasetDmsClient, use_history:bool=False,
logger = None) -> ManifestType:
"""
"""
[Geosiris Developement]
Get manifest from a datasetService.
Get manifest from a datasetService.
If use_history is set to True, the data is taken from the record_id history instead of using last task return value
"""
if logger is None:
@@ -110,13 +110,13 @@ class ReceivingContextMixin:
original_manifest,
data_partition_id: str,
dataset_dms_client: DatasetDmsClient,
dataset_reg_client: DatasetRegistryClient,
entitlements_client: EntitlementsClient=None,
use_history:bool=False,
dataset_reg_client: DatasetRegistryClient,
entitlements_client: EntitlementsClient=None,
use_history:bool=False,
logger = None) -> str:
"""
"""
[Geosiris Developement]
Put manifest into a datasetService and get back an access to the content.
Put manifest into a datasetService and get back an access to the content.
If use_history is set to True, the manifest record id is still return but also saved into xcom at key "manifest_ref_ids".
The history may also contains all record_ids from previous tasks (and operators) if they also used the history.
"""
@@ -140,7 +140,7 @@ class ReceivingContextMixin:
if acl_data is None:
logger.debug(f"Getting default value for acl, because not found in manifest. {type(original_manifest_dict)}, \n{original_manifest_dict}")
acl_data = self._get_default_acl(execution_context=execution_context,
acl_data = self._get_default_acl(execution_context=execution_context,
entitlements_client=entitlements_client,
data_partition_id=data_partition_id,
logger=logger)
@@ -169,25 +169,25 @@ class ReceivingContextMixin:
dataset_dms_client=dataset_dms_client,
data_partition_id=data_partition_id,
logger=logger)
if use_history:
manifest_ref_ids = context["ti"].xcom_pull(task_ids=self.previous_task_id, key="manifest_ref_ids") # type: ignore
if manifest_ref_ids is None:
manifest_ref_ids = []
manifest_ref_ids.append(record_id)
context["ti"].xcom_push(key="manifest_ref_ids", value=manifest_ref_ids) # type: ignore
return record_id
def _put_file_on_dataset_service(self, file_content,
def _put_file_on_dataset_service(self, file_content,
acl_data: Acl,
legal_tags: Legal,
dataset_reg_client: DatasetRegistryClient,
dataset_dms_client: DatasetDmsClient,
data_partition_id: str,
data_partition_id: str,
logger = None) -> str:
"""
"""
[Geosiris Developement]
Store a file on the dataset-service
"""
@@ -204,13 +204,13 @@ class ReceivingContextMixin:
except KeyError as e:
logger.debug("No 'signed' parameter url found for storage location")
unsigned_url = ""
unsigned_url = ""
try:
unsigned_url = storage_location['unsignedUrl']
except KeyError as e:
logger.debug("No 'unsigned' parameter url found for storage location")
file_source = ""
file_source = ""
try:
file_source = storage_location['fileSource']
except KeyError as e:
@@ -223,8 +223,10 @@ class ReceivingContextMixin:
logger.debug("No 'signedUploadFileName' parameter found for storage location")
#### Uploading data
put_result = dataset_dms_client.make_request(method=HttpMethod.PUT, url=signed_url, data=file_content, no_auth=True)
headers = {}
if os.getenv("CLOUD_PROVIDER") == "azure":
headers["x-ms-blob-type"] = "BlockBlob"
put_result = dataset_dms_client.make_request(method=HttpMethod.PUT,add_headers=headers, url=signed_url, data=file_content, no_auth=True)
authority = Variable.get("kind_authority", default_var="osdu")
record_list = [
Record( kind = f"{authority}:wks:dataset--File.Generic:1.0.0",
@@ -244,15 +246,15 @@ class ReceivingContextMixin:
ancestry=RecordAncestry(parents=[]))
]
registered_dataset = dataset_reg_client.register_dataset(CreateDatasetRegistriesRequest(dataset_registries=record_list))
return registered_dataset.json()['datasetRegistries'][0]["id"]
def _get_default_acl(self, execution_context: dict,
def _get_default_acl(self, execution_context: dict,
entitlements_client: EntitlementsClient,
data_partition_id = None,
data_partition_id = None,
logger = None):
if "acl" in execution_context: # try to take the value from the context
@@ -269,7 +271,7 @@ class ReceivingContextMixin:
)
dataset_registry_url_domain = match_domain.group(1) # type: str
ent_response = entitlements_client.get_groups_for_user()
acl_domain = data_partition_id + "." + dataset_registry_url_domain
DATA_DEFAULT_VIEWERS_NAME = "data.default.viewers"
@@ -294,11 +296,11 @@ class ReceivingContextMixin:
if viewers_found and owners_found:
break
return Acl( viewers=[data_default_viewers],
return Acl( viewers=[data_default_viewers],
owners=[data_default_owners])
def _get_default_legaltags(self, execution_context: dict,
def _get_default_legaltags(self, execution_context: dict,
data_partition_id: str):
if "legal" in execution_context: # try to take the value from the context
return execution_context['legal']
Loading