gcp_blob_storage_client.py 6.83 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#  Copyright 2021 Google LLC
#  Copyright 2021 EPAM Systems
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Blob storage GCP client module."""

import io
import logging
from typing import Tuple
from urllib.parse import urlparse

import google.auth
import tenacity
from google.cloud import storage
from providers.constants import GOOGLE_CLOUD_PROVIDER
26
27
28
from providers.exceptions import GCSObjectURIError
from providers.factory import ProvidersFactory
from providers.types import BlobStorageClient, FileLikeObject
29
30
31
32
33
34
35
36
37
38

logger = logging.getLogger(__name__)

RETRY_SETTINGS = {
    "stop": tenacity.stop_after_attempt(3),
    "wait": tenacity.wait_fixed(10),
    "reraise": True,
}


39
40
@ProvidersFactory.register(GOOGLE_CLOUD_PROVIDER)
class GoogleCloudStorageClient(BlobStorageClient):
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    """Implementation of blob storage client for the Google provider."""

    def __init__(self):
        """Initialize storage client."""
        self._storage_client = storage.Client()

    @staticmethod
    def _parse_gcs_uri(gcs_uri: str) -> Tuple[str, str]:
        """Parse gcs compliant uri and return bucket_name and blob_name.

        :param gcs_uri: A GCS compliant URI.
        :type gcs_uri: str
        :raises GCSObjectURIError: When non GCS compliant URI is provided
        :return: A tuple (bucket_name, blob_name) obtained from the URI
        :rtype: Tuple[str, str]
        """
        parsed_path = urlparse(gcs_uri)
        if parsed_path.scheme == "gs":
            bucket_name = parsed_path.netloc
            source_blob_name = parsed_path.path[1:]  # delete the first slash

            if bucket_name and source_blob_name:
                return bucket_name, source_blob_name

        raise GCSObjectURIError(f"Wrong format path to GCS object. Object path is '{gcs_uri}'")

    @tenacity.retry(**RETRY_SETTINGS)
    def _get_file_from_bucket(self,
                             bucket_name: str,
70
71
                             source_blob_name: str,
                             file: FileLikeObject) -> Tuple[io.BytesIO, str]:
72
73
74
75
76
77
        """Get file from gcs bucket.

        :param bucket_name: The name of the bucket that holds the file
        :type bucket_name: str
        :param source_blob_name: The name of the file
        :type source_blob_name: str
78
79
        :param file: The file where to download the blob content
        :type file: FileLikeObject
80
81
82
83
84
85
        :return: A tuple containing file and its content-type
        :rtype: Tuple[io.BytesIO, str]
        """
        bucket = self._storage_client.bucket(bucket_name)
        blob = bucket.get_blob(source_blob_name)

86
        blob.download_to_file(file)
87
88
        logger.debug(f"File {source_blob_name} got from bucket {bucket_name}.")

89
        return file, blob.content_type
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

    @tenacity.retry(**RETRY_SETTINGS)
    def _get_file_as_bytes_from_bucket(self,
                                       bucket_name: str,
                                       source_blob_name: str) -> Tuple[bytes, str]:
        """Get file as bytes from gcs bucket.

        :param bucket_name: The name of the bucket that holds the file
        :type bucket_name: str
        :param source_blob_name: The name of the file
        :type source_blob_name: str
        :return: A tuple containing file and its content-type
        :rtype: Tuple[bytes, str]
        """
        bucket = self._storage_client.bucket(bucket_name)
        blob = bucket.get_blob(source_blob_name)

107
        file_as_bytes = blob.download_as_string()
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
        logger.debug(f"File {source_blob_name} got from bucket {bucket_name}.")

        return file_as_bytes, blob.content_type

    @tenacity.retry(**RETRY_SETTINGS)
    def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str) -> bool:
        """Use gcs client and verify a file exists in given bucket.

        :param bucket_name: The name of the bucket that holds the resoie
        :type bucket_name: str
        :param source_blob_name: The name of the file
        :type source_blob_name: str
        :return: A boolean indicating if the file exists
        :rtype: bool
        """
        bucket = self._storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        return blob.exists()

127
    def does_file_exist(self, uri: str) -> bool:
128
129
130
131
        """Verify if a file exists in the given URI.

        :param uri: The GCS URI of the file.
        :type uri: str
132
133
        :return: A boolean indicating if the file exists
        :rtype: bool
134
135
136
        """
        bucket_name, source_blob_name = self._parse_gcs_uri(uri)
        try:
137
            return self._does_file_exist_in_bucket(bucket_name, source_blob_name)
138
139
140
141
        except google.auth.exceptions.DefaultCredentialsError:
            # TODO(python-team) Figure out a way to mock google endpoints in integration tests.
            logger.error("No default credentials found in env, is this integration-env?")

142
    def download_to_file(self, uri: str, file: FileLikeObject) -> Tuple[FileLikeObject, str]:
143
144
145
146
        """Download file from the given URI.

        :param uri: The GCS URI of the file.
        :type uri: str
147
148
        :param file: The file where to download the blob content
        :type file: FileLikeObject
149
150
151
152
        :return: A tuple containing the file and its content-type
        :rtype: Tuple[io.BytesIO, str]
        """
        bucket_name, blob_name = self._parse_gcs_uri(uri)
153
        return self._get_file_from_bucket(bucket_name, blob_name, file)
154
155
156
157
158
159
160
161
162
163
164
165

    def download_file_as_bytes(self, uri: str) -> Tuple[bytes, str]:
        """Download file as bytes from the given URI.

        :param uri: The GCS URI of the file
        :type uri: str
        :return: The file as bytes and its content-type
        :rtype: Tuple[bytes, str]
        """
        bucket_name, blob_name = self._parse_gcs_uri(uri)
        return self._get_file_as_bytes_from_bucket(bucket_name, blob_name)

166
    def upload_file(self, uri: str, blob_file: FileLikeObject, content_type: str):
167
168
169
170
171
        """Upload a file to the given uri.

        :param uri: The GCS URI of the file
        :type uri: str
        :param blob: The file
172
        :type blob: FileLikeObject
173
174
175
176
177
178
179
180
        :param content_type: [description]
        :type content_type: str
        """
        bucket_name, blob_name = self._parse_gcs_uri(uri)
        bucket = self._storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_file(blob_file, content_type=content_type)
        logger.debug(f"Uploaded file to {uri}.")