Commit 9db1520f authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'master' into refactoring_task

parents 70bec927 3bd0d28b
......@@ -46,9 +46,6 @@ include:
- project: "osdu/platform/ci-cd-pipelines"
file: "build/python.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "scanners/fossa-python.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "scanners/gitlab-ultimate.yml"
......@@ -230,3 +227,133 @@ osdu-gcp-test:
# Allow failure on private development deployments
ibm-deploy-devpri:
allow_failure: true
# --------------------------------------------------------------------------------
# Experimental FOSSA jobs. These will be promoted to the standard ci-cd-pipelines after
# they've had some testing in a real project
fossa-analyze:
image: $CI_REGISTRY/divido/fossa-with-cache/incremental:latest
stage: scan
needs: ['compile-and-unit-test']
rules:
- if: $FOSSA_API_KEY
variables:
FOSSA_OUTPUT_DIR: fossa-output
artifacts:
paths:
- fossa-output
when: always
expire_in: 2 days
script:
# fossa-with-cache needs a CI_COMMIT_BRANCH defined to know how to parse the FOSSA API results
# When building tags, this isn't defined by GitLab. In that case, we use the tag name instead. If that's not defined
# then things will fail and we'll have to make this smarter
- test -z "$CI_COMMIT_BRANCH" && export CI_COMMIT_BRANCH="$CI_COMMIT_TAG"
- |
if [ ! -e all-requirements.txt ]; then
echo "I was expecting a file named 'all-requirements.txt' to have been generated by compile-and-unit-test"
echo "However, that file doesn't seem to exist"
echo "----------------------------------------"
echo "That file should have been the output of a 'pip freeze', so that I knew what the full list of deep"
echo "dependencies were. I can't reasonably generate that in this job, because I don't know what python image"
echo "is appropriate. If this structure has been changed in the build/python.yml, you may need to update this"
echo "logic as well (in scanners/fossa-python.yml)"
exit 1
fi
# This variable is used by the python build environment to refer to the set of requirements that need to
# be compiled down into the single 'all-requirements.txt'. Here, we override it to supply fossa-with-cache
# with a direct answer.
- PIP_REQUIREMENTS=all-requirements.txt fossa-with-cache
fossa-check-notice:
image: $CI_REGISTRY/divido/fossa-with-cache/incremental:latest
stage: scan
needs: ['fossa-analyze']
tags: ['osdu-small']
rules:
- if: $FOSSA_API_KEY
artifacts:
when: on_failure
paths:
- fossa-output/cached-NOTICE
- fossa-output/generated-clean-NOTICE
expire_in: 2 days
script:
# Check to see if a newer commit exists for the pipeline's branch, and if it does, use that NOTICE instead of this one's
- |
if [ "$CI_COMMIT_BRANCH" != "" ]; then
colorCmd="\e[32;1m"
colorReset="\e[0m"
function echoCmd() {
echo -e "${colorCmd}>" "$@" "${colorReset}"
}
echoCmd git fetch
git fetch
echoCmd git diff --name-only HEAD origin/$CI_COMMIT_BRANCH
branchDiffs="$(git diff --name-only HEAD origin/$CI_COMMIT_BRANCH)"
echo $branchDiffs
echo "--------------------"
if [ "$branchDiffs" == "NOTICE" ]; then
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has been changed, but the only changes are the NOTICE file"
echo "I will use the NOTICE file from origin/$CI_COMMIT_BRANCH ($(git rev-parse --short origin/$CI_COMMIT_BRANCH)) as the basis for comparison"
echoCmd git checkout origin/$CI_COMMIT_BRANCH -- NOTICE
git checkout origin/$CI_COMMIT_BRANCH -- NOTICE
elif [ "$branchDiffs" == "" ]; then
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has not been changed since the commit that spawned this pipeline"
echo "I will use the NOTICE file from the pipeline's commit ($CI_COMMIT_SHORT_SHA) as the basis for comparison"
else
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has been changed, but the changes include more than just the NOTICE file"
echo "I will use the NOTICE file from the pipeline's commit ($CI_COMMIT_SHORT_SHA) as the basis for comparison"
fi
fi
# Use a cached NOTICE if available, otherwise use a generated one
- |
if [ -e fossa-output/cached-NOTICE ]; then
fossaGeneratedNotice=fossa-output/cached-NOTICE;
elif [ -e fossa-output/generated-clean-NOTICE ]; then
fossaGeneratedNotice=fossa-output/generated-clean-NOTICE
else
echo "Couldn't find either a cached-NOTICE or generated-clean-NOTICE in the fossa-output/ directory"
echo
echo "At least one of these should have been generated by a previous job stage (fossa-analyze) and stored"
echo "as an artifact. Something must be wrong in the CI setup"
exit 1
fi
echo "Comparing with $fossaGeneratedNotice"
# If the comparison finds differences, let the user know what to do next
- |
if ! fossa-compare-notices NOTICE $fossaGeneratedNotice; then
echo --------------------------------------------------------------------------------
echo "There are differences in the NOTICE file"
echo "Please review these differences, and if they look appropriate based on your"
echo "changes, update the committed NOTICE file"
echo "--------------------"
echo "If you make changes to the NOTICE file (and only the NOTICE file), you can"
echo "re-run this single stage of the pipeline alone rather than the whole pipeline"
echo "One way to achieve this:"
echo "$ wget -O NOTICE '${CI_PROJECT_URL}/-/jobs/${CI_JOB_ID}/artifacts/raw/${fossaGeneratedNotice}?inline=false'"
echo "$ git add NOTICE"
echo "$ git commit -m 'Updating NOTICE'"
echo "$ git push -o ci.skip"
echo "Then retry this job"
exit 1
fi
# 3rd-Party Software License Notice
Generated by fossa-cli (https://github.com/fossas/fossa-cli).
Formatted by fossa-with-cache (https://community.opengroup.org/divido/fossa-with-cache).
This software includes the following software and licenses:
========================================================================
......@@ -15,18 +16,18 @@ The following software have components provided under the terms of this license:
- coverage (from https://github.com/nedbat/coveragepy)
- cryptography (from https://github.com/pyca/cryptography)
- google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-auth (from https://github.com/googleapis/google-auth-library-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-trace (from https://github.com/googleapis/googleapis)
- googleapis-common-protos (from https://github.com/googleapis/googleapis)
- grpcio (from https://grpc.io)
- importlib-metadata (from )
- importlib-metadata
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
- msgpack (from http://msgpack.org/)
- multidict (from https://github.com/aio-libs/multidict/)
- numpy (from )
- numpy
- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator)
- opencensus (from https://github.com/census-instrumentation/opencensus-python)
- opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context)
......@@ -36,14 +37,14 @@ The following software have components provided under the terms of this license:
- opencensus-ext-stackdriver (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-stackdriver)
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging)
- pandas (from https://pandas.pydata.org)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-multipart (from http://github.com/andrew-d/python-multipart)
- requests (from https://requests.readthedocs.io)
- requests (from http://python-requests.org, https://requests.readthedocs.io)
- rfc3986 (from https://rfc3986.readthedocs.org)
- rsa (from https://stuvel.eu/rsa)
- s3transfer (from https://github.com/boto/s3transfer)
......@@ -64,7 +65,7 @@ The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from )
- numpy
- packaging (from https://github.com/pypa/packaging)
- ply (from http://www.dabeaz.com/ply/)
- pyasn1 (from http://sourceforge.net/projects/pyasn1/)
......@@ -79,7 +80,7 @@ BSD-3-Clause
The following software have components provided under the terms of this license:
- HeapDict (from http://stutzbachenterprises.com/)
- Jinja2 (from https://palletsprojects.com/p/jinja/)
- Jinja2 (from http://jinja.pocoo.org/, https://palletsprojects.com/p/jinja/)
- MarkupSafe (from https://palletsprojects.com/p/markupsafe/)
- adlfs (from https://github.com/hayesgb/adlfs/)
- asgiref (from http://github.com/django/asgiref/)
......@@ -100,11 +101,11 @@ The following software have components provided under the terms of this license:
- isodate (from http://cheeseshop.python.org/pypi/isodate)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from )
- numpy
- oauthlib (from https://github.com/idan/oauthlib)
- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator)
- packaging (from https://github.com/pypa/packaging)
- pandas (from https://pandas.pydata.org)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- pip-tools (from https://github.com/jazzband/pip-tools/)
- ply (from http://www.dabeaz.com/ply/)
......@@ -136,8 +137,8 @@ The following software have components provided under the terms of this license:
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs)
- numpy (from )
- pandas (from https://pandas.pydata.org)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- s3fs (from http://github.com/dask/s3fs/)
- toolz (from http://github.com/pytoolz/toolz/)
......@@ -147,7 +148,7 @@ CC-BY-SA-3.0
========================================================================
The following software have components provided under the terms of this license:
- numpy (from )
- numpy
========================================================================
GPL-2.0-only
......@@ -225,14 +226,14 @@ MIT
The following software have components provided under the terms of this license:
- PyJWT (from http://github.com/jpadilla/pyjwt)
- PyYAML (from )
- PyYAML
- adal (from https://github.com/AzureAD/azure-activedirectory-library-for-python)
- aiohttp (from https://github.com/aio-libs/aiohttp/)
- aioitertools (from https://github.com/jreese/aioitertools)
- aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from https://pypi.org/project/anyio/3.4.0/)
- anyio (from https://pypi.org/project/anyio/3.3.0/, https://pypi.org/project/anyio/3.4.0/)
- asgiref (from http://github.com/django/asgiref/)
- attrs (from https://www.attrs.org/)
- attrs (from https://attrs.readthedocs.io/, https://www.attrs.org/)
- azure-common (from https://github.com/Azure/azure-sdk-for-python)
- azure-core (from https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/core/azure-core)
- azure-datalake-store (from https://github.com/Azure/azure-data-lake-store-python)
......@@ -242,30 +243,30 @@ The following software have components provided under the terms of this license:
- azure-keyvault-keys (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-keys)
- azure-keyvault-secrets (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets)
- azure-storage-blob (from https://github.com/Azure/azure-storage-python)
- backoff (from )
- backoff
- botocore (from https://github.com/boto/botocore)
- cachetools (from https://github.com/tkem/cachetools)
- cffi (from )
- cffi
- charset-normalizer (from https://github.com/ousret/charset_normalizer)
- coverage (from https://github.com/nedbat/coveragepy)
- deepdiff (from https://github.com/seperman/deepdiff)
- fastapi (from https://github.com/tiangolo/fastapi)
- grpcio (from https://grpc.io)
- h11 (from )
- h11
- iniconfig (from http://github.com/RonnyPfannschmidt/iniconfig)
- jmespath (from https://github.com/jmespath/jmespath.py)
- jsonschema (from )
- jsonschema
- mockito (from https://github.com/kaste/mockito-python)
- msal (from https://github.com/AzureAD/microsoft-authentication-library-for-python)
- msal-extensions (from https://pypi.org/project/msal-extensions/0.1.3/)
- msrest (from https://github.com/Azure/msrest-for-python)
- munch (from http://github.com/Infinidat/munch)
- natsort (from https://github.com/SethMMorton/natsort)
- numpy (from )
- numpy
- ordered-set (from http://github.com/LuminosoInsight/ordered-set)
- pandas (from https://pandas.pydata.org)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pluggy (from )
- pluggy
- py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/)
- pydantic (from https://github.com/samuelcolvin/pydantic)
......@@ -283,7 +284,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/2.0.0/)
- tomli (from https://pypi.org/project/tomli/1.2.2/, https://pypi.org/project/tomli/2.0.0/)
- urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp)
......@@ -301,14 +302,14 @@ NCSA
========================================================================
The following software have components provided under the terms of this license:
- numpy (from )
- numpy
========================================================================
OPL-1.0
========================================================================
The following software have components provided under the terms of this license:
- numpy (from )
- numpy
========================================================================
OpenSSL
......@@ -325,10 +326,10 @@ The following software have components provided under the terms of this license:
- async-timeout (from https://github.com/aio-libs/async_timeout/)
- coverage (from https://github.com/nedbat/coveragepy)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- google-auth (from https://github.com/googleapis/google-auth-library-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- numpy (from )
- pandas (from https://pandas.pydata.org)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- ply (from http://www.dabeaz.com/ply/)
- portalocker (from https://github.com/WoLpH/portalocker)
- python-dateutil (from https://dateutil.readthedocs.org)
......@@ -343,7 +344,7 @@ SunPro
========================================================================
The following software have components provided under the terms of this license:
- numpy (from )
- numpy
========================================================================
Unlicense
......@@ -372,7 +373,7 @@ Zlib
The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io)
- numpy (from )
- numpy
========================================================================
public-domain
......@@ -381,9 +382,7 @@ The following software have components provided under the terms of this license:
- botocore (from https://github.com/boto/botocore)
- grpcio (from https://grpc.io)
- numpy (from )
- pandas (from https://pandas.pydata.org)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/)
- pytz (from http://pythonhosted.org/pytz)
......@@ -13,7 +13,7 @@
# limitations under the License.
from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .dataframe_persistence import create_and_store_dataframe, get_dataframe, download_bulk
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient
from .mime_types import MimeTypes
......
......@@ -13,8 +13,7 @@
# limitations under the License.
import asyncio
import json
from typing import Awaitable, Callable, List, Optional, Union
from typing import Awaitable, Callable, List, Optional, Union, AsyncGenerator, Tuple
import uuid
import fsspec
......@@ -29,14 +28,14 @@ from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings
from app.conf import Config
from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace
from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, worker_capture_timing_handlers,
get_num_rows, set_index, index_union)
from ..dataframe_validators import (assert_df_validate, validate_index, validate_number_of_columns,
columns_not_in_reserved_names, is_reserved_column_name)
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
from .. import DataframeSerializerSync
from . import storage_path_builder as pathBuilder
from . import session_file_meta as session_meta
......@@ -44,6 +43,9 @@ from ..bulk_id import new_bulk_id
from .bulk_catalog import (BulkCatalog, ChunkGroup,
async_load_bulk_catalog,
async_save_bulk_catalog)
from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
from . import dask_worker_write_bulk as bulk_writer
def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
......@@ -73,16 +75,8 @@ def _load_index_from_meta(meta, **kwargs):
**kwargs).index
def dask_to_parquet(ddf, path, storage_options):
""" Save dask dataframe to parquet """
return dd.to_parquet(ddf, path,
engine='pyarrow', schema="infer",
storage_options=storage_options,
compression='snappy')
def _index_union_tuple(t):
return index_union(*t)
def _index_union_tuple(indexes: Tuple[pd.Index, Optional[pd.Index]]):
return index_union(*indexes)
class DaskBulkStorage:
......@@ -94,6 +88,14 @@ class DaskBulkStorage:
self._parameters = None
self._fs = None
@property
def _data_ipc(self):
# may be also adapted depending of size to data
if Config.dask_data_ipc.value == DaskLocalFileDataIPC.ipc_type:
return DaskLocalFileDataIPC()
assert self.client is not None, 'Dask client not initialized'
return DaskNativeDataIPC(self.client)
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
......@@ -135,6 +137,13 @@ class DaskBulkStorage:
def _relative_path(self, record_id: str, path: str) -> str:
return pathBuilder.record_relative_path(self.base_directory, record_id, path)
def _ensure_dir_tree_exists(self, path: str):
path_wo_protocol, protocol = pathBuilder.remove_protocol(path)
# on local storage only """
if protocol == 'file':
self._fs.mkdirs(path_wo_protocol, exist_ok=True)
def _read_parquet(self, path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
Args:
......@@ -163,7 +172,7 @@ class DaskBulkStorage:
def read_parquet_files(f):
return read_with_dask(f.paths, columns=f.labels, storage_options=self._parameters.storage_options)
dfs = self._map_with_trace(read_parquet_files, files_to_load)
index_df = self._read_index_from_catalog_index_path(catalog)
if index_df:
dfs.append(index_df)
......@@ -230,61 +239,9 @@ class DaskBulkStorage:
ddf: dd.DataFrame or Future<dd.DataFrame>
Returns a Future<None>
"""
return self._submit_with_trace(dask_to_parquet, dataframe, path,
storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, dataframe: pd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
Returns a Future<None>
"""
f_pdf = await self.client.scatter(dataframe)
return await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, path,
storage_options=self._parameters.storage_options)
@capture_timings('save_bulk', handlers=worker_capture_timing_handlers)
@internal_bulk_exceptions
@with_trace('save_bulk')
async def save_bulk(self, ddf: pd.DataFrame, record_id: str, bulk_id: str = None):
"""Write the data frame to the blob storage."""
bulk_id = bulk_id or new_bulk_id()
assert_df_validate(dataframe=ddf, validation_funcs=[validate_number_of_columns,
validate_index,
columns_not_in_reserved_names])
ddf.index.name = WDMS_INDEX_NAME
ddf = dd.from_pandas(ddf, npartitions=1, name=f"from_pandas-{uuid.uuid4()}")
ddf = await self.client.scatter(ddf)
path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
try:
await self._save_with_dask(path, ddf)
except OSError as os_error:
raise BulkRecordNotFound(record_id, bulk_id) from os_error
return bulk_id
@capture_timings('session_add_chunk')
@internal_bulk_exceptions
@with_trace('session_add_chunk')
async def session_add_chunk(self, session: Session, pdf: pd.DataFrame):
"""add new chunk to the given session"""
assert_df_validate(dataframe=pdf, validation_funcs=[validate_number_of_columns,
validate_index,
columns_not_in_reserved_names])
# sort column by names
pdf.index.name = WDMS_INDEX_NAME
pdf = pdf[sorted(pdf.columns)]
filename = session_meta.generate_chunk_filename(pdf)
session_path = pathBuilder.record_session_path(
self.base_directory, session.id, session.recordId)
self._fs.mkdirs(session_path, exist_ok=True) # TODO only for local
with self._fs.open(f'{session_path}/{filename}.meta', 'w') as outfile:
json.dump(session_meta.build_chunk_metadata(pdf), outfile)
session_path = pathBuilder.add_protocol(session_path, self.protocol)
await self._save_with_pandas(f'{session_path}/{filename}.parquet', pdf)
return self._submit_with_trace(dd.to_parquet, dataframe, path,
storage_options=self._parameters.storage_options,
engine='pyarrow', schema="infer", compression='snappy')
@capture_timings('get_bulk_catalog')
async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog:
......@@ -347,8 +304,8 @@ class DaskBulkStorage:
@capture_timings('_future_load_index')
async def _future_load_index(self, record_id: str, bulk_id: str) -> Awaitable[pd.Index]:
"""Loads the dataframe index of the specified record
index should be save in a specific folder but for bulk prior to catalog creation
we read one column and retreive the index associated with it.
index should be save in a specific folder but for bulk prior to catalog creation
we read one column and retreive the index associated with it.
"""
catalog = await self.get_bulk_catalog(record_id, bulk_id)
future_df = self._read_index_from_catalog_index_path(catalog)
......@@ -451,11 +408,15 @@ class DaskBulkStorage:
@with_trace('_save_session_index')
async def _save_session_index(self, path: str, index: pd.Index) -> str:
index_folder = pathBuilder.join(path, '_wdms_index_')
self._fs.mkdirs(pathBuilder.remove_protocol(index_folder)[0]) # TODO for local storage
self._ensure_dir_tree_exists(index_folder)
index_path = pathBuilder.join(index_folder, 'index.parquet')
dataframe = pd.DataFrame(index=index)
dataframe.index.name = WDMS_INDEX_NAME
await self._save_with_pandas(index_path, dataframe)
f_pdf = await self.client.scatter(dataframe)
await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, index_path,
storage_options=self._parameters.storage_options)
return index_path
@capture_timings('session_commit')
......@@ -501,9 +462,76 @@ class DaskBulkStorage:
await async_save_bulk_catalog(self._fs, commit_path, fcatalog)
return bulk_id
@internal_bulk_exceptions
@capture_timings('post_data_without_session', handlers=worker_capture_timing_handlers)
@with_trace('post_data_without_session')
async def post_data_without_session(self,
data: Union[bytes, AsyncGenerator[bytes, None]],
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_id: str,
bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
"""
process post data outside of a session, delegate the entire work in Dask worker. It constructs the path
for the bulk in current context, prepare and
:throw:
- BulkNotProcessable: in case on invalid input data
- BulkSaveException: if store operation fails for some reasons
"""
bulk_id = bulk_id or new_bulk_id()
bulk_base_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
# ensure directory exists for local storage, do nothing on remote storage
self._ensure_dir_tree_exists(bulk_base_path)
async with self._data_ipc.set(data) as (data_handle, data_getter):
data = None # unref data
df_describe = await submit_with_trace(self.client,
bulk_writer.write_bulk_without_session,
data_handle,
data_getter,
content_type,
df_validator_func,
bulk_base_path,
self._parameters.storage_options)
return bulk_id, df_describe
@internal_bulk_exceptions
@capture_timings('add_chunk_in_session', handlers=worker_capture_timing_handlers)
@with_trace('add_chunk_in_session')
async def add_chunk_in_session(self,
data: Union[bytes, AsyncGenerator[bytes, None]],
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_id: str,
session_id: str,
bulk_id: Optional[str] = None) -> Tuple[str, bulk_writer.DataframeBasicDescribe]:
"""
add a chunk data inside a session, delegate the entire work in Dask worker
:throw:
- BulkNotProcessable: in case on invalid input data
- BulkSaveException: if store operation fails for some reasons
"""
bulk_id = bulk_id or new_bulk_id()
base_path = pathBuilder.record_session_path(self.base_directory, session_id, record_id, self.protocol)
# ensure directory exists for local storage, do nothing on remote storage
self._ensure_dir_tree_exists(base_path)
async with self._data_ipc.set(data) as (data_handle, data_getter):
data = None # unref data
df_describe = await submit_with_trace(self