Skip to content
Snippets Groups Projects
upload_file.py 6.46 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Copyright 2020 Google LLC
    #  Copyright 2020 EPAM Systems
    #
    #  Licensed under the Apache License, Version 2.0 (the "License");
    #  you may not use this file except in compliance with the License.
    #  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    import io
    import json
    import logging
    from abc import ABC, abstractmethod
    from typing import Tuple, TypeVar
    from urllib.parse import urlparse
    
    import requests
    import tenacity
    from google.cloud import storage
    from libs.context import Context
    from libs.exceptions import GCSObjectURIError, FileSourceError
    from libs.mixins import HeadersMixin
    from libs.refresh_token import TokenRefresher, refresh_token
    
    logger = logging.getLogger()
    
    FileLikeObject = TypeVar("FileLikeObject", io.IOBase, io.RawIOBase, io.BytesIO)
    
    RETRY_SETTINGS = {
        "stop": tenacity.stop_after_attempt(3),
        "wait": tenacity.wait_fixed(2),
    }
    
    
    class FileUploader(HeadersMixin, ABC):
        """
        File uploader to copy file from PreLoadPath into FileSource on OSDU platform.
        """
    
        def __init__(self, file_service: str, token_refresher: TokenRefresher, context: Context):
            super().__init__(context)
            self.file_service = file_service
            self.token_refresher = token_refresher
    
        @abstractmethod
        def get_file_from_preload_path(self, preload_path: str) -> FileLikeObject:
            """
            Return file-like object containing raw content of a file
            in preload path.
            """
    
        @tenacity.retry(**RETRY_SETTINGS)
        @refresh_token()
        def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
            response = requests.post(url, request_body, headers=headers)
            return response
    
        @tenacity.retry(**RETRY_SETTINGS)
        def _get_signed_url_request(self, headers: dict) -> Tuple[str, str]:
            """
            Get fileID and SignedURL using File Service.
            """
            logger.debug("Getting signed url.")
            request_body = json.dumps({})  # '/getLocation' method requires empty json.
            response = self._send_post_request(headers, f"{self.file_service}/getLocation",
                                               request_body).json()
            logger.debug("Signed url got.")
            logger.debug(response)
            return response["FileID"], response["Location"]["SignedURL"]
    
        @tenacity.retry(**RETRY_SETTINGS)
        def _upload_file_request(self, headers: dict, signed_url: str, buffer: FileLikeObject):
            """
            Upload file via File service using signed_url.
            """
            logger.debug("Uploading file.")
            buffer.seek(0)
            requests.put(signed_url, buffer.read(), headers=headers)
            logger.debug("File uploaded.")
    
        @tenacity.retry(**RETRY_SETTINGS)
        def _get_file_location_request(self, headers: dict, file_id: str) -> str:
            """
            Get file location using File Service.
            """
            logger.debug("Getting file location.")
            request_body = json.dumps({"FileID": file_id})
            response = self._send_post_request(headers, f"{self.file_service}/getFileLocation",
                                               request_body).json()
            logger.debug("File location got.")
            return response["Location"]
    
        def upload_file(self, preload_file_path: str) -> str:
            """
            Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
            Return file_location.
            """
            buffer = self.get_file_from_preload_path(preload_file_path)
            file_id, signed_url = self._get_signed_url_request(self.request_headers)
            self._upload_file_request(self.request_headers, signed_url, buffer)
            file_location = self._get_file_location_request(self.request_headers, file_id)
            return file_location
    
    
    class GCSFileUploader(FileUploader):
    
        def __init__(
            self,
            file_service: str,
            token_refresher: TokenRefresher,
            context: Context,
        ):
            super().__init__(file_service, token_refresher, context)
    
        @staticmethod
        def _parse_object_uri(file_path: str) -> Tuple[str, str]:
            """
            Parse GCS Object uri.
            Return bucket and blob names.
            """
            parsed_path = urlparse(file_path)
            if parsed_path.scheme == "gs":
                bucket_name = parsed_path.netloc
                source_blob_name = parsed_path.path[1:]  # delete the first slash
    
                if bucket_name and source_blob_name:
                    return bucket_name, source_blob_name
    
            raise GCSObjectURIError
    
        @tenacity.retry(**RETRY_SETTINGS)
    
        def get_file_from_bucket(
            self,
            bucket_name: str,
            source_blob_name: str
        ) -> Tuple[io.BytesIO, str]:
    
            storage_client = storage.Client()
            bucket = storage_client.bucket(bucket_name)
    
            blob = bucket.get_blob(source_blob_name)
    
    
            does_exist = blob.exists()
            if not does_exist:
                raise FileSourceError("File doesn't exist in preloadPath "
                                      f"'gs://{bucket_name}/{source_blob_name}'")
    
            file = io.BytesIO()
            blob.download_to_file(file)
            logger.debug("File got from landing zone")
    
            return file, blob.content_type
    
        def get_file_from_preload_path(self, preload_file_path: str) -> Tuple[io.BytesIO, str]:
    
            bucket_name, blob_name = self._parse_object_uri(preload_file_path)
    
            buffer, content_type = self.get_file_from_bucket(bucket_name, blob_name)
            return buffer, content_type
    
        def upload_file(self, preload_file_path: str) -> str:
            """
            Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
            Get Content-Type of this file, refresh Content-Type with this value in headers
            while this file is being uploaded onto OSDU platform.
            Return file_location.
            """
            buffer, content_type = self.get_file_from_preload_path(preload_file_path)
            file_id, signed_url = self._get_signed_url_request(self.request_headers)
    
            headers = self.request_headers
            headers["Content-Type"] = content_type
            self._upload_file_request(headers, signed_url, buffer)
    
            file_location = self._get_file_location_request(self.request_headers, file_id)
            return file_location