Commit 3bd0d28b authored by Yannick's avatar Yannick
Browse files

Merge branch 'dask_full_write' into 'master'

Perform entire write bulk work inside Dask worker

See merge request !344
parents 5d8130d3 e36ec15b
Pipeline #86692 passed with stages
in 7 minutes and 52 seconds
......@@ -13,7 +13,7 @@
# limitations under the License.
from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .dataframe_persistence import create_and_store_dataframe, get_dataframe, download_bulk
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient
from .mime_types import MimeTypes
......
......@@ -13,15 +13,13 @@
# limitations under the License.
import asyncio
import json
from typing import Awaitable, Callable, List, Optional, Union
from typing import Awaitable, Callable, List, Optional, Union, AsyncGenerator, Tuple
import uuid
import fsspec
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
from pyarrow.lib import ArrowException
import pyarrow.parquet as pa
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
......@@ -30,19 +28,22 @@ from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings
from app.conf import Config
from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace
from .utils import (by_pairs, do_merge, worker_capture_timing_handlers,
get_num_rows, set_index, index_union)
from ..dataframe_validators import (assert_df_validate, validate_index, validate_number_of_columns,
columns_not_in_reserved_names, is_reserved_column_name)
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
from .. import DataframeSerializerSync
from . import storage_path_builder as pathBuilder
from . import session_file_meta as session_meta
from ..bulk_id import new_bulk_id
from .bulk_catalog import BulkCatalog, ChunkGroup, load_bulk_catalog, save_bulk_catalog
from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
from . import dask_worker_write_bulk as bulk_writer
def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
......@@ -73,14 +74,6 @@ def _load_index_from_meta(meta, **kwargs):
**kwargs).index
def dask_to_parquet(ddf, path, storage_options):
""" Save dask dataframe to parquet """
return dd.to_parquet(ddf, path,
engine='pyarrow', schema="infer",
storage_options=storage_options,
compression='snappy')
def _index_union_tuple(t):
return index_union(*t)
......@@ -94,6 +87,14 @@ class DaskBulkStorage:
self._parameters = None
self._fs = None
@property
def _data_ipc(self):
# may be also adapted depending of size to data
if Config.dask_data_ipc.value == DaskLocalFileDataIPC.ipc_type:
return DaskLocalFileDataIPC()
assert self.client is not None, 'Dask client not initialized'
return DaskNativeDataIPC(self.client)
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
......@@ -135,6 +136,13 @@ class DaskBulkStorage:
def _relative_path(self, record_id: str, path: str) -> str:
return pathBuilder.record_relative_path(self.base_directory, record_id, path)
def _ensure_dir_tree_exists(self, path: str):
path_wo_protocol, protocol = pathBuilder.remove_protocol(path)
# on local storage only """
if protocol == 'file':
self._fs.mkdirs(path_wo_protocol, exist_ok=True)
def _read_parquet(self, path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
Args:
......@@ -224,59 +232,9 @@ class DaskBulkStorage:
ddf: dd.DataFrame or Future<dd.DataFrame>
Returns a Future<None>
"""
return self._submit_with_trace(dask_to_parquet, dataframe, path,
storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, dataframe: pd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
Returns a Future<None>
"""
f_pdf = await self.client.scatter(dataframe)
return await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, path,
storage_options=self._parameters.storage_options)
@capture_timings('save_bulk', handlers=worker_capture_timing_handlers)
@internal_bulk_exceptions
@with_trace('save_bulk')
async def save_bulk(self, ddf: pd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or new_bulk_id()
assert_df_validate(dataframe=ddf, validation_funcs=[validate_number_of_columns,
validate_index,
columns_not_in_reserved_names])
ddf = dd.from_pandas(ddf, npartitions=1, name=f"from_pandas-{uuid.uuid4()}")
ddf = await self.client.scatter(ddf)
path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
try:
await self._save_with_dask(path, ddf)
except OSError as os_error:
raise BulkRecordNotFound(record_id, bulk_id) from os_error
return bulk_id
@capture_timings('session_add_chunk')
@internal_bulk_exceptions
@with_trace('session_add_chunk')
async def session_add_chunk(self, session: Session, pdf: pd.DataFrame):
"""add new chunk to the given session"""
assert_df_validate(dataframe=pdf, validation_funcs=[validate_number_of_columns,
validate_index,
columns_not_in_reserved_names])
# sort column by names
pdf = pdf[sorted(pdf.columns)]
filename = session_meta.generate_chunk_filename(pdf)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
self._fs.mkdirs(session_path, exist_ok=True) # TODO only for local
with self._fs.open(f'{session_path}/{filename}.meta', 'w') as outfile:
json.dump(session_meta.build_chunk_metadata(pdf), outfile)
session_path = pathBuilder.add_protocol(session_path, self.protocol)
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
return self._submit_with_trace(dd.to_parquet, dataframe, path,
storage_options=self._parameters.storage_options,
engine='pyarrow', schema="infer", compression='snappy')
@capture_timings('get_bulk_catalog')
async def get_bulk_catalog(self, record_id: str, bulk_id: str) -> BulkCatalog:
......@@ -431,9 +389,13 @@ class DaskBulkStorage:
@with_trace('_save_session_index')
async def _save_session_index(self, path: str, index: pd.Index) -> str:
index_folder = pathBuilder.join(path, '_wdms_index_')
self._fs.mkdirs(pathBuilder.remove_protocol(index_folder)[0]) # TODO for local storage
self._ensure_dir_tree_exists(index_folder)
index_path = pathBuilder.join(index_folder, 'index.parquet')
await self._save_with_pandas(index_path, pd.DataFrame(index=index))
f_pdf = await self.client.scatter(pd.DataFrame(index=index))
await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, index_path,
storage_options=self._parameters.storage_options)
return index_path
@capture_timings('session_commit')
......@@ -478,9 +440,76 @@ class DaskBulkStorage:
save_bulk_catalog(self._fs, commit_path, catalog)
return bulk_id
@internal_bulk_exceptions
@capture_timings('post_data_without_session', handlers=worker_capture_timing_handlers)
@with_trace('post_data_without_session')
async def post_data_without_session(self,
data: Union[bytes, AsyncGenerator[bytes, None]],
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_id: str,
bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
"""
process post data outside of a session, delegate the entire work in Dask worker. It constructs the path
for the bulk in current context, prepare and
:throw:
- BulkNotProcessable: in case on invalid input data
- BulkSaveException: if store operation fails for some reasons
"""
bulk_id = bulk_id or new_bulk_id()
bulk_base_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
# ensure directory exists for local storage, do nothing on remote storage
self._ensure_dir_tree_exists(bulk_base_path)
async with self._data_ipc.set(data) as (data_handle, data_getter):
data = None # unref data
df_describe = await submit_with_trace(self.client,
bulk_writer.write_bulk_without_session,
data_handle,
data_getter,
content_type,
df_validator_func,
bulk_base_path,
self._parameters.storage_options)
return bulk_id, df_describe
@internal_bulk_exceptions
@capture_timings('add_chunk_in_session', handlers=worker_capture_timing_handlers)
@with_trace('add_chunk_in_session')
async def add_chunk_in_session(self,
data: Union[bytes, AsyncGenerator[bytes, None]],
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_id: str,
session_id: str,
bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
"""
add a chunk data inside a session, delegate the entire work in Dask worker
:throw:
- BulkNotProcessable: in case on invalid input data
- BulkSaveException: if store operation fails for some reasons
"""
bulk_id = bulk_id or new_bulk_id()
base_path = pathBuilder.record_session_path(self.base_directory, session_id, record_id, self.protocol)
# ensure directory exists for local storage, do nothing on remote storage
self._ensure_dir_tree_exists(base_path)
async with self._data_ipc.set(data) as (data_handle, data_getter):
data = None # unref data
df_describe = await submit_with_trace(self.client,
bulk_writer.add_chunk_in_session,
data_handle,
data_getter,
content_type,
df_validator_func,
base_path,
self._parameters.storage_options)
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
base_directory=base_directory,
storage_options={'auto_mkdir': True})
return await DaskBulkStorage.create(params)
return bulk_id, df_describe
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from .dask_bulk_storage import DaskBulkStorage
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
base_directory=base_directory,
storage_options={'auto_mkdir': True})
return await DaskBulkStorage.create(params)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path, remove
import uuid
import asyncio
from io import BytesIO
from contextlib import asynccontextmanager, contextmanager, suppress
from typing import Union, AsyncGenerator, AsyncContextManager
from dask.utils import format_bytes
from app.utils import get_wdms_temp_dir
from app.helper.logger import get_logger
GiB = 10 ** 9
MiB = 10 ** 6
"""
Dask data IPC (inter process communication) implementations
=============
This module contains various mechanism to pass data (bytes) between the main process to the dask worker:
* `DaskNativeDataIPC` uses the native Dask mechanism using dask_client.scatter
* `DaskLocalFileDataIPC` uses temporary local files to transfer data. The main motivation is to reduce the memory while
improving efficiency. Here's a note from Dask: `Note that it is often better to submit jobs to your workers to have them
load the data rather than loading data locally and then scattering it out to them.`
* `DaskNoneDataIPC` does nothing but forward what is put inside. This is only in case of mono process and as utility for
testing and development.
Data is expected to flow is one way for now, from main to worker. In main producer set the data asynchronously using a
context manager. The `set` method return an handle and getter pointer function. The data can then be fetched using the
getter function given the handle: and pass the result as argument to the worker:
.. code-block:: python
async with ipc_data.set(data_to_pass_to_worker) as (ipc_data_handle, ipc_data_getter_func):
dask.client.submit(some_func, ipc_data_handle, ipc_data_getter_func, ...)
Inside the worker, the data is fetched synchronously as a file_like object:
.. code-block:: python
with ipc_data_getter_func(ipc_data_handle) as file_like_data:
actual_data: bytes = file_like_data.read()
"""
async def _real_all_from_async_gen(gen: AsyncGenerator[bytes, None]) -> bytes:
""" concat all data from an async generator and return the result"""
chunks = []
async for chunk in gen:
chunks.append(chunk)
return b"".join(chunks)
@contextmanager
def ipc_data_getter_from_bytes(ipc_ref):
""" get data as file_like given data handle as direct bytes. To use with 'with' statement. """
yield BytesIO(ipc_ref)
@contextmanager
def ipc_data_getter_from_file(ipc_ref):
""" get data as file_like given data handle from a file. To use with 'with' statement. """
with open(ipc_ref, 'rb') as f:
yield f
class DaskNativeDataIPC:
"""
Data IPC implementation based on Dask native method (DaskClient.scatter) which efficiency degrades after 2MB
"""
ipc_type = 'dask_native'
def __init__(self, dask_client):
""" build a dask native ipc """
self._client = dask_client
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
scattered_data = await self._client.scatter(data, direct=True)
yield scattered_data, ipc_data_getter_from_bytes
class DaskLocalFileDataIPC:
"""
Data IPC using local file to share data. The implementations focuses to release memory as soon as possible reducing
the memory footprint.
It also 'monitors' the space used on disk.
"""
ipc_type = 'local_file'
total_files_count = 0 # only for monitoring purposes
total_size_in_file = 0 # only for monitoring purposes
log_usage_info_threshold = 1 * GiB
log_usage_warning_threshold = 2 * GiB
log_usage_error_threshold = 5 * GiB
def __init__(self, base_folder=None, io_chunk_size=50*MiB):
self._base_folder = base_folder or get_wdms_temp_dir()
self._io_chunk_size = io_chunk_size
class _AsyncSetterContextManager:
# done this way rather than @asynccontextmanager to release memory as soon as it's possible
def __init__(self, base_folder: str, data: Union[bytes, AsyncGenerator[bytes, None]], chunk_size: int):
self._base_folder = base_folder
self._data = data
self._file_path = None
self._file_size = 0
self._io_chunk_size = chunk_size
def _clean(self):
# delete file if any
if self._file_path:
get_logger().debug(f"IPC data via file, deletion of {self._file_path}")
with suppress(Exception):
remove(self._file_path)
# keep track of total file and size for monitoring purposes
DaskLocalFileDataIPC.total_files_count = max(0, DaskLocalFileDataIPC.total_files_count-1)
DaskLocalFileDataIPC.total_size_in_file = max(0, DaskLocalFileDataIPC.total_size_in_file-self._file_size)
self._file_path = None
async def _write_to_file(self, file, chunk_data: bytes) -> int:
""" write the into file by chunk """
if self._io_chunk_size == 0 or len(chunk_data) < self._io_chunk_size: # write it all at once
return file.write(chunk_data) or 0
# loop and release the event loop
dump_size = self._io_chunk_size
written_size = 0
for i in range(0, len(chunk_data), dump_size):
written_size += file.write(chunk_data[i:i + dump_size]) or 0
# as Disk I/O cannot really be async, read/write one chunk at a time then release the event loop
await asyncio.sleep(0)
return written_size
async def __aenter__(self):
filepath = path.join(self._base_folder, 'ipc_' + str(uuid.uuid4()))
try:
with open(filepath, 'wb') as f:
DaskLocalFileDataIPC.total_files_count += 1
self._file_path = filepath
if type(self._data) is bytes: # basic type check
# data are bytes
self._file_size = await self._write_to_file(f, self._data)
else:
# data is passed as a async generator
async for data_chunk in self._data:
# async generator provided: iterate on chunks
self._file_size += await self._write_to_file(f, data_chunk)
self._data = None # unref so it can be freed
get_logger().debug(f"IPC data via file, {self._file_size} bytes written into {self._file_path}")
DaskLocalFileDataIPC.total_size_in_file += self._file_size
return filepath, ipc_data_getter_from_file
except: # clean up file in any case on write failure
self._clean()
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._clean()
def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
self._log_files_stat()
return self._AsyncSetterContextManager(self._base_folder, data, self._io_chunk_size)
@classmethod
def _log_files_stat(cls):
""" internal log current number of file and total size """
if cls.total_size_in_file > cls.log_usage_error_threshold:
get_logger().error(f"unexpected IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_warning_threshold:
get_logger().warning(f"IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_info_threshold:
get_logger().info(f"IPC data usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
class DaskNoneDataIPC:
""" Utility, when no multiprocess, do nothing just pass, get data as it """
ipc_type = 'none'
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
yield data, ipc_data_getter_from_bytes
from typing import List
import json
import fsspec
import pandas as pd
from app.model.model_chunking import DataframeBasicDescribe
# imports from bulk_persistence
from ..json_orient import JSONOrient
from ..mime_types import MimeType
from ..dataframe_serializer import DataframeSerializerSync
from ..dataframe_validators import (DataFrameValidationFunc, assert_df_validate, validate_index,
columns_not_in_reserved_names, validate_number_of_columns)
from .traces import trace_dataframe_attributes
from .errors import BulkNotProcessable, BulkSaveException
from . import storage_path_builder as path_builder
from . import session_file_meta as session_meta
"""
Contains functions related to writing bulk that mean to be run inside worker
"""
def basic_describe(df: pd.DataFrame) -> DataframeBasicDescribe:
full_cols = df.columns.tolist()
if len(full_cols) > 20: # truncate if too many columns, show 10 first and 10 last
cols = [*full_cols[0:10], '...', *full_cols[-10:]]
else:
cols = full_cols
return DataframeBasicDescribe(rowCount=len(df.index),
columnCount=len(full_cols),
columns=cols,
indexStart=str(df.index[0]),
indexEnd=str(df.index[-1]),
indexType=str(df.index.dtype))
def write_bulk_without_session(data_handle,
data_getter,
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
bulk_base_path: str,
storage_options) -> DataframeBasicDescribe:
"""
process post data outside of a session - write data straight to blob storage
:param data_handle: dataframe as input ipc raw bytes wrapped (file-like obj)
:param data_getter: function to get data from the handle
:param content_type: content type value as mime type (supports json and parquet)
:param df_validator_func: option validation callable function.
:param bulk_base_path: base path of the final object on blob storage.
:param storage_options: storage options
:return: basic describe of the dataframe
:throw: BulkNotProcessable, BulkSaveException
"""
# 1- deserialize to pandas dataframe
try:
with data_getter(data_handle) as file_like_data:
df = DataframeSerializerSync.load(file_like_data, content_type, JSONOrient.split)
except Exception as e:
raise BulkNotProcessable(f'parsing error: {e}') from e
data_handle = None # unref
# 2- input dataframe validation
assert_df_validate(df, [df_validator_func, validate_number_of_columns, columns_not_in_reserved_names, validate_index])
trace_dataframe_attributes(df)
# 3- build blob filename and final full blob path
filename = session_meta.generate_chunk_filename(df)
full_file_path = path_builder.join(bulk_base_path, filename + '.parquet')
# 4- save/upload the dataframe
try:
DataframeSerializerSync.to_parquet(df, full_file_path, storage_options=storage_options)
except Exception as e:
raise BulkSaveException('Unexpected error and save bulk') from e
# 4- return basic describe
return basic_describe(df)
def add_chunk_in_session(data_handle,
data_getter,
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_session_path: str,