Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import json
import logging
from abc import ABC, abstractmethod
from typing import Tuple, TypeVar
from urllib.parse import urlparse
import requests
import tenacity
from google.cloud import storage
from libs.context import Context
from libs.exceptions import GCSObjectURIError, FileSourceError
from libs.mixins import HeadersMixin
from libs.refresh_token import TokenRefresher, refresh_token
logger = logging.getLogger()
FileLikeObject = TypeVar("FileLikeObject", io.IOBase, io.RawIOBase, io.BytesIO)
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(3),
"wait": tenacity.wait_fixed(2),
}
class FileUploader(HeadersMixin, ABC):
"""
File uploader to copy file from PreLoadPath into FileSource on OSDU platform.
"""
def __init__(self, file_service: str, token_refresher: TokenRefresher, context: Context):
super().__init__(context)
self.file_service = file_service
self.token_refresher = token_refresher
@abstractmethod
def get_file_from_preload_path(self, preload_path: str) -> FileLikeObject:
"""
Return file-like object containing raw content of a file
in preload path.
"""
@tenacity.retry(**RETRY_SETTINGS)
@refresh_token()
def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
response = requests.post(url, request_body, headers=headers)
return response
@tenacity.retry(**RETRY_SETTINGS)
def _get_signed_url_request(self, headers: dict) -> Tuple[str, str]:
"""
Get fileID and SignedURL using File Service.
"""
logger.debug("Getting signed url.")
request_body = json.dumps({}) # '/getLocation' method requires empty json.
response = self._send_post_request(headers, f"{self.file_service}/getLocation",
request_body).json()
logger.debug("Signed url got.")
logger.debug(response)
return response["FileID"], response["Location"]["SignedURL"]
@tenacity.retry(**RETRY_SETTINGS)
def _upload_file_request(self, headers: dict, signed_url: str, buffer: FileLikeObject):
"""
Upload file via File service using signed_url.
"""
logger.debug("Uploading file.")
buffer.seek(0)
requests.put(signed_url, buffer.read(), headers=headers)
logger.debug("File uploaded.")
@tenacity.retry(**RETRY_SETTINGS)
def _get_file_location_request(self, headers: dict, file_id: str) -> str:
"""
Get file location using File Service.
"""
logger.debug("Getting file location.")
request_body = json.dumps({"FileID": file_id})
response = self._send_post_request(headers, f"{self.file_service}/getFileLocation",
request_body).json()
logger.debug("File location got.")
return response["Location"]
def upload_file(self, preload_file_path: str) -> str:
"""
Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
Return file_location.
"""
buffer = self.get_file_from_preload_path(preload_file_path)
file_id, signed_url = self._get_signed_url_request(self.request_headers)
self._upload_file_request(self.request_headers, signed_url, buffer)
file_location = self._get_file_location_request(self.request_headers, file_id)
return file_location
class GCSFileUploader(FileUploader):
def __init__(
self,
file_service: str,
token_refresher: TokenRefresher,
context: Context,
):
super().__init__(file_service, token_refresher, context)
@staticmethod
def _parse_object_uri(file_path: str) -> Tuple[str, str]:
"""
Parse GCS Object uri.
Return bucket and blob names.
"""
parsed_path = urlparse(file_path)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
if bucket_name and source_blob_name:
return bucket_name, source_blob_name
raise GCSObjectURIError
@tenacity.retry(**RETRY_SETTINGS)
def get_file_from_bucket(
self,
bucket_name: str,
source_blob_name: str
) -> Tuple[io.BytesIO, str]:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.get_blob(source_blob_name)
does_exist = blob.exists()
if not does_exist:
raise FileSourceError("File doesn't exist in preloadPath "
f"'gs://{bucket_name}/{source_blob_name}'")
file = io.BytesIO()
blob.download_to_file(file)
logger.debug("File got from landing zone")
return file, blob.content_type
def get_file_from_preload_path(self, preload_file_path: str) -> Tuple[io.BytesIO, str]:
bucket_name, blob_name = self._parse_object_uri(preload_file_path)
buffer, content_type = self.get_file_from_bucket(bucket_name, blob_name)
return buffer, content_type
def upload_file(self, preload_file_path: str) -> str:
"""
Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
Get Content-Type of this file, refresh Content-Type with this value in headers
while this file is being uploaded onto OSDU platform.
Return file_location.
"""
buffer, content_type = self.get_file_from_preload_path(preload_file_path)
file_id, signed_url = self._get_signed_url_request(self.request_headers)
headers = self.request_headers
headers["Content-Type"] = content_type
self._upload_file_request(headers, signed_url, buffer)
file_location = self._get_file_location_request(self.request_headers, file_id)
return file_location