bulk_utils.py 16 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import List, Set, Optional
17
import re
18
from contextlib import suppress
19
20
21
22
23

from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import Response
import pandas as pd

24
from app.bulk_persistence import DataframeSerializerAsync, JSONOrient, get_dataframe
25
from app.bulk_persistence.bulk_id import BulkId
Yannick's avatar
Yannick committed
26
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
27
28
29
30
31
from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
from app.clients.storage_service_client import get_storage_record_service
from app.record_utils import fetch_record
from app.bulk_persistence.mime_types import MimeTypes
from app.model.model_chunking import GetDataParams
32
from app.model.log_bulk import LogBulkHelper
33
from app.utils import Context, OpenApiHandler, get_ctx
34
35
from app.persistence.sessions_storage import (Session, SessionException, SessionState, SessionUpdateMode)
from app.routers.common_parameters import (
36
37
38
39
    REQUEST_DATA_BODY_SCHEMA,
    REQUIRED_ROLES_READ,
    REQUIRED_ROLES_WRITE,
    json_orient_parameter)
40
41
42
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
                                  WithSessionStorages, get_session_dependencies)

43
44
45
46

router_bulk = APIRouter()  # router dedicated to bulk APIs


47
48
49
50
BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"


51
def _check_df_columns_type(df: pd.DataFrame):
52
    if any((type(t) is not str for t in df.columns)):
53
54
55
56
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                            detail=f'All columns type should be string')


57
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
58
    """ extract dataframe from request """
59

60
61
62
    ct = request.headers.get('Content-Type', '')
    if MimeTypes.PARQUET.match(ct):
        content = await request.body()  # request.stream()
63
        try:
64
            return await DataframeSerializerAsync().read_parquet(content)
65
66
67
68
69
70
71
        except OSError as err:
            raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                                detail=f'{err}')  # TODO

    if MimeTypes.JSON.match(ct):
        content = await request.body()  # request.stream()
        try:
72
            return await DataframeSerializerAsync().read_json(content, orient)
73
74
75
76
77
78
79
80
        except ValueError:
            raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                                detail='invalid body')  # TODO

    raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
                        detail=f'Invalid content-type, "{ct}" is not supported')


Yannick's avatar
Yannick committed
81
async def with_dask_blob_storage() -> DaskBulkStorage:
82
    ctx = Context.current()
Yannick's avatar
Yannick committed
83
    return await ctx.app_injector.get(DaskBulkStorage)
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


class DataFrameRender:
    @staticmethod
    async def compute(df):
        if isinstance(df, pd.DataFrame):
            return df
        driver = await with_dask_blob_storage()
        return await driver.client.compute(df)

    @staticmethod
    async def get_size(df):
        if isinstance(df, pd.DataFrame):
            return len(df.index)
        driver = await with_dask_blob_storage()
        return await driver.client.submit(lambda: len(df.index))

101

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    re_array_selection = re.compile(r'^(?P<name>.+)\[(?P<start>[^:]+):?(?P<stop>.*)\]$')

    @staticmethod
    def _col_matching(sel, col):
        if sel == col:  # exact match
            return True
        m_col = DataFrameRender.re_array_selection.match(col)
        if not m_col:  # if the column doesn't have an array pattern (col[*])
            return False
        # compare selection with curve name without array suffix [*]
        if sel == m_col['name']:  # if selection is 'c', c[*] should match
            return True
        # range selection use cases c[0:2] should match c[0], c[1] and c[2]
        m_sel = DataFrameRender.re_array_selection.match(sel)
        if m_sel and m_sel['stop']: 
            with suppress(ValueError):  # suppress int conversion exceptions
                if int(m_sel['start']) <=  int(m_col['start']) <= int(m_sel['stop']):
                    return True
        return False
    
122
    @staticmethod
123
    def get_matching_column(selection: List[str], cols: Set[str]) -> List[str]:
124
        selected = set()
125
126
127
        for sel in selection:
            selected.update(filter(lambda col: DataFrameRender._col_matching(sel, col),
                                   cols.difference(selected)))
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
        return list(selected)

    @staticmethod
    async def process_params(df, params: GetDataParams):
        if params.curves:
            selection = list(map(str.strip, params.curves.split(',')))
            columns = DataFrameRender.get_matching_column(selection, set(df))
            df = df[sorted(columns)]

        if params.offset:
            head_index = df.head(params.offset, npartitions=-1, compute=False).index
            index = await DataFrameRender.compute(head_index)  # TODO could be slow!
            df = df.loc[~df.index.isin(index)]

        if params.limit and params.limit > 0:
            try:
                df = df.head(params.limit, npartitions=-1, compute=False)  # dask async
            except:
                df = df.head(params.limit)
        return df

    @staticmethod
    async def df_render(df, params: GetDataParams, accept: str = None):
        if params.describe:
            return {
                "numberOfRows": await DataFrameRender.get_size(df),
                "columns": [c for c in df.columns]
            }

        pdf = await DataFrameRender.compute(df)
        pdf.index.name = None  # TODO

160
        if not accept or MimeTypes.PARQUET.type in accept:
161
            return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
162

163
164
165
        if MimeTypes.JSON.type in accept:
            return Response(pdf.to_json(index=True, date_format='iso'), media_type=MimeTypes.JSON.type)

166
167
        if MimeTypes.CSV.type in accept:
            return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
168

169
170
        # in any other case => Parquet anyway?
        return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
171
172


173
def get_bulk_uri_osdu(record):
174
    return record.data.get('ExtensionProperties', {}).get('wdms', {}).get(BULK_URI_FIELD, None)
175
176
177


def set_bulk_uri(record, bulk_urn):
178
    return record.data.update({'ExtensionProperties': {'wdms': {BULK_URI_FIELD: bulk_urn}}})
179
180


181
182
async def set_bulk_field_and_send_record(ctx: Context, bulk_id, record):
    bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION)
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
    set_bulk_uri(record, bulk_urn)

    # push new version on the storage
    storage_client = await get_storage_record_service(ctx)
    return await storage_client.create_or_update_records(
        record=[record], data_partition_id=ctx.partition_id
    )


@OpenApiHandler.set(operation_id="post_data", request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
    '/{record_id}/data',
    summary='Writes data as a whole bulk, creates a new version.',
    description="""
Writes data to the associated record. It creates a new version.
Payload is expected to contain the entire bulk which will replace as latest version
any previous bulk. Previous bulk versions are accessible via the get bulk data version API.
Support JSON and Parquet format ('Content_Type' must be set accordingly).
In case of JSON the orient must be set accordingly. Support http chunked encoding transfer.
""" + REQUIRED_ROLES_WRITE,
    operation_id="write_record_data",
    responses={
            404: {},
            200: {}
        })
async def post_data(record_id: str,
                    request: Request,
                    orient: JSONOrient = Depends(json_orient_parameter),
                    ctx: Context = Depends(get_ctx),
Yannick's avatar
Yannick committed
212
                    dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
213
214
215
                    ):
    async def save_blob():
        df = await get_df_from_request(request, orient)
216
        _check_df_columns_type(df)
217
218
219
220
221
222
        return await dask_blob_storage.save_blob(df, record_id)

    record, bulk_id = await asyncio.gather(
        fetch_record(ctx, record_id),
        save_blob()
    )
223
    return await set_bulk_field_and_send_record(ctx=ctx, bulk_id=bulk_id, record=record)
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242


@OpenApiHandler.set(operation_id="post_chunk_data", request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
    "/{record_id}/sessions/{session_id}/data",
    summary="Send a data chunk. Session must be complete/commit once all chunks are sent.",
    description="Send a data chunk. Session must be complete/commit once all chunks are sent. "
                "This will create a new and single version aggregating all and previous bulk."
                "Support JSON and Parquet format ('Content_Type' must be set accordingly). "
                "In case of JSON the orient must be set accordingly. Support http chunked encoding."
    + REQUIRED_ROLES_WRITE,
    operation_id="post_chunk_data",
    responses={400: {"error": "Record not found"}}
)
async def post_chunk_data(record_id: str,
                          session_id: str,
                          request: Request,
                          orient: JSONOrient = Depends(json_orient_parameter),
                          with_session: WithSessionStorages = Depends(get_session_dependencies),
Yannick's avatar
Yannick committed
243
                          dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
244
245
246
247
248
249
250
251
                          ):
    i_session = await with_session.get_session(record_id, session_id)
    if i_session.session.state != SessionState.Open:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Session cannot accept data, state={i_session.session.state}")

    df = await get_df_from_request(request, orient)
252
    _check_df_columns_type(df)
253
254
255
256
257
258
259
260
    await dask_blob_storage.session_add_chunk(i_session.session, df)


@router_bulk.get(
    '/{record_id}/versions/{version}/data',
    summary='Returns data of the specified version.',
    description='Returns the data of a specific version according to the specified query parameters.'
    ' Multiple media types response are available ("application/json", text/csv", "application/x-parquet")'
261
262
    ' The desired format can be specify in "Accept" header. The default is Parquet.'
    ' When bulk statistics are requested using "describe" parameter, the response is always provided in JSON'
263
264
265
266
267
    + REQUIRED_ROLES_READ,
    # response_model=RecordData,
    responses={
        404: {},
        200: {"content": {
268
269
270
            MimeTypes.JSON.type: {},
            MimeTypes.PARQUET.type: {},
            MimeTypes.CSV.type: {},
271
272
273
274
275
276
277
278
        }}
    }
)
async def get_data_version(
    record_id: str, version: int,
    request: Request,
    ctrl_p: GetDataParams = Depends(),
    ctx: Context = Depends(get_ctx),
Yannick's avatar
Yannick committed
279
    dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
280
281
):
    record = await fetch_record(ctx, record_id, version)
282
283
284
285
    bulk_urn = get_bulk_uri_osdu(record)
    if bulk_urn is not None:
        bulk_id, prefix = BulkId.bulk_urn_decode(bulk_urn)
    else: # fallback on ddms_v2 Persistence for wks:log schema
286
        bulk_id, prefix = LogBulkHelper.get_bulk_id(record, None)
287
    try:
288
        if bulk_id is None:
289
            raise BulkNotFound(record_id=record_id, bulk_id=None)
290
291
292
        if prefix == BULK_URN_PREFIX_VERSION:
            df = await dask_blob_storage.load_bulk(record_id, bulk_id)
        elif prefix is None:
293
            df = await get_dataframe(ctx, bulk_id)
294
        else:
295
296
297
298
299
300
301
302
303
304
305
            raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
        df = await DataFrameRender.process_params(df, ctrl_p)
        return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'))
    except BulkError as ex:
        ex.raise_as_http()


@router_bulk.get(
    "/{record_id}/data",
    summary='Returns the data according to the specified query parameters.',
    description='Returns the data according to the specified query parameters.'
306
307
308
    ' Multiple media types response are available ("application/json", text/csv", "application/x-parquet").'
    ' The desired format can be specify in "Accept" header. The default is Parquet.'
    ' When bulk statistics are requested using "describe" parameter, the response is always provided in JSON.'
309
310
311
312
313
    + REQUIRED_ROLES_READ,
    # response_model=Union[RecordData, Dict],
    responses={
        404: {},
        200: {"content": {
314
315
316
            MimeTypes.JSON.type: {},
            MimeTypes.PARQUET.type: {},
            MimeTypes.CSV.type: {},
317
318
319
320
321
322
323
324
        }}
    }
)
async def get_data(
    record_id: str,
    request: Request,
    ctrl_p: GetDataParams = Depends(),
    ctx: Context = Depends(get_ctx),
Yannick's avatar
Yannick committed
325
    dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
326
327
328
329
330
331
332
333
334
335
336
337
338
339
):
    return await get_data_version(record_id, None, request, ctrl_p, ctx, dask_blob_storage)


@router_bulk.patch(
    "/{record_id}/sessions/{session_id}",
    summary='Update a session, either commit or abandon.',
    response_model=Session
)
async def complete_session(
    record_id: str,
    session_id: str,
    update_request: UpdateSessionState,
    with_session: WithSessionStorages = Depends(get_session_dependencies),
Yannick's avatar
Yannick committed
340
    dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
    ctx: Context = Depends(get_ctx),
) -> Session:
    tenant = with_session.tenant
    sessions_storage = with_session.sessions_storage

    try:
        # --------  SESSION COMMIT SEQUENCE -----------------------
        if update_request.state == UpdateSessionStateValue.Commit:
            async with sessions_storage.initiate_commit(tenant, record_id, session_id) as commit_guard:
                # get the session if some information is needed
                i_session = commit_guard.session
                internal = i_session.internal  # <=  contains details details, may be irrelevant or not needed

                record = await fetch_record(ctx, record_id, i_session.session.fromVersion)
                previous_bulk_uri = None
356
                bulk_urn = get_bulk_uri_osdu(record)
357
358
359
360
361
362
363
                if i_session.session.mode == SessionUpdateMode.Update and bulk_urn is not None:
                    previous_bulk_uri, _prefix = BulkId.bulk_urn_decode(bulk_urn)

                new_bulk_uri = await dask_blob_storage.session_commit(i_session.session, previous_bulk_uri)
                # ==============>
                # ==============> UPDATE WELLLOG META DATA HERE (baseDepth, ...) <==============
                # ==============>
364
                await set_bulk_field_and_send_record(ctx, new_bulk_uri, record)
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387

            i_session = commit_guard.session
            i_session.session.meta = i_session.session.meta or {}
            i_session.session.meta.update({"some_detail_about_merge": "like the shape, number of rows ..."})
            return i_session.session

        # --------  SESSION ABANDON SEQUENCE ----------------------
        if update_request.state == UpdateSessionStateValue.Abandon:
            async with sessions_storage.initiate_abandon(tenant, record_id, session_id) as abandon_guard:
                # get the session if some information is needed
                i_session: SessionInternal = abandon_guard.session
                internal = i_session.internal  # <=  contains details details, may be irrelevant or not needed

                # ==============>
                # ==============> ADD ABANDON CODE HERE <==============
                # ==============>

            return abandon_guard.session.session

    except SessionException as ex:
        ex.raise_as_http()
    except BulkError as ex:
        ex.raise_as_http()