dask_bulk_storage.py 25.2 KB
Newer Older
Luc Yriarte's avatar
Luc Yriarte committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import asyncio
Yannick's avatar
Yannick committed
16
from typing import Awaitable, Callable, List, Optional, Union, AsyncGenerator, Tuple
Jeremie Hallal's avatar
Jeremie Hallal committed
17
import uuid
Jeremie Hallal's avatar
Jeremie Hallal committed
18

Luc Yriarte's avatar
Luc Yriarte committed
19
20
import fsspec
import pandas as pd
21
22
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
Jeremie Hallal's avatar
Jeremie Hallal committed
23
import pyarrow.parquet as pa
24

Yannick's avatar
Yannick committed
25
26
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters

Yannick's avatar
Yannick committed
27
from app.helper.logger import get_logger
Luc Yriarte's avatar
Luc Yriarte committed
28
29
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
30
from app.utils import DaskClient, capture_timings
Yannick's avatar
Yannick committed
31
from app.conf import Config
Luc Yriarte's avatar
Luc Yriarte committed
32

33
from .dask_worker_plugin import DaskWorkerPlugin
34
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
35
from .traces import map_with_trace, submit_with_trace
36
from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, worker_capture_timing_handlers,
37
                    get_num_rows, set_index, index_union)
Yannick's avatar
Yannick committed
38
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
Yannick's avatar
Yannick committed
39
from .. import DataframeSerializerSync
40
from . import storage_path_builder as pathBuilder
41
from . import session_file_meta as session_meta
Yannick's avatar
Yannick committed
42
from ..bulk_id import new_bulk_id
Jeremie Hallal's avatar
Jeremie Hallal committed
43
44
45
from .bulk_catalog import (BulkCatalog, ChunkGroup,
                           async_load_bulk_catalog,
                           async_save_bulk_catalog)
Yannick's avatar
Yannick committed
46
47
from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
Yannick's avatar
Yannick committed
48
from . import dask_worker_write_bulk as bulk_writer
49

Jeremie Hallal's avatar
Jeremie Hallal committed
50

Jeremie Hallal's avatar
Jeremie Hallal committed
51
52
53
54
def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
    """call dask.dataframe.read_parquet with default parameters
    Dask read_parquet parameters:
        chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
Jeremie Hallal's avatar
Jeremie Hallal committed
55
        aggregate_files=True: aggregate_files is needed when files are in different folders
Jeremie Hallal's avatar
Jeremie Hallal committed
56
57
58
59
60
    Args:
        path (Union[str, List[str]]): a file, a folder or a list of files
    Returns:
        [dd.DataFrame]: the dask dataframe we read
    """
61
62
63
64
65
66
67
68
69
70
    arguments = {
        'engine': 'pyarrow-dataset',
        'chunksize': '25M',
        'aggregate_files': True,
    }
    arguments.update(kwargs)

    return dd.read_parquet(path, **arguments)


Jeremie Hallal's avatar
Jeremie Hallal committed
71
72
73
74
75
def _load_index_from_meta(meta, **kwargs):
    return pd.read_parquet(meta.path_with_protocol,
                           engine='pyarrow',
                           columns=[meta.columns[0]],
                           **kwargs).index
Jeremie Hallal's avatar
Jeremie Hallal committed
76
77


78
79
def _index_union_tuple(indexes: Tuple[pd.Index, Optional[pd.Index]]):
    return index_union(*indexes)
80
81


Yannick's avatar
Yannick committed
82
class DaskBulkStorage:
Jeremie Hallal's avatar
Jeremie Hallal committed
83
    client: DaskDistributedClient = None
Yannick's avatar
Yannick committed
84
    """ Dask client """
Luc Yriarte's avatar
Luc Yriarte committed
85

Jeremie Hallal's avatar
Jeremie Hallal committed
86
    def __init__(self) -> None:
Yannick's avatar
Yannick committed
87
88
89
        """ use `create` to create instance """
        self._parameters = None
        self._fs = None
90

Yannick's avatar
Yannick committed
91
92
    @property
    def _data_ipc(self):
Yannick's avatar
Yannick committed
93
        # may be also adapted depending of size to data
Yannick's avatar
Yannick committed
94
        if Config.dask_data_ipc.value == DaskLocalFileDataIPC.ipc_type:
Yannick's avatar
Yannick committed
95
96
97
            return DaskLocalFileDataIPC()
        assert self.client is not None, 'Dask client not initialized'
        return DaskNativeDataIPC(self.client)
Yannick's avatar
Yannick committed
98

Yannick's avatar
Yannick committed
99
100
101
102
103
104
    @classmethod
    async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
        instance = cls()
        instance._parameters = parameters

        # Initialise the dask client.
Jeremie Hallal's avatar
Jeremie Hallal committed
105
106
107
        dask_client = dask_client or await DaskClient.create()
        if DaskBulkStorage.client is not dask_client:  # executed only once per dask client
            DaskBulkStorage.client = dask_client
108

Jeremie Hallal's avatar
Jeremie Hallal committed
109
110
            if parameters.register_fsspec_implementation:
                parameters.register_fsspec_implementation()
111

Jeremie Hallal's avatar
Jeremie Hallal committed
112
            await DaskBulkStorage.client.register_worker_plugin(
113
                DaskWorkerPlugin,
Jeremie Hallal's avatar
Jeremie Hallal committed
114
115
116
                name="LoggerWorkerPlugin",
                logger=get_logger(),
                register_fsspec_implementation=parameters.register_fsspec_implementation)
Yannick's avatar
Yannick committed
117

Jeremie Hallal's avatar
Jeremie Hallal committed
118
            get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
Yannick's avatar
Yannick committed
119
120
121
122
123
124
125
126
127
128
129

        instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
        return instance

    @property
    def protocol(self) -> str:
        return self._parameters.protocol

    @property
    def base_directory(self) -> str:
        return self._parameters.base_directory
Luc Yriarte's avatar
Luc Yriarte committed
130

131
132
133
134
135
136
    def _submit_with_trace(self, target_func: Callable, *args, **kwargs):
        return submit_with_trace(self.client, target_func, *args, **kwargs)

    def _map_with_trace(self, target_func: Callable, *args, **kwargs):
        return map_with_trace(self.client, target_func, *args, **kwargs)

Jeremie Hallal's avatar
Jeremie Hallal committed
137
    def _relative_path(self, record_id: str, path: str) -> str:
138
        return pathBuilder.record_relative_path(self.base_directory, record_id, path)
Jeremie Hallal's avatar
Jeremie Hallal committed
139

Yannick's avatar
Yannick committed
140
141
142
143
144
145
146
    def _ensure_dir_tree_exists(self, path: str):
        path_wo_protocol, protocol = pathBuilder.remove_protocol(path)

        # on local storage only """
        if protocol == 'file':
            self._fs.mkdirs(path_wo_protocol, exist_ok=True)

147
    def _read_parquet(self, path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
Luc Yriarte's avatar
Luc Yriarte committed
148
        """Read a Parquet file into a Dask DataFrame
149
        Args:
Jeremie Hallal's avatar
Jeremie Hallal committed
150
            path (Union[str, List[str]]): a file, a folder or a list of files
151
            **kwargs dict (of dicts): Passthrough key-word arguments for read backend.
Jeremie Hallal's avatar
Jeremie Hallal committed
152
        Returns:
153
154
155
156
157
            Future<dd.DataFrame>: dask dataframe
        Dask read_parquet parameters:
            chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
            aggregate_files=True: because we are passing a list of path when commiting a session,
                                  aggregate_files is needed when paths are different
Luc Yriarte's avatar
Luc Yriarte committed
158
        """
Jeremie Hallal's avatar
Jeremie Hallal committed
159
        return self._submit_with_trace(read_with_dask, path,
160
161
                                       storage_options=self._parameters.storage_options,
                                       **kwargs)
Luc Yriarte's avatar
Luc Yriarte committed
162

163
    def _load_bulk_from_catalog(self, catalog: BulkCatalog, columns: List[str] = None) -> dd.DataFrame:
164
        """Load data from information contained in the catalog
Jeremie Hallal's avatar
Jeremie Hallal committed
165
            - if the user request columns that does not exists, we ignore them
166
            - if columns is None, we load all columns
Jeremie Hallal's avatar
Jeremie Hallal committed
167
168
        Returns: Future<dd.dataframe>
        """
Jeremie Hallal's avatar
Jeremie Hallal committed
169
170
        record_path = pathBuilder.record_path(self.base_directory, catalog.record_id, self.protocol)
        files_to_load = catalog.get_paths_for_columns(columns, record_path)
171
        # read all chunk for requested columns
172
173
174
        def read_parquet_files(f):
            return read_with_dask(f.paths, columns=f.labels, storage_options=self._parameters.storage_options)
        dfs = self._map_with_trace(read_parquet_files, files_to_load)
175

176
177
178
179
        index_df = self._read_index_from_catalog_index_path(catalog)
        if index_df:
            dfs.append(index_df)

180
181
182
183
        if not dfs:
            raise RuntimeError("cannot find requested columns")

        if len(dfs) == 1:
Jeremie Hallal's avatar
Jeremie Hallal committed
184
            return dfs[0]
Jeremie Hallal's avatar
Jeremie Hallal committed
185
186

        # if multiple dataframes, concat them together
Jeremie Hallal's avatar
Jeremie Hallal committed
187
        dfs = self._map_with_trace(set_index, dfs)
188
        return self._submit_with_trace(dd.concat, dfs, axis=1, join='outer')
189

Jeremie Hallal's avatar
Jeremie Hallal committed
190
    async def _load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
Jeremie Hallal's avatar
Jeremie Hallal committed
191
192
        """Load columns from parquet files in the bulk_path.
        Returns: Future<dd.DataFrame>
193
        """
Jeremie Hallal's avatar
Jeremie Hallal committed
194
        catalog = await self.get_bulk_catalog(record_id, bulk_id, generate_if_not_exists=False)
195
        if catalog is not None:
196
            return self._load_bulk_from_catalog(catalog, columns)
197
        # No catalog means that we can read the folder as a parquet dataset. (legacy behavior)
Jeremie Hallal's avatar
Jeremie Hallal committed
198
        bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
199
        return self._read_parquet(bulk_path, columns=columns)
200

Jeremie Hallal's avatar
Jeremie Hallal committed
201
    @with_trace('read_stat')
Jeremie Hallal's avatar
Jeremie Hallal committed
202
    async def read_stat(self, record_id: str, bulk_id: str):
203
204
        """Returns some meta data about the bulk.
        Raises:
205
            BulkRecordNotFound: If bulk folder doesn't exists
206
        """
207
        catalog = await self.get_bulk_catalog(record_id, bulk_id)
Jeremie Hallal's avatar
Jeremie Hallal committed
208
        schema_dict = catalog.all_columns_dtypes
209
        return {
210
            "num_rows": catalog.nb_rows,
211
212
            "schema": schema_dict
        }
Luc Yriarte's avatar
Luc Yriarte committed
213

214
    @capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
Jeremie Hallal's avatar
Jeremie Hallal committed
215
    @internal_bulk_exceptions
216
    @with_trace('load_bulk')
Jeremie Hallal's avatar
Jeremie Hallal committed
217
    async def load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
Jeremie Hallal's avatar
Jeremie Hallal committed
218
219
220
221
        """Returns a dask Dataframe of a record at the specified version.
        Args:
            record_id (str): the record id on which belongs the bulk.
            bulk_id (str): the bulk id to load.
222
            columns (List[str], optional): columns to load. If None, all available columns. Defaults to None.
Jeremie Hallal's avatar
Jeremie Hallal committed
223
        Raises:
224
            BulkRecordNotFound: If bulk data cannot be found.
Jeremie Hallal's avatar
Jeremie Hallal committed
225
226
227
        Returns:
            dd.DataFrame: a lazy loaded dask dataframe representing the bulk data.
        """
Luc Yriarte's avatar
Luc Yriarte committed
228
        try:
Jeremie Hallal's avatar
Jeremie Hallal committed
229
            future_df = await self._load_bulk(record_id, bulk_id, columns=columns)
230
            dataframe = await future_df
Jeremie Hallal's avatar
Jeremie Hallal committed
231
232
            if columns and set(dataframe.columns) != set(columns):
                raise BulkRecordNotFound(record_id, bulk_id)
233
            return dataframe
234
        except (OSError, RuntimeError) as exp:
235
            raise BulkRecordNotFound(record_id, bulk_id) from exp
Luc Yriarte's avatar
Luc Yriarte committed
236

237
    def _save_with_dask(self, path, dataframe):
Luc Yriarte's avatar
Luc Yriarte committed
238
        """Save the dataframe to a parquet file(s).
239
        ddf: dd.DataFrame or Future<dd.DataFrame>
Jeremie Hallal's avatar
Jeremie Hallal committed
240
        Returns a Future<None>
Luc Yriarte's avatar
Luc Yriarte committed
241
        """
Yannick's avatar
Yannick committed
242
243
244
        return self._submit_with_trace(dd.to_parquet, dataframe, path,
                                       storage_options=self._parameters.storage_options,
                                       engine='pyarrow', schema="infer", compression='snappy')
Jeremie Hallal's avatar
Jeremie Hallal committed
245

246
    @capture_timings('get_bulk_catalog')
Jeremie Hallal's avatar
Jeremie Hallal committed
247
    async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog:
Jeremie Hallal's avatar
Jeremie Hallal committed
248
        bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
Jeremie Hallal's avatar
Jeremie Hallal committed
249
        catalog = await async_load_bulk_catalog(self._fs, bulk_path)
250
251
        if catalog:
            return catalog
Jeremie Hallal's avatar
Jeremie Hallal committed
252

Jeremie Hallal's avatar
Jeremie Hallal committed
253
254
255
256
257
258
        if generate_if_not_exists:
            # For legacy bulk, construct a catalog on the fly
            try:
                return await self._build_catalog_from_path(bulk_path, record_id)
            except FileNotFoundError as error:
                raise BulkRecordNotFound(record_id, bulk_id) from error
Jeremie Hallal's avatar
Jeremie Hallal committed
259
260
261

    @capture_timings('_build_catalog_from_path')
    async def _build_catalog_from_path(self, path: str, record_id: str) -> BulkCatalog:
262
263
264
265
266
267
268
269
270
        """Build a catalog on the fly for folder that don't have a catalog (legacy bulk bolder)
        The method will list all parquet file from the specified folder and build the catalog
        from file metadata. There is an optimization if we detect a folder created by dask.
        Args:
            path (str): Folder that contains parquet files
            record_id (str): recod id from which the catalog belong.
        Raises:
            FileNotFoundError: If path does not exist
        Returns:
Jeremie Hallal's avatar
Jeremie Hallal committed
271
            BulkCatalog: the builded catalog
272
        """
Jeremie Hallal's avatar
Jeremie Hallal committed
273
        path, _ = pathBuilder.remove_protocol(path)
274
        files = self._fs.ls(path)  # raises if path doesn't exists
Jeremie Hallal's avatar
Jeremie Hallal committed
275
276
277
        is_dask_folder = any((f.endswith('_common_metadata') for f in files))
        parquet_files = (f for f in files if f.endswith('.parquet'))
        files = [path] if is_dask_folder else list(parquet_files)
278

279
        futures_datasets = self._map_with_trace(pa.ParquetDataset, files, filesystem=self._fs)
280
281
282
        datasets = await self.client.gather(futures_datasets)

        schemas = (d.read_pandas().schema for d in datasets)
Jeremie Hallal's avatar
Jeremie Hallal committed
283

Jeremie Hallal's avatar
Jeremie Hallal committed
284
        catalog = BulkCatalog(record_id)
285
        catalog.nb_rows = max(get_num_rows(d) for d in datasets)
Jeremie Hallal's avatar
Jeremie Hallal committed
286

287
        for file, schema in zip(files, schemas):
288
            index_columns = schema.pandas_metadata.get('index_columns', [])
Jeremie Hallal's avatar
Jeremie Hallal committed
289
            columns = {name: str(dtype) for name, dtype in zip(schema.names, schema.types)
290
                       if name not in index_columns and not is_reserved_column_name(name)}
Jeremie Hallal's avatar
Jeremie Hallal committed
291
292
            chunk_group = ChunkGroup(set(columns.keys()), [self._relative_path(record_id, file)], list(columns.values()))
            catalog.add_chunk(chunk_group)
Jeremie Hallal's avatar
Jeremie Hallal committed
293

Jeremie Hallal's avatar
Jeremie Hallal committed
294
        return catalog
Jeremie Hallal's avatar
Jeremie Hallal committed
295

296
297
298
299
300
301
302
303
304
    def _read_index_from_catalog_index_path(self, catalog: BulkCatalog) -> Optional[dd.DataFrame]:
        """Returns a Future dask dataframe or None if index path is not in the catalog"""
        if catalog.index_path:
            index_path = pathBuilder.full_path(self.base_directory, catalog.record_id,
                                               catalog.index_path, self.protocol)
            return self._read_parquet(index_path)
        return None

    @capture_timings('_future_load_index')
Jeremie Hallal's avatar
Jeremie Hallal committed
305
    async def _future_load_index(self, record_id: str, bulk_id: str) -> Awaitable[pd.Index]:
306
        """Loads the dataframe index of the specified record
307
308
        index should be save in a specific folder but for bulk prior to catalog creation
        we read one column and retreive the index associated with it.
309
        """
310
        catalog = await self.get_bulk_catalog(record_id, bulk_id)
311
312
313
        future_df = self._read_index_from_catalog_index_path(catalog)
        if future_df is None:
            # read one column to get the index. (It doesn't seems possible to get the index directly)
Jeremie Hallal's avatar
Jeremie Hallal committed
314
315
            first_column = next(iter(catalog.all_columns_dtypes))
            future_df = await self._load_bulk(record_id, bulk_id, [first_column])
316
317
318
319
        return self._submit_with_trace(lambda df: df.index.compute(), future_df)

    @capture_timings('load_index')
    async def load_index(self, record_id: str, bulk_id: str) -> pd.Index:
320
        """load the dataframe index of the specified record"""
321
322
        future_index = await self._future_load_index(record_id, bulk_id)
        return await future_index
323

324
    @capture_timings('_build_session_index')
325
    @with_trace('_build_session_index')
326
327
328
    async def _build_session_index(
        self, chunk_metas: List[session_meta.SessionFileMeta], record_id: str, from_bulk_id: str
    ) -> pd.Index:
329
330
331
332
333
        """
            Combine all chunks indexes + previous version index
            List one file per different index_hash.
            Read chunks indexes from parquet
        """
334
335
336
337
338
        chunks_meta_with_different_indexes = {meta.index_hash: meta
                                              for meta in chunk_metas}.values()

        indexes = self.client.map(_load_index_from_meta, chunks_meta_with_different_indexes,
                                  storage_options=self._parameters.storage_options)
Jeremie Hallal's avatar
Jeremie Hallal committed
339
        if from_bulk_id:
Jeremie Hallal's avatar
Jeremie Hallal committed
340
            # read the index of previous version
341
            indexes.append(await self._future_load_index(record_id, from_bulk_id))
342

343
344
        # merge all indexes
        while len(indexes) > 1:
345
            indexes = self._map_with_trace(_index_union_tuple, list(by_pairs(indexes)))
346
        return await indexes[0]
Jeremie Hallal's avatar
Jeremie Hallal committed
347

348
    @capture_timings('_fill_catalog_columns_info')
349
    @with_trace('_fill_catalog_columns_info')
350
    async def _fill_catalog_columns_info(
351
        self, catalog: BulkCatalog, session_metas, bulk_id: str
352
    ) -> Optional[BulkCatalog]:
Jeremie Hallal's avatar
Jeremie Hallal committed
353
        """Build the catalog from the session."""
Jeremie Hallal's avatar
Jeremie Hallal committed
354
        catalog_columns = set(catalog.all_columns_dtypes)
355

Jeremie Hallal's avatar
Jeremie Hallal committed
356
        for chunks_metas in session_meta.get_next_chunk_files(session_metas):
Jeremie Hallal's avatar
Jeremie Hallal committed
357
            files = [m.path_with_protocol for m in chunks_metas]
Jeremie Hallal's avatar
Jeremie Hallal committed
358
            relative_paths = [self._relative_path(catalog.record_id, f) for f in files]
Jeremie Hallal's avatar
Jeremie Hallal committed
359
360
361
362
363
364
365
366
367
368
369
            # chunks share the same schemas (columns + dtypes) so we get them from the first one
            labels = set(chunks_metas[0].columns)
            dtypes = chunks_metas[0].dtypes
            conflicting_col = catalog_columns.intersection(chunks_metas[0].columns)

            # if some columns already exist in the catalog, merge is needed
            if len(conflicting_col) > 0:
                # filter out the conflicting columns
                labels_dtypes = {label: dtype for label, dtype in zip(labels, dtypes) if label not in conflicting_col}
                labels = set(labels_dtypes.keys())
                dtypes = list(labels_dtypes.values())
Jeremie Hallal's avatar
Jeremie Hallal committed
370
                # pb here, wait -> cannot resolve conflict in parallele!
Jeremie Hallal's avatar
Jeremie Hallal committed
371
                await self._resolve_conflict_catalog(catalog, bulk_id, files, conflicting_col)
372

Jeremie Hallal's avatar
Jeremie Hallal committed
373
374
            catalog.add_chunk(ChunkGroup(labels, relative_paths, dtypes))
            catalog_columns.update(chunks_metas[0].columns)
375

Jeremie Hallal's avatar
Jeremie Hallal committed
376
        return catalog
Jeremie Hallal's avatar
Jeremie Hallal committed
377

Jeremie Hallal's avatar
Jeremie Hallal committed
378
    @capture_timings('_resolve_conflict_catalog')
379
    @with_trace('_resolve_conflict_catalog')
Jeremie Hallal's avatar
Jeremie Hallal committed
380
    async def _resolve_conflict_catalog(
381
        self, catalog: BulkCatalog, bulk_id: str, files: List[str], cols_to_merge: List[str]
Jeremie Hallal's avatar
Jeremie Hallal committed
382
383
384
385
386
387
388
389
390
    ) -> None:
        """Merge columns between chunks found in the catalog and chunks files.
        Merged result is save in a new parquet dataset (dask folder) and the catalog is updated with the new path
        Args:
            catalog (BulkCatalog): catalog to update and where input columns are read
            bulk_id (str): record bulk id to retreive the commit path
            files (List[str]): chunk files to merge with chunks from the catalog
            cols_to_merge (List[str]): the columns to merge
        """
391
        commit_path = pathBuilder.record_bulk_path(self.base_directory, catalog.record_id, bulk_id, self.protocol)
Jeremie Hallal's avatar
Jeremie Hallal committed
392

393
        df1 = self._load_bulk_from_catalog(catalog, columns=cols_to_merge)
Jeremie Hallal's avatar
Jeremie Hallal committed
394
395
        df2 = self._read_parquet(files, columns=cols_to_merge)
        merged_df = self._submit_with_trace(do_merge, df1, df2)
396

Jeremie Hallal's avatar
Jeremie Hallal committed
397
398
        merged_df_path = pathBuilder.join(commit_path, f'{uuid.uuid4()}.parquet')
        await self._save_with_dask(merged_df_path, merged_df)
399

400
        merged_df = await merged_df
Jeremie Hallal's avatar
Jeremie Hallal committed
401
402
        dtypes = [str(dt) for dt in merged_df.dtypes]
        labels = set(merged_df.columns)
Jeremie Hallal's avatar
Jeremie Hallal committed
403
        relative_paths = [self._relative_path(catalog.record_id, merged_df_path)]
Jeremie Hallal's avatar
Jeremie Hallal committed
404
405
        chunk_group = ChunkGroup(labels=labels, paths=relative_paths, dtypes=dtypes)
        catalog.change_columns_info(chunk_group)
Jeremie Hallal's avatar
Jeremie Hallal committed
406
407

    @capture_timings('_save_session_index')
408
    @with_trace('_save_session_index')
Jeremie Hallal's avatar
Jeremie Hallal committed
409
    async def _save_session_index(self, path: str, index: pd.Index) -> str:
Jeremie Hallal's avatar
Jeremie Hallal committed
410
        index_folder = pathBuilder.join(path, '_wdms_index_')
Yannick's avatar
Yannick committed
411
        self._ensure_dir_tree_exists(index_folder)
Jeremie Hallal's avatar
Jeremie Hallal committed
412
        index_path = pathBuilder.join(index_folder, 'index.parquet')
Yannick's avatar
Yannick committed
413

414
415
        dataframe = pd.DataFrame(index=index)
        dataframe.index.name = WDMS_INDEX_NAME
416
417

        f_pdf = await self.client.scatter(dataframe)
Yannick's avatar
Yannick committed
418
419
        await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, index_path,
                                      storage_options=self._parameters.storage_options)
Jeremie Hallal's avatar
Jeremie Hallal committed
420
421
        return index_path

Jeremie Hallal's avatar
Jeremie Hallal committed
422
423
    @capture_timings('session_commit')
    @internal_bulk_exceptions
Jeremie Hallal's avatar
Jeremie Hallal committed
424
    @with_trace('session_commit')
Luc Yriarte's avatar
Luc Yriarte committed
425
    async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
Jeremie Hallal's avatar
Jeremie Hallal committed
426
427
428
429
430
431
432
433
434
        """Commit the session
        Args:
            session (Session): The session To commit
            from_bulk_id (str, optional): Bulk id of a previous record version.
                                          Used On session mode update. Defaults to None.
        Raises:
            BulkNotProcessable: If session is empty
        Returns:
            str: identifier of the new bulk
Jeremie Hallal's avatar
Jeremie Hallal committed
435
        """
436
        bulk_id = new_bulk_id()
Jeremie Hallal's avatar
Jeremie Hallal committed
437

Jeremie Hallal's avatar
Jeremie Hallal committed
438
439
        chunk_metas = await session_meta.get_chunks_metadata(self._fs, self.protocol, self.base_directory, session)
        if len(chunk_metas) == 0:  # there is no files in this session
440
            raise BulkNotProcessable(message="No data to commit")
Jeremie Hallal's avatar
Jeremie Hallal committed
441

Jeremie Hallal's avatar
Jeremie Hallal committed
442
443
        if from_bulk_id:
            # update session: we start from the previous catalog
444
            catalog = await self.get_bulk_catalog(session.recordId, from_bulk_id)
Jeremie Hallal's avatar
Jeremie Hallal committed
445
        else:
Jeremie Hallal's avatar
Jeremie Hallal committed
446
            catalog = BulkCatalog(session.recordId)
Jeremie Hallal's avatar
Jeremie Hallal committed
447

Jeremie Hallal's avatar
Jeremie Hallal committed
448
        commit_path = pathBuilder.record_bulk_path(self.base_directory, session.recordId, bulk_id, self.protocol)
Jeremie Hallal's avatar
Jeremie Hallal committed
449

450
451
452
453
454
455
456
457
        async def build_and_save_index():
            index = await self._build_session_index(chunk_metas, session.recordId, from_bulk_id)
            index_path = await self._save_session_index(commit_path, index)
            catalog.nb_rows = len(index)
            catalog.index_path = self._relative_path(session.recordId, index_path)

        await asyncio.gather(
            build_and_save_index(),
458
            self._fill_catalog_columns_info(catalog, chunk_metas, bulk_id)
459
        )
460

Jeremie Hallal's avatar
Jeremie Hallal committed
461
462
        fcatalog = await self.client.scatter(catalog)
        await async_save_bulk_catalog(self._fs, commit_path, fcatalog)
Jeremie Hallal's avatar
Jeremie Hallal committed
463
        return bulk_id
464

Yannick's avatar
Yannick committed
465
466
467
468
469
470
471
472
    @internal_bulk_exceptions
    @capture_timings('post_data_without_session', handlers=worker_capture_timing_handlers)
    @with_trace('post_data_without_session')
    async def post_data_without_session(self,
                                        data: Union[bytes, AsyncGenerator[bytes, None]],
                                        content_type: MimeType,
                                        df_validator_func: DataFrameValidationFunc,
                                        record_id: str,
Yannick's avatar
Yannick committed
473
                                        bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
Yannick's avatar
Yannick committed
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
        """
        process post data outside of a session, delegate the entire work in Dask worker. It constructs the path
        for the bulk in current context, prepare and
        :throw:
            - BulkNotProcessable: in case on invalid input data
            - BulkSaveException: if store operation fails for some reasons
        """

        bulk_id = bulk_id or new_bulk_id()
        bulk_base_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)

        # ensure directory exists for local storage, do nothing on remote storage
        self._ensure_dir_tree_exists(bulk_base_path)

        async with self._data_ipc.set(data) as (data_handle, data_getter):
            data = None  # unref data

            df_describe = await submit_with_trace(self.client,
Yannick's avatar
Yannick committed
492
                                                  bulk_writer.write_bulk_without_session,
Yannick's avatar
Yannick committed
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
                                                  data_handle,
                                                  data_getter,
                                                  content_type,
                                                  df_validator_func,
                                                  bulk_base_path,
                                                  self._parameters.storage_options)

        return bulk_id, df_describe

    @internal_bulk_exceptions
    @capture_timings('add_chunk_in_session', handlers=worker_capture_timing_handlers)
    @with_trace('add_chunk_in_session')
    async def add_chunk_in_session(self,
                                   data: Union[bytes, AsyncGenerator[bytes, None]],
                                   content_type: MimeType,
                                   df_validator_func: DataFrameValidationFunc,
                                   record_id: str,
                                   session_id: str,
Yannick's avatar
Yannick committed
511
                                   bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
Yannick's avatar
Yannick committed
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
        """
        add a chunk data inside a session, delegate the entire work in Dask worker
        :throw:
            - BulkNotProcessable: in case on invalid input data
            - BulkSaveException: if store operation fails for some reasons
        """

        bulk_id = bulk_id or new_bulk_id()
        base_path = pathBuilder.record_session_path(self.base_directory, session_id, record_id, self.protocol)

        # ensure directory exists for local storage, do nothing on remote storage
        self._ensure_dir_tree_exists(base_path)

        async with self._data_ipc.set(data) as (data_handle, data_getter):
            data = None  # unref data

            df_describe = await submit_with_trace(self.client,
Yannick's avatar
Yannick committed
529
                                                  bulk_writer.add_chunk_in_session,
Yannick's avatar
Yannick committed
530
531
532
533
534
535
                                                  data_handle,
                                                  data_getter,
                                                  content_type,
                                                  df_validator_func,
                                                  base_path,
                                                  self._parameters.storage_options)
Yannick's avatar
Yannick committed
536

Yannick's avatar
Yannick committed
537
        return bulk_id, df_describe