Skip to content
Snippets Groups Projects
Commit 454bd6a6 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'Add_Content_Type_to_uploaded_file' into 'integration-master'

Add content type to upload file. Fix typo

See merge request go3-nrg/platform/data-flow/ingestion/ingestion-dags!21
parents 2a99a125 3a2819f8
No related branches found
No related tags found
1 merge request!6R3 Data Ingestion
Pipeline #18929 failed
......@@ -136,10 +136,14 @@ class GCSFileUploader(FileUploader):
raise GCSObjectURIError
@tenacity.retry(**RETRY_SETTINGS)
def get_file_from_bucket(self, bucket_name: str, source_blob_name: str) -> io.BytesIO:
def get_file_from_bucket(
self,
bucket_name: str,
source_blob_name: str
) -> Tuple[io.BytesIO, str]:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob = bucket.get_blob(source_blob_name)
does_exist = blob.exists()
if not does_exist:
......@@ -149,9 +153,26 @@ class GCSFileUploader(FileUploader):
file = io.BytesIO()
blob.download_to_file(file)
logger.debug("File got from landing zone")
return file
return file, blob.content_type
def get_file_from_preload_path(self, preload_file_path: str) -> io.BytesIO:
def get_file_from_preload_path(self, preload_file_path: str) -> Tuple[io.BytesIO, str]:
bucket_name, blob_name = self._parse_object_uri(preload_file_path)
buffer = self.get_file_from_bucket(bucket_name, blob_name)
return buffer
buffer, content_type = self.get_file_from_bucket(bucket_name, blob_name)
return buffer, content_type
def upload_file(self, preload_file_path: str) -> str:
"""
Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
Get Content-Type of this file, refresh Content-Type with this value in headers
while this file is being uploaded onto OSDU platform.
Return file_location.
"""
buffer, content_type = self.get_file_from_preload_path(preload_file_path)
file_id, signed_url = self._get_signed_url_request(self.request_headers)
headers = self.request_headers
headers["Content-Type"] = content_type
self._upload_file_request(headers, signed_url, buffer)
file_location = self._get_file_location_request(self.request_headers, file_id)
return file_location
......@@ -111,6 +111,8 @@ class SchemaValidator(HeadersMixin):
"""
if not schema:
schema = self.get_schema(manifest["kind"])
if schema["properties"].get("id"):
schema["properties"]["id"].pop("pattern", None)
logger.debug(f"Validating kind {manifest['kind']}")
resolver = OSDURefResolver(schema_service=self.schema_service,
base_uri=schema.get("$id", ""), referrer=schema,
......
......@@ -48,7 +48,8 @@ class TestSourceFileChecker:
file_uploader: GCSFileUploader
):
file = io.RawIOBase()
monkeypatch.setattr(file_uploader, "get_file_from_bucket", lambda *args, **kwargs: file)
monkeypatch.setattr(file_uploader, "get_file_from_bucket",
lambda *args, **kwargs: (file, "test"))
file_uploader.upload_file("gs://test/test")
@pytest.mark.parametrize(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment