session_file_meta.py 8.03 KB
Newer Older
Jeremie Hallal's avatar
Jeremie Hallal committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import json
import os
import time
19
20
from contextlib import suppress
from operator import attrgetter
Jeremie Hallal's avatar
Jeremie Hallal committed
21
from typing import Dict, Generator, List
22
from distributed.worker import get_client
Jeremie Hallal's avatar
Jeremie Hallal committed
23
24
25

import pandas as pd
from app.bulk_persistence.dask.utils import share_items
Jeremie Hallal's avatar
Jeremie Hallal committed
26
from app.helper.logger import get_logger
Jeremie Hallal's avatar
Jeremie Hallal committed
27
from app.helper.traces import with_trace
28
from app.persistence.sessions_storage import Session
29
from app.utils import capture_timings
30

Jeremie Hallal's avatar
Jeremie Hallal committed
31
from .storage_path_builder import add_protocol, record_session_path
Jeremie Hallal's avatar
Jeremie Hallal committed
32

Jeremie Hallal's avatar
Jeremie Hallal committed
33

Jeremie Hallal's avatar
Jeremie Hallal committed
34
35
36
class SessionFileMeta:
    """The class extract information about chunks."""

Jeremie Hallal's avatar
Jeremie Hallal committed
37
    def __init__(self, fs, protocol: str, file_path: str, lazy: bool = True) -> None:
Jeremie Hallal's avatar
Jeremie Hallal committed
38
39
40
41
42
43
        """
        Args:
            fs: fsspec filesystem
            file_path (str): the parquet chunk file path
            lazy (bool, optional): prefetch the metadata file if False, else read at demand. Defaults to True.
        """
Jeremie Hallal's avatar
Jeremie Hallal committed
44
45
46
47
48
49
50
51
        self._fs = fs
        file_name = os.path.basename(file_path)
        start, end, tail = file_name.split('_')
        self.start = float(start)  # data time support ?
        self.end = float(end)
        self.time, self.shape, tail = tail.split('.')
        self._meta = None
        self.path = file_path
Jeremie Hallal's avatar
Jeremie Hallal committed
52
        self.protocol = protocol
53
54
        if not lazy:
            self._read_meta()
Jeremie Hallal's avatar
Jeremie Hallal committed
55
56
57
58
59
60
61
62
63

    def _read_meta(self):
        if not self._meta:
            path, _ = os.path.splitext(self.path)
            with self._fs.open(path + '.meta') as meta_file:
                self._meta = json.load(meta_file)
        return self._meta

    @property
Jeremie Hallal's avatar
Jeremie Hallal committed
64
    def columns(self) -> List[str]:
Jeremie Hallal's avatar
Jeremie Hallal committed
65
        """Returns the column names"""
Jeremie Hallal's avatar
Jeremie Hallal committed
66
        return self._read_meta()['columns']
Jeremie Hallal's avatar
Jeremie Hallal committed
67

Jeremie Hallal's avatar
Jeremie Hallal committed
68
    @property
Jeremie Hallal's avatar
Jeremie Hallal committed
69
    def dtypes(self) -> List[str]:
Jeremie Hallal's avatar
Jeremie Hallal committed
70
        """Returns the column dtypes"""
Jeremie Hallal's avatar
Jeremie Hallal committed
71
        return self._read_meta()['dtypes']
Jeremie Hallal's avatar
Jeremie Hallal committed
72

Jeremie Hallal's avatar
Jeremie Hallal committed
73
    @property
Jeremie Hallal's avatar
Jeremie Hallal committed
74
    def nb_rows(self) -> int:
75
        """Returns the number of rows of the chunk"""
Jeremie Hallal's avatar
Jeremie Hallal committed
76
77
        return self._read_meta()['nb_rows']

Jeremie Hallal's avatar
Jeremie Hallal committed
78
79
80
    @property
    def path_with_protocol(self) -> str:
        """Returns chunk path with protocol"""
Jeremie Hallal's avatar
Jeremie Hallal committed
81
        return add_protocol(self.path, self.protocol)
Jeremie Hallal's avatar
Jeremie Hallal committed
82

Jeremie Hallal's avatar
Jeremie Hallal committed
83
    @property
Jeremie Hallal's avatar
Jeremie Hallal committed
84
    def index_hash(self) -> str:
85
        """Returns the index hash"""
Jeremie Hallal's avatar
Jeremie Hallal committed
86
87
        return self._read_meta()['index_hash']

Jeremie Hallal's avatar
Jeremie Hallal committed
88
    def overlap(self, other: 'SessionFileMeta') -> bool:
Jeremie Hallal's avatar
Jeremie Hallal committed
89
90
91
        """Returns True if indexes overlap."""
        return self.end >= other.start and other.end >= self.start

Jeremie Hallal's avatar
Jeremie Hallal committed
92
    def has_common_columns(self, other: 'SessionFileMeta') -> bool:
Jeremie Hallal's avatar
Jeremie Hallal committed
93
94
95
96
        """Returns True if contains common columns with others."""
        return share_items(self.columns, other.columns)


Jeremie Hallal's avatar
Jeremie Hallal committed
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def generate_chunk_filename(dataframe: pd.DataFrame) -> str:
    """Generate a chunk filename composed of information from the given dataframe
    {first_index}_{last_index}_{time}.{shape}
    The shape is a hash of columns names + columns dtypes
    If chunks have same shape, dask can read them together.

    Warnings:
        - This funtion is not idempotent !
        - Do not modify the name without updating the class SessionFileMeta !
          Indeed, SessionFileMeta parse information from the chunk filename
        - Filenames impacts partitions order in Dask as it order them by 'natural key'
          Thats why the start index is in the first position

    Raises:
        IndexError - if empty dataframe

    >>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)))
    '0_9_1637223437910.526782c41fe12c3249046fedcc45563ef3662250'
    >>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10,20)))
    '10_19_1637223490719.526782c41fe12c3249046fedcc45563ef3662250'
Jeremie Hallal's avatar
merge    
Jeremie Hallal committed
117
    >>> generate_chunk_filename(pd.DataFrame({'A': [1], 'B': [1]}, index=[datetime.datetime.now()]))
Jeremie Hallal's avatar
Jeremie Hallal committed
118
    '1639672097644401000_1639672097644401000_1639668497645.526782c41fe12c3249046fedcc45563ef3662250'
Jeremie Hallal's avatar
Jeremie Hallal committed
119
120
121
    >>> generate_chunk_filename(pd.DataFrame({'A': []}, index=[]))
    IndexError: index 0 is out of bounds for axis 0 with size 0
    """
Jeremie Hallal's avatar
Jeremie Hallal committed
122
123
124
    first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
    if isinstance(dataframe.index, pd.DatetimeIndex):
        first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
Jeremie Hallal's avatar
Jeremie Hallal committed
125

Jeremie Hallal's avatar
Jeremie Hallal committed
126
    shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
Jeremie Hallal's avatar
Jeremie Hallal committed
127
128
129
    shape = hashlib.sha1(shape_str.encode()).hexdigest()
    cur_time = round(time.time() * 1000)
    return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
Jeremie Hallal's avatar
Jeremie Hallal committed
130
131
132
133
134
135
136
137
138
139
140
141


def build_chunk_metadata(dataframe: pd.DataFrame) -> dict:
    """Returns dataframe metadata
    Other metadata such as start_index or stop_index are saved into the chunk filename

    >>> build_chunk_metadata(pd.DataFrame({'A': [1,2,3], 'B': [4,5,6]}, index=[0,1,2]))
    {'columns': ['A', 'B'], 'dtypes': ['int64', 'int64'], 'nb_rows': 3, 'index_hash': 'ab2fa50ae23ce035bad2e77ec5e0be05c2f4b816'}
    """
    return {
        "columns": list(dataframe.columns),
        "dtypes": [str(dt) for dt in dataframe.dtypes],
142
        "nb_rows": len(dataframe.index),
Jeremie Hallal's avatar
Jeremie Hallal committed
143
144
        "index_hash": hashlib.sha1(dataframe.index.values).hexdigest()
    }
145
146


Jeremie Hallal's avatar
Jeremie Hallal committed
147
@capture_timings('get_chunks_metadata')
148
@with_trace('get_chunks_metadata')
Jeremie Hallal's avatar
Jeremie Hallal committed
149
async def get_chunks_metadata(filesystem, protocol: str, base_directory: str, session: Session) -> List[SessionFileMeta]:
150
151
152
153
    """Return metadata objects for a given session"""
    session_path = record_session_path(base_directory, session.id, session.recordId)
    with suppress(FileNotFoundError):
        parquet_files = [f for f in filesystem.ls(session_path) if f.endswith(".parquet")]
Jeremie Hallal's avatar
Jeremie Hallal committed
154
        futures = get_client().map(lambda f: SessionFileMeta(filesystem, protocol, f, lazy=False) , parquet_files)
155
156
157
158
        return await get_client().gather(futures)
    return []


Jeremie Hallal's avatar
Jeremie Hallal committed
159
def get_next_chunk_files(
160
161
162
163
164
165
166
    chunks_info
) -> Generator[List[SessionFileMeta], None, None]:
    """Generator which groups session chunk files in lists of files that can be read directly with dask
    File can be grouped if they have the same schemas and no overlap between indexes
    """
    chunks_info.sort(key=attrgetter('time'))

Jeremie Hallal's avatar
Jeremie Hallal committed
167
    cache: Dict[str, List[SessionFileMeta]] = {}
168
169
170
    columns_in_cache = set()  # keep track of colunms present in the cache
    for chunk in chunks_info:
        if chunk.shape in cache: # if other chunks with same shape
Jeremie Hallal's avatar
Jeremie Hallal committed
171
172
173
174
175
176
177
178
179
180
181
            # looking for overlaped chunk
            for i, cached_chunk in enumerate(cache[chunk.shape]):
                if chunk.overlap(cached_chunk):
                    if chunk.index_hash == cached_chunk.index_hash:
                        # if chunks are identical in shape and index just keep the last one
                        get_logger().info(f"Duplicated chunk skipped : '{chunk.path}'")
                        cache[chunk.shape].pop(i)
                    else:
                        yield cache[chunk.shape]
                        del cache[chunk.shape]
                    break
182
183
184
185
186
187
188
189
190
191
        elif not columns_in_cache.isdisjoint(chunk.columns): # else if columns conflicts
            conflicting_chunk = next(metas[0] for metas in cache.values()
                                     if chunk.has_common_columns(metas[0]))
            yield cache[conflicting_chunk.shape]
            columns_in_cache = columns_in_cache.difference(conflicting_chunk.columns)
            del cache[conflicting_chunk.shape]
        cache.setdefault(chunk.shape, []).append(chunk)
        columns_in_cache.update(chunk.columns)

    yield from cache.values()