chunking_test.py 36.1 KB
Newer Older
Jeremie Hallal's avatar
Jeremie Hallal committed
1
import asyncio
Luc Yriarte's avatar
Luc Yriarte committed
2
3
4
5
6
7
import io
from tempfile import TemporaryDirectory

from fastapi import Header
from fastapi.testclient import TestClient
import pytest
8
9
10
11
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
Luc Yriarte's avatar
Luc Yriarte committed
12
13
14
15

from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from osdu.core.api.storage.blob_storage_base import BlobStorageBase

Yannick's avatar
Yannick committed
16
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage, make_local_dask_bulk_storage
17

Luc Yriarte's avatar
Luc Yriarte committed
18
19
20
21
22
23
from app.clients import StorageRecordServiceClient
from app.persistence.sessions_storage import SessionsStorage, SessionState
from app.clients.storage_service_blob_storage import StorageRecordServiceBlobStorage
from app.auth.auth import require_opendes_authorized_user
from app.middleware import require_data_partition_id
from app.helper import traces
Jeremie Hallal's avatar
Jeremie Hallal committed
24
from app.utils import Context, DaskClient
Luc Yriarte's avatar
Luc Yriarte committed
25
26
27
from app import conf

from tests.unit.persistence.dask_blob_storage_test import generate_df
28
from tests.unit.test_utils import nope_logger_fixture
Luc Yriarte's avatar
Luc Yriarte committed
29
30


31
32
Definitions = {
    'WellLog': {
33
        'api_version': 'v3',
34
        'base_url': '/ddms/v3/welllogs',
35
        'chunking_url': '/ddms/v3/welllogs',
fabian serin's avatar
fabian serin committed
36
        'kind': 'osdu:wks:work-product-component--WellLog:1.1.0',
37
38
        'record_data': {
            "WellboreID": "namespace:master-data--Wellbore:SomeUniqueWellboreID:",
39
40
            "Curves": [{"CurveID": "MD"}, {"CurveID": "X"}],
            "ExtensionProperties": {"my_test_extension": 42},
41
42
43
44
        }
    },

    'WellboreTrajectory': {
45
        'api_version': 'v3',
46
        'base_url': '/ddms/v3/wellboretrajectories',
47
        'chunking_url': '/ddms/v3/wellboretrajectories',  # TODO: update when no longer alpha
48
49
50
51
52
53
54
        'kind': 'osdu:wks:work-product-component--WellboreTrajectory:1.0.0',
        'record_data': {
            "WellboreID": "namespace:master-data--Wellbore:SomeUniqueWellboreID:",
            "TopDepthMeasuredDepth": 12345.6,
            "BaseDepthMeasuredDepth": 12345.6,
            "VerticalMeasurement": {"VerticalMeasurement": 12345.6}
        }
55
56
    },
    'Log': {
57
        'api_version': 'v2',
58
        'base_url': '/alpha/ddms/v2/logs',
59
        'chunking_url': '/ddms/v2/logs',
60
61
62
63
        'kind': 'osdu:wks:log:1.0.5',
        'record_data': {
            "name": "myLog_name"
        }
64
65
66
    }
}

67
EntityTypeParams = ['WellLog', 'WellboreTrajectory', 'Log']
68

69

Luc Yriarte's avatar
Luc Yriarte committed
70
71
72
73
74
75
76
77
78
79
def _create_df_from_response(response):
    f = io.BytesIO(response.content)
    f.seek(0)

    content_type = response.headers.get('content-type')
    if content_type == 'application/x-parquet':
        return pd.read_parquet(f)
    elif content_type == 'text/csv; charset=utf-8':
        return pd.read_csv(f, index_col=0)
    elif content_type == 'application/json':
Jeremie Hallal's avatar
Jeremie Hallal committed
80
        return pd.read_json(f, dtype=True, orient='split', convert_axes=False)
Luc Yriarte's avatar
Luc Yriarte committed
81
82
83
84
85
86
87
88
    else:
        raise ValueError(f"Unknown content-type: '{content_type}'")


def _df_to_format(df, data_format):
    if data_format == 'parquet':
        return df.to_parquet(engine="pyarrow")
    elif data_format == 'json':
89
        return df.to_json(orient='split', date_format='iso')
Luc Yriarte's avatar
Luc Yriarte committed
90
91
92
93
    else:
        raise ValueError(f"Unknown content-type: '{data_format}'")


94
95
96
97
98
def _create_record(client, entity_type):
    entity_def = Definitions[entity_type]
    create_url = entity_def['base_url']
    kind = entity_def['kind']
    record_data = entity_def['record_data']
Luc Yriarte's avatar
Luc Yriarte committed
99
100

    record = {
101
        "kind": kind,
Luc Yriarte's avatar
Luc Yriarte committed
102
103
104
105
106
107
108
109
        "acl": {
            "viewers": ["data.default.viewers@opendes.enterprisedata.cloud.slb-ds.com"],
            "owners": ["data.default.owners@opendes.enterprisedata.cloud.slb-ds.com"]
        },
        "legal": {
            "legaltags": ["opendes-storage-1602183747123"],
            "otherRelevantDataCountries": ["US"],
        },
110
111
        'version': 0,
        "data": record_data
Luc Yriarte's avatar
Luc Yriarte committed
112
    }
113
    response = client.post(create_url, json=[record])
Luc Yriarte's avatar
Luc Yriarte committed
114
115
116
117
118
    assert response.status_code == 200
    record_id = response.json()["recordIds"][0]
    return record_id


119
120
121
122
123
def _cast_datetime_to_datetime64_ns(result_df):
    """  if datetime is detected, cast data column as datetime to ensure date values are valid  """
    for name, col in result_df.items():
        if name.startswith('date'):
            result_df[name] = result_df[name].astype('datetime64[ns]')
124

125
126
127
    return result_df


Luc Yriarte's avatar
Luc Yriarte committed
128
@pytest.fixture
129
def init_fixtures(nope_logger_fixture, monkeypatch):
Luc Yriarte's avatar
Luc Yriarte committed
130
131
132
133
134
135
    with TemporaryDirectory() as tmp_dir:
        monkeypatch.setenv(name='USE_LOCALFS_BLOB_STORAGE_WITH_PATH', value=tmp_dir)
        conf.Config = conf.ConfigurationContainer.with_load_all()
        yield


Jeremie Hallal's avatar
Jeremie Hallal committed
136
137
138
139
140
141
142
143
144
@pytest.fixture(scope="module")
def event_loop():  # all tests will share the same loop
    loop = asyncio.get_event_loop()
    yield loop
    # teardown
    loop.run_until_complete(DaskClient.close())
    loop.close()
    

Luc Yriarte's avatar
Luc Yriarte committed
145
@pytest.fixture
146
def dasked_test_app(init_fixtures):
147
    from app.wdms_app import wdms_app, enable_alpha_feature
Luc Yriarte's avatar
Luc Yriarte committed
148
149
    from app.wdms_app import app_injector

150
151
    enable_alpha_feature()

Luc Yriarte's avatar
Luc Yriarte committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
    with TemporaryDirectory() as tmp_dir:
        local_blob_storage = LocalFSBlobStorage(directory=tmp_dir)

        async def storage_service_builder(*args, **kwargs):
            return StorageRecordServiceBlobStorage(local_blob_storage, 'myProject', 'myContainer')

        async def set_default_partition(data_partition_id: str = Header('opendes')):
            Context.set_current_with_value(partition_id=data_partition_id)

        async def blob_storage_builder(*args, **kwargs):
            return local_blob_storage

        async def sessions_storage_builder(*args, **kwargs):
            return SessionsStorage(local_blob_storage)

Yannick's avatar
Yannick committed
167
168
        async def dask_blob_storage_builder() -> DaskBulkStorage:
            return await make_local_dask_bulk_storage(base_directory=tmp_dir)
169

Yannick's avatar
Yannick committed
170
        app_injector.register(DaskBulkStorage, dask_blob_storage_builder)
Luc Yriarte's avatar
Luc Yriarte committed
171
172
173
174
175
176
177
178
179
180
181
182
183
        app_injector.register(BlobStorageBase, blob_storage_builder)
        app_injector.register(SessionsStorage, sessions_storage_builder)
        app_injector.register(StorageRecordServiceClient, storage_service_builder)

        async def do_nothing():
            # empty method
            pass

        wdms_app.dependency_overrides[require_opendes_authorized_user] = do_nothing
        wdms_app.dependency_overrides[require_data_partition_id] = set_default_partition
        # Initialize traces exporter in app, like it is in app's startup decorator
        wdms_app.trace_exporter = traces.CombinedExporter(service_name='tested-ddms')

184
        yield wdms_app
Luc Yriarte's avatar
Luc Yriarte committed
185
186
187
188

        wdms_app.dependency_overrides = {}  # clean up


189
190
191
192
193
@pytest.fixture
def setup_client(dasked_test_app):
    yield TestClient(dasked_test_app)


194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def test_post_data_merge_extension_properties(setup_client):
    client = setup_client
    record_id = _create_record(client, "WellLog")
    chunking_url = Definitions["WellLog"]['chunking_url']

    df = generate_df(['MD'], range(10))
    data_to_send = df.to_json(orient='split', date_format='iso')
    headers = {'content-type': 'application/json'}

    write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
    assert write_response.status_code == 200

    get_response = client.get(f'{chunking_url}/{record_id}')
    assert get_response.status_code == 200

    expected = Definitions["WellLog"]["record_data"]["ExtensionProperties"].copy()

    expected["wdms"] = get_response.json()["data"]["ExtensionProperties"]["wdms"]

    assert get_response.json()["data"]["ExtensionProperties"] == expected



217
@pytest.mark.parametrize("entity_type", EntityTypeParams)
218
@pytest.mark.parametrize("content_type_header,create_func", [
Yannick's avatar
Yannick committed
219
220
    ('application/x-parquet', lambda df: df.to_parquet(engine="pyarrow")),
    ('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
Luc Yriarte's avatar
Luc Yriarte committed
221
222
223
224
225
226
227
228
229
230
])
@pytest.mark.parametrize("accept_content", [
    'application/x-parquet',
    'application/json',
])
@pytest.mark.parametrize("columns", [
    ['MD', 'X'],
    ['float_MD', 'float_X'],
    ['str_MD', 'str_X'],
    ['date_MD', 'date_X'],
Jeremie Hallal's avatar
Jeremie Hallal committed
231
    ['MD', 'date_X', 'float_X', 'str_X']
Luc Yriarte's avatar
Luc Yriarte committed
232
])
233
234
235
236
237
238
def test_send_all_data_once(setup_client,
                            entity_type,
                            columns,
                            content_type_header,
                            create_func,
                            accept_content):
239
    client = setup_client
240
241
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
242
243
244

    initial_data_df = generate_df(columns, range(5, 13))
    data_to_send = create_func(initial_data_df)
245
    headers = {'content-type': content_type_header}
246
247
248

    get_response_no_data = client.get(f'{chunking_url}/{record_id}/data', headers=headers)
    assert get_response_no_data.status_code == 404
Luc Yriarte's avatar
Luc Yriarte committed
249

250
    write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
Luc Yriarte's avatar
Luc Yriarte committed
251
252
    assert write_response.status_code == 200

253
    get_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
Luc Yriarte's avatar
Luc Yriarte committed
254
255
256
    assert get_response.status_code == 200
    result_df = _create_df_from_response(get_response)

257
258
    if content_type_header.endswith('parquet') and accept_content.endswith('json'):
        result_df = _cast_datetime_to_datetime64_ns(result_df)
Luc Yriarte's avatar
Luc Yriarte committed
259

260
    if content_type_header.endswith('json'):
Luc Yriarte's avatar
Luc Yriarte committed
261
        initial_data_df = pd.read_json(data_to_send, orient='split')
262
263
264
265
266
267
268
269
270
271

    assert initial_data_df.index.dtype == result_df.index.dtype
    assert initial_data_df.shape == result_df.shape
    pd.testing.assert_frame_equal(initial_data_df, result_df,
                                  check_dtype=False,
                                  check_column_type=False,
                                  check_datetimelike_compat=True,
                                  )


272
273
@pytest.mark.parametrize("entity_type",
                         [entity for entity in EntityTypeParams if Definitions[entity]['api_version'] == "v2"])
274
275
276
277
278
279
280
281
282
283
284
@pytest.mark.parametrize("content_type_header,create_func", [
    ('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
@pytest.mark.parametrize("accept_content", [
    'application/json',
])
@pytest.mark.parametrize("columns", [
    ['MD', 'X'],
    ['float_MD', 'float_X'],
    ['str_MD', 'str_X'],
    ['date_MD', 'date_X'],
Jeremie Hallal's avatar
Jeremie Hallal committed
285
    ['MD', 'date_X', 'float_X', 'str_X']
286
287
])
def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
288
289
290
291
292
                                                     entity_type,
                                                     columns,
                                                     content_type_header,
                                                     create_func,
                                                     accept_content):
293
    client = setup_client
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
    base_url = Definitions[entity_type]['base_url']

    initial_data_df = generate_df(columns, range(5, 13))
    data_to_send = create_func(initial_data_df)
    headers = {'content-type': content_type_header}

    get_response_no_data = client.get(f'{chunking_url}/{record_id}/data', headers=headers)
    assert get_response_no_data.status_code == 404

    write_response = client.post(f'{base_url}/{record_id}/data', data=data_to_send, headers=headers)
    assert write_response.status_code == 200

    get_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
    assert get_response.status_code == 200
    result_df = _create_df_from_response(get_response)

    if content_type_header.endswith('json'):
        initial_data_df = pd.read_json(data_to_send, orient='split')
Luc Yriarte's avatar
Luc Yriarte committed
314
315
316
317
318
319
320
321
322
323

    assert initial_data_df.index.dtype == result_df.index.dtype
    assert initial_data_df.shape == result_df.shape
    pd.testing.assert_frame_equal(initial_data_df, result_df,
                                  check_dtype=False,
                                  check_column_type=False,
                                  check_datetimelike_compat=True,
                                  )


324
@pytest.mark.parametrize("entity_type", EntityTypeParams)
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@pytest.mark.parametrize("content_type_header, create_func", [
    ('application/x-parquet', lambda df: df.to_parquet(engine="pyarrow")),
    ('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
@pytest.mark.parametrize("accept_content", [
    'application/x-parquet',
    # 'text/csv; charset=utf-8',
    'application/json',
])
@pytest.mark.parametrize("columns", [
    ['float_MD', 'float_X'],
    ['str_MD', 'str_X'],
    ['date_MD', 'date_X'],
    ['TVD', 'float_X', 'str_X', 'date_X'],
    ['MD', 'X'],
    ['MD', 'float_X'],
341
    ['MD', 'str_MD'],
Luc Yriarte's avatar
Luc Yriarte committed
342

343
344
    # BELOW test cases FAIL with UPDATE mode:
    # => If adding new column Date/String not starting at first index AND override an existing column
345
346
    # ['MD', 'date_X'],
    # ['MD', 'float_X', 'str_X', 'date_X'],
347
348
349
350
351
])
@pytest.mark.parametrize("session_mode", [
    'overwrite',
    'update',
])
352
def test_overwrite_data_by_chunk_append(setup_client, entity_type, columns, content_type_header, create_func,
353
                                        accept_content, session_mode):
Luc Yriarte's avatar
Luc Yriarte committed
354
355
    """ Create session, append chunking with consecutive index, validate session """

356
    client = setup_client
357
358
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
359

360
    initial_df = generate_df(['MD', 'X'], range(5))
361
    write_response = client.post(f'{chunking_url}/{record_id}/data',
362
                                 data=initial_df.to_json(orient='split', date_format='iso'),
Yannick's avatar
Yannick committed
363
                                 headers={'Content-Type': 'application/json'})
364

Luc Yriarte's avatar
Luc Yriarte committed
365
    assert write_response.status_code == 200
366
    get_response = client.get(f'{chunking_url}/{record_id}/data')
Luc Yriarte's avatar
Luc Yriarte committed
367
368
    assert get_response.status_code == 200
    initial_bulk_data = _create_df_from_response(get_response)
369
    assert initial_bulk_data.shape == initial_df.shape, "existing bulk data should not be empty"
Luc Yriarte's avatar
Luc Yriarte committed
370

371
    data_format = 'json' if content_type_header.endswith('json') else 'parquet'
372
    chunk_dfs = _create_chunks(client, entity_type, record_id=record_id, session_mode=session_mode,
373
374
375
                               data_format=data_format,
                               cols_ranges=[(columns, range(5, 10)),
                                            (columns, range(10, 15))])
Luc Yriarte's avatar
Luc Yriarte committed
376

377
    get_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
Luc Yriarte's avatar
Luc Yriarte committed
378
379
380
    assert get_response.status_code == 200
    df = _create_df_from_response(get_response)

381
382
383
384
385
386
387
388
389
390
391
392
393
394
    if session_mode == 'update':
        chunk_dfs.insert(0, initial_df)

    expected = pd.concat(chunk_dfs, axis=0)
    df = _cast_datetime_to_datetime64_ns(df)

    sorted_columns = sorted(columns)
    df = df[sorted_columns]
    expected = expected[sorted_columns]
    pd.testing.assert_frame_equal(df, expected,
                                  check_dtype=False,
                                  check_column_type=False,
                                  check_datetimelike_compat=True,
                                  )
Luc Yriarte's avatar
Luc Yriarte committed
395
396


397
def _create_chunks(client, entity_type, cols_ranges, record_id, session_mode='update', data_format='json'):
Luc Yriarte's avatar
Luc Yriarte committed
398
399
    """ Create session, add chunks with given columns and index, validate the session """

400
401
    chunking_url = Definitions[entity_type]["chunking_url"]
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': session_mode})
402
403
404
405
    assert session_response.status_code == 200
    session_data = session_response.json()
    assert 'id' in session_data
    session_id = session_data['id']
Luc Yriarte's avatar
Luc Yriarte committed
406
407
408
409
410
411
412
    created_dfs = []

    for columns, ranges in cols_ranges:
        chunk_df = generate_df(columns, ranges)
        created_dfs.append(chunk_df)

        if data_format == 'json':
413
            headers = {'Content-Type': 'application/json'}
Luc Yriarte's avatar
Luc Yriarte committed
414
        elif data_format == 'parquet':
415
            headers = {'content-type': 'application/x-parquet'}
Luc Yriarte's avatar
Luc Yriarte committed
416
417
418
        else:
            raise ValueError(f"Unknown content-type: '{data_format}'")

419
        chunk_response = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
420
421
                                     data=_df_to_format(chunk_df, data_format),
                                     headers=headers)
Luc Yriarte's avatar
Luc Yriarte committed
422
423
        assert chunk_response.status_code == 200  # todo: should it be 204?

424
    commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
Luc Yriarte's avatar
Luc Yriarte committed
425
    assert commit_response.status_code == 200
426
    assert commit_response.json()['state'] == SessionState.Committed
Luc Yriarte's avatar
Luc Yriarte committed
427
428
429
    return created_dfs


430
@pytest.mark.parametrize("entity_type", EntityTypeParams)
Luc Yriarte's avatar
Luc Yriarte committed
431
432
433
434
435
436
@pytest.mark.parametrize("data_format", ['parquet', 'json'])
@pytest.mark.parametrize("accept_content", [
    'application/x-parquet',
    'text/csv; charset=utf-8',
    'application/json',
])
437
def test_add_curve_by_chunk_different_cols(setup_client, entity_type, data_format, accept_content):
Luc Yriarte's avatar
Luc Yriarte committed
438
439
    """ Create session, append chunking with consecutive index, validate session """

440
    client = setup_client
441
442
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
443

444
445
446
447
448
449
    _create_chunks(client, entity_type,
                   record_id=record_id,
                   data_format=data_format,
                   cols_ranges=[(['MD', 'X'], range(5, 20)),
                                (['Y'], range(5, 20)),
                                (['Z'], range(5, 20))])
Luc Yriarte's avatar
Luc Yriarte committed
450

451
    data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
Luc Yriarte's avatar
Luc Yriarte committed
452
453
454
455
456
457
458
    assert data_response.status_code == 200
    with_new_col = _create_df_from_response(data_response)
    # with_new_col = pd.DataFrame.from_dict(data_response.json())
    assert list(with_new_col.columns) == ['MD', 'X', 'Y', 'Z']
    assert with_new_col.shape == (15, 4)


459
@pytest.mark.parametrize("entity_type", EntityTypeParams)
Luc Yriarte's avatar
Luc Yriarte committed
460
461
462
463
464
465
@pytest.mark.parametrize("data_format", ['parquet', 'json'])
@pytest.mark.parametrize("accept_content", [
    'application/x-parquet',
    'text/csv; charset=utf-8',
    'application/json',
])
466
def test_add_curve_by_chunk_same_cols(setup_client, entity_type, data_format, accept_content):
Luc Yriarte's avatar
Luc Yriarte committed
467
468
    """ Create session, append chunking with consecutive index, validate session """

469
    client = setup_client
470
471
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
472

473
474
475
476
    _create_chunks(client, entity_type, record_id=record_id, data_format=data_format,
                   cols_ranges=[(['X'], range(10)),
                                (['X'], range(10, 20)),
                                (['X'], range(20, 30))])
Luc Yriarte's avatar
Luc Yriarte committed
477

478
    data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
Luc Yriarte's avatar
Luc Yriarte committed
479
480
481
482
483
484
485
486
487
488
489
    assert data_response.status_code == 200
    with_new_col = _create_df_from_response(data_response)
    if accept_content == 'application/json':
        with_new_col.sort_index(inplace=True)  # TODO need to be investigate. Why the index is mixed up

    assert list(with_new_col.columns) == ['X']
    assert with_new_col.shape == (30, 1)
    assert with_new_col.index[0] == 0
    assert with_new_col.index[-1] == 29


490
491
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_add_curve_by_chunk_same_cols_overlapped_index(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
492
493
    """ Create session, append chunking with consecutive index, validate session """

494
    client = setup_client
495
496
497
498
499
500
501
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']

    chunk_dfs = _create_chunks(client, entity_type, record_id=record_id,
                               cols_ranges=[(['MD', 'X'], range(20)),
                                            (['MD', 'X'], range(10, 30)),
                                            (['MD', 'X'], range(25, 40))])
Luc Yriarte's avatar
Luc Yriarte committed
502

503
    get_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': 'application/x-parquet'})
Luc Yriarte's avatar
Luc Yriarte committed
504
505
506
507
508
509
510
511
512
    assert get_response.status_code == 200
    result_df = _create_df_from_response(get_response)

    chunk_1, chunk_2, chunk_3 = chunk_dfs
    assert result_df.loc[0:9].compare(chunk_1.loc[0:9]).empty
    assert result_df.loc[10:24].compare(chunk_2.loc[10:24]).empty
    assert result_df.loc[25:40].compare(chunk_3.loc[25:40]).empty


513
514
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_add_curve_by_chunk_overlap_different_cols(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
515
516
    """ Create session, append chunking with consecutive index, validate session """

517
    client = setup_client
518
519
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
520

521
    _create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['MD', 'A'], range(5, 10)),
522
523
524
525
526
                                                                          (['B'], range(8)),  # overlap left side
                                                                          (['C'], range(8, 15)),  # overlap left side
                                                                          (['D'], range(6, 8)),  # within
                                                                          (['E'], range(15)),  # overlap both side
                                                                          ])
Luc Yriarte's avatar
Luc Yriarte committed
527

528
    data_response = client.get(f'{chunking_url}/{record_id}/data?orient=columns', headers={'Accept': 'application/json'})
Luc Yriarte's avatar
Luc Yriarte committed
529
530
531
532
533
534
    assert data_response.status_code == 200
    with_new_col = pd.DataFrame.from_dict(data_response.json())
    assert list(with_new_col.columns) == ['A', 'B', 'C', 'D', 'E', 'MD']
    assert with_new_col.shape == (15, 6)


535
536
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_abandon_session_with_data_push_data_again(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
537
    """ Create session, append chunking with consecutive index, abort sessions """
538
    client = setup_client
539
540
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
541

542
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
543
544
545
546
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

    chunk_1 = generate_df(['MD', 'X'], range(5, 10))
547
    client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
Yannick's avatar
Yannick committed
548
549
                data=chunk_1.to_json(orient='split'),
                headers={'Content-Type': 'application/json'})
Luc Yriarte's avatar
Luc Yriarte committed
550

551
    abort_session_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}',
Luc Yriarte's avatar
Luc Yriarte committed
552
553
554
555
556
                                          json={'state': 'abandon'})
    assert abort_session_response.status_code == 200
    assert abort_session_response.json()['state'] == SessionState.Abandoned

    chunk_2 = generate_df(['MD', 'X'], range(11, 20))
557
    chunk2_response = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
Yannick's avatar
Yannick committed
558
559
                                  data=chunk_2.to_json(orient='split'),
                                  headers={'Content-Type': 'application/json'})
Luc Yriarte's avatar
Luc Yriarte committed
560
561
562
    assert chunk2_response.status_code == 400


563
564
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_abandon_no_data_session(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
565
    """ Create session, append chunking with overlapped index, validate session """
566
    client = setup_client
567
568
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
569

570
    commit_unknown_session_response = client.patch(f'{chunking_url}/{record_id}/sessions/123456',
Luc Yriarte's avatar
Luc Yriarte committed
571
572
573
                                                   json={'state': 'commit'})
    assert commit_unknown_session_response.status_code == 404

574
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
575
576
577
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

578
    commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'abandon'})
Luc Yriarte's avatar
Luc Yriarte committed
579
580
581
    assert commit_response.status_code == 200


582
583
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_session_commit_no_data(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
584
    """ Create session, append chunking with overlapped index, validate session """
585
    client = setup_client
586
587
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
588

589
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
590
591
592
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

593
    commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
Luc Yriarte's avatar
Luc Yriarte committed
594
595
596
    assert commit_response.status_code == 422  # todo: expected behavior ?


597
598
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_session_double_abandon(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
599
    """ Create session, append chunking with overlapped index, validate session """
600
    client = setup_client
601
602
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
603

604
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
605
606
607
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

608
    abort_session_response_try1 = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}',
Luc Yriarte's avatar
Luc Yriarte committed
609
610
611
                                               json={'state': 'abandon'})
    assert abort_session_response_try1.status_code == 200

612
    abort_session_response_try2 = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}',
Luc Yriarte's avatar
Luc Yriarte committed
613
614
615
616
                                               json={'state': 'abandon'})
    assert abort_session_response_try2.status_code == 409


617
618
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_valid_session_double_commit(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
619
    """ Create session, append chunking with overlapped index, validate session """
620
    client = setup_client
621
622
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
623

624
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
625
626
627
628
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

    chunk_1 = generate_df(['MD', 'X'], range(5, 10))
629
    client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
Yannick's avatar
Yannick committed
630
631
                data=chunk_1.to_json(orient='split'),
                headers={'Content-Type': 'application/json'})
Luc Yriarte's avatar
Luc Yriarte committed
632

633
    abort_session_response_try1 = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}',
Luc Yriarte's avatar
Luc Yriarte committed
634
635
636
                                               json={'state': 'commit'})
    assert abort_session_response_try1.status_code == 200

637
    abort_session_response_try2 = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}',
Luc Yriarte's avatar
Luc Yriarte committed
638
639
640
641
                                               json={'state': 'commit'})
    assert abort_session_response_try2.status_code == 409


642
643
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_session_unknown_record(setup_client, entity_type):
Luc Yriarte's avatar
Luc Yriarte committed
644
    """ Create session, append chunking with overlapped index, validate session """
645
    client = setup_client
646
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
647

648
    session_response = client.post(f'{chunking_url}/123456/sessions', json={'mode': 'update'})
Luc Yriarte's avatar
Luc Yriarte committed
649
650
651
    assert session_response.status_code == 404


652
653
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_creates_two_sessions_one_record_with_chunks_different_format(setup_client, entity_type):
654
    client = setup_client
655
656
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
657

658
659
    _create_chunks(client, entity_type, record_id=record_id, data_format='json', cols_ranges=[(['X'], range(5, 20))])
    _create_chunks(client, entity_type, record_id=record_id, data_format='parquet', cols_ranges=[(['Y'], range(5, 20)),
660
                                                                                                 (['Z'], range(5, 20))])
661
    data_response = client.get(f'{chunking_url}/{record_id}/data')
Luc Yriarte's avatar
Luc Yriarte committed
662
663
664
665
666
    assert data_response.status_code == 200
    df = _create_df_from_response(data_response)
    assert df.shape == (15, 3)


667
668
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_creates_two_sessions_two_record_with_chunks(setup_client, entity_type):
669
    client = setup_client
670
671
672
    record_id = _create_record(client, entity_type)
    another_record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
Luc Yriarte's avatar
Luc Yriarte committed
673

674
675
    _create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['X'], range(5, 20))])
    _create_chunks(client, entity_type, record_id=another_record_id, cols_ranges=[(['Y'], range(0, 10)),
676
                                                                                  (['Z'], range(5, 10))])
677
    data_response = client.get(f'{chunking_url}/{record_id}/data')
Luc Yriarte's avatar
Luc Yriarte committed
678
679
680
681
682
    assert data_response.status_code == 200
    df = _create_df_from_response(data_response)
    assert list(df.columns) == ['X']
    assert df.shape == (15, 1)

683
    another_data_response = client.get(f'{chunking_url}/{another_record_id}/data')
Luc Yriarte's avatar
Luc Yriarte committed
684
685
686
687
688
    assert another_data_response.status_code == 200
    other_df = _create_df_from_response(another_data_response)
    assert list(other_df.columns) == ['Y', 'Z']
    assert other_df.shape == (10, 2)

689

690
691
@pytest.mark.parametrize("entity_type", EntityTypeParams)
def test_session_sent_same_col_different_types(setup_client, entity_type):
692
    """ Create session, append chunking with overlapped index, validate session """
693
    client = setup_client
694
695
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
696

697
    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
698
699
700
701
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

    chunk_1 = generate_df(['MD', 'X'], range(10))
702
    chunk_response_1 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
703
704
705
706
707
708
                                   data=chunk_1.to_json(orient='split'),
                                   headers={'Content-Type': 'application/json'})
    assert chunk_response_1.status_code == 200

    chunk_2 = generate_df(['float_MD', 'str_X'], range(10, 20))
    chunk_2.rename(columns={'float_MD': 'MD', 'str_X': 'X'}, inplace=True)
709
    chunk_response_2 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
710
711
712
713
                                   data=chunk_2.to_json(orient='split'),
                                   headers={'Content-Type': 'application/json'})
    assert chunk_response_2.status_code == 200

714
    commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
715
716
    assert commit_response.status_code == 422

717
718
719
720
721
722
723
724
725
726
727

def _df_to_pyarrow_parquet(df_data: pd.DataFrame):
    """ Return a buffer containing parquet format file from the given dataframe """

    table = pa.Table.from_pandas(df=df_data)
    buf = pa.BufferOutputStream()
    pq.write_table(table, buf)
    return buf.getvalue().to_pybytes()


@pytest.mark.parametrize("entity_type", EntityTypeParams)
728
729
730
731
@pytest.mark.parametrize("columns_type", [
    [int(42), float(-42)],
    [int(42), float(-42), str('forty two')]
])
732
733
734
735
@pytest.mark.parametrize("content_type_header,create_func", [
    ('application/x-parquet', lambda df: _df_to_pyarrow_parquet(df)),
    ('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
736
def test_session_chunk_int(setup_client, entity_type, content_type_header, create_func, columns_type):
737
    client = setup_client
738
739
740
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']

741
    json_data = {t: np.random.rand(10) for t in columns_type}
742
743
744
745
    df_data = pd.DataFrame(json_data)
    data_to_send = create_func(df_data)

    headers = {'content-type': content_type_header}
746
    expected_code = 422
747
748

    # there is a side effect with parquet format, if at least one col is str, then all cols are casted into str
749
    if content_type_header.endswith('parquet') and any((type(c) is str for c in columns_type)):
750
751
752
753
        expected_code = 200

    # for legacy Log entity, column type as int are automatically casted to string to ensure backward compatibility
    if content_type_header.endswith('json') and entity_type == 'Log':
754
755
        expected_code = 200

756
    write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
757
    assert write_response.status_code == expected_code
758
759
760
761
762
763
764
765

    session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
    assert session_response.status_code == 200
    session_id = session_response.json()['id']

    chunk_response_1 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
                                   data=data_to_send,
                                   headers=headers)
766
    assert chunk_response_1.status_code == expected_code
767

Jeremie Hallal's avatar
Jeremie Hallal committed
768
769
770
771
772
773
774
775
776
777
778
779

@pytest.mark.parametrize("data_format", ['parquet', 'json'])
@pytest.mark.parametrize("accept_content", ['application/x-parquet', 'application/json'])
@pytest.mark.parametrize("columns_name", [
    list(map(str, range(100))),
    list(map(lambda x: f'test_{x}', range(100))),
    list(map(lambda x: f'{x}_test_{x%10}', range(100)))
])
def test_nat_sort_columns(setup_client, data_format, accept_content, columns_name):
    """ Create session, append chunking with consecutive index, validate session """

    entity_type = 'WellLog'
780
    client = setup_client
Jeremie Hallal's avatar
Jeremie Hallal committed
781
782
783
784
785
786
787
788
789
790
791
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']

    _create_chunks(client, entity_type, record_id=record_id, data_format=data_format,
                cols_ranges=[(columns_name, range(20))])

    data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
    assert data_response.status_code == 200
    response_df = _create_df_from_response(data_response)
    assert list(response_df.columns) == columns_name

792
793
@pytest.mark.parametrize("entity_type", ['WellLog', 'Log'])
def test_session_update_previous_version(setup_client, entity_type):
794
    """ create a session update on a previous version """
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845

    client = setup_client
    record_id = _create_record(client, entity_type)
    chunking_url = Definitions[entity_type]['chunking_url']
    base_url = Definitions[entity_type]['base_url']
    headers = headers={'Content-Type': 'application/x-parquet'}
    nb_rows = 5
    version_data = [
        generate_df(['MD', 'X', 'Y'], range(nb_rows)),
        generate_df(['MD', 'X', 'Z'], range(nb_rows)),
        generate_df(['MD', 'A', 'B'], range(nb_rows))
    ]

    # create the different version of data
    for data in version_data:
        write_response = client.post(f'{chunking_url}/{record_id}/data',
                                     data=data.to_parquet(engine="pyarrow"),
                                     headers=headers)
        assert write_response.status_code == 200


    versions_response = client.get(f'{base_url}/{record_id}/versions')
    assert versions_response.status_code == 200
    versions = versions_response.json()['versions']
    versions_with_data = zip(versions[1:], version_data)
    assert len(versions) == len(version_data) + 1
    
    # update specific versions
    for from_version, data in versions_with_data:
        session_response = client.post(f'{chunking_url}/{record_id}/sessions',
                                       json={"fromVersion": from_version, 'mode': 'update'})
        assert session_response.status_code == 200
        session_id = session_response.json()['id']

        new_curve = generate_df(['New'], range(nb_rows))
        chunk_response = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
                                     data=new_curve.to_parquet(engine="pyarrow"),
                                     headers=headers)
        assert chunk_response.status_code == 200

        commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
        assert commit_response.status_code == 200

        # check result
        get_response = client.get(f'{chunking_url}/{record_id}/data')
        assert get_response.status_code == 200
        res_df = _create_df_from_response(get_response)
        expected_df = data
        expected_df['New'] = new_curve['New']
        expected_df = expected_df[sorted(expected_df.columns)]
        pd.testing.assert_frame_equal(expected_df, res_df)
Jeremie Hallal's avatar
Jeremie Hallal committed
846
847


Luc Yriarte's avatar
Luc Yriarte committed
848
849
850
851
852
# todo:
#  - concurrent sessions using fromVersion in Integrations tests
#  - index: check if dataframe has an index
#  - test timeout ?
#  - how to choose the index?