Commit c20c9f6c authored by Luc Yriarte's avatar Luc Yriarte
Browse files

bulk chunking

parent 501bf4e8
......@@ -34,6 +34,7 @@ The following software have components provided under the terms of this license:
- opencensus-ext-stackdriver (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-stackdriver)
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from )
......@@ -121,6 +122,7 @@ The following software have components provided under the terms of this license:
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- toolz (from http://github.com/pytoolz/toolz/)
......@@ -244,6 +246,7 @@ The following software have components provided under the terms of this license:
- msrest (from https://github.com/Azure/msrest-for-python)
- munch (from http://github.com/Infinidat/munch)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- pluggy (from https://github.com/pytest-dev/pluggy)
- py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/)
......@@ -303,6 +306,7 @@ The following software have components provided under the terms of this license:
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- portalocker (from https://github.com/WoLpH/portalocker)
- python-dateutil (from https://dateutil.readthedocs.org)
- pytz (from http://pythonhosted.org/pytz)
......@@ -355,6 +359,7 @@ The following software have components provided under the terms of this license:
- botocore (from https://github.com/boto/botocore)
- grpcio (from http://www.grpc.io)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/)
- pytz (from http://pythonhosted.org/pytz)
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import adlfs
import fsspec
from app.bulk_persistence.dask.blob_storage import (DaskBlobStorageBase,
DaskDriverBlobStorage)
from app.helper.logger import get_logger
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from dask.distributed import WorkerPlugin
class AzureBlobFileSystemWithDefaultCredentials(adlfs.AzureBlobFileSystem):
""" Wrap the azure file system to add credentials if not present
The Azure credential object cannot be serialized (pickle) to the workers
so, it needs to be instantiated from the worker.
"""
def __init__(self, *args, **kwargs):
has_credential = (
"credential" in kwargs or "account_key" in kwargs
)
if not has_credential:
kwargs["credential"] = AzureAioBlobStorage()._get_credentials()
super().__init__(*args, **kwargs)
class AzureWorkerPlugin(WorkerPlugin):
""" worker plugin enables custom code to run at different stages of the Workers' lifecycle
At startup, we wrap the azure blob storage to add the authentication.
"""
def setup(self, worker):
fsspec.register_implementation("abfs", AzureBlobFileSystemWithDefaultCredentials)
fsspec.register_implementation("az", AzureBlobFileSystemWithDefaultCredentials)
return super().setup(worker)
fsspec.register_implementation("abfs", AzureBlobFileSystemWithDefaultCredentials)
fsspec.register_implementation("az", AzureBlobFileSystemWithDefaultCredentials)
class DaskBlobStorageAzure(DaskBlobStorageBase):
"""Instantiate a DaskDriverBlobStorage with the Azure blob storage file system."""
async def build_dask_blob_storage(self, tenant):
az = AzureAioBlobStorage()
storage_account_name = await az._get_storage_account_name(tenant.data_partition_id)
storage_options = {'account_name': storage_account_name}
base_directory = f'{tenant.bucket_name}/dask_data' # TODO remove dask_data
_dask = DaskDriverBlobStorage(protocol='az',
base_directory=base_directory,
storage_options=storage_options)
if await _dask.init_client():
await DaskDriverBlobStorage.client.register_worker_plugin(AzureWorkerPlugin, name="AzureWorkerPlugin")
get_logger().debug(f"DASK_CLIENT: {_dask.client}") # TODO remove dbg
return _dask
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
from abc import ABC, abstractmethod
from operator import attrgetter
import dask
import dask.dataframe as dd
import fsspec
import pandas as pd
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotProcessable, BulkNotFound
from app.bulk_persistence.dask.utils import SessionFileMeta, set_index, do_merge, by_pairs
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings, get_wdms_temp_dir
from dask.distributed import Client
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
class DaskBlobStorageBase(ABC):
@abstractmethod
async def build_dask_blob_storage(self, tenant):
raise NotImplementedError('DaskBlobStorageBase.build_dask_blob_storage')
class DaskBlobStorageLocal(DaskBlobStorageBase):
"""Instantiate a DaskDriverBlobStorage with a local file system."""
def __init__(self, base_directory) -> None:
self._base_directory = base_directory
async def build_dask_blob_storage(self, tenant):
base_directory = f'{self._base_directory}/{tenant.data_partition_id}'
_dask = DaskDriverBlobStorage(protocol='file',
base_directory=base_directory,
storage_options={'auto_mkdir': True})
await _dask.init_client() # TODO should not be init here
get_logger().debug(f"DASK_CLIENT: {_dask.client}")
return _dask
class DaskDriverBlobStorage:
client = None
lock_client = None
def __init__(self, protocol, base_directory, storage_options) -> None:
if DaskDriverBlobStorage.lock_client is None:
DaskDriverBlobStorage.lock_client = asyncio.Lock()
self._storage_options = storage_options
self._protocol = protocol
self._base_directory = base_directory
self._fs = fsspec.filesystem(protocol, **self._storage_options)
@staticmethod
async def close():
async with DaskDriverBlobStorage.lock_client:
if DaskDriverBlobStorage.client:
await DaskDriverBlobStorage.client.close() # or shutdown
DaskDriverBlobStorage.client = None
@staticmethod
async def init_client():
async with DaskDriverBlobStorage.lock_client:
"""Initialise the dask client. Returns False if client was already initialized"""
if not DaskDriverBlobStorage.client:
DaskDriverBlobStorage.client = await Client(asynchronous=True, processes=True)
return True
return False
def _get_base_directory(self, protocol=True):
return f'{self._protocol}://{self._base_directory}' if protocol else self._base_directory
def _get_blob_path(self, bulk_id, protocol=True) -> str:
"""Return the bulk path from the bulk_id."""
return f'{self._get_base_directory(protocol)}/{bulk_id}'
def _build_path_from_session(self, session: Session, protocol=True) -> str:
"""Return the session path."""
return f'{self._get_base_directory(protocol)}/session-{session.id}'
def _load(self, path, **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
"""
return self.client.submit(dd.read_parquet, path, engine='pyarrow-dataset',
storage_options=self._storage_options,
**kwargs)
def _load_bulk(self, bulk_id) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
returns a future<dd.DataFrame>
"""
return self._load(self._get_blob_path(bulk_id))
async def load_bulk(self, bulk_id) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
return await self._load_bulk(bulk_id)
except OSError:
raise BulkNotFound(bulk_id) # TODO proper exception
def _save_with_dask(self, path, ddf):
"""Save the dataframe to a parquet file(s).
ddf: dd.DataFrame or future<dd.DataFrame>
returns a future<None>
Note:
we should be able to change or support other format easily ?
"""
return self.client.submit(dd.to_parquet, ddf, path, schema="infer",
engine='pyarrow',
storage_options=self._storage_options)
def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or future<pd.DataFrame>
returns a future<None>
"""
return self.client.submit(pdf.to_parquet, path, storage_options=self._storage_options)
def _check_incoming_chunk(self, df):
# TODO should we test if is_monotonic?, unique ?
if len(df.index) == 0:
raise BulkNotProcessable("Empty data")
if not df.index.is_unique:
raise BulkNotProcessable("Duplicated index found")
if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
raise BulkNotProcessable("Index should be numeric or datetime")
async def save_blob(self, ddf: dd.DataFrame, bulk_id: str = None):
"""Write the data frame to the blob storage."""
# TODO: The new bulk_id should contain information about the way we store the bulk
# In the future, if we change the way we store chunk it could be useful to deduce it from the bulk_uri
bulk_id = bulk_id or BulkId.new_bulk_id()
if isinstance(ddf, pd.DataFrame):
self._check_incoming_chunk(ddf)
ddf = dd.from_pandas(ddf, npartitions=1)
path = self._get_blob_path(bulk_id)
try:
await self._save_with_dask(path, ddf)
except OSError:
raise BulkNotFound(bulk_id) # TODO proper exception
return bulk_id
@capture_timings('session_add_chunk')
@with_trace('session_add_chunk')
async def session_add_chunk(self, session: Session, pdf: pd.DataFrame):
import hashlib
import time
self._check_incoming_chunk(pdf)
# sort column by names
pdf = pdf[sorted(pdf.columns)]
# generate a file name sorted by starting index
# dask reads and sort files by 'natural_key' So the file name impact the final result
first_idx, last_idx = pdf.index[0], pdf.index[-1]
if isinstance(pdf.index, pd.DatetimeIndex):
first_idx, last_idx = pdf.index[0].value, pdf.index[-1].value
idx_range = f'{first_idx}_{last_idx}'
shape = hashlib.sha256('_'.join(map(str, pdf)).encode()).hexdigest()
t = round(time.time() * 1000)
filename = f'{idx_range}_{t}.{shape}'
session_path_wo_protocol = self._build_path_from_session(session, protocol=False)
self._fs.mkdirs(session_path_wo_protocol, exist_ok=True)
with self._fs.open(f'{session_path_wo_protocol}/{filename}.meta', 'w') as outfile:
json.dump({"columns": list(pdf)}, outfile)
# could be done asynchronously in the workers but it as a cost
# we may want to be async if the dataFrame is big
session_path = self._build_path_from_session(session)
# await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
pdf.to_parquet(f'{session_path}/{filename}.parquet', index=True,
storage_options=self._storage_options, engine='pyarrow')
@capture_timings('get_session_parquet_files')
@with_trace('get_session_parquet_files')
def get_session_parquet_files(self, session):
session_path = self._build_path_from_session(session, protocol=False)
return self._fs.glob(f'{session_path}/*.parquet')
def _get_next_files_list(self, session: Session):
"""Group session files in lists of files that can be read directly with dask
File can be groupped if they have the same columns (shape) and no overlap of indexes
"""
session_files = [SessionFileMeta(self._fs, f) for f in self.get_session_parquet_files(session)]
session_files = sorted(session_files, key=attrgetter('time'))
while len(session_files) > 0:
first = session_files.pop(0)
L = [first]
i = 0
while i < len(session_files):
f2 = session_files[i]
if first.shape == f2.shape:
if any(f2.overlap(f) for f in L):
break
L.append(session_files.pop(i))
elif first.has_common_columns(f2):
break
else:
i = i + 1
yield [f'{self._protocol}://{file.path}' for file in L]
@capture_timings('session_commit')
@with_trace('session_commit')
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
dfs = [self.client.submit(set_index, df1) for df1 in dfs]
while len(dfs) > 1:
dfs = [self.client.submit(do_merge, a, b) for a, b in by_pairs(dfs)]
return await self.save_blob(dfs[0])
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fastapi import status, HTTPException
class BulkError(Exception):
http_status: int
def raise_as_http(self):
raise HTTPException(status_code=self.http_status, detail=str(self))
class BulkNotFound(BulkError):
http_status = status.HTTP_404_NOT_FOUND
def __init__(self, bulk_id):
self.message = f'bulk {bulk_id} not found'
class BulkNotProcessable(BulkError):
http_status = status.HTTP_422_UNPROCESSABLE_ENTITY
def __init__(self, bulk_id):
self.message = f'bulk {bulk_id} not processable'
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_gcp.storage.blob_storage_gcp import GCloudAioStorage
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase, DaskDriverBlobStorage
class DaskBlobStorageGoogle(DaskBlobStorageBase):
"""Instantiate a DaskDriverBlobStorage with the google blob storage file system."""
async def build_dask_blob_storage(self, tenant): # TODO
gcp_store = GCloudAioStorage(service_account_file=tenant.credentials)
token = await gcp_store._get_access_token(tenant.project_id, tenant.bucket_name)
storage_options = {'token': token}
base_directory = f'{tenant.bucket_name}/dask_data' # TODO remove dask_data
_dask = DaskDriverBlobStorage(protocol='gs',
base_directory=base_directory,
storage_options=storage_options)
await _dask.init_client()
return _dask
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from itertools import zip_longest
def share_items(seq1, seq2):
"""Returns True if seq1 contains common items with seq2."""
return not set(seq1).isdisjoint(seq2)
def by_pairs(iterable):
"""Yield successive 2 elements from iterable.
Fill with None if less than 2 items in iterable."""
return zip_longest(*[iter(iterable)] * 2, fillvalue=None)
class SessionFileMeta:
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self.columns = self._get_columns(file_path) # TODO lazy load
self.path = file_path
def _get_columns(self, file_path):
path, _ = os.path.splitext(file_path)
with self._fs.open(path + '.meta') as meta_file:
return json.load(meta_file)['columns']
def overlap(self, other: 'SessionFileMeta'):
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other):
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
def set_index(ddf): # TODO
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
ddf['_idx'] = ddf.index # Why ?
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx')
return ddf
def do_merge(df1, df2):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df2.join(df1, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
......@@ -46,22 +46,21 @@ class DataframeSerializer:
index: List[Union[str, int, float]] = None
class IndexFormat(BaseModel):
TODO: str
# TODO
pass
class ColumnFormat(BaseModel):
TODO: str
class ValuesFormat(BaseModel):
__root__: List[List[Union[str, int, float]]]
# TODO
pass
class RecordsFormat(BaseModel):
TODO: str
# TODO
pass
schema_dict = {
JSONOrient.split: SplitFormat.schema(),
JSONOrient.index: IndexFormat.schema(),
JSONOrient.columns: ColumnFormat.schema(),
JSONOrient.values: ValuesFormat.schema(),
JSONOrient.records: RecordsFormat.schema()
}
......@@ -103,6 +102,4 @@ class DataframeSerializer:
"""
orient = JSONOrient.get(orient)
if isinstance(data, bytes):
data = BytesIO(data)
return pd.read_json(path_or_buf=data, orient=orient.value).replace("NaN", np.NaN)
......@@ -16,13 +16,13 @@ from enum import Enum
from typing import Union
class JSONOrient(Enum):
class JSONOrient(str, Enum):
# not allow 'table' because very verbose then comes with significant overhead
# not allow 'values' because cannot carry index nor column
split = "split"
index = "index"
columns = "columns"
records = "records"
values = "values"
@classmethod
def get(cls, orient: Union[str, "JSONOrient"]) -> "JSONOrient":
......
......@@ -19,6 +19,7 @@ from osdu.core.api.storage.tenant import Tenant
from odes_storage.models import *
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.exceptions import ResourceNotFoundException
from app.model import model_utils
......@@ -104,7 +105,7 @@ class StorageRecordServiceBlobStorage:
Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition_id),
object_name)
return Record.parse_raw(bin_data)
except FileNotFoundError:
except (FileNotFoundError, ResourceNotFoundException):
raise HTTPException(status_code=404, detail="Item not found")
async def get_all_record_versions(self,
......
......@@ -101,6 +101,13 @@ class ConfigurationContainer:
default='undefined'
)
# TODO: based on environment name, hardcoded values here are temporary until chunking feature release
alpha_feature_enabled: EnvVar = EnvVar(
key='ENVIRONMENT_NAME',
description='enable alpha features',
default='',
factory=lambda x: x.lower() in ['evd', 'dev', 'qa'])
cloud_provider: EnvVar = EnvVar(
key='CLOUD_PROVIDER',
description='Short name of the current cloud provider environment, must be "aws" or "gcp" or "az" or "ibm',
......
......@@ -11,15 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language g