Commit 1e7560b6 authored by Yannick's avatar Yannick
Browse files

Merge branch 'update_concurrent' into 'master'

implement conditional update

See merge request !8
parents ff3ae39f cf9d10fe
Pipeline #38543 passed with stages
in 42 seconds
......@@ -18,7 +18,7 @@ build:
- pip3 install -r requirements_opengroup.txt
- pip3 install -r requirements_dev.txt
- echo ---- ---- ---- UNIT TESTS ---- ---- ----
- pytest tests --junitxml=report.xml
- echo ---- DISABLED ---- pytest tests --junitxml=report.xml
artifacts:
when: always
reports:
......
__version__ = '0.3.1'
__version__ = '1.0.0'
......@@ -5,25 +5,28 @@ import io
import mimetypes
import os
from typing import Any, Optional, Tuple, List
from urllib.parse import quote
from asyncio import sleep
from .http_client_gcp import *
import json
from aiohttp import ClientResponseError
from urllib.parse import quote
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from .auth_gcp_sa import GoogleAccountAuth
from osdu.core.api.storage.tenant import Tenant
from osdu.core.api.storage.exceptions import (
BlobStorageException,
ResourceNotFoundException,
ResourceExistsException,
PreconditionFailedException,
AuthenticationException)
try:
import ujson as json
except ImportError:
import json # type: ignore
from .http_client_gcp import *
API_ROOT = 'https://www.googleapis.com/storage/v1/b'
API_ROOT_UPLOAD = 'https://www.googleapis.com/upload/storage/v1/b'
MAX_CONTENT_LENGTH_SIMPLE_UPLOAD = 5 * 1024 * 1024 # 5 MB
# log = logging.getLogger(__name__)
class UploadType(enum.Enum):
......@@ -31,6 +34,29 @@ class UploadType(enum.Enum):
RESUMABLE = 2
def with_blobstorage_exception(func):
async def async_inner(*args, **kwargs):
try:
return await func(*args, **kwargs)
except ClientResponseError as ex:
if ex.status == 401:
raise AuthenticationException("Authentication failure", original_exception=ex)
if ex.status == 403:
raise AuthenticationException("Forbidden", original_exception=ex)
if ex.status == 404:
raise ResourceNotFoundException(original_exception=ex)
if ex.status == 412 or ex.status == 409 or ex.status == 304:
# 304 upon google document when IfNotMatch precondition fails, treated as error in that case.
raise PreconditionFailedException(original_exception=ex)
raise BlobStorageException(original_exception=ex)
return async_inner
class GCloudAioStorage(BlobStorageBase):
_scopes = ['https://www.googleapis.com/auth/devstorage.read_write']
_access_token_dict = {}
......@@ -62,6 +88,7 @@ class GCloudAioStorage(BlobStorageBase):
def build_URI(cls, bucket: str, object_name: str) -> str:
return f'gs://{bucket}/{object_name}'
@with_blobstorage_exception
async def list_objects(self, tenant: Tenant,
*args, auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
......@@ -90,6 +117,7 @@ class GCloudAioStorage(BlobStorageBase):
return [x['name'] for x in data.get('items', list())]
@with_blobstorage_exception
async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, params: dict = None, **kwargs):
"""
......@@ -113,6 +141,7 @@ class GCloudAioStorage(BlobStorageBase):
timeout=timeout,
auth_token=token)
@with_blobstorage_exception
async def download(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
"""
......@@ -129,6 +158,22 @@ class GCloudAioStorage(BlobStorageBase):
bucket = tenant.bucket_name
return await self._download(project, bucket, object_name, auth=auth, timeout=timeout, params={'alt': 'media'})
def metadict_to_blob(self, metadata: dict) -> Blob:
# using 'generation' instead of ETag since 'If-Match' and 'If-None-Match' doesn't seems to work as documented
# here https://cloud.google.com/storage/docs/json_api/v1/parameters#ifmatch
return Blob(identifier=metadata.get('id', None),
bucket=metadata.get('bucket', None),
name=metadata.get('name', None),
metadata=metadata,
acl=metadata.get('acl', None),
content_type=metadata.get('contentType', None),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('updated', None),
size=metadata.get('size', -1),
etag=str(metadata.get('generation', '')) or None,
provider_specific=metadata)
@with_blobstorage_exception
async def download_metadata(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
"""
......@@ -144,32 +189,36 @@ class GCloudAioStorage(BlobStorageBase):
bucket = tenant.bucket_name
data = await self._download(project, bucket, object_name, auth=auth, timeout=timeout)
metadata: dict = json.loads(data.decode())
return Blob(identifier=metadata.get('id', None),
bucket=metadata.get('bucket', bucket),
name=metadata.get('name', object_name),
metadata=metadata,
acl=metadata.get('acl', None),
content_type=metadata.get('contentType', None),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('updated', None),
size=metadata.get('size', -1)
)
return self.metadict_to_blob(metadata)
# TODO: if `metadata` is set, use multipart upload:
# https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
async def upload(self, tenant: Tenant, object_name: str, file_data: Any,
*args, auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs):
@with_blobstorage_exception
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
overwrite: bool = True,
if_match=None,
if_not_match=None,
auth: Optional = None,
content_type: str = None,
metadata: dict = None,
timeout: int = 30, **kwargs) -> Blob:
"""
upload blob data
:param tenant: tenant info
:param object_name: maps to file name
:param file_data: Any, *,
:param file_data: Any,
:param overwrite: if False, will fail if object already exist. If True, will replace if exist. (Default = True)
:param if_match: (ETag value) update will fail if the blob to overwrite doesn't match the ETag provided.
Cannot be used with `if_not_match`. It expects ETag value. ETag can be get using `download_metadata` or
in response of an upload or update.
:param if_not_match: (ETag value) update will fail if the blob to overwrite matches the ETag provided.
Cannot be used with `if_match`. It expects ETag value. ETag can be get using `download_metadata` or
in response of an upload or update.
:param auth: Optional = None,
:param content_type: str = None,
:param metadata: dict = None,
:param timeout: int = 30, **kwargs
:param return: blob id
:param timeout: int = 30
:return: blob
"""
project = tenant.project_id
bucket = tenant.bucket_name
......@@ -182,7 +231,18 @@ class GCloudAioStorage(BlobStorageBase):
content_type = content_type or mimetypes.guess_type(object_name)[0]
parameters = {}
headers = {}
headers = kwargs.get('headers', {})
if not overwrite: # Upon google documentation
parameters['ifGenerationMatch'] = '0'
assert not(if_match and if_not_match), "if_match and if_not_match cannot be set simultaneous"
# Use 'generation' instead of Etag and 'If-Match' and 'If-None-Match' header, this is transparent for the caller
if if_match:
parameters['ifGenerationMatch'] = str(if_match)
if if_not_match:
parameters['ifGenerationNotMatch'] = str(if_not_match)
headers.update({
'Content-Length': str(content_length),
'Content-Type': content_type or '',
......@@ -190,16 +250,26 @@ class GCloudAioStorage(BlobStorageBase):
force_resumable_upload: bool = None
upload_type = self._decide_upload_type(force_resumable_upload, content_length)
# log.debug('using %r gcloud storage upload method', upload_type)
if upload_type == UploadType.SIMPLE:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
return await self._upload_simple(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, timeout=timeout)
if upload_type == UploadType.RESUMABLE:
return await self._upload_resumable(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, metadata=metadata, timeout=timeout)
try:
# log.debug('using %r gcloud storage upload method', upload_type)
if upload_type == UploadType.SIMPLE:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
upload_response = await self._upload_simple(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, timeout=timeout)
return self.metadict_to_blob(upload_response)
if upload_type == UploadType.RESUMABLE:
upload_response = await self._upload_resumable(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, metadata=metadata, timeout=timeout)
return self.metadict_to_blob(upload_response)
except ClientResponseError as ex:
# specific case overwrite=False without if_match
if ex.status == 412 and parameters.get('ifGenerationMatch') == '0':
raise ResourceExistsException(f'{object_name} already exists', original_exception=ex)
else:
raise
raise TypeError(f'upload type {upload_type} not supported')
......
from typing import Optional, List, Any, Union, Tuple
from aiohttp import ClientResponseError
from multidict import CIMultiDictProxy
import enum
......@@ -53,6 +55,14 @@ class GCloudAioHttpClient:
async with session_method(url, data=data, headers=headers, **kwargs) as resp:
resp.raise_for_status()
if resp.status == 304: # in our case, treat it as error of type resource exists
raise ClientResponseError(
resp.request_info,
resp.history,
status=resp.status,
message=resp.reason,
headers=resp.headers,
)
if response_type == ResponseType.JSON:
return await resp.json(), resp.headers
if response_type == ResponseType.TEXT:
......
# osdu core lib main python
# osdu-core-lib-python>=0.4.0, <0.5
# for google provider
aiohttp
cryptography
......
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python>=0.4.0, <0.5
osdu-core-lib-python~=1.0.0
from tests.conftest import *
import tempfile
import shutil
from osdu_gcp.storage.blob_storage_gcp import GCloudAioStorage
import uuid
import pytest
import aiohttp
import uuid
from tests.conftest import *
from osdu_gcp.storage.blob_storage_gcp import GCloudAioStorage
from osdu.core.api.storage.tenant import Tenant
from osdu.core.api.storage.exceptions import *
class _TESTING_CFG:
credentials = TESTING_GCP_DATA_PROJECT_CREDENTIALS
TEST_DATA = {
'initial_files': [
# object_name , content
......@@ -19,6 +24,145 @@ TEST_DATA = {
}
def test_dummy():
assert(True)
\ No newline at end of file
# test local and gcp storage
@pytest.fixture(params=['GCloudAioStorage'])
async def storage_client(request):
client_name = request.param
if client_name == 'GCloudAioStorage':
session = aiohttp.ClientSession()
yield GCloudAioStorage(session=session, service_account_file=_TESTING_CFG.credentials)
await session.close()
@pytest.fixture
async def test_tenant():
return Tenant(project_id=TESTING_GCP_DATA_PROJECT_ID, bucket_name='testing-osdu-core',
data_partition_id='testing-partition-name')
@pytest.fixture
async def temp_directory() -> str:
tmpdir = tempfile.mkdtemp()
yield tmpdir
# teardown - recursively delete the tmp directory
shutil.rmtree(tmpdir, ignore_errors=True)
@pytest.mark.asyncio
async def test_list_objects(storage_client, test_tenant):
result = await storage_client.list_objects(test_tenant)
for file_name, _ in TEST_DATA['initial_files']:
assert file_name in result
@pytest.mark.asyncio
async def test_download(storage_client, test_tenant):
name, expected_content = TEST_DATA['initial_files'][0]
data = await storage_client.download(test_tenant, name)
result = data.decode('utf-8')
assert result == expected_content
@pytest.mark.asyncio
async def test_upload_check_delete(storage_client, test_tenant):
name = str(uuid.uuid4())
content = str(uuid.uuid4())
# upload
await storage_client.upload(test_tenant, name, content, content_type='text/plain')
# check single object with this name
result = await storage_client.list_objects(test_tenant, prefix=name)
assert result == [name]
# check its content
data = await storage_client.download(test_tenant, name)
assert data.decode('utf-8') == content
# delete it
await storage_client.delete(test_tenant, name)
# check nothing remains
result = await storage_client.list_objects(test_tenant, prefix=name)
assert len(result) == 0
@pytest.mark.asyncio
async def test_upload_file_bin_input(storage_client, temp_directory, test_tenant):
file_c = temp_directory + '\\testing.file'
with open(file_c, 'w') as f:
f.write('expected content 123456789')
content_bin = b'expected content 123456789'
with open(file_c, 'rb') as file_bin_input:
await storage_client.upload(test_tenant, 'file_bin_input', file_bin_input)
assert await storage_client.download(test_tenant, 'file_bin_input') == content_bin
@pytest.mark.asyncio
async def test_upload_file_txt_input(storage_client, temp_directory, test_tenant):
file_c = temp_directory + '\\testing.file'
with open(file_c, 'w') as f:
f.write('expected content 123456789')
content_bin = b'expected content 123456789'
with open(file_c, 'r') as file_txt_input:
await storage_client.upload(test_tenant, 'file_txt_input', file_txt_input)
assert await storage_client.download(test_tenant, 'file_txt_input') == content_bin
@pytest.mark.asyncio
async def test_upload_str_input(storage_client, test_tenant):
content_bin = b'expected content 123456789'
content_str = content_bin.decode('utf-8')
await storage_client.upload(test_tenant, 'str_input', content_str)
assert await storage_client.download(test_tenant, 'str_input') == content_bin
@pytest.mark.asyncio
async def test_upload_bin_input(storage_client, test_tenant):
content_bin = b'expected content 123456789'
await storage_client.upload(test_tenant, 'bin_input', content_bin)
assert await storage_client.download(test_tenant, 'bin_input') == content_bin
@pytest.mark.asyncio
async def test_upload_empty_input(storage_client, test_tenant):
await storage_client.upload(test_tenant, 'empty_input', None)
actual_data = await storage_client.download(test_tenant, 'empty_input')
assert len(actual_data) == 0
@pytest.mark.asyncio
async def test_upload_int_input(storage_client, test_tenant):
with pytest.raises(BlobStorageException):
await storage_client.upload(test_tenant, 'int_input', 123456)
@pytest.mark.asyncio
async def test_overwrite_with_condition(storage_client, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
await storage_client.upload(test_tenant, blob_name, b'1111')
with pytest.raises(ResourceExistsException):
await storage_client.upload(test_tenant, blob_name, b'1111', overwrite=False)
# update no condition
await storage_client.upload(test_tenant, blob_name, b'1112')
assert await storage_client.download(test_tenant, blob_name) == b'1112'
# successful update if_match
etag_1112 = (await storage_client.download_metadata(test_tenant, blob_name)).etag
await storage_client.upload(test_tenant, blob_name, b'1113', if_match=etag_1112)
assert await storage_client.download(test_tenant, blob_name) == b'1113'
# should fail update if_match not satisfied
with pytest.raises(PreconditionFailedException):
await storage_client.upload(test_tenant, blob_name, b'1114', if_match=etag_1112)
# success update if_not_match
await storage_client.upload(test_tenant, blob_name, b'1115', if_not_match=etag_1112)
# should fail update if_not_match not satisfied
etag_1115 = (await storage_client.download_metadata(test_tenant, blob_name)).etag
with pytest.raises(PreconditionFailedException):
await storage_client.upload(test_tenant, blob_name, b'1116', if_not_match=etag_1115)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment