Skip to content
Snippets Groups Projects
Commit 4b6fb10f authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'mk_module_for_path_manipulation' into 'master'

[bulk storage refactoring]Add module for blob path manipulation

See merge request !306
parents 6e365983 c1293521
No related branches found
No related tags found
1 merge request!306[bulk storage refactoring]Add module for blob path manipulation
Pipeline #77650 waiting for manual action
......@@ -12,9 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import time
from contextlib import suppress
from operator import attrgetter
from typing import List
......@@ -39,6 +37,7 @@ from .traces import wrap_trace_process
from .utils import by_pairs, do_merge, worker_capture_timing_handlers
from .dask_worker_plugin import DaskWorkerPlugin
from .session_file_meta import SessionFileMeta
from . import storage_path_builder as pathBuilder
from ..bulk_id import new_bulk_id
......@@ -87,22 +86,6 @@ class DaskBulkStorage:
def base_directory(self) -> str:
return self._parameters.base_directory
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
def _get_base_directory(self, protocol=True):
return f'{self.protocol}://{self.base_directory}' if protocol else self.base_directory
def _get_blob_path(self, record_id: str, bulk_id: str, with_protocol=True) -> str:
"""Return the bulk path from the bulk_id."""
encoded_id = self._encode_record_id(record_id)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/bulk/{bulk_id}/data'
def _build_path_from_session(self, session: Session, with_protocol=True) -> str:
"""Return the session path."""
encoded_id = self._encode_record_id(session.recordId)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/session/{session.id}/data'
def _load(self, path, **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
path : string or list
......@@ -124,12 +107,14 @@ class DaskBulkStorage:
"""Return a dask Dataframe of a record at the specified version.
returns a Future<dd.DataFrame>
"""
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns)
blob_path = pathBuilder.record_bulk_path(
self.base_directory, record_id, bulk_id, self.protocol)
return self._load(blob_path, columns=columns)
@with_trace('read_stat')
def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False)
file_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema = dataset.read_pandas().schema
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
......@@ -232,7 +217,7 @@ class DaskBulkStorage:
ddf = dd.from_pandas(ddf, npartitions=1)
ddf = await self.client.scatter(ddf)
path = self._get_blob_path(record_id, bulk_id)
path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
try:
await self._save_with_dask(path, ddf)
except OSError:
......@@ -247,29 +232,22 @@ class DaskBulkStorage:
# sort column by names
pdf = pdf[sorted(pdf.columns)]
# generate a file name sorted by starting index
# dask reads and sort files by 'natural_key' So the file name impact the final result
first_idx, last_idx = pdf.index[0], pdf.index[-1]
if isinstance(pdf.index, pd.DatetimeIndex):
first_idx, last_idx = pdf.index[0].value, pdf.index[-1].value
idx_range = f'{first_idx}_{last_idx}'
shape = hashlib.sha1('_'.join(map(str, pdf)).encode()).hexdigest()
t = round(time.time() * 1000)
filename = f'{idx_range}_{t}.{shape}'
session_path_wo_protocol = self._build_path_from_session(session, with_protocol=False)
self._fs.mkdirs(session_path_wo_protocol, exist_ok=True)
with self._fs.open(f'{session_path_wo_protocol}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf)}, outfile)
filename = pathBuilder.build_chunk_filename(pdf)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
session_path = self._build_path_from_session(session)
self._fs.mkdirs(session_path, exist_ok=True)
with self._fs.open(f'{session_path}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf.columns)}, outfile)
session_path = pathBuilder.add_protocol(session_path, self.protocol)
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
@capture_timings('get_session_parquet_files')
@with_trace('get_session_parquet_files')
def get_session_parquet_files(self, session):
session_path = self._build_path_from_session(session, with_protocol=False)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
with suppress(FileNotFoundError):
session_files = [f for f in self._fs.ls(session_path) if f.endswith(".parquet")]
return session_files
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions that gathers method to build path for bulk storage
"""
import hashlib
from os.path import join
from time import time
from typing import Optional, Tuple
import pandas as pd
def hash_record_id(record_id: str) -> str:
"""encode the record_id to be a valid path name"""
return hashlib.sha1(record_id.encode()).hexdigest()
def build_base_path(base_directory: str, protocol: Optional[str] = None) -> str:
"""return the base directory, add the protocol if requested"""
return f'{protocol}://{base_directory}' if protocol else base_directory
def add_protocol(path: str, protocol: str) -> str:
"""add protocole to the path"""
prefix = protocol + '://'
if not path.startswith(prefix):
return prefix + path
return path
def remove_protocol(path: str) -> Tuple[str, str]:
"""remove protocol for path if any, return tuple[path, protocol].
If no protocol in path then protocol='' """
if '://' not in path:
return path, ''
sep_idx = path.index('://')
return path[sep_idx + 3:], path[:sep_idx]
def record_path(base_directory: str, record_id, protocol: Optional[str] = None) -> str:
"""Return the entity path.
(path where all data relateed to an entity are saved"""
encoded_id = hash_record_id(record_id)
base_path = build_base_path(base_directory, protocol)
return join(base_path, encoded_id)
def record_bulks_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where blob are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk')
def record_sessions_root_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""return the path where sessions are stored for the specified entity"""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session')
def record_bulk_path(
base_directory: str, record_id: str, bulk_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified bulk."""
entity_blob_path = record_bulks_root_path(base_directory, record_id, protocol)
return join(entity_blob_path, bulk_id, 'data')
def record_session_path(
base_directory: str, session_id: str, record_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified session."""
entity_session_path = record_sessions_root_path(base_directory, record_id, protocol)
return join(entity_session_path, session_id, 'data')
def build_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Return chunk file name sorted by starting index
Note 1: do not change the name without updating SessionFileMeta
Note 2: dask reads and sort files by 'natural_key' so the filenames impacts the final result
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
#shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape_str = '_'.join(f'{cn}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment