Commit e21cad5a authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'add_array_support' into 'master'

Add support of data with large number of columns

See merge request !243
parents 049a8f34 89d32d91
Pipeline #83245 failed with stages
in 46 minutes and 16 seconds
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module groups function related to bulk catalog.
A catalog contains metadata of the chunks
"""
import json
from contextlib import suppress
from dataclasses import dataclass
from typing import Dict, Iterable, List, NamedTuple, Optional, Set
from dask.distributed import get_client
from app.bulk_persistence.dask.traces import submit_with_trace
from app.helper.traces import with_trace
from app.utils import capture_timings
from .storage_path_builder import join, remove_protocol
from .utils import worker_capture_timing_handlers
@dataclass
class ChunkGroup:
"""A chunk group represent a chunk list having exactly the same shemas
(columns labels and dtypes)"""
labels: Set[str]
paths: List[str]
dtypes: List[str]
class BulkCatalog:
"""Represent a bulk catalog
Example:
{
"recordId": "7507fb30-9cfa-4506-9cd8-6cbacbcda740",
"nbRows": 1000,
"indexPath": "folder/wdms_index/index.parquet,
"columns" : [
{
"labels": ["A", "B"],
"paths": ["folder/file1.parquet", "folder/file2.parquet"],
"dtypes": ["Int64, "Float32"]
},
{
"labels": ["C"],
"paths": ["folder/file3.parquet"],
"dtypes": ["Float32"]
}
],
}
"""
def __init__(self, record_id: str) -> None:
self.record_id: str = record_id # TODO remove
self.nb_rows: int = 0
self.index_path: Optional[str] = None
self.columns: List[ChunkGroup] = []
@property
def all_columns_dtypes(self) -> Dict[str, str]:
"""Returns all columns with their dtype
Returns:
Dict[str, str]: a dict { column label : column dtype }
"""
res = {}
for col_group in self.columns:
res.update({cn: dt for cn, dt in zip(col_group.labels, col_group.dtypes)})
return res
def add_chunk(self, chunk_group: ChunkGroup) -> None:
"""Add ChunkGroup to the catalog."""
if len(chunk_group.labels) == 0:
return
keys = frozenset(chunk_group.labels)
chunk_group_with_same_schema = next((x for x in self.columns if len(
keys) == len(x.labels) and all(l in keys for l in x.labels)), None)
if chunk_group_with_same_schema:
chunk_group_with_same_schema.paths.extend(chunk_group.paths)
else:
self.columns.append(chunk_group)
def remove_columns_info(self, labels: Iterable[str]) -> None:
"""Removes columns information
Args:
labels (Iterable[str]): columns labels to remove
"""
clean_needed = False
labels_set = frozenset(labels)
for col_group in self.columns:
remaining_columns = {col: dt for col, dt in zip(
col_group.labels, col_group.dtypes) if col not in labels_set}
if len(remaining_columns) != len(col_group.labels):
col_group.labels = set(remaining_columns.keys())
col_group.dtypes = list(remaining_columns.values())
clean_needed = clean_needed or len(col_group.labels) == 0
if clean_needed:
self.columns = [c for c in self.columns if c.labels]
def change_columns_info(self, chunk_group: ChunkGroup) -> None:
"""Replace column information with the given one
Args:
chunk_group (ChunkGroup): new column information
"""
self.remove_columns_info(chunk_group.labels)
self.add_chunk(chunk_group)
class ColumnsPaths(NamedTuple):
labels: Set[str]
paths: List[str]
def get_paths_for_columns(self, labels: Iterable[str], base_path: str) -> List[ColumnsPaths]:
"""Returns the paths to load data of the requested columns grouped by paths"""
grouped_files = []
for col_group in self.columns:
matching_columns = col_group.labels.intersection(labels)
if matching_columns:
grouped_files.append(self.ColumnsPaths(
labels=matching_columns,
paths=[join(base_path, f) for f in col_group.paths])
)
return grouped_files
def as_dict(self) -> dict:
"""Returns the dict representation of the catalog"""
return {
"recordId": self.record_id,
"nbRows": self.nb_rows,
"indexPath": self.index_path,
'columns': [{
'labels': list(c.labels),
'paths': c.paths,
'dtypes': c.dtypes
} for c in self.columns],
}
@classmethod
def from_dict(cls, catalog_as_dict: dict) -> "BulkCatalog":
"""construct a Catalog from a dict"""
catalog = cls(record_id=catalog_as_dict["recordId"])
catalog.nb_rows = catalog_as_dict["nbRows"]
catalog.index_path = catalog_as_dict["indexPath"]
catalog.columns = [
ChunkGroup(set(c["labels"]), c["paths"], c["dtypes"])
for c in catalog_as_dict["columns"]
]
return catalog
CATALOG_FILE_NAME = 'bulk_catalog.json'
@capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('save_bulk_catalog')
def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
"""save a bulk catalog to a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with filesystem.open(meta_path, 'w') as outfile:
data = json.dumps(catalog.as_dict(), indent=0)
outfile.write(data)
# json.dump(catalog.as_dict(), outfile) # don't know why json.dump is slower (local windows)
@capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk_catalog')
def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
"""load a bulk catalog from a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with suppress(FileNotFoundError):
with filesystem.open(meta_path) as json_file:
data = json.load(json_file)
return BulkCatalog.from_dict(data)
return None
async def async_load_bulk_catalog(filesystem, folder_path: str) -> BulkCatalog:
return await submit_with_trace(get_client(), load_bulk_catalog, filesystem, folder_path)
async def async_save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
return await submit_with_trace(get_client(), save_bulk_catalog, filesystem, folder_path, catalog)
......@@ -12,17 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import os
from typing import List
import time
from contextlib import suppress
from operator import attrgetter
from typing import Dict, Generator, List
from distributed.worker import get_client
import pandas as pd
from app.bulk_persistence.dask.utils import share_items
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings
from .storage_path_builder import add_protocol, record_session_path
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, file_path: str) -> None:
def __init__(self, fs, file_path: str, lazy: bool = True) -> None:
"""
Args:
fs: fsspec filesystem
file_path (str): the parquet chunk file path
lazy (bool, optional): prefetch the metadata file if False, else read at demand. Defaults to True.
"""
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
......@@ -31,6 +47,8 @@ class SessionFileMeta:
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
if not lazy:
self._read_meta()
def _read_meta(self):
if not self._meta:
......@@ -41,9 +59,29 @@ class SessionFileMeta:
@property
def columns(self) -> List[str]:
"""Return the column names"""
"""Returns the column names"""
return self._read_meta()['columns']
@property
def dtypes(self) -> List[str]:
"""Returns the column dtypes"""
return self._read_meta()['dtypes']
@property
def nb_rows(self) -> int:
"""Returns the number of rows of the chunk"""
return self._read_meta()['nb_rows']
@property
def path_with_protocol(self) -> str:
"""Returns chunk path with protocol"""
return add_protocol(self.path, self._fs.protocol)
@property
def index_hash(self) -> str:
"""Returns the index hash"""
return self._read_meta()['index_hash']
def overlap(self, other: 'SessionFileMeta') -> bool:
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
......@@ -51,3 +89,92 @@ class SessionFileMeta:
def has_common_columns(self, other: 'SessionFileMeta') -> bool:
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
def generate_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Generate a chunk filename composed of information from the given dataframe
{first_index}_{last_index}_{time}.{shape}
The shape is a hash of columns names + columns dtypes
If chunks have same shape, dask can read them together.
Warnings:
- This funtion is not idempotent !
- Do not modify the name without updating the class SessionFileMeta !
Indeed, SessionFileMeta parse information from the chunk filename
- Filenames impacts partitions order in Dask as it order them by 'natural key'
Thats why the start index is in the first position
Raises:
IndexError - if empty dataframe
>>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)))
'0_9_1637223437910.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10,20)))
'10_19_1637223490719.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': [1], 'B': [1]}, index=[datetime.datetime.now()]))
'1639672097644401000_1639672097644401000_1639668497645.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': []}, index=[]))
IndexError: index 0 is out of bounds for axis 0 with size 0
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time.time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
def build_chunk_metadata(dataframe: pd.DataFrame) -> dict:
"""Returns dataframe metadata
Other metadata such as start_index or stop_index are saved into the chunk filename
>>> build_chunk_metadata(pd.DataFrame({'A': [1,2,3], 'B': [4,5,6]}, index=[0,1,2]))
{'columns': ['A', 'B'], 'dtypes': ['int64', 'int64'], 'nb_rows': 3, 'index_hash': 'ab2fa50ae23ce035bad2e77ec5e0be05c2f4b816'}
"""
return {
"columns": list(dataframe.columns),
"dtypes": [str(dt) for dt in dataframe.dtypes],
"nb_rows": len(dataframe.index),
"index_hash": hashlib.sha1(dataframe.index.values).hexdigest()
}
@capture_timings('get_chunks_metadata')
@with_trace('get_chunks_metadata')
async def get_chunks_metadata(filesystem, base_directory, session: Session) -> List[SessionFileMeta]:
"""Return metadata objects for a given session"""
session_path = record_session_path(base_directory, session.id, session.recordId)
with suppress(FileNotFoundError):
parquet_files = [f for f in filesystem.ls(session_path) if f.endswith(".parquet")]
futures = get_client().map(lambda f: SessionFileMeta(filesystem, f, lazy=False) , parquet_files)
return await get_client().gather(futures)
return []
def get_next_chunk_files(
chunks_info
) -> Generator[List[SessionFileMeta], None, None]:
"""Generator which groups session chunk files in lists of files that can be read directly with dask
File can be grouped if they have the same schemas and no overlap between indexes
"""
chunks_info.sort(key=attrgetter('time'))
cache: Dict[str, SessionFileMeta] = {}
columns_in_cache = set() # keep track of colunms present in the cache
for chunk in chunks_info:
if chunk.shape in cache: # if other chunks with same shape
if any(chunk.overlap(c) for c in cache[chunk.shape]): # rows overlaps
yield cache[chunk.shape]
del cache[chunk.shape]
elif not columns_in_cache.isdisjoint(chunk.columns): # else if columns conflicts
conflicting_chunk = next(metas[0] for metas in cache.values()
if chunk.has_common_columns(metas[0]))
yield cache[conflicting_chunk.shape]
columns_in_cache = columns_in_cache.difference(conflicting_chunk.columns)
del cache[conflicting_chunk.shape]
cache.setdefault(chunk.shape, []).append(chunk)
columns_in_cache.update(chunk.columns)
yield from cache.values()
......@@ -17,12 +17,9 @@ Utility functions that gathers method to build path for bulk storage
"""
import hashlib
from os.path import join
from time import time
from os.path import join, relpath
from typing import Optional, Tuple
import pandas as pd
def hash_record_id(record_id: str) -> str:
"""encode the record_id to be a valid path name"""
......@@ -44,14 +41,21 @@ def add_protocol(path: str, protocol: str) -> str:
def remove_protocol(path: str) -> Tuple[str, str]:
"""remove protocol for path if any, return tuple[path, protocol].
If no protocol in path then protocol='' """
If no protocol in path then protocol=''
>>> remove_protocol('s3://path/to/my/file')
('path/to/my/file', 's3')
>>> remove_protocol('path/to/my/file')
('path/to/my/file', '')
"""
if '://' not in path:
return path, ''
sep_idx = path.index('://')
return path[sep_idx + 3:], path[:sep_idx]
def record_path(base_directory: str, record_id, protocol: Optional[str] = None) -> str:
def record_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""Return the entity path.
(path where all data relateed to an entity are saved"""
encoded_id = hash_record_id(record_id)
......@@ -59,49 +63,31 @@ def record_path(base_directory: str, record_id, protocol: Optional[str] = None)
return join(base_path, encoded_id)
def record_bulks_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where blob are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk')
def record_sessions_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where sessions are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session')
def record_bulk_path(
base_directory: str, record_id: str, bulk_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified bulk."""
entity_blob_path = record_bulks_root_path(base_directory, record_id, protocol)
return join(entity_blob_path, bulk_id, 'data')
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk', bulk_id, 'data')
def record_session_path(
base_directory: str, session_id: str, record_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified session."""
entity_session_path = record_sessions_root_path(base_directory, record_id, protocol)
return join(entity_session_path, session_id, 'data')
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session', session_id, 'data')
def build_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Return chunk file name sorted by starting index
Note 1: do not change the name without updating SessionFileMeta
Note 2: dask reads and sort files by 'natural_key' so the filenames impacts the final result
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
#shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape_str = '_'.join(f'{cn}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
def record_relative_path(base_directory: str, record_id: str, path: str) -> str:
"""Returns the path relative to the specified record."""
base_path = record_path(base_directory, record_id)
path, _proto = remove_protocol(path)
return relpath(path, base_path)
def full_path(
base_directory: str, record_id: str, rel_path: str, protocol: Optional[str] = None
) -> str:
"""Returns the full path of a record from a relative path"""
return join(record_path(base_directory, record_id, protocol), rel_path)
from typing import Callable
from dask.distributed import Client
import pandas as pd
from dask.utils import funcname
from dask.base import tokenize
......@@ -6,9 +9,9 @@ from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.utils import get_or_create_ctx
from app.conf import Config
from app.helper.traces import create_exporter
from app.utils import get_ctx, get_or_create_ctx
_EXPORTER = None
......@@ -40,6 +43,30 @@ def _create_func_key(func, *args, **kwargs):
return funcname(func) + "-" + tokenize(func, kwargs, *args)
def submit_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
"""Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
"""
dask_task_key = _create_func_key(target_func, *args, **kwargs)
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return dask_client.submit(wrap_trace_process, *args, key=dask_task_key, **kwargs)
def map_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
"""Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
"""
dask_task_key = _create_func_key(target_func, *args, **kwargs)
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return dask_client.map(wrap_trace_process, *args, key=dask_task_key, **kwargs)
def trace_dataframe_attributes(df: pd.DataFrame):
"""
Add dataframe shape into current tracing span if tracer exists
......@@ -56,6 +83,3 @@ def trace_dataframe_attributes(df: pd.DataFrame):
tracer.add_attribute_to_current_span(
attribute_key="df columns count",
attribute_value=cols_count)
......@@ -12,16 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from itertools import zip_longest
from logging import INFO
from typing import List, Optional
import dask.dataframe as dd
import pandas as pd
import pyarrow.parquet as pa
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -56,8 +57,15 @@ def set_index(ddf: dd.DataFrame):
return ddf
@capture_timings("join_dataframes", handlers=worker_capture_timing_handlers)
def join_dataframes(dfs: List[dd.DataFrame]):
if len(dfs) > 1:
return dfs[0].join(dfs[1:], how='outer')
return dfs[0] if dfs else None
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
def do_merge(df1: dd.DataFrame, df2: Optional[dd.DataFrame]):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
......@@ -65,8 +73,20 @@ def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return df2.combine_first(df1)
return df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf
@capture_timings("get_num_rows", handlers=worker_capture_timing_handlers)
def get_num_rows(dataset: pa.ParquetDataset) -> int:
"""Returns the number of rows from a pyarrow ParquetDataset"""
metadata = dataset.common_metadata
if metadata and metadata.num_rows > 0:
return metadata.num_rows
return sum((piece.get_metadata().num_rows for piece in dataset.pieces))