Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (911)
Showing
with 1956 additions and 543 deletions
...@@ -41,4 +41,4 @@ secrets/ ...@@ -41,4 +41,4 @@ secrets/
**/.DS_Store **/.DS_Store
.vscode .vscode
\ No newline at end of file
...@@ -22,6 +22,7 @@ variables: ...@@ -22,6 +22,7 @@ variables:
AZURE_TEST_TYPE: python AZURE_TEST_TYPE: python
AWS_SERVICE: wellbore-ddms AWS_SERVICE: wellbore-ddms
AWS_SERVICE_NAMESPACE: osdu-wellbore-ddms
AWS_ENVIRONMENT: dev AWS_ENVIRONMENT: dev
AWS_BUILD_SUBDIR: provider/os-wellbore-ddms-aws/build-aws AWS_BUILD_SUBDIR: provider/os-wellbore-ddms-aws/build-aws
AWS_TEST_SUBDIR: tests/aws-test/build-aws AWS_TEST_SUBDIR: tests/aws-test/build-aws
...@@ -32,8 +33,9 @@ variables: ...@@ -32,8 +33,9 @@ variables:
OSDU_GCP_SERVICE: wellbore OSDU_GCP_SERVICE: wellbore
OSDU_GCP_VENDOR: gcp OSDU_GCP_VENDOR: gcp
OSDU_GCP_HELM_PACKAGE_CHARTS: "devops/gcp/deploy devops/gcp/configmap" OSDU_GCP_HELM_PACKAGE_CHARTS: "devops/gcp/deploy devops/gcp/configmap"
OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT --set data.service_host_search=$OSDU_GCP_SERVICE_HOST_SEARCH --set data.sa_key=$OSDU_GCP_INTEGRATION_TESTER" OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT"
OSDU_GCP_HELM_DEPLOYMENT_SERVICE_VARS: "--set data.image=$CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA" OSDU_GCP_HELM_CONFIG_SERVICE_VARS_DEV2: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT"
OSDU_GCP_HELM_DEPLOYMENT_SERVICE_VARS: "--set data.image=$CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA --set data.serviceAccountName=$OSDU_GCP_SERVICE-k8s"
OSDU_GCP_HELM_CONFIG_SERVICE: wellbore-config OSDU_GCP_HELM_CONFIG_SERVICE: wellbore-config
OSDU_GCP_HELM_DEPLOYMENT_SERVICE: wellbore-deploy OSDU_GCP_HELM_DEPLOYMENT_SERVICE: wellbore-deploy
OSDU_GCP_INT_TEST_TYPE: python OSDU_GCP_INT_TEST_TYPE: python
...@@ -41,29 +43,68 @@ variables: ...@@ -41,29 +43,68 @@ variables:
include: include:
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "standard-setup.yml" file: "standard-setup.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "build/python.yml" file: "build/python.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "scanners/fossa-python.yml" file: "scanners/fossa-python.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "scanners/gitlab-ultimate.yml" file: "scanners/gitlab-ultimate.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/osdu-gcp-gke.yml" file: "cloud-providers/osdu-gcp-gke.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/ibm-wellbore.yml" file: "cloud-providers/ibm-wellbore-git.yml"
ref: v0.13.2
- local: "/devops/azure/azure-wellbore.yml" - local: "/devops/azure/azure-wellbore.yml"
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws-global.yml" file: "cloud-providers/aws-global.yml"
ref: v0.13.2
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws-python.yml" file: "cloud-providers/aws-python.yml"
ref: v0.13.2
verify_existing_requirements:
extends:
- .python
- .skipForTriggeringMergeRequests
stage: build
before_script:
- pip3 install --upgrade pip-tools
script:
- pip-compile requirements.in
- pip-compile requirements_dev.in
- git diff
# checking difference between existing requirements and the newly generated one
# and using the number of lines as exit status code
- exit $(git status -s | grep -e "^ M " | wc -l)
artifacts:
when: always
paths:
- "requirements*.txt"
expire_in: 2 days
compile-and-unit-test:
artifacts:
when: always
paths:
- all-requirements.txt
- spec/generated/openapi.json
containerize: containerize:
extends: .skipForTriggeringMergeRequests extends: .skipForTriggeringMergeRequests
...@@ -99,9 +140,6 @@ osdu-gcp-containerize-gitlab: ...@@ -99,9 +140,6 @@ osdu-gcp-containerize-gitlab:
image: docker:19.03 image: docker:19.03
cache: {} cache: {}
tags: ["osdu-medium"] tags: ["osdu-medium"]
only:
variables:
- $OSDU_GCP == 'true'
variables: variables:
IMAGE_TAG: $CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA IMAGE_TAG: $CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA
IMAGE_TAG_LATEST: $CI_REGISTRY_IMAGE/osdu-gcp:latest IMAGE_TAG_LATEST: $CI_REGISTRY_IMAGE/osdu-gcp:latest
...@@ -163,9 +201,6 @@ osdu-gcp-test-python: ...@@ -163,9 +201,6 @@ osdu-gcp-test-python:
stage: integration stage: integration
image: gcr.io/google.com/cloudsdktool/cloud-sdk image: gcr.io/google.com/cloudsdktool/cloud-sdk
needs: ["osdu-gcp-deploy-deployment"] needs: ["osdu-gcp-deploy-deployment"]
only:
variables:
- $OSDU_GCP == 'true' && $OSDU_GCP_INT_TEST_TYPE == 'python'
script: script:
- apt-get install -y python3-venv - apt-get install -y python3-venv
- python3 -m venv env - python3 -m venv env
...@@ -188,19 +223,46 @@ osdu-gcp-test-python: ...@@ -188,19 +223,46 @@ osdu-gcp-test-python:
--data_partition $OSDU_GCP_TENANT --data_partition $OSDU_GCP_TENANT
--acl_domain $DOMAIN --acl_domain $DOMAIN
--legal_tag $LEGAL_TAG --legal_tag $LEGAL_TAG
- pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=basic - pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=!search
osdu-gcp-dev2-test-python:
extends: .osdu-gcp-dev2-variables
stage: integration
image: gcr.io/google.com/cloudsdktool/cloud-sdk
needs: ["osdu-gcp-dev2-deploy-deployment"]
script:
- apt-get install -y python3-venv
- python3 -m venv env
- source env/bin/activate
- pip install --upgrade pip
- pip install wheel pytest pytest-cov
- >
for REQ in $PIP_REQUIREMENTS ; do
pip install -r $REQ
done
- cd tests/integration
- echo $OSDU_GCP_INTEGRATION_TESTER | base64 -d > file.json
- gcloud auth activate-service-account --key-file file.json
- gcloud config set project $OSDU_GCP_PROJECT
- >
python gen_postman_env.py
--token $(gcloud auth print-access-token)
--base_url $OSDU_GCP_WELLBORE_BASE_URL
--cloud_provider $OSDU_GCP_VENDOR
--data_partition $OSDU_GCP_TENANT
--acl_domain $DOMAIN
--legal_tag $LEGAL_TAG
- pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=!search
# Disable maven job in gcp common gke pipeline # Disable maven job in gcp common gke pipeline
osdu-gcp-test: osdu-gcp-test:
extends: extends:
- .osdu-gcp-variables - .osdu-gcp-variables
# Allow failure on deployments osdu-gcp-dev2-test:
extends:
ibm-deploy: - .osdu-gcp-dev2-variables
allow_failure: true
# Allow failure on integration tests
ibm-test: # Allow failure on private development deployments
ibm-deploy-devpri:
allow_failure: true allow_failure: true
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
\ No newline at end of file
# 3rd-Party Software License Notice # 3rd-Party Software License Notice
Generated by fossa-cli (https://github.com/fossas/fossa-cli). Generated by fossa-cli (https://github.com/fossas/fossa-cli).
Formatted by fossa-with-cache (https://community.opengroup.org/divido/fossa-with-cache).
This software includes the following software and licenses: This software includes the following software and licenses:
======================================================================== ========================================================================
...@@ -7,42 +8,45 @@ Apache-2.0 ...@@ -7,42 +8,45 @@ Apache-2.0
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- aiohttp (from https://github.com/aio-libs/aiohttp/) - Pillow (from https://python-pillow.org)
- aiobotocore (from https://github.com/aio-libs/aiobotocore)
- aiohttp (from https://github.com/aio-libs/aiohttp, https://github.com/aio-libs/aiohttp/)
- async-timeout (from https://github.com/aio-libs/async_timeout/) - async-timeout (from https://github.com/aio-libs/async_timeout/)
- bokeh (from https://github.com/bokeh/bokeh)
- boto3 (from https://github.com/boto/boto3) - boto3 (from https://github.com/boto/boto3)
- botocore (from https://github.com/boto/botocore) - botocore (from https://github.com/boto/botocore)
- coverage (from https://coverage.readthedocs.io) - coverage (from https://github.com/nedbat/coveragepy)
- cryptography (from https://github.com/pyca/cryptography) - cryptography (from https://github.com/pyca/cryptography)
- google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python) - google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-auth (from https://github.com/googleapis/google-auth-library-python) - google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib) - google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python) - google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python) - google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-trace (from https://github.com/googleapis/googleapis) - google-cloud-trace (from https://github.com/googleapis/googleapis)
- googleapis-common-protos (from https://github.com/googleapis/googleapis) - googleapis-common-protos (from https://github.com/googleapis/googleapis)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- importlib-metadata (from http://importlib-metadata.readthedocs.io/) - importlib-metadata
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng) - jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
- msgpack (from http://msgpack.org/) - msgpack (from http://msgpack.org/)
- multidict (from https://github.com/aio-libs/multidict/) - multidict (from https://github.com/aio-libs/multidict/)
- numpy (from http://www.numpy.org) - numpy
- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator) - openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator)
- opencensus (from https://github.com/census-instrumentation/opencensus-python) - opencensus (from https://github.com/census-instrumentation/opencensus-python)
- opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context) - opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context)
- opencensus-ext-azure (from ) - opencensus-ext-azure (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-azure)
- opencensus-ext-logging (from ) - opencensus-ext-logging (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-logging)
- opencensus-ext-ocagent (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-ocagent) - opencensus-ext-ocagent (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-ocagent)
- opencensus-ext-stackdriver (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-stackdriver) - opencensus-ext-stackdriver (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-stackdriver)
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python) - opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging) - packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org) - pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517) - pep517 (from https://github.com/takluyver/pep517)
- pyarrow (from https://arrow.apache.org/) - pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio) - pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency) - pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
- python-dateutil (from https://dateutil.readthedocs.org) - python-dateutil (from https://dateutil.readthedocs.org)
- python-multipart (from http://github.com/andrew-d/python-multipart) - python-multipart (from http://github.com/andrew-d/python-multipart)
- requests (from https://requests.readthedocs.io) - requests (from http://python-requests.org, https://requests.readthedocs.io)
- rfc3986 (from https://rfc3986.readthedocs.org) - rfc3986 (from https://rfc3986.readthedocs.org)
- rsa (from https://stuvel.eu/rsa) - rsa (from https://stuvel.eu/rsa)
- s3transfer (from https://github.com/boto/s3transfer) - s3transfer (from https://github.com/boto/s3transfer)
...@@ -50,6 +54,7 @@ The following software have components provided under the terms of this license: ...@@ -50,6 +54,7 @@ The following software have components provided under the terms of this license:
- sortedcontainers (from http://www.grantjenks.com/docs/sortedcontainers/) - sortedcontainers (from http://www.grantjenks.com/docs/sortedcontainers/)
- structlog (from http://www.structlog.org/) - structlog (from http://www.structlog.org/)
- tblib (from https://github.com/ionelmc/python-tblib) - tblib (from https://github.com/ionelmc/python-tblib)
- toposort (from https://bitbucket.org/ericvsmith/toposort)
- tornado (from http://www.tornadoweb.org/) - tornado (from http://www.tornadoweb.org/)
- yarl (from https://github.com/aio-libs/yarl/) - yarl (from https://github.com/aio-libs/yarl/)
...@@ -58,17 +63,17 @@ BSD-2-Clause ...@@ -58,17 +63,17 @@ BSD-2-Clause
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- colorama (from https://github.com/tartley/colorama)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py) - locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock) - mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org) - numpy
- packaging (from https://github.com/pypa/packaging) - packaging (from https://github.com/pypa/packaging)
- ply (from http://www.dabeaz.com/ply/) - ply (from http://www.dabeaz.com/ply/)
- pyasn1 (from http://sourceforge.net/projects/pyasn1/) - pyasn1 (from http://sourceforge.net/projects/pyasn1/)
- pyasn1-modules (from http://sourceforge.net/projects/pyasn1/) - pyasn1-modules (from http://sourceforge.net/projects/pyasn1/)
- pycparser (from https://github.com/eliben/pycparser) - pycparser (from https://github.com/eliben/pycparser)
- tblib (from https://github.com/ionelmc/python-tblib) - tblib (from https://github.com/ionelmc/python-tblib)
- wrapt (from https://github.com/GrahamDumpleton/wrapt)
======================================================================== ========================================================================
BSD-3-Clause BSD-3-Clause
...@@ -76,11 +81,14 @@ BSD-3-Clause ...@@ -76,11 +81,14 @@ BSD-3-Clause
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- HeapDict (from http://stutzbachenterprises.com/) - HeapDict (from http://stutzbachenterprises.com/)
- Jinja2 (from http://jinja.pocoo.org/, https://palletsprojects.com/p/jinja/)
- MarkupSafe (from https://palletsprojects.com/p/markupsafe/)
- Pillow (from https://python-pillow.org)
- adlfs (from https://github.com/hayesgb/adlfs/) - adlfs (from https://github.com/hayesgb/adlfs/)
- asgiref (from http://github.com/django/asgiref/) - asgiref (from http://github.com/django/asgiref/)
- bokeh (from https://github.com/bokeh/bokeh)
- click (from http://github.com/mitsuhiko/click) - click (from http://github.com/mitsuhiko/click)
- cloudpickle (from https://github.com/cloudpipe/cloudpickle) - cloudpickle (from https://github.com/cloudpipe/cloudpickle)
- colorama (from https://github.com/tartley/colorama)
- cryptography (from https://github.com/pyca/cryptography) - cryptography (from https://github.com/pyca/cryptography)
- dask (from http://github.com/dask/dask/) - dask (from http://github.com/dask/dask/)
- decorator (from https://github.com/micheles/decorator) - decorator (from https://github.com/micheles/decorator)
...@@ -95,13 +103,13 @@ The following software have components provided under the terms of this license: ...@@ -95,13 +103,13 @@ The following software have components provided under the terms of this license:
- isodate (from http://cheeseshop.python.org/pypi/isodate) - isodate (from http://cheeseshop.python.org/pypi/isodate)
- locket (from http://github.com/mwilliamson/locket.py) - locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock) - mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org) - numpy
- oauthlib (from https://github.com/idan/oauthlib) - oauthlib (from https://github.com/idan/oauthlib)
- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator) - openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator)
- packaging (from https://github.com/pypa/packaging) - packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org) - pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/) - partd (from http://github.com/dask/partd/)
- pip-tools (from http://pypi.python.org/pypi/pip-tools/1.8.1rc3) - pip-tools (from https://github.com/jazzband/pip-tools/)
- ply (from http://www.dabeaz.com/ply/) - ply (from http://www.dabeaz.com/ply/)
- protobuf (from https://developers.google.com/protocol-buffers/) - protobuf (from https://developers.google.com/protocol-buffers/)
- psutil (from https://github.com/giampaolo/psutil) - psutil (from https://github.com/giampaolo/psutil)
...@@ -113,25 +121,50 @@ The following software have components provided under the terms of this license: ...@@ -113,25 +121,50 @@ The following software have components provided under the terms of this license:
- python-dateutil (from https://dateutil.readthedocs.org) - python-dateutil (from https://dateutil.readthedocs.org)
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson) - python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib) - requests-oauthlib (from https://github.com/requests/requests-oauthlib)
- s3fs (from http://github.com/dask/s3fs/)
- starlette (from https://github.com/encode/starlette) - starlette (from https://github.com/encode/starlette)
- tblib (from https://github.com/ionelmc/python-tblib) - tblib (from https://github.com/ionelmc/python-tblib)
- toolz (from http://github.com/pytoolz/toolz/) - toolz (from http://github.com/pytoolz/toolz/)
- uvicorn (from https://github.com/tomchristie/uvicorn) - uvicorn (from https://github.com/tomchristie/uvicorn, https://www.uvicorn.org/)
- wrapt (from https://github.com/GrahamDumpleton/wrapt)
- zict (from http://github.com/dask/zict/) - zict (from http://github.com/dask/zict/)
========================================================================
BitstreamVera
========================================================================
The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
========================================================================
CC-BY-2.0
========================================================================
The following software have components provided under the terms of this license:
- bokeh (from https://github.com/bokeh/bokeh)
========================================================================
CC-BY-3.0
========================================================================
The following software have components provided under the terms of this license:
- bokeh (from https://github.com/bokeh/bokeh)
======================================================================== ========================================================================
CC-BY-4.0 CC-BY-4.0
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- adlfs (from https://github.com/hayesgb/adlfs/) - adlfs (from https://github.com/hayesgb/adlfs/)
- bokeh (from https://github.com/bokeh/bokeh)
- dask (from http://github.com/dask/dask/) - dask (from http://github.com/dask/dask/)
- distributed (from https://distributed.readthedocs.io/en/latest/) - distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec) - fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs) - gcsfs (from https://github.com/dask/gcsfs)
- numpy (from http://www.numpy.org) - numpy
- pandas (from http://pandas.pydata.org) - pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/) - partd (from http://github.com/dask/partd/)
- s3fs (from http://github.com/dask/s3fs/)
- toolz (from http://github.com/pytoolz/toolz/) - toolz (from http://github.com/pytoolz/toolz/)
======================================================================== ========================================================================
...@@ -139,14 +172,14 @@ CC-BY-SA-3.0 ...@@ -139,14 +172,14 @@ CC-BY-SA-3.0
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org) - numpy
======================================================================== ========================================================================
GPL-2.0-only GPL-2.0-only
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io) - coverage (from https://github.com/nedbat/coveragepy)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
======================================================================== ========================================================================
...@@ -161,11 +194,19 @@ GPL-3.0-only ...@@ -161,11 +194,19 @@ GPL-3.0-only
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io) - Pillow (from https://python-pillow.org)
- coverage (from https://github.com/nedbat/coveragepy)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/) - pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org) - rfc3986 (from https://rfc3986.readthedocs.org)
========================================================================
GPL-3.0-or-later
========================================================================
The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
======================================================================== ========================================================================
ISC ISC
======================================================================== ========================================================================
...@@ -193,6 +234,7 @@ LGPL-2.1-only ...@@ -193,6 +234,7 @@ LGPL-2.1-only
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
- chardet (from https://github.com/chardet/chardet) - chardet (from https://github.com/chardet/chardet)
- charset-normalizer (from https://github.com/ousret/charset_normalizer) - charset-normalizer (from https://github.com/ousret/charset_normalizer)
...@@ -208,6 +250,7 @@ LGPL-3.0-only ...@@ -208,6 +250,7 @@ LGPL-3.0-only
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
- chardet (from https://github.com/chardet/chardet) - chardet (from https://github.com/chardet/chardet)
- pycparser (from https://github.com/eliben/pycparser) - pycparser (from https://github.com/eliben/pycparser)
...@@ -216,17 +259,18 @@ MIT ...@@ -216,17 +259,18 @@ MIT
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
- PyJWT (from http://github.com/jpadilla/pyjwt) - PyJWT (from http://github.com/jpadilla/pyjwt)
- PyYAML (from http://pyyaml.org/wiki/PyYAML) - PyYAML (from http://pyyaml.org/wiki/PyYAML)
- adal (from https://github.com/AzureAD/azure-activedirectory-library-for-python) - adal (from https://github.com/AzureAD/azure-activedirectory-library-for-python)
- aiohttp (from https://github.com/aio-libs/aiohttp/) - aiohttp (from https://github.com/aio-libs/aiohttp, https://github.com/aio-libs/aiohttp/)
- aioitertools (from https://github.com/jreese/aioitertools)
- aioredis (from https://github.com/aio-libs/aioredis) - aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from https://pypi.org/project/anyio/3.3.0/) - anyio (from https://pypi.org/project/anyio/3.3.0/, https://pypi.org/project/anyio/3.4.0/)
- asgiref (from http://github.com/django/asgiref/) - asgiref (from http://github.com/django/asgiref/)
- atomicwrites (from https://github.com/untitaker/python-atomicwrites) - attrs (from https://attrs.readthedocs.io/, https://www.attrs.org/)
- attrs (from https://attrs.readthedocs.io/)
- azure-common (from https://github.com/Azure/azure-sdk-for-python) - azure-common (from https://github.com/Azure/azure-sdk-for-python)
- azure-core (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/core/azure-core) - azure-core (from https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/core/azure-core)
- azure-datalake-store (from https://github.com/Azure/azure-data-lake-store-python) - azure-datalake-store (from https://github.com/Azure/azure-data-lake-store-python)
- azure-identity (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/identity/azure-identity) - azure-identity (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/identity/azure-identity)
- azure-keyvault (from https://github.com/Azure/azure-sdk-for-python) - azure-keyvault (from https://github.com/Azure/azure-sdk-for-python)
...@@ -234,27 +278,31 @@ The following software have components provided under the terms of this license: ...@@ -234,27 +278,31 @@ The following software have components provided under the terms of this license:
- azure-keyvault-keys (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-keys) - azure-keyvault-keys (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-keys)
- azure-keyvault-secrets (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets) - azure-keyvault-secrets (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets)
- azure-storage-blob (from https://github.com/Azure/azure-storage-python) - azure-storage-blob (from https://github.com/Azure/azure-storage-python)
- backoff (from https://github.com/litl/backoff) - backoff
- bokeh (from https://github.com/bokeh/bokeh)
- botocore (from https://github.com/boto/botocore) - botocore (from https://github.com/boto/botocore)
- cachetools (from https://github.com/tkem/cachetools) - cachetools (from https://github.com/tkem/cachetools)
- cffi (from http://cffi.readthedocs.org) - cffi
- charset-normalizer (from https://github.com/ousret/charset_normalizer) - charset-normalizer (from https://github.com/ousret/charset_normalizer)
- coverage (from https://coverage.readthedocs.io) - coverage (from https://github.com/nedbat/coveragepy)
- deepdiff (from https://github.com/seperman/deepdiff)
- fastapi (from https://github.com/tiangolo/fastapi) - fastapi (from https://github.com/tiangolo/fastapi)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- h11 (from https://github.com/python-hyper/h11) - h11
- iniconfig (from http://github.com/RonnyPfannschmidt/iniconfig) - iniconfig (from http://github.com/RonnyPfannschmidt/iniconfig)
- jmespath (from https://github.com/jmespath/jmespath.py) - jmespath (from https://github.com/jmespath/jmespath.py)
- jsonschema (from http://github.com/Julian/jsonschema) - jsonschema
- mockito (from https://github.com/kaste/mockito-python)
- msal (from https://github.com/AzureAD/microsoft-authentication-library-for-python) - msal (from https://github.com/AzureAD/microsoft-authentication-library-for-python)
- msal-extensions (from https://pypi.org/project/msal-extensions/0.1.3/) - msal-extensions (from https://pypi.org/project/msal-extensions/0.1.3/)
- msrest (from https://github.com/Azure/msrest-for-python) - msrest (from https://github.com/Azure/msrest-for-python)
- munch (from http://github.com/Infinidat/munch) - munch (from http://github.com/Infinidat/munch)
- natsort (from https://github.com/SethMMorton/natsort) - natsort (from https://github.com/SethMMorton/natsort)
- numpy (from http://www.numpy.org) - numpy
- pandas (from http://pandas.pydata.org) - ordered-set (from http://github.com/LuminosoInsight/ordered-set)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517) - pep517 (from https://github.com/takluyver/pep517)
- pluggy (from https://github.com/pytest-dev/pluggy) - pluggy
- py (from http://pylib.readthedocs.org/) - py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/) - pyarrow (from https://arrow.apache.org/)
- pydantic (from https://github.com/samuelcolvin/pydantic) - pydantic (from https://github.com/samuelcolvin/pydantic)
...@@ -272,7 +320,7 @@ The following software have components provided under the terms of this license: ...@@ -272,7 +320,7 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio) - sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/) - structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml) - toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.1.0/) - tomli (from https://pypi.org/project/tomli/1.2.2/, https://pypi.org/project/tomli/2.0.0/)
- urllib3 (from https://urllib3.readthedocs.io/) - urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict) - xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp) - zipp (from https://github.com/jaraco/zipp)
...@@ -290,14 +338,21 @@ NCSA ...@@ -290,14 +338,21 @@ NCSA
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org) - numpy
========================================================================
OFL-1.1
========================================================================
The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
======================================================================== ========================================================================
OPL-1.0 OPL-1.0
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org) - numpy
======================================================================== ========================================================================
OpenSSL OpenSSL
...@@ -312,12 +367,12 @@ Python-2.0 ...@@ -312,12 +367,12 @@ Python-2.0
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- async-timeout (from https://github.com/aio-libs/async_timeout/) - async-timeout (from https://github.com/aio-libs/async_timeout/)
- coverage (from https://coverage.readthedocs.io) - coverage (from https://github.com/nedbat/coveragepy)
- distributed (from https://distributed.readthedocs.io/en/latest/) - distributed (from https://distributed.readthedocs.io/en/latest/)
- google-auth (from https://github.com/googleapis/google-auth-library-python) - google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib) - google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- numpy (from http://www.numpy.org) - numpy
- pandas (from http://pandas.pydata.org) - pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- ply (from http://www.dabeaz.com/ply/) - ply (from http://www.dabeaz.com/ply/)
- portalocker (from https://github.com/WoLpH/portalocker) - portalocker (from https://github.com/WoLpH/portalocker)
- python-dateutil (from https://dateutil.readthedocs.org) - python-dateutil (from https://dateutil.readthedocs.org)
...@@ -332,7 +387,7 @@ SunPro ...@@ -332,7 +387,7 @@ SunPro
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org) - numpy
======================================================================== ========================================================================
Unlicense Unlicense
...@@ -348,13 +403,6 @@ The following software have components provided under the terms of this license: ...@@ -348,13 +403,6 @@ The following software have components provided under the terms of this license:
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng) - jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
========================================================================
X11
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
======================================================================== ========================================================================
ZPL-2.1 ZPL-2.1
======================================================================== ========================================================================
...@@ -368,18 +416,25 @@ Zlib ...@@ -368,18 +416,25 @@ Zlib
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org) - numpy
======================================================================== ========================================================================
public-domain public-domain
======================================================================== ========================================================================
The following software have components provided under the terms of this license: The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
- bokeh (from https://github.com/bokeh/bokeh)
- botocore (from https://github.com/boto/botocore) - botocore (from https://github.com/boto/botocore)
- grpcio (from https://grpc.io) - grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org) - numpy
- pandas (from http://pandas.pydata.org) - pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/) - py (from http://pylib.readthedocs.org/)
- pytz (from http://pythonhosted.org/pytz) - pytz (from http://pythonhosted.org/pytz)
========================================================================
unknown
========================================================================
The following software have components provided under the terms of this license:
- Pillow (from https://python-pillow.org)
...@@ -55,6 +55,9 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un ...@@ -55,6 +55,9 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- Implementation of blob storage and partition service on Azure - Implementation of blob storage and partition service on Azure
- osdu-core-lib-python-azure - osdu-core-lib-python-azure
- Implementation of blob storage and partition service on AWS
- osdu-core-lib-python-aws
- Client libraries for OSDU data ecosystem services - Client libraries for OSDU data ecosystem services
- osdu-data-ecosystem-search - osdu-data-ecosystem-search
- osdu-data-ecosystem-storage - osdu-data-ecosystem-storage
...@@ -187,6 +190,20 @@ python main.py -e SERVICE_HOST_STORAGE https://api.example.com/storage -e SERVIC ...@@ -187,6 +190,20 @@ python main.py -e SERVICE_HOST_STORAGE https://api.example.com/storage -e SERVIC
-e USE_PARTITION_SERVICE disabled -e USE_PARTITION_SERVICE disabled
``` ```
- The following environment variables are required when the cloud provider is set to AWS:
- SERVICE_HOST_SEARCH: The Search Service host
- SERVICE_HOST_STORAGE: The Storage Service host
- SERVICE_HOST_PARTITION: The Partition Service host
```bash
python main.py -e CLOUD_PROVIDER aws \
-e SERVICE_HOST_SEARCH search_host \
-e SERVICE_HOST_STORAGE storage_host \
-e SERVICE_HOST_PARTITION partition_host
```
Note: If you're running locally, you may need to provide environmental variables in your IDE. Here is a sample for providing a `.env` file. Note: If you're running locally, you may need to provide environmental variables in your IDE. Here is a sample for providing a `.env` file.
As default, all Core Services endpoint values are set to `None` in `app/conf.py`, you can update `.env` file for core services endpoints based on your cloud provider. As default, all Core Services endpoint values are set to `None` in `app/conf.py`, you can update `.env` file for core services endpoints based on your cloud provider.
...@@ -349,8 +366,7 @@ This example runs basic tests using the local filesystem for blob storage and st ...@@ -349,8 +366,7 @@ This example runs basic tests using the local filesystem for blob storage and st
First, create the temp storage folders and run the service. First, create the temp storage folders and run the service.
```bash ```bash
mkdir -p tmpstorage mkdir -p tmpstorage tmpblob
mkdir -p tmpblob
python main.py -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH $(pwd)/tmpstorage -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH $(pwd)/tmpblob -e CLOUD_PROVIDER local python main.py -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH $(pwd)/tmpstorage -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH $(pwd)/tmpblob -e CLOUD_PROVIDER local
``` ```
...@@ -378,6 +394,11 @@ If you want to work with other requirements file, you can specify them ...@@ -378,6 +394,11 @@ If you want to work with other requirements file, you can specify them
pip-sync requirements.txt requirements_dev.txt pip-sync requirements.txt requirements_dev.txt
``` ```
**Note:** On a Windows workstation, platform-specific modules such as `pywin32` are also needed. In this case don't use `pip-sync` but `pip install` instead.
```bash
pip install -r requirements.txt -r requirements_dev.txt
```
If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use: If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use:
```bash ```bash
...@@ -390,6 +411,8 @@ If you want to update the version of only one dependency, for instance fastapi: ...@@ -390,6 +411,8 @@ If you want to update the version of only one dependency, for instance fastapi:
pip-compile --upgrade-package fastapi pip-compile --upgrade-package fastapi
``` ```
**Note:** On a Windows workstation, **don't** commit the `pywin32` back to the `requirements.txt` file, that will cause CICD to fail.
For more information: https://github.com/jazzband/pip-tools/ For more information: https://github.com/jazzband/pip-tools/
### Debugging: ### Debugging:
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .bulk_id import BulkId from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe from .dataframe_persistence import create_and_store_dataframe, get_dataframe, download_bulk
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient from .json_orient import JSONOrient
from .mime_types import MimeTypes from .mime_types import MimeTypes
......
...@@ -13,27 +13,7 @@ ...@@ -13,27 +13,7 @@
# limitations under the License. # limitations under the License.
import uuid import uuid
from typing import Tuple, Optional
class BulkId: def new_bulk_id() -> str:
@staticmethod return str(uuid.uuid4())
def new_bulk_id() -> str:
return str(uuid.uuid4())
@classmethod
def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str:
if prefix:
return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}'
return uuid.UUID(bulk_id).urn
# Returns a tuple (<uuid> : str, <prefix> : str)
@classmethod
def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]:
if urn is None:
raise ValueError('attempted to decode empty urn')
parts = urn.split(":")
if len(parts) < 4:
return str(uuid.UUID(urn)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
from typing import Optional, NamedTuple
class BulkStorageVersion(NamedTuple):
""" This is the version of the bulk storage engine """
version: str
""" unique version identifier """
uri_prefix: Optional[str]
""" associated uri prefix """
BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None)
""" first bulk management implementation with direct management to blob storage with a single blob """
BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1")
""" version 1, using Dask to handle bulk manipulation and storage """
BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None)
""" represent an invalid/undefined storage version """
from typing import Optional, Tuple
import uuid
from .bulk_storage_version import (
BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid)
class BulkURI:
"""
Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how
the bulk is stored.
Usage:
- ctor from URI string value:
`bulk_uri = BulkURI.decode(uri_str)`
- ctor explicit given a bulk_id and a storage version:
`bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)`
- ctor explict using class method:
`bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)`
- encode to URI string value:
`uri_str: str = bulk_uri.encode()`
- check which storage engine version is:
`bulk_uri.storage_version == BulkStorageVersion_V0`
`bulk_uri.is_bulk_storage_V0()`
"""
def __init__(self, bulk_id: str, version: BulkStorageVersion):
"""
make an new one or invalid
Either pass uri alone or bulk_id, version
:param bulk_id: expected as a valid uuid
:param version: storage version
:throw: ValueError
"""
if not bulk_id or not version or version == BulkStorageVersion_Invalid:
bulk_id = ''
version = BulkStorageVersion_Invalid
else:
# ensure valid uuid
uuid.UUID(bulk_id)
self._bulk_id = bulk_id
self._storage_version = version
@classmethod
def invalid(cls):
""" make an invalid instance """
return cls('', BulkStorageVersion_Invalid)
@classmethod
def decode(cls, uri: str) -> 'BulkURI':
"""
construct a BulkURI from an encoded URI
:throw: ValueError
"""
if not uri:
return BulkURI.invalid()
bulk_id, prefix = cls._decode_uri(uri)
if not prefix:
version = BulkStorageVersion_V0
elif prefix == BulkStorageVersion_V1.uri_prefix:
version = BulkStorageVersion_V1
else:
raise ValueError('Unsupported prefix in bulk URI: ' + prefix)
return cls(bulk_id=bulk_id, version=version)
def is_bulk_storage_V0(self) -> bool:
""" convenient check that returns True is version == BulkStorageVersions.V0 """
return self._storage_version.version == BulkStorageVersion_V0.version
@classmethod
def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V0 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0)
@classmethod
def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V1 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1)
@property
def bulk_id(self) -> str:
return self._bulk_id
@property
def storage_version(self) -> BulkStorageVersion:
return self._storage_version
def encode(self) -> str:
"""
encode to uri as string
If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id`
If the prefix is empty or None, uri format = `urn:uuid:$bulk_id`
:Throw: ValueError
"""
if self._storage_version.uri_prefix:
return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}'
return uuid.UUID(self._bulk_id).urn
@classmethod
def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]:
"""
Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix].
If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix]
If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None]
:throw: ValueError if urn empty or invalid UUID
"""
if uri is None:
raise ValueError('attempted to decode empty urn')
parts = uri.split(":")
if len(parts) < 4:
return str(uuid.UUID(uri)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def is_valid(self) -> bool:
""" check invalid """
if self._bulk_id and self._storage_version.version:
return True
return False
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module groups function related to bulk catalog.
A catalog contains metadata of the chunks
"""
import json
from contextlib import suppress
from dataclasses import dataclass
from typing import Dict, Iterable, List, NamedTuple, Optional, Set
from dask.distributed import get_client
from app.bulk_persistence.dask.traces import submit_with_trace, trace_attributes_root_span
from app.helper.traces import with_trace
from app.utils import capture_timings
from .storage_path_builder import join, remove_protocol
from .utils import worker_capture_timing_handlers
@dataclass
class ChunkGroup:
"""A chunk group represent a chunk list having exactly the same schemas
(columns labels and dtypes)"""
labels: Set[str]
paths: List[str]
dtypes: List[str]
ColumnLabel = str
ColumnDType = str
class BulkCatalog:
"""Represent a bulk catalog
Example:
{
"recordId": "7507fb30-9cfa-4506-9cd8-6cbacbcda740",
"nbRows": 1000,
"indexPath": "folder/wdms_index/index.parquet,
"columns" : [
{
"labels": ["A", "B"],
"paths": ["folder/file1.parquet", "folder/file2.parquet"],
"dtypes": ["Int64, "Float32"]
},
{
"labels": ["C"],
"paths": ["folder/file3.parquet"],
"dtypes": ["Float32"]
}
],
}
"""
def __init__(self, record_id: str) -> None:
self.record_id: str = record_id # TODO remove
self.nb_rows: int = 0
self.index_path: Optional[str] = None
self.columns: List[ChunkGroup] = []
@property
def all_columns_count(self) -> int:
"""
Return number of columns contained in bulk data
"""
return len(self.all_columns_dtypes)
@property
def all_columns_dtypes(self) -> Dict[ColumnLabel, ColumnDType]:
"""Returns all columns with their dtype
Returns:
Dict[str, str]: a dict { column label : column dtype }
"""
res = {}
for col_group in self.columns:
res.update({cn: dt for cn, dt in zip(col_group.labels, col_group.dtypes)})
return res
def add_chunk(self, chunk_group: ChunkGroup) -> None:
"""Add ChunkGroup to the catalog."""
if len(chunk_group.labels) == 0:
return
keys = frozenset(chunk_group.labels)
chunk_group_with_same_schema = next((x for x in self.columns if len(
keys) == len(x.labels) and all(l in keys for l in x.labels)), None)
if chunk_group_with_same_schema:
chunk_group_with_same_schema.paths.extend(chunk_group.paths)
else:
self.columns.append(chunk_group)
def remove_columns_info(self, labels: Iterable[str]) -> None:
"""Removes columns information
Args:
labels (Iterable[str]): columns labels to remove
"""
clean_needed = False
labels_set = frozenset(labels)
for col_group in self.columns:
remaining_columns = {col: dt for col, dt in zip(
col_group.labels, col_group.dtypes) if col not in labels_set}
if len(remaining_columns) != len(col_group.labels):
col_group.labels = set(remaining_columns.keys())
col_group.dtypes = list(remaining_columns.values())
clean_needed = clean_needed or len(col_group.labels) == 0
if clean_needed:
self.columns = [c for c in self.columns if c.labels]
def change_columns_info(self, chunk_group: ChunkGroup) -> None:
"""Replace column information with the given one
Args:
chunk_group (ChunkGroup): new column information
"""
self.remove_columns_info(chunk_group.labels)
self.add_chunk(chunk_group)
class ColumnsPaths(NamedTuple):
labels: Set[str]
paths: List[str]
def get_paths_for_columns(self, labels: Iterable[str], base_path: str) -> List[ColumnsPaths]:
"""Returns the paths to load data of the requested columns grouped by paths
Args:
labels (Iterable[str]): List of desired columns. If None or empty select all columns.
base_path (str): Base path as prefix to chunks path
Returns:
List[ColumnsPaths]: The requested columns grouped by paths
"""
grouped_files = []
for col_group in self.columns:
matching_columns = col_group.labels.intersection(labels) if labels else col_group.labels
if matching_columns:
grouped_files.append(self.ColumnsPaths(
labels=matching_columns,
paths=[join(base_path, f) for f in col_group.paths])
)
return grouped_files
def as_dict(self) -> dict:
"""Returns the dict representation of the catalog"""
return {
"recordId": self.record_id,
"nbRows": self.nb_rows,
"indexPath": self.index_path,
'columns': [{
'labels': list(c.labels),
'paths': c.paths,
'dtypes': c.dtypes
} for c in self.columns],
}
@classmethod
def from_dict(cls, catalog_as_dict: dict) -> "BulkCatalog":
"""construct a Catalog from a dict"""
catalog = cls(record_id=catalog_as_dict["recordId"])
catalog.nb_rows = catalog_as_dict["nbRows"]
catalog.index_path = catalog_as_dict["indexPath"]
catalog.columns = [
ChunkGroup(set(c["labels"]), c["paths"], c["dtypes"])
for c in catalog_as_dict["columns"]
]
return catalog
CATALOG_FILE_NAME = 'bulk_catalog.json'
@capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('save_bulk_catalog')
def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
"""save a bulk catalog to a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with filesystem.open(meta_path, 'w') as outfile:
data = json.dumps(catalog.as_dict(), indent=0)
outfile.write(data)
# json.dump(catalog.as_dict(), outfile) # don't know why json.dump is slower (local windows)
@capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk_catalog')
def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
"""load a bulk catalog from a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with suppress(FileNotFoundError):
with filesystem.open(meta_path) as json_file:
data = json.load(json_file)
return BulkCatalog.from_dict(data)
return None
async def async_load_bulk_catalog(filesystem, folder_path: str) -> BulkCatalog:
return await submit_with_trace(get_client(), load_bulk_catalog, filesystem, folder_path)
async def async_save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
return await submit_with_trace(get_client(), save_bulk_catalog, filesystem, folder_path, catalog)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from .dask_bulk_storage import DaskBulkStorage
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
base_directory=base_directory,
storage_options={'auto_mkdir': True})
return await DaskBulkStorage.create(params)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path, remove
import uuid
import asyncio
from io import BytesIO
from contextlib import asynccontextmanager, contextmanager, suppress
from typing import Union, AsyncGenerator, AsyncContextManager
from dask.utils import format_bytes
from app.utils import get_wdms_temp_dir
from app.helper.logger import get_logger
GiB = 10 ** 9
MiB = 10 ** 6
"""
Dask data IPC (inter process communication) implementations
=============
This module contains various mechanism to pass data (bytes) between the main process to the dask worker:
* `DaskNativeDataIPC` uses the native Dask mechanism using dask_client.scatter
* `DaskLocalFileDataIPC` uses temporary local files to transfer data. The main motivation is to reduce the memory while
improving efficiency. Here's a note from Dask: `Note that it is often better to submit jobs to your workers to have them
load the data rather than loading data locally and then scattering it out to them.`
* `DaskNoneDataIPC` does nothing but forward what is put inside. This is only in case of mono process and as utility for
testing and development.
Data is expected to flow is one way for now, from main to worker. In main producer set the data asynchronously using a
context manager. The `set` method return an handle and getter pointer function. The data can then be fetched using the
getter function given the handle: and pass the result as argument to the worker:
.. code-block:: python
async with ipc_data.set(data_to_pass_to_worker) as (ipc_data_handle, ipc_data_getter_func):
dask.client.submit(some_func, ipc_data_handle, ipc_data_getter_func, ...)
Inside the worker, the data is fetched synchronously as a file_like object:
.. code-block:: python
with ipc_data_getter_func(ipc_data_handle) as file_like_data:
actual_data: bytes = file_like_data.read()
"""
async def _real_all_from_async_gen(gen: AsyncGenerator[bytes, None]) -> bytes:
""" concat all data from an async generator and return the result"""
chunks = []
async for chunk in gen:
chunks.append(chunk)
return b"".join(chunks)
@contextmanager
def ipc_data_getter_from_bytes(ipc_ref):
""" get data as file_like given data handle as direct bytes. To use with 'with' statement. """
yield BytesIO(ipc_ref)
@contextmanager
def ipc_data_getter_from_file(ipc_ref):
""" get data as file_like given data handle from a file. To use with 'with' statement. """
with open(ipc_ref, 'rb') as f:
yield f
class DaskNativeDataIPC:
"""
Data IPC implementation based on Dask native method (DaskClient.scatter) which efficiency degrades after 2MB
"""
ipc_type = 'dask_native'
def __init__(self, dask_client):
""" build a dask native ipc """
self._client = dask_client
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
scattered_data = await self._client.scatter(data, direct=True)
yield scattered_data, ipc_data_getter_from_bytes
class DaskLocalFileDataIPC:
"""
Data IPC using local file to share data. The implementations focuses to release memory as soon as possible reducing
the memory footprint.
It also 'monitors' the space used on disk.
"""
ipc_type = 'local_file'
total_files_count = 0 # only for monitoring purposes
total_size_in_file = 0 # only for monitoring purposes
log_usage_info_threshold = 1 * GiB
log_usage_warning_threshold = 2 * GiB
log_usage_error_threshold = 5 * GiB
def __init__(self, base_folder=None, io_chunk_size=50*MiB):
self._base_folder = base_folder or get_wdms_temp_dir()
self._io_chunk_size = io_chunk_size
class _AsyncSetterContextManager:
# done this way rather than @asynccontextmanager to release memory as soon as it's possible
def __init__(self, base_folder: str, data: Union[bytes, AsyncGenerator[bytes, None]], chunk_size: int):
self._base_folder = base_folder
self._data = data
self._file_path = None
self._file_size = 0
self._io_chunk_size = chunk_size
def _clean(self):
# delete file if any
if self._file_path:
get_logger().debug(f"IPC data via file, deletion of {self._file_path}")
with suppress(Exception):
remove(self._file_path)
# keep track of total file and size for monitoring purposes
DaskLocalFileDataIPC.total_files_count = max(0, DaskLocalFileDataIPC.total_files_count-1)
DaskLocalFileDataIPC.total_size_in_file = max(0, DaskLocalFileDataIPC.total_size_in_file-self._file_size)
self._file_path = None
async def _write_to_file(self, file, chunk_data: bytes) -> int:
""" write the into file by chunk """
if self._io_chunk_size == 0 or len(chunk_data) < self._io_chunk_size: # write it all at once
return file.write(chunk_data) or 0
# loop and release the event loop
dump_size = self._io_chunk_size
written_size = 0
for i in range(0, len(chunk_data), dump_size):
written_size += file.write(chunk_data[i:i + dump_size]) or 0
# as Disk I/O cannot really be async, read/write one chunk at a time then release the event loop
await asyncio.sleep(0)
return written_size
async def __aenter__(self):
filepath = path.join(self._base_folder, 'ipc_' + str(uuid.uuid4()))
try:
with open(filepath, 'wb') as f:
DaskLocalFileDataIPC.total_files_count += 1
self._file_path = filepath
if type(self._data) is bytes: # basic type check
# data are bytes
self._file_size = await self._write_to_file(f, self._data)
else:
# data is passed as a async generator
async for data_chunk in self._data:
# async generator provided: iterate on chunks
self._file_size += await self._write_to_file(f, data_chunk)
self._data = None # unref so it can be freed
get_logger().debug(f"IPC data via file, {self._file_size} bytes written into {self._file_path}")
DaskLocalFileDataIPC.total_size_in_file += self._file_size
return filepath, ipc_data_getter_from_file
except: # clean up file in any case on write failure
self._clean()
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._clean()
def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
self._log_files_stat()
return self._AsyncSetterContextManager(self._base_folder, data, self._io_chunk_size)
@classmethod
def _log_files_stat(cls):
""" internal log current number of file and total size """
if cls.total_size_in_file > cls.log_usage_error_threshold:
get_logger().error(f"unexpected IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_warning_threshold:
get_logger().warning(f"IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_info_threshold:
get_logger().info(f"IPC data usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
class DaskNoneDataIPC:
""" Utility, when no multiprocess, do nothing just pass, get data as it """
ipc_type = 'none'
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
yield data, ipc_data_getter_from_bytes
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import WorkerPlugin
from app.helper.logger import get_logger
class DaskWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
super().__init__()
get_logger().debug("WorkerPlugin initialised")
def setup(self, worker):
self.worker = worker
if self._register_fsspec_implementation:
self._register_fsspec_implementation()
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
\ No newline at end of file
from typing import List
import json
import fsspec
import pandas as pd
from app.bulk_persistence.dask.utils import WDMS_INDEX_NAME
from app.model.model_chunking import DataframeBasicDescribe
# imports from bulk_persistence
from ..json_orient import JSONOrient
from ..mime_types import MimeType
from ..dataframe_serializer import DataframeSerializerSync
from ..dataframe_validators import (DataFrameValidationFunc, assert_df_validate, validate_index,
columns_not_in_reserved_names, validate_number_of_columns)
from .errors import BulkNotProcessable, BulkSaveException
from . import storage_path_builder as path_builder
from . import session_file_meta as session_meta
"""
Contains functions related to writing bulk that mean to be run inside worker
"""
def basic_describe(df: pd.DataFrame) -> DataframeBasicDescribe:
full_cols = df.columns.tolist()
if len(full_cols) > 20: # truncate if too many columns, show 10 first and 10 last
cols = [*full_cols[0:10], '...', *full_cols[-10:]]
else:
cols = full_cols
index_exists = len(df.index)
return DataframeBasicDescribe(rowCount=len(df.index),
columnCount=len(full_cols),
columns=cols,
indexStart=str(df.index[0]) if index_exists else "0",
indexEnd=str(df.index[-1]) if index_exists else "0",
indexType=str(df.index.dtype) if index_exists else "")
def write_bulk_without_session(data_handle,
data_getter,
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
bulk_base_path: str,
storage_options) -> DataframeBasicDescribe:
"""
process post data outside of a session - write data straight to blob storage
:param data_handle: dataframe as input ipc raw bytes wrapped (file-like obj)
:param data_getter: function to get data from the handle
:param content_type: content type value as mime type (supports json and parquet)
:param df_validator_func: option validation callable function.
:param bulk_base_path: base path of the final object on blob storage.
:param storage_options: storage options
:return: basic describe of the dataframe
:throw: BulkNotProcessable, BulkSaveException
"""
# 1- deserialize to pandas dataframe
try:
with data_getter(data_handle) as file_like_data:
df = DataframeSerializerSync.load(file_like_data, content_type, JSONOrient.split)
except Exception as e:
raise BulkNotProcessable(f'parsing error: {e}') from e
data_handle = None # unref
# 2- input dataframe validation
assert_df_validate(df, [df_validator_func, validate_number_of_columns, columns_not_in_reserved_names, validate_index])
# set the name of the index column
df.index.name = WDMS_INDEX_NAME
# 3- build blob filename and final full blob path
filename = session_meta.generate_chunk_filename(df)
full_file_path = path_builder.join(bulk_base_path, filename + '.parquet')
# 4- save/upload the dataframe
try:
DataframeSerializerSync.to_parquet(df, full_file_path, storage_options=storage_options)
except Exception as e:
raise BulkSaveException('Unexpected error and save bulk') from e
# 4- return basic describe
return basic_describe(df)
def add_chunk_in_session(data_handle,
data_getter,
content_type: MimeType,
df_validator_func: DataFrameValidationFunc,
record_session_path: str,
storage_options) -> DataframeBasicDescribe:
"""
process add chunk data inside of a session
:param data_handle: input ipc raw bytes wrapped (file-like obj)
:param data_getter: function to get data from the handle
:param content_type: content type as mime type (supports json and parquet)
:param df_validator_func: option validation callable function.
:param record_session_path: base path to the session associated to the record.
:param storage_options: storage options
:return: basic describe of the dataframe
:throw: BulkNotProcessable, BulkSaveException
"""
# 1- deserialize
try:
with data_getter(data_handle) as file_like_data:
df = DataframeSerializerSync.load(file_like_data, content_type, JSONOrient.split)
except Exception as e:
raise BulkNotProcessable(f'parsing error: {e}') from e
data_handle = None # unref
# 2- perf some check
assert_df_validate(df, [df_validator_func, validate_number_of_columns, columns_not_in_reserved_names, validate_index])
# sort column by names and set index column name # TODO could it be avoided ? then we could keep input untouched and save serialization step?
df = df[sorted(df.columns)]
df.index.name = WDMS_INDEX_NAME
# 3- build blob filename and final full blob path
filename = session_meta.generate_chunk_filename(df)
# 4- build and push chunk meta file
meta_file_path, protocol = path_builder.remove_protocol(f'{record_session_path}/{filename}.meta')
fs = fsspec.filesystem(protocol, **(storage_options if storage_options else {}))
with fs.open(meta_file_path, 'w') as outfile:
json.dump(session_meta.build_chunk_metadata(df), outfile)
# 5- save/upload the dataframe
parquet_file_path = f'{record_session_path}/{filename}.parquet'
try:
DataframeSerializerSync.to_parquet(df, parquet_file_path, storage_options=storage_options)
except Exception as e:
raise BulkSaveException('Unexpected error and save bulk') from e
# 6- return basic describe
return basic_describe(df)
...@@ -13,24 +13,113 @@ ...@@ -13,24 +13,113 @@
# limitations under the License. # limitations under the License.
from fastapi import status, HTTPException from fastapi import status, HTTPException
from dask.distributed import scheduler
from pyarrow.lib import ArrowException, ArrowInvalid
from functools import wraps
from app.conf import Config
from app.helper.logger import get_logger
class BulkError(Exception): class BulkError(Exception):
http_status: int http_status: int
def raise_as_http(self): def raise_as_http(self):
raise HTTPException(status_code=self.http_status, detail=str(self)) raise HTTPException(status_code=self.http_status, detail=str(self)) from self
class BulkRecordNotFound(BulkError):
http_status = status.HTTP_404_NOT_FOUND
def __init__(self, record_id=None, bulk_id=None, message=None):
ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
if record_id:
ex_message += f'for record {record_id} '
ex_message += 'not found'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class BulkNotFound(BulkError): class BulkCurvesNotFound (BulkError):
http_status = status.HTTP_404_NOT_FOUND http_status = status.HTTP_404_NOT_FOUND
def __init__(self, record_id, bulk_id): def __init__(self, curves=None, message=None):
self.message = f'bulk {bulk_id} for record {record_id} not found' ex_message = 'bulk '
if curves:
ex_message += f'for curves: {curves} not found'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class BulkNotProcessable(BulkError): class BulkNotProcessable(BulkError):
http_status = status.HTTP_422_UNPROCESSABLE_ENTITY http_status = status.HTTP_422_UNPROCESSABLE_ENTITY
def __init__(self, bulk_id): def __init__(self, bulk_id=None, message=None):
self.message = f'bulk {bulk_id} not processable' ex_message = 'bulk '
if bulk_id:
ex_message += f'{bulk_id} '
ex_message += 'not processable'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class BulkSaveException(BulkError):
http_status = status.HTTP_500_INTERNAL_SERVER_ERROR
class InternalBulkError(BulkError):
http_status = status.HTTP_500_INTERNAL_SERVER_ERROR
def __init__(self, message=None):
ex_message = 'Internal bulk error'
if message:
ex_message += ': ' + message
super().__init__(ex_message)
class FilterError(BulkError):
http_status = status.HTTP_400_BAD_REQUEST
def __init__(self, reason):
ex_message = f'filter error: {reason}'
super().__init__(ex_message)
class TooManyColumnsRequested(BulkError):
http_status = status.HTTP_400_BAD_REQUEST
def __init__(self, nb_requested_cols):
ex_message = (
f"Too many columns: requested '{nb_requested_cols}',"
f" maximum allowed '{Config.max_columns_return.value}'")
super().__init__(ex_message)
def internal_bulk_exceptions(target):
"""
Decoration to handler exceptions that should be not exposed to outside world. e.g. Pyarrow or Dask exceptions
"""
@wraps(target)
async def async_inner(*args, **kwargs):
try:
return await target(*args, **kwargs)
except ArrowInvalid as e:
get_logger().exception(f"Pyarrow ArrowInvalid when running {target.__name__}")
raise BulkNotProcessable(f"Unable to process bulk - {str(e)}")
except ArrowException:
get_logger().exception(f"Pyarrow exception raised when running {target.__name__}")
raise BulkNotProcessable("Unable to process bulk - Arrow")
except scheduler.KilledWorker:
get_logger().exception(f"Dask worker has been killed when running '{target.__name__}'")
raise InternalBulkError("Out of memory")
except Exception:
get_logger().exception(f"Unexpected exception raised when running '{target.__name__}'")
raise
return async_inner
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import os
import time
from contextlib import suppress
from operator import attrgetter
from typing import Dict, Generator, List
from distributed.worker import get_client
import pandas as pd
from app.bulk_persistence.dask.utils import share_items
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings
from .storage_path_builder import add_protocol, record_session_path
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, protocol: str, file_path: str, lazy: bool = True) -> None:
"""
Args:
fs: fsspec filesystem
file_path (str): the parquet chunk file path
lazy (bool, optional): prefetch the metadata file if False, else read at demand. Defaults to True.
"""
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
self.protocol = protocol
if not lazy:
self._read_meta()
def _read_meta(self):
if not self._meta:
path, _ = os.path.splitext(self.path)
with self._fs.open(path + '.meta') as meta_file:
self._meta = json.load(meta_file)
return self._meta
@property
def columns(self) -> List[str]:
"""Returns the column names"""
return self._read_meta()['columns']
@property
def dtypes(self) -> List[str]:
"""Returns the column dtypes"""
return self._read_meta()['dtypes']
@property
def nb_rows(self) -> int:
"""Returns the number of rows of the chunk"""
return self._read_meta()['nb_rows']
@property
def path_with_protocol(self) -> str:
"""Returns chunk path with protocol"""
return add_protocol(self.path, self.protocol)
@property
def index_hash(self) -> str:
"""Returns the index hash"""
return self._read_meta()['index_hash']
def overlap(self, other: 'SessionFileMeta') -> bool:
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other: 'SessionFileMeta') -> bool:
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
def generate_chunk_filename(dataframe: pd.DataFrame) -> str:
"""Generate a chunk filename composed of information from the given dataframe
{first_index}_{last_index}_{time}.{shape}
The shape is a hash of columns names + columns dtypes
If chunks have same shape, dask can read them together.
Warnings:
- This funtion is not idempotent !
- Do not modify the name without updating the class SessionFileMeta !
Indeed, SessionFileMeta parse information from the chunk filename
- Filenames impacts partitions order in Dask as it order them by 'natural key'
Thats why the start index is in the first position
Raises:
IndexError - if empty dataframe
>>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)))
'0_9_1637223437910.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10,20)))
'10_19_1637223490719.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': [1], 'B': [1]}, index=[datetime.datetime.now()]))
'1639672097644401000_1639672097644401000_1639668497645.526782c41fe12c3249046fedcc45563ef3662250'
>>> generate_chunk_filename(pd.DataFrame({'A': []}, index=[]))
IndexError: index 0 is out of bounds for axis 0 with size 0
"""
first_idx, last_idx = dataframe.index[0], dataframe.index[-1]
if isinstance(dataframe.index, pd.DatetimeIndex):
first_idx, last_idx = dataframe.index[0].value, dataframe.index[-1].value
shape_str = '_'.join(f'{cn}:{dt}' for cn, dt in dataframe.dtypes.items())
shape = hashlib.sha1(shape_str.encode()).hexdigest()
cur_time = round(time.time() * 1000)
return f'{first_idx}_{last_idx}_{cur_time}.{shape}'
def build_chunk_metadata(dataframe: pd.DataFrame) -> dict:
"""Returns dataframe metadata
Other metadata such as start_index or stop_index are saved into the chunk filename
>>> build_chunk_metadata(pd.DataFrame({'A': [1,2,3], 'B': [4,5,6]}, index=[0,1,2]))
{'columns': ['A', 'B'], 'dtypes': ['int64', 'int64'], 'nb_rows': 3, 'index_hash': 'ab2fa50ae23ce035bad2e77ec5e0be05c2f4b816'}
"""
return {
"columns": list(dataframe.columns),
"dtypes": [str(dt) for dt in dataframe.dtypes],
"nb_rows": len(dataframe.index),
"index_hash": hashlib.sha1(dataframe.index.values).hexdigest()
}
@capture_timings('get_chunks_metadata')
@with_trace('get_chunks_metadata')
async def get_chunks_metadata(filesystem, protocol: str, base_directory: str, session: Session) -> List[SessionFileMeta]:
"""Return metadata objects for a given session"""
session_path = record_session_path(base_directory, session.id, session.recordId)
with suppress(FileNotFoundError):
parquet_files = [f for f in filesystem.ls(session_path) if f.endswith(".parquet")]
futures = get_client().map(lambda f: SessionFileMeta(filesystem, protocol, f, lazy=False) , parquet_files)
return await get_client().gather(futures)
return []
def get_next_chunk_files(
chunks_info
) -> Generator[List[SessionFileMeta], None, None]:
"""Generator which groups session chunk files in lists of files that can be read directly with dask
File can be grouped if they have the same schemas and no overlap between indexes
"""
chunks_info.sort(key=attrgetter('time'))
cache: Dict[str, List[SessionFileMeta]] = {}
columns_in_cache = set() # keep track of colunms present in the cache
for chunk in chunks_info:
if chunk.shape in cache: # if other chunks with same shape
# looking for overlaped chunk
for i, cached_chunk in enumerate(cache[chunk.shape]):
if chunk.overlap(cached_chunk):
if chunk.index_hash == cached_chunk.index_hash:
# if chunks are identical in shape and index just keep the last one
get_logger().info(f"Duplicated chunk skipped : '{chunk.path}'")
cache[chunk.shape].pop(i)
else:
yield cache[chunk.shape]
del cache[chunk.shape]
break
elif not columns_in_cache.isdisjoint(chunk.columns): # else if columns conflicts
conflicting_chunk = next(metas[0] for metas in cache.values()
if chunk.has_common_columns(metas[0]))
yield cache[conflicting_chunk.shape]
columns_in_cache = columns_in_cache.difference(conflicting_chunk.columns)
del cache[conflicting_chunk.shape]
cache.setdefault(chunk.shape, []).append(chunk)
columns_in_cache.update(chunk.columns)
yield from cache.values()
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions that gathers method to build path for bulk storage
"""
import hashlib
from os.path import join, relpath
from typing import Optional, Tuple
def hash_record_id(record_id: str) -> str:
"""encode the record_id to be a valid path name"""
return hashlib.sha1(record_id.encode()).hexdigest()
def build_base_path(base_directory: str, protocol: Optional[str] = None) -> str:
"""return the base directory, add the protocol if requested"""
return f'{protocol}://{base_directory}' if protocol else base_directory
def add_protocol(path: str, protocol: str) -> str:
"""add protocole to the path"""
prefix = protocol + '://'
if not path.startswith(prefix):
return prefix + path
return path
def remove_protocol(path: str) -> Tuple[str, str]:
"""remove protocol for path if any, return tuple[path, protocol].
If no protocol in path then protocol=''
>>> remove_protocol('s3://path/to/my/file')
('path/to/my/file', 's3')
>>> remove_protocol('path/to/my/file')
('path/to/my/file', '')
"""
if '://' not in path:
return path, ''
sep_idx = path.index('://')
return path[sep_idx + 3:], path[:sep_idx]
def record_path(
base_directory: str, record_id, protocol: Optional[str] = None
) -> str:
"""Return the entity path.
(path where all data relateed to an entity are saved"""
encoded_id = hash_record_id(record_id)
base_path = build_base_path(base_directory, protocol)
return join(base_path, encoded_id)
def record_bulk_path(
base_directory: str, record_id: str, bulk_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified bulk."""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'bulk', bulk_id, 'data')
def record_session_path(
base_directory: str, session_id: str, record_id: str, protocol: Optional[str] = None
) -> str:
"""Return the path corresponding to the specified session."""
entity_path = record_path(base_directory, record_id, protocol)
return join(entity_path, 'session', session_id, 'data')
def record_relative_path(base_directory: str, record_id: str, path: str) -> str:
"""Returns the path relative to the specified record."""
base_path = record_path(base_directory, record_id)
path, _proto = remove_protocol(path)
return relpath(path, base_path)
def full_path(
base_directory: str, record_id: str, rel_path: str, protocol: Optional[str] = None
) -> str:
"""Returns the full path of a record from a relative path"""
return join(record_path(base_directory, record_id, protocol), rel_path)
from typing import Callable, Union
from enum import Enum
from dask.distributed import Client
import pandas as pd
from dask.utils import funcname
from dask.base import tokenize
from opencensus.trace.span import SpanKind from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.conf import Config from app.conf import Config
from app.helper import traces
from app.utils import get_ctx
from opencensus.trace import execution_context
from . import dask_worker_write_bulk as bulk_writer
_EXPORTER = None _EXPORTER = None
...@@ -11,18 +22,113 @@ _EXPORTER = None ...@@ -11,18 +22,113 @@ _EXPORTER = None
def wrap_trace_process(*args, **kwargs): def wrap_trace_process(*args, **kwargs):
global _EXPORTER global _EXPORTER
tracing_headers = kwargs.pop('tracing_headers')
target_func = kwargs.pop('target_func') target_func = kwargs.pop('target_func')
span_context = kwargs.pop('span_context') if not tracing_headers or not target_func:
if not span_context or not target_func: raise AttributeError("Keyword arguments should contain 'target_func' and 'tracing_headers'")
raise AttributeError("Keyword arguments should contain 'target_func' and 'span_context'")
if _EXPORTER is None: if _EXPORTER is None:
_EXPORTER = create_exporter(service_name=Config.service_name.value) _EXPORTER = traces.create_exporter(service_name=Config.service_name.value)
span_context = traces.get_trace_propagator().from_headers(tracing_headers)
tracer = open_tracer.Tracer(span_context=span_context, tracer = open_tracer.Tracer(span_context=span_context,
sampler=AlwaysOnSampler(), sampler=AlwaysOnSampler(),
exporter=_EXPORTER) exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span: with tracer.span(name=f"Dask Worker - {funcname(target_func)}") as span:
span.span_kind = SpanKind.CLIENT span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs) return target_func(*args, **kwargs)
def _create_func_key(func, *args, **kwargs):
"""
Inspired by Dask code, it returns a hashed key based on function name and given arguments
"""
return funcname(func) + "-" + tokenize(func, kwargs, *args)
def submit_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
"""Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
"""
tracing_headers = traces.get_trace_propagator().to_headers(get_ctx().tracer.span_context)
kwargs['tracing_headers'] = tracing_headers
kwargs['target_func'] = target_func
dask_task_key = _create_func_key(target_func, *args, **kwargs)
return dask_client.submit(wrap_trace_process, *args, key=dask_task_key, **kwargs)
def map_with_trace(dask_client: Client, target_func: Callable, *args, **kwargs):
"""Submit given target_func to Distributed Dask workers and add tracing required stuff
Note: 'dask_task_key' is manually created to easy reading of Dask's running tasks: it will display
the effective targeted function instead of 'wrap_trace_process' used to enable tracing into Dask workers.
"""
tracing_headers = traces.get_trace_propagator().to_headers(get_ctx().tracer.span_context)
kwargs['tracing_headers'] = tracing_headers
kwargs['target_func'] = target_func
dask_task_key = _create_func_key(target_func, *args, **kwargs)
return dask_client.map(wrap_trace_process, *args, key=dask_task_key, **kwargs)
class TracingMode(Enum):
""" Allow to determine which mode of adding attributes on tracing span is needed. """
CURRENT_SPAN = 1
ROOT_SPAN = 2
def _add_trace_attributes(attributes: dict, tracing_mode: TracingMode):
"""
If tracer exists, add custom key:value as attributes on root or current span according value of 'tracing_mode'.
NOTE: if called by a Dask worker, the parent span is the one created by `wrap_trace_process` function above.
"""
opencensus_tracer = execution_context.get_opencensus_tracer()
if opencensus_tracer is None or not hasattr(opencensus_tracer, 'tracer'):
return
span = None
if tracing_mode == TracingMode.CURRENT_SPAN:
span = opencensus_tracer.tracer.current_span()
elif tracing_mode == TracingMode.ROOT_SPAN:
existing_spans = opencensus_tracer.tracer.list_collected_spans()
span = existing_spans[0] if existing_spans else None
if not span:
return
for k, v in attributes.items():
span.add_attribute(attribute_key=k,
attribute_value=v)
def trace_attributes_root_span(attributes):
""" Add attributes to root tracing span """
_add_trace_attributes(attributes, TracingMode.ROOT_SPAN)
def trace_attributes_current_span(attributes):
""" Add attributes to current tracing span """
_add_trace_attributes(attributes, TracingMode.CURRENT_SPAN)
def trace_dataframe_attributes(df: Union[pd.DataFrame, bulk_writer.DataframeBasicDescribe]):
"""
Add dataframe shape into current tracing span if tracer exists
"""
if type(df) is pd.DataFrame:
df = bulk_writer.basic_describe(df)
trace_attributes_current_span({
"df rows count": df.row_count,
"df columns count": df.column_count,
"df index start": df.index_start,
"df index end": df.index_end,
"df index type": df.index_type,
})
...@@ -12,29 +12,33 @@ ...@@ -12,29 +12,33 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import os
from itertools import zip_longest from itertools import zip_longest
from logging import INFO from logging import INFO
from typing import List, Optional
import dask.dataframe as dd
import pandas as pd
import pyarrow.parquet as pa
from app.helper.logger import get_logger from app.helper.logger import get_logger
from app.utils import capture_timings from app.utils import capture_timings
import dask.dataframe as dd
WDMS_INDEX_NAME = '_wdms_index_'
def worker_make_log_captured_timing_handler(level=INFO): def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)""" """log captured timing from the worker subprocess (no access to context)"""
def log_captured_timing(tag, wall, cpu): def log_captured_timing(tag, wall, cpu):
logger = get_logger() logger = get_logger()
if logger: if logger:
logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s") logger.log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing return log_captured_timing
worker_capture_timing_handlers = [worker_make_log_captured_timing_handler(INFO)] worker_capture_timing_handlers = [worker_make_log_captured_timing_handler(INFO)]
##
def share_items(seq1, seq2): def share_items(seq1, seq2):
"""Returns True if seq1 contains common items with seq2.""" """Returns True if seq1 contains common items with seq2."""
...@@ -47,31 +51,6 @@ def by_pairs(iterable): ...@@ -47,31 +51,6 @@ def by_pairs(iterable):
return zip_longest(*[iter(iterable)] * 2, fillvalue=None) return zip_longest(*[iter(iterable)] * 2, fillvalue=None)
class SessionFileMeta:
def __init__(self, fs, file_path: str) -> None:
self._fs = fs
file_name = os.path.basename(file_path)
start, end, tail = file_name.split('_')
self.start = float(start) # data time support ?
self.end = float(end)
self.time, self.shape, tail = tail.split('.')
self.columns = self._get_columns(file_path) # TODO lazy load
self.path = file_path
def _get_columns(self, file_path):
path, _ = os.path.splitext(file_path)
with self._fs.open(path + '.meta') as meta_file:
return json.load(meta_file)['columns']
def overlap(self, other: 'SessionFileMeta'):
"""Returns True if indexes overlap."""
return self.end >= other.start and other.end >= self.start
def has_common_columns(self, other):
"""Returns True if contains common columns with others."""
return share_items(self.columns, other.columns)
@capture_timings("set_index", handlers=worker_capture_timing_handlers) @capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf: dd.DataFrame): def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed.""" """Set index of the dask dataFrame only if needed."""
...@@ -80,18 +59,46 @@ def set_index(ddf: dd.DataFrame): ...@@ -80,18 +59,46 @@ def set_index(ddf: dd.DataFrame):
return ddf return ddf
@capture_timings("join_dataframes", handlers=worker_capture_timing_handlers)
def join_dataframes(dfs: List[dd.DataFrame]):
if len(dfs) > 1:
return dfs[0].join(dfs[1:], how='outer')
return dfs[0] if dfs else None
def rename_index(dataframe: pd.DataFrame, name):
"""Rename the dataframe index"""
dataframe.index.name = name
return dataframe
@capture_timings("do_merge", handlers=worker_capture_timing_handlers) @capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame): def do_merge(df1: dd.DataFrame, df2: Optional[dd.DataFrame]):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap.""" """Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None: if df2 is None:
return df1 return df1
df1 = set_index(df1) df1 = set_index(df1)
df2 = set_index(df2) df2 = set_index(df2)
df1 = df1.map_partitions(rename_index, WDMS_INDEX_NAME)
df2 = df2.map_partitions(rename_index, WDMS_INDEX_NAME)
if share_items(df1.columns, df2.columns): if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1) return df2.combine_first(df1)
else: return df1.join(df2, how='outer') # join seems faster when there no columns in common
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf
@capture_timings("get_num_rows", handlers=worker_capture_timing_handlers)
def get_num_rows(dataset: pa.ParquetDataset) -> int:
"""Returns the number of rows from a pyarrow ParquetDataset"""
metadata = dataset.common_metadata
if metadata and metadata.num_rows > 0:
return metadata.num_rows
return sum((piece.get_metadata().num_rows for piece in dataset.pieces))
@capture_timings("index_union", handlers=worker_capture_timing_handlers)
def index_union(idx1: pd.Index, idx2: Optional[pd.Index]):
"""Union of two Index object (check pd.Index.union doc string for more details)"""
return idx1.union(idx2) if idx2 is not None else idx1