Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Domain Data Mgmt Services
Wellbore
Wellbore Domain Services
Commits
238ba023
Commit
238ba023
authored
Dec 17, 2021
by
Cyril Monmouton
Browse files
Restore optimization to remove select distinct index to be loaded
parent
e21cad5a
Pipeline
#83339
failed with stages
in 12 minutes and 24 seconds
Changes
1
Pipelines
4
Hide whitespace changes
Inline
Side-by-side
app/bulk_persistence/dask/dask_bulk_storage.py
View file @
238ba023
...
@@ -345,10 +345,14 @@ class DaskBulkStorage:
...
@@ -345,10 +345,14 @@ class DaskBulkStorage:
async
def
_build_session_index
(
async
def
_build_session_index
(
self
,
chunk_metas
:
List
[
session_meta
.
SessionFileMeta
],
record_id
:
str
,
from_bulk_id
:
str
self
,
chunk_metas
:
List
[
session_meta
.
SessionFileMeta
],
record_id
:
str
,
from_bulk_id
:
str
)
->
pd
.
Index
:
)
->
pd
.
Index
:
"""Combine all chunks indexes + previous version index"""
"""
# list one file per different index_hash.
Combine all chunks indexes + previous version index
# read chunks indexes from paquet
List one file per different index_hash.
indexes
=
self
.
_map_with_trace
(
_load_index_from_meta
,
chunk_metas
,
Read chunks indexes from parquet
"""
chunks_meta_with_different_indexes
=
{
hash_index
:
meta
for
hash_index
,
meta
in
chunk_metas
}.
values
()
indexes
=
self
.
_map_with_trace
(
_load_index_from_meta
,
chunks_meta_with_different_indexes
,
storage_options
=
self
.
_parameters
.
storage_options
)
storage_options
=
self
.
_parameters
.
storage_options
)
if
from_bulk_id
:
if
from_bulk_id
:
# read the index of previous version
# read the index of previous version
...
@@ -419,6 +423,7 @@ class DaskBulkStorage:
...
@@ -419,6 +423,7 @@ class DaskBulkStorage:
catalog
.
change_columns_info
(
chunk_group
)
catalog
.
change_columns_info
(
chunk_group
)
@
capture_timings
(
'_save_session_index'
)
@
capture_timings
(
'_save_session_index'
)
@
with_trace
(
'_save_session_index'
)
async
def
_save_session_index
(
self
,
path
:
str
,
index
:
pd
.
Index
)
->
str
:
async
def
_save_session_index
(
self
,
path
:
str
,
index
:
pd
.
Index
)
->
str
:
index_folder
=
pathBuilder
.
join
(
path
,
'_wdms_index_'
)
index_folder
=
pathBuilder
.
join
(
path
,
'_wdms_index_'
)
self
.
_fs
.
mkdirs
(
pathBuilder
.
remove_protocol
(
index_folder
)[
0
])
# TODO for local storage
self
.
_fs
.
mkdirs
(
pathBuilder
.
remove_protocol
(
index_folder
)[
0
])
# TODO for local storage
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment