Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Domain Data Mgmt Services
Wellbore
Lib
Wellbore-cloud
Wellbore-gcp-lib
Commits
ced20b90
Commit
ced20b90
authored
Mar 31, 2021
by
Yannick
Browse files
impl conditional update
parent
5ae16888
Pipeline
#34165
passed with stage
in 24 seconds
Changes
4
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
osdu_gcp/storage/blob_storage_gcp.py
View file @
ced20b90
...
...
@@ -5,25 +5,28 @@ import io
import
mimetypes
import
os
from
typing
import
Any
,
Optional
,
Tuple
,
List
from
urllib.parse
import
quote
from
asyncio
import
sleep
from
.http_client_gcp
import
*
import
json
from
aiohttp
import
ClientResponseError
from
urllib.parse
import
quote
from
osdu.core.api.storage.blob_storage_base
import
BlobStorageBase
from
osdu.core.api.storage.blob
import
Blob
from
.auth_gcp_sa
import
GoogleAccountAuth
from
osdu.core.api.storage.tenant
import
Tenant
from
osdu.core.api.storage.exceptions
import
(
BlobStorageException
,
ResourceNotFoundException
,
ResourceExistsException
,
PreconditionFailedException
,
AuthenticationException
)
try
:
import
ujson
as
json
except
ImportError
:
import
json
# type: ignore
from
.http_client_gcp
import
*
API_ROOT
=
'https://www.googleapis.com/storage/v1/b'
API_ROOT_UPLOAD
=
'https://www.googleapis.com/upload/storage/v1/b'
MAX_CONTENT_LENGTH_SIMPLE_UPLOAD
=
5
*
1024
*
1024
# 5 MB
# log = logging.getLogger(__name__)
class
UploadType
(
enum
.
Enum
):
...
...
@@ -31,6 +34,29 @@ class UploadType(enum.Enum):
RESUMABLE
=
2
def
with_blobstorage_exception
(
func
):
async
def
async_inner
(
*
args
,
**
kwargs
):
try
:
return
await
func
(
*
args
,
**
kwargs
)
except
ClientResponseError
as
ex
:
if
ex
.
status
==
401
:
raise
AuthenticationException
(
"Authentication failure"
,
original_exception
=
ex
)
if
ex
.
status
==
403
:
raise
AuthenticationException
(
"Forbidden"
,
original_exception
=
ex
)
if
ex
.
status
==
404
:
raise
ResourceNotFoundException
(
original_exception
=
ex
)
if
ex
.
status
==
412
or
ex
.
status
==
409
or
ex
.
status
==
304
:
# 304 upon google document when IfNotMatch precondition fails, treated as error in that case.
raise
PreconditionFailedException
(
original_exception
=
ex
)
raise
BlobStorageException
(
original_exception
=
ex
)
return
async_inner
class
GCloudAioStorage
(
BlobStorageBase
):
_scopes
=
[
'https://www.googleapis.com/auth/devstorage.read_write'
]
_access_token_dict
=
{}
...
...
@@ -62,6 +88,7 @@ class GCloudAioStorage(BlobStorageBase):
def
build_URI
(
cls
,
bucket
:
str
,
object_name
:
str
)
->
str
:
return
f
'gs://
{
bucket
}
/
{
object_name
}
'
@
with_blobstorage_exception
async
def
list_objects
(
self
,
tenant
:
Tenant
,
*
args
,
auth
:
Optional
=
None
,
prefix
:
str
=
''
,
page_token
:
Optional
[
str
]
=
None
,
max_result
:
Optional
[
int
]
=
None
,
timeout
:
int
=
10
,
**
kwargs
)
->
List
[
str
]:
...
...
@@ -90,6 +117,7 @@ class GCloudAioStorage(BlobStorageBase):
return
[
x
[
'name'
]
for
x
in
data
.
get
(
'items'
,
list
())]
@
with_blobstorage_exception
async
def
delete
(
self
,
tenant
:
Tenant
,
object_name
:
str
,
*
args
,
auth
:
Optional
=
None
,
timeout
:
int
=
10
,
params
:
dict
=
None
,
**
kwargs
):
"""
...
...
@@ -113,6 +141,7 @@ class GCloudAioStorage(BlobStorageBase):
timeout
=
timeout
,
auth_token
=
token
)
@
with_blobstorage_exception
async
def
download
(
self
,
tenant
:
Tenant
,
object_name
:
str
,
*
args
,
auth
:
Optional
=
None
,
timeout
:
int
=
10
,
**
kwargs
)
->
bytes
:
"""
...
...
@@ -129,7 +158,6 @@ class GCloudAioStorage(BlobStorageBase):
bucket
=
tenant
.
bucket_name
return
await
self
.
_download
(
project
,
bucket
,
object_name
,
auth
=
auth
,
timeout
=
timeout
,
params
=
{
'alt'
:
'media'
})
def
metadict_to_blob
(
self
,
metadata
:
dict
)
->
Blob
:
# using 'generation' instead of ETag since 'If-Match' and 'If-None-Match' doesn't seems to work as documented
# here https://cloud.google.com/storage/docs/json_api/v1/parameters#ifmatch
...
...
@@ -145,6 +173,7 @@ class GCloudAioStorage(BlobStorageBase):
etag
=
str
(
metadata
.
get
(
'generation'
,
''
))
or
None
,
provider_specific
=
metadata
)
@
with_blobstorage_exception
async
def
download_metadata
(
self
,
tenant
:
Tenant
,
object_name
:
str
,
*
args
,
auth
:
Optional
=
None
,
timeout
:
int
=
10
,
**
kwargs
)
->
Blob
:
"""
...
...
@@ -164,6 +193,7 @@ class GCloudAioStorage(BlobStorageBase):
# TODO: if `metadata` is set, use multipart upload:
# https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
@
with_blobstorage_exception
async
def
upload
(
self
,
tenant
:
Tenant
,
object_name
:
str
,
file_data
:
Any
,
*
,
overwrite
:
bool
=
True
,
if_match
=
None
,
...
...
@@ -220,18 +250,26 @@ class GCloudAioStorage(BlobStorageBase):
force_resumable_upload
:
bool
=
None
upload_type
=
self
.
_decide_upload_type
(
force_resumable_upload
,
content_length
)
# log.debug('using %r gcloud storage upload method', upload_type)
if
upload_type
==
UploadType
.
SIMPLE
:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
upload_response
=
await
self
.
_upload_simple
(
project
,
bucket
,
url
,
object_name
,
stream
,
parameters
,
headers
,
auth
=
auth
,
timeout
=
timeout
)
return
self
.
metadict_to_blob
(
upload_response
)
if
upload_type
==
UploadType
.
RESUMABLE
:
upload_response
=
await
self
.
_upload_resumable
(
project
,
bucket
,
url
,
object_name
,
stream
,
parameters
,
headers
,
auth
=
auth
,
metadata
=
metadata
,
timeout
=
timeout
)
return
self
.
metadict_to_blob
(
upload_response
)
try
:
# log.debug('using %r gcloud storage upload method', upload_type)
if
upload_type
==
UploadType
.
SIMPLE
:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
upload_response
=
await
self
.
_upload_simple
(
project
,
bucket
,
url
,
object_name
,
stream
,
parameters
,
headers
,
auth
=
auth
,
timeout
=
timeout
)
return
self
.
metadict_to_blob
(
upload_response
)
if
upload_type
==
UploadType
.
RESUMABLE
:
upload_response
=
await
self
.
_upload_resumable
(
project
,
bucket
,
url
,
object_name
,
stream
,
parameters
,
headers
,
auth
=
auth
,
metadata
=
metadata
,
timeout
=
timeout
)
return
self
.
metadict_to_blob
(
upload_response
)
except
ClientResponseError
as
ex
:
# specific case overwrite=False without if_match
if
ex
.
status
==
412
and
parameters
.
get
(
'ifGenerationMatch'
)
==
'0'
:
raise
ResourceExistsException
(
f
'
{
object_name
}
already exists'
,
original_exception
=
ex
)
else
:
raise
raise
TypeError
(
f
'upload type
{
upload_type
}
not supported'
)
...
...
osdu_gcp/storage/http_client_gcp.py
View file @
ced20b90
from
typing
import
Optional
,
List
,
Any
,
Union
,
Tuple
from
aiohttp
import
ClientResponseError
from
multidict
import
CIMultiDictProxy
import
enum
...
...
@@ -53,6 +55,14 @@ class GCloudAioHttpClient:
async
with
session_method
(
url
,
data
=
data
,
headers
=
headers
,
**
kwargs
)
as
resp
:
resp
.
raise_for_status
()
if
resp
.
status
==
304
:
# in our case, treat it as error of type resource exists
raise
ClientResponseError
(
resp
.
request_info
,
resp
.
history
,
status
=
resp
.
status
,
message
=
resp
.
reason
,
headers
=
resp
.
headers
,
)
if
response_type
==
ResponseType
.
JSON
:
return
await
resp
.
json
(),
resp
.
headers
if
response_type
==
ResponseType
.
TEXT
:
...
...
requirements_opengroup.txt
View file @
ced20b90
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python==1.0.0.dev2
70686
osdu-core-lib-python==1.0.0.dev2
86249
tests/storage/test_blob_storage_gcp.py
View file @
ced20b90
from
tests.conftest
import
*
import
tempfile
import
shutil
from
osdu_gcp.storage.blob_storage_gcp
import
GCloudAioStorage
import
uuid
import
pytest
import
aiohttp
import
uuid
from
tests.conftest
import
*
from
osdu_gcp.storage.blob_storage_gcp
import
GCloudAioStorage
from
osdu.core.api.storage.tenant
import
Tenant
from
osdu.core.api.storage.exceptions
import
*
class
_TESTING_CFG
:
...
...
@@ -131,7 +134,7 @@ async def test_upload_empty_input(storage_client, test_tenant):
@
pytest
.
mark
.
asyncio
async
def
test_upload_int_input
(
storage_client
,
test_tenant
):
with
pytest
.
raises
(
TypeError
):
with
pytest
.
raises
(
BlobStorageException
):
await
storage_client
.
upload
(
test_tenant
,
'int_input'
,
123456
)
...
...
@@ -140,7 +143,7 @@ async def test_overwrite_with_condition(storage_client, test_tenant):
blob_name
=
'testing_data/'
+
str
(
uuid
.
uuid4
())
await
storage_client
.
upload
(
test_tenant
,
blob_name
,
b
'1111'
)
with
pytest
.
raises
(
Exception
):
# StorageErrorException is internal
with
pytest
.
raises
(
ResourceExistsException
):
await
storage_client
.
upload
(
test_tenant
,
blob_name
,
b
'1111'
,
overwrite
=
False
)
# update no condition
...
...
@@ -153,7 +156,7 @@ async def test_overwrite_with_condition(storage_client, test_tenant):
assert
await
storage_client
.
download
(
test_tenant
,
blob_name
)
==
b
'1113'
# should fail update if_match not satisfied
with
pytest
.
raises
(
Exception
):
# StorageErrorException is internal
with
pytest
.
raises
(
PreconditionFailedException
):
await
storage_client
.
upload
(
test_tenant
,
blob_name
,
b
'1114'
,
if_match
=
etag_1112
)
# success update if_not_match
...
...
@@ -161,5 +164,5 @@ async def test_overwrite_with_condition(storage_client, test_tenant):
# should fail update if_not_match not satisfied
etag_1115
=
(
await
storage_client
.
download_metadata
(
test_tenant
,
blob_name
)).
etag
with
pytest
.
raises
(
Exception
):
# StorageErrorException is internal
await
storage_client
.
upload
(
test_tenant
,
blob_name
,
b
'1116'
,
if_not_match
=
etag_1115
)
with
pytest
.
raises
(
PreconditionFailedException
):
await
storage_client
.
upload
(
test_tenant
,
blob_name
,
b
'1116'
,
if_not_match
=
etag_1115
)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment