Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Domain Data Mgmt Services
Wellbore
Wellbore Domain Services
Commits
a2c7fe2c
Commit
a2c7fe2c
authored
Sep 10, 2021
by
Cyril Monmouton
Browse files
Merge branch 'feature/asynchronize-df-processing' into 'master'
Feature/asynchronize df processing See merge request
!234
parents
0c44de28
5f3cda95
Pipeline
#64832
passed with stages
in 9 minutes and 10 seconds
Changes
3
Pipelines
6
Hide whitespace changes
Inline
Side-by-side
app/bulk_persistence/dataframe_serializer.py
View file @
a2c7fe2c
...
...
@@ -13,6 +13,7 @@
# limitations under the License.
import
asyncio
from
functools
import
partial
from
io
import
BytesIO
from
typing
import
Union
,
AnyStr
,
IO
,
Optional
,
List
,
Dict
...
...
@@ -61,19 +62,19 @@ class DataframeSerializerSync:
def
to_json
(
cls
,
df
:
DataframeClass
,
orient
:
Union
[
str
,
JSONOrient
]
=
JSONOrient
.
split
,
path_or_buf
:
Optional
[
Union
[
str
,
Path
,
IO
[
AnyStr
]]]
=
None
)
->
Optional
[
str
]:
**
kwargs
)
->
Optional
[
str
]:
"""
:param df: dataframe to dump
:param orient: format for Json, default is split
:param
path_or_buf: File path or object. If not specified, the result is returned as a string.
:param
kwargs: keyword arguments will be forwarded to pandas.to_json()
:return: None or json string of path_or_buf is None
"""
orient
=
JSONOrient
.
get
(
orient
)
return
df
.
fillna
(
"NaN"
).
to_json
(
path_or_buf
=
path_or_buf
,
orient
=
orient
.
value
)
return
df
.
fillna
(
"NaN"
).
to_json
(
orient
=
orient
.
value
,
**
kwargs
)
@
classmethod
def
read_parquet
(
cls
,
data
)
->
'DataframeSerializerAsync.
DataframeClass
'
:
def
read_parquet
(
cls
,
data
)
->
DataframeClass
:
"""
:param data: bytes, path object or file-like object
:return: dataframe
...
...
@@ -85,7 +86,7 @@ class DataframeSerializerSync:
return
pd
.
read_parquet
(
data
)
@
classmethod
def
read_json
(
cls
,
data
,
orient
:
Union
[
str
,
JSONOrient
],
convert_axes
:
Optional
[
bool
]
=
None
)
->
'DataframeSerializerAsync.
DataframeClass
'
:
def
read_json
(
cls
,
data
,
orient
:
Union
[
str
,
JSONOrient
],
convert_axes
:
Optional
[
bool
]
=
None
)
->
DataframeClass
:
"""
:param data: bytes str content (valid JSON str), path object or file-like object
:param orient:
...
...
@@ -100,14 +101,26 @@ class DataframeSerializerAsync:
def
__init__
(
self
,
pool_executor
=
get_pool_executor
()):
self
.
executor
=
pool_executor
@
with_trace
(
"Parquet bulk serialization"
)
async
def
to_parquet
(
self
,
df
:
DataframeClass
,
*
args
,
**
kwargs
)
->
DataframeClass
:
func
=
partial
(
df
.
to_parquet
,
*
args
,
**
kwargs
)
return
await
asyncio
.
get_event_loop
().
run_in_executor
(
self
.
executor
,
func
)
@
with_trace
(
"JSON bulk serialization"
)
async
def
to_json
(
self
,
df
:
DataframeClass
,
orient
:
Union
[
str
,
JSONOrient
]
=
JSONOrient
.
split
,
path_or_buf
:
Optional
[
Union
[
str
,
Path
,
IO
[
AnyStr
]]]
=
None
)
->
Optional
[
str
]:
return
await
asyncio
.
get_event_loop
().
run_in_executor
(
self
.
executor
,
DataframeSerializerSync
.
to_json
,
df
,
orient
,
path_or_buf
)
*
args
,
**
kwargs
)
->
Optional
[
str
]:
func
=
partial
(
DataframeSerializerSync
.
to_json
,
df
,
orient
,
*
args
,
**
kwargs
)
return
await
asyncio
.
get_event_loop
().
run_in_executor
(
self
.
executor
,
func
)
@
with_trace
(
"CSV bulk serialization"
)
async
def
to_csv
(
self
,
df
:
DataframeClass
,
*
args
,
**
kwargs
)
->
Optional
[
str
]:
df
=
df
.
fillna
(
"NaN"
)
func
=
partial
(
df
.
to_csv
,
*
args
,
**
kwargs
)
return
await
asyncio
.
get_event_loop
().
run_in_executor
(
self
.
executor
,
func
)
@
with_trace
(
"Parquet bulk deserialization"
)
async
def
read_parquet
(
self
,
data
)
->
DataframeClass
:
...
...
app/routers/bulk/utils.py
View file @
a2c7fe2c
...
...
@@ -176,7 +176,6 @@ class DataFrameRender:
selected
.
extend
(
natsorted
(
matching_columns
))
return
selected
@
staticmethod
@
with_trace
(
'process_params'
)
async
def
process_params
(
df
,
params
:
GetDataParams
):
...
...
@@ -194,7 +193,6 @@ class DataFrameRender:
return
df
@
staticmethod
@
with_trace
(
'df_render'
)
async
def
df_render
(
df
,
params
:
GetDataParams
,
accept
:
str
=
None
,
orient
:
Optional
[
JSONOrient
]
=
None
):
...
...
@@ -208,18 +206,19 @@ class DataFrameRender:
pdf
.
index
.
name
=
None
# TODO
if
not
accept
or
MimeTypes
.
PARQUET
.
type
in
accept
:
return
Response
(
pdf
.
to_parquet
(
engine
=
"pyarrow"
),
media_type
=
MimeTypes
.
PARQUET
.
type
)
content
=
await
DataframeSerializerAsync
().
to_parquet
(
pdf
,
engine
=
"pyarrow"
)
return
Response
(
content
,
media_type
=
MimeTypes
.
PARQUET
.
type
)
if
MimeTypes
.
JSON
.
type
in
accept
:
return
Response
(
pdf
.
to_json
(
index
=
True
,
date_format
=
'iso'
,
orient
=
orient
.
value
),
media_type
=
MimeTypes
.
JSON
.
type
)
content
=
await
DataframeSerializerAsync
().
to_json
(
pdf
,
index
=
True
,
date_format
=
'iso'
,
orient
=
orient
.
value
)
return
Response
(
content
,
media_type
=
MimeTypes
.
JSON
.
type
)
if
MimeTypes
.
CSV
.
type
in
accept
:
return
Response
(
pdf
.
to_csv
(),
media_type
=
MimeTypes
.
CSV
.
type
)
content
=
await
DataframeSerializerAsync
().
to_csv
(
pdf
)
return
Response
(
content
,
media_type
=
MimeTypes
.
CSV
.
type
)
# in any other case => Parquet anyway?
return
Response
(
pdf
.
to_parquet
(
engine
=
"pyarrow"
)
,
media_type
=
MimeTypes
.
PARQUET
.
type
)
content
=
await
DataframeSerializerAsync
().
to_parquet
(
pdf
,
engine
=
"pyarrow"
)
return
Response
(
content
,
media_type
=
MimeTypes
.
PARQUET
.
type
)
async
def
set_bulk_field_and_send_record
(
ctx
:
Context
,
bulk_id
,
record
,
bulk_uri_access
:
BulkIdAccess
):
...
...
tests/unit/routers/chunking_test.py
View file @
a2c7fe2c
...
...
@@ -77,7 +77,9 @@ def _create_df_from_response(response):
elif
content_type
==
'text/csv; charset=utf-8'
:
return
pd
.
read_csv
(
f
,
index_col
=
0
)
elif
content_type
==
'application/json'
:
return
pd
.
read_json
(
f
,
dtype
=
True
,
orient
=
'split'
,
convert_axes
=
False
)
return
pd
.
read_json
(
f
,
dtype
=
True
,
orient
=
'split'
,
convert_axes
=
False
).
replace
(
"NaN"
,
np
.
NaN
)
elif
content_type
==
'application/csv'
:
return
pd
.
read_csv
(
f
,
dtype
=
True
).
replace
(
"NaN"
,
np
.
NaN
)
else
:
raise
ValueError
(
f
"Unknown content-type: '
{
content_type
}
'"
)
...
...
@@ -222,6 +224,7 @@ def test_post_data_merge_extension_properties(setup_client):
@
pytest
.
mark
.
parametrize
(
"accept_content"
,
[
'application/x-parquet'
,
'application/json'
,
'text/csv; charset=utf-8'
,
])
@
pytest
.
mark
.
parametrize
(
"columns"
,
[
[
'MD'
,
'X'
],
...
...
@@ -254,7 +257,7 @@ def test_send_all_data_once(setup_client,
assert
get_response
.
status_code
==
200
result_df
=
_create_df_from_response
(
get_response
)
if
content_type_header
.
endswith
(
'parquet'
)
and
accept_content
.
endswith
(
'
json
'
):
if
content_type_header
.
endswith
(
'parquet'
)
and
not
accept_content
.
endswith
(
'
parquet
'
):
result_df
=
_cast_datetime_to_datetime64_ns
(
result_df
)
if
content_type_header
.
endswith
(
'json'
):
...
...
@@ -328,7 +331,7 @@ def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
])
@
pytest
.
mark
.
parametrize
(
"accept_content"
,
[
'application/x-parquet'
,
#
'text/csv; charset=utf-8',
'text/csv; charset=utf-8'
,
'application/json'
,
])
@
pytest
.
mark
.
parametrize
(
"columns"
,
[
...
...
@@ -789,6 +792,7 @@ def test_nat_sort_columns(setup_client, data_format, accept_content, columns_nam
response_df
=
_create_df_from_response
(
data_response
)
assert
list
(
response_df
.
columns
)
==
columns_name
@
pytest
.
mark
.
parametrize
(
"entity_type"
,
[
'WellLog'
,
'Log'
])
def
test_session_update_previous_version
(
setup_client
,
entity_type
):
""" create a session update on a previous version """
...
...
@@ -797,7 +801,7 @@ def test_session_update_previous_version(setup_client, entity_type):
record_id
=
_create_record
(
client
,
entity_type
)
chunking_url
=
Definitions
[
entity_type
][
'chunking_url'
]
base_url
=
Definitions
[
entity_type
][
'base_url'
]
headers
=
headers
=
{
'Content-Type'
:
'application/x-parquet'
}
headers
=
{
'Content-Type'
:
'application/x-parquet'
}
nb_rows
=
5
version_data
=
[
generate_df
([
'MD'
,
'X'
,
'Y'
],
range
(
nb_rows
)),
...
...
@@ -812,7 +816,6 @@ def test_session_update_previous_version(setup_client, entity_type):
headers
=
headers
)
assert
write_response
.
status_code
==
200
versions_response
=
client
.
get
(
f
'
{
base_url
}
/
{
record_id
}
/versions'
)
assert
versions_response
.
status_code
==
200
versions
=
versions_response
.
json
()[
'versions'
]
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment