Commit 0c44de28 authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'dask_read_optimization' into 'master'

improve offset limit filter

See merge request !237
parents 425b0614 2b619076
Pipeline #64253 passed with stages
in 16 minutes and 50 seconds
......@@ -12,6 +12,7 @@ from natsort import natsorted
from app.clients.storage_service_client import get_storage_record_service
from app.bulk_persistence import DataframeSerializerAsync
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from app.bulk_persistence.dask.utils import set_index
from app.bulk_persistence.mime_types import MimeTypes
from app.bulk_persistence import JSONOrient
from app.utils import get_ctx, OpenApiHandler, Context
......@@ -132,6 +133,20 @@ class DataFrameRender:
driver = await with_dask_blob_storage()
return await driver.client.submit(lambda: len(df.index))
async def select_range(df: dd.DataFrame, offset, limit):
if offset or limit:
driver = await with_dask_blob_storage()
df = driver.client.persist(df)
df = await driver.client.submit(set_index, df)
index = await driver.client.submit(lambda x: x.index.compute(), df)
if offset and offset > 0:
index = index[offset:]
if limit and limit > 0:
index = index[:limit]
return df.loc[df.index.isin(index)]
return df
re_array_selection = re.compile(r'^(?P<name>.+)\[(?P<start>[^:]+):?(?P<stop>.*)\]$')
......@@ -175,13 +190,7 @@ class DataFrameRender:
df = df[natsorted(df.columns)] # columns are ordered by natural sort
if params.offset:
head_index = df.head(params.offset, npartitions=-1, compute=False).index
index = await DataFrameRender.compute(head_index) # TODO could be slow!
df = df.loc[~df.index.isin(index)]
if params.limit and params.limit > 0:
df = df.head(params.limit, npartitions=-1, compute=False)
df = await DataFrameRender.select_range(df, params.offset, params.limit)
return df
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment