test_blob_storage_az.py 6.48 KB
Newer Older
1
from azure.core.exceptions import AzureError
2
3
4
5
6
7
8
9
from osdu.core.api.storage.tenant import Tenant
from tests.conftest import Config
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
import pytest
from io import BytesIO
import uuid
from mock import patch
from azure.core import exceptions
10
import asyncio
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34


# Patch '_get_credentials', '_build_url' and '_get_storage_account_name' for tests
@pytest.fixture
async def az_client() -> AzureAioBlobStorage:
    with patch.object(AzureAioBlobStorage, '_get_credentials', return_value=Config.credentials):
        with patch.object(AzureAioBlobStorage, '_get_storage_account_name', return_value=Config.storage_account_name):
            with patch.object(AzureAioBlobStorage, '_build_url', return_value=Config.storage_account):
                yield AzureAioBlobStorage()


@pytest.fixture
async def test_tenant():
    return Tenant(project_id=Config.storage_account, bucket_name=Config.container, data_partition_id='local')


@pytest.mark.asyncio
@pytest.mark.parametrize('input_data, expected', [
    (b'expected content 123456789'.decode('utf-8'), b'expected content 123456789'),
    (BytesIO(b'expected content 123456789'), b'expected content 123456789')
])
async def test_downloading_successfully_uploaded_blob(az_client: AzureAioBlobStorage, test_tenant, input_data, expected):
    blob_name = 'testing_data/' + str(uuid.uuid4())
    await az_client.upload(test_tenant, blob_name, input_data)
Yannick's avatar
Yannick committed
35

36
37
38
    assert await az_client.download(test_tenant, blob_name) == expected


Yannick's avatar
Yannick committed
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@pytest.mark.asyncio
async def test_download_metadata(az_client: AzureAioBlobStorage, test_tenant):
    blob_name = 'testing_data/' + str(uuid.uuid4())
    input_data = b'expected content 123456789'
    await az_client.upload(test_tenant,
                           blob_name,
                           input_data,
                           metadata={'customMetaKey': 'customMetaValue'},
                           content_type='application/x-parquet')

    blob_prop = await az_client.download_metadata(test_tenant, blob_name)
    assert blob_prop.identifier == blob_name
    assert blob_prop.name == blob_name
    assert blob_prop.content_type == 'application/x-parquet'
    assert blob_prop.metadata['customMetaKey'] == 'customMetaValue'


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@pytest.mark.asyncio
async def test_update_with_condition(az_client: AzureAioBlobStorage, test_tenant):
    blob_name = 'testing_data/' + str(uuid.uuid4())
    await az_client.upload(test_tenant, blob_name, b'1111')

    # update no condition
    await az_client.update(test_tenant, blob_name, b'1112')
    assert await az_client.download(test_tenant, blob_name) == b'1112'

    # successful update if_match
    etag_1112 = (await az_client.download_metadata(test_tenant, blob_name)).etag
    await az_client.update(test_tenant, blob_name, b'1113', if_match=etag_1112)
    assert await az_client.download(test_tenant, blob_name) == b'1113'

    # should fail update if_match not satisfied
    with pytest.raises(AzureError):  # StorageErrorException is internal
        await az_client.update(test_tenant, blob_name, b'1114', if_match=etag_1112)

    # success update if_not_match
    await az_client.update(test_tenant, blob_name, b'1115', if_not_match=etag_1112)

    # should fail update if_not_match not satisfied
    etag_1115 = (await az_client.download_metadata(test_tenant, blob_name)).etag
    with pytest.raises(AzureError):  # StorageErrorException is internal
        await az_client.update(test_tenant, blob_name, b'1116', if_not_match=etag_1115)


@pytest.mark.asyncio
async def test_concurrent_update_only_one_should_succeed(az_client: AzureAioBlobStorage, test_tenant):
    # not really sure this can really prove it
    blob_name = 'testing_data/' + str(uuid.uuid4())
    await az_client.upload(test_tenant, blob_name, b'1111')

    nb_repetition = 10
    concurrency = 10

    for _ in range(nb_repetition):
        etag = (await az_client.download_metadata(test_tenant, blob_name)).etag
        calls = [az_client.update(test_tenant, blob_name, str(c), if_match=etag) for c in range(concurrency)]

        # perform several concurrent call
        result = await asyncio.gather(*calls, return_exceptions=True)

        # ensure only one succeed and updated it
        success = [i for (i, r) in enumerate(result) if not isinstance(r, Exception)]
        assert len(success) == 1  # only should succeed

        # check the content, should match the one which succeed
        content = await az_client.download(test_tenant, blob_name)
        assert content.decode('utf-8') == str(success[0])


108
109
110
111
112
113
114
115
116
117
118
@pytest.mark.asyncio
async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobStorage, test_tenant):
    # here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions
    # will be defined in the core lib)
    with pytest.raises(exceptions.ResourceNotFoundError) as ex_info:
        # given a not existing blob
        blob_name = 'testing_data/' + str(uuid.uuid4())

        # when try to download it should fail
        await az_client.download(test_tenant, blob_name)

119
    # print(ex_info.value)
120
121
122
123
124
125
126
127


@pytest.mark.asyncio
async def test_invalid_storage_container(az_client: AzureAioBlobStorage):
    with pytest.raises(exceptions.ResourceNotFoundError) as ex_info:
        tenant = Tenant(project_id=Config.storage_account, bucket_name='not_existing_container', data_partition_id='local')
        await az_client.upload(tenant, 'blob_name', 'input_data')

128
    # print(ex_info.value)
129
130


131
132
133
134
135
136
@pytest.mark.asyncio
async def test_list_objects(az_client: AzureAioBlobStorage, test_tenant):
    # given
    nb_blob = 3
    prefix = 'testing_data/list' + str(uuid.uuid4())
    all_names = set()
137

138
139
140
141
    for _ in range(nb_blob):
        blob_name = prefix + str(uuid.uuid4())
        all_names.add(blob_name)
        await az_client.upload(test_tenant, blob_name, b'na')
142

143
144
    blob_names = await az_client.list_objects(test_tenant, prefix=prefix)
    assert all_names.issubset(set(blob_names))
145

146
147
    blob_names = await az_client.list_objects(test_tenant, prefix=prefix, max_result=2)
    assert len(set(blob_names)) == 2
148
149
150


@pytest.mark.asyncio
151
async def test_delete(az_client: AzureAioBlobStorage, test_tenant):
Yannick's avatar
Yannick committed
152
153
154
155
156
157
158
159
    blob_name = 'testing_data/' + str(uuid.uuid4())
    input_data = b'expected content 123456789'
    await az_client.upload(test_tenant, blob_name, input_data)
    await az_client.download(test_tenant, blob_name)

    await az_client.delete(test_tenant, blob_name)
    with pytest.raises(exceptions.ResourceNotFoundError):
        await az_client.download(test_tenant, blob_name)
160