Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Domain Data Management Services
Wellbore
Lib
Wellbore-cloud
Wellbore-azure-lib
Commits
204b4752
Commit
204b4752
authored
Mar 09, 2021
by
Luc Yriarte
Browse files
Add back disabled tests from slb ADO
parent
87fa4980
Pipeline
#30964
failed with stage
in 7 minutes and 23 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
tests/storage/__init__.py
0 → 100644
View file @
204b4752
tests/storage/test_blob_storage_az.py
0 → 100644
View file @
204b4752
from
osdu.core.api.storage.tenant
import
Tenant
from
tests.conftest
import
Config
from
osdu_az.storage.blob_storage_az
import
AzureAioBlobStorage
import
pytest
from
io
import
BytesIO
import
uuid
from
mock
import
patch
from
azure.core
import
exceptions
# Patch '_get_credentials', '_build_url' and '_get_storage_account_name' for tests
@
pytest
.
fixture
async
def
az_client
()
->
AzureAioBlobStorage
:
with
patch
.
object
(
AzureAioBlobStorage
,
'_get_credentials'
,
return_value
=
Config
.
credentials
):
with
patch
.
object
(
AzureAioBlobStorage
,
'_get_storage_account_name'
,
return_value
=
Config
.
storage_account_name
):
with
patch
.
object
(
AzureAioBlobStorage
,
'_build_url'
,
return_value
=
Config
.
storage_account
):
yield
AzureAioBlobStorage
()
@
pytest
.
fixture
async
def
test_tenant
():
return
Tenant
(
project_id
=
Config
.
storage_account
,
bucket_name
=
Config
.
container
,
data_partition_id
=
'local'
)
@
pytest
.
mark
.
asyncio
@
pytest
.
mark
.
parametrize
(
'input_data, expected'
,
[
(
b
'expected content 123456789'
.
decode
(
'utf-8'
),
b
'expected content 123456789'
),
(
BytesIO
(
b
'expected content 123456789'
),
b
'expected content 123456789'
)
])
async
def
test_downloading_successfully_uploaded_blob
(
az_client
:
AzureAioBlobStorage
,
test_tenant
,
input_data
,
expected
):
blob_name
=
'testing_data/'
+
str
(
uuid
.
uuid4
())
await
az_client
.
upload
(
test_tenant
,
blob_name
,
input_data
)
assert
await
az_client
.
download
(
test_tenant
,
blob_name
)
==
expected
@
pytest
.
mark
.
asyncio
async
def
test_download_not_existing_blob_should_throw
(
az_client
:
AzureAioBlobStorage
,
test_tenant
):
# here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions
# will be defined in the core lib)
with
pytest
.
raises
(
exceptions
.
ResourceNotFoundError
)
as
ex_info
:
# given a not existing blob
blob_name
=
'testing_data/'
+
str
(
uuid
.
uuid4
())
# when try to download it should fail
await
az_client
.
download
(
test_tenant
,
blob_name
)
print
(
ex_info
.
value
)
@
pytest
.
mark
.
asyncio
async
def
test_invalid_storage_container
(
az_client
:
AzureAioBlobStorage
):
with
pytest
.
raises
(
exceptions
.
ResourceNotFoundError
)
as
ex_info
:
tenant
=
Tenant
(
project_id
=
Config
.
storage_account
,
bucket_name
=
'not_existing_container'
,
data_partition_id
=
'local'
)
await
az_client
.
upload
(
tenant
,
'blob_name'
,
'input_data'
)
print
(
ex_info
.
value
)
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------- NOT IMPLEMENTED METHODS ----------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
async
def
assert_not_implemented
(
coro
):
with
pytest
.
raises
(
NotImplementedError
)
as
ex_info
:
await
coro
ex
=
ex_info
.
value
assert
'azure'
in
str
(
ex
).
lower
()
@
pytest
.
mark
.
asyncio
async
def
test_list_objects_should_throw_not_implemented
(
az_client
:
AzureAioBlobStorage
,
test_tenant
):
await
assert_not_implemented
(
az_client
.
list_objects
(
test_tenant
)
)
@
pytest
.
mark
.
asyncio
async
def
test_delete_should_throw_not_implemented
(
az_client
:
AzureAioBlobStorage
,
test_tenant
):
await
assert_not_implemented
(
az_client
.
delete
(
test_tenant
,
'id'
)
)
@
pytest
.
mark
.
asyncio
async
def
test_download_metadata_should_throw_not_implemented
(
az_client
:
AzureAioBlobStorage
,
test_tenant
):
await
assert_not_implemented
(
az_client
.
download_metadata
(
test_tenant
,
'id'
)
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment