Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services
  • Vkamani/vkamani-wellbore-domain-services
  • Yan_Sushchynski/wellbore-domain-services-comm-impl
3 results
Show changes
Commits on Source (1205)
Showing
with 1457 additions and 519 deletions
......@@ -27,6 +27,8 @@ htmlcov
coverage.xml
unit_tests_report.xml
# Ignore files cached by Hypothesis...
.hypothesis/*
# Environments
.env
.venv*
......@@ -41,4 +43,4 @@ secrets/
**/.DS_Store
.vscode
\ No newline at end of file
.vscode
......@@ -22,6 +22,7 @@ variables:
AZURE_TEST_TYPE: python
AWS_SERVICE: wellbore-ddms
AWS_SERVICE_NAMESPACE: osdu-wellbore-ddms
AWS_ENVIRONMENT: dev
AWS_BUILD_SUBDIR: provider/os-wellbore-ddms-aws/build-aws
AWS_TEST_SUBDIR: tests/aws-test/build-aws
......@@ -32,8 +33,9 @@ variables:
OSDU_GCP_SERVICE: wellbore
OSDU_GCP_VENDOR: gcp
OSDU_GCP_HELM_PACKAGE_CHARTS: "devops/gcp/deploy devops/gcp/configmap"
OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT --set data.service_host_search=$OSDU_GCP_SERVICE_HOST_SEARCH --set data.sa_key=$OSDU_GCP_INTEGRATION_TESTER"
OSDU_GCP_HELM_DEPLOYMENT_SERVICE_VARS: "--set data.image=$CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA"
OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT"
OSDU_GCP_HELM_CONFIG_SERVICE_VARS_DEV2: "--set data.os_wellbore_ddms_data_project_id=$OSDU_GCP_PROJECT"
OSDU_GCP_HELM_DEPLOYMENT_SERVICE_VARS: "--set data.image=$CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA --set data.serviceAccountName=$OSDU_GCP_SERVICE-k8s"
OSDU_GCP_HELM_CONFIG_SERVICE: wellbore-config
OSDU_GCP_HELM_DEPLOYMENT_SERVICE: wellbore-deploy
OSDU_GCP_INT_TEST_TYPE: python
......@@ -55,7 +57,7 @@ include:
file: "cloud-providers/osdu-gcp-gke.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/ibm-wellbore.yml"
file: "cloud-providers/ibm-wellbore-git.yml"
- local: "/devops/azure/azure-wellbore.yml"
......@@ -65,6 +67,53 @@ include:
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws-python.yml"
#Override Python Image defined in python.yml
.python:
image: python:3.8-bullseye
verify_existing_requirements:
extends:
- .python
- .skipForTriggeringMergeRequests
stage: build
before_script:
- pip3 install --upgrade pip-tools
script:
- pip-compile requirements.in
- pip-compile requirements_dev.in
- git diff
# checking difference between existing requirements and the newly generated one
# and using the number of lines as exit status code
- exit $(git status -s | grep -e "^ M " | wc -l)
artifacts:
when: always
paths:
- "requirements*.txt"
expire_in: 2 days
compile-and-unit-test:
artifacts:
when: always
paths:
- all-requirements.txt
- spec/generated/openapi.json
verify_models:
extends:
- .python
- .skipForTriggeringMergeRequests
stage: build
script:
- python -m doctest docs/model/model_curated.md
containerize:
extends: .skipForTriggeringMergeRequests
stage: containerize
......@@ -99,9 +148,6 @@ osdu-gcp-containerize-gitlab:
image: docker:19.03
cache: {}
tags: ["osdu-medium"]
only:
variables:
- $OSDU_GCP == 'true'
variables:
IMAGE_TAG: $CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA
IMAGE_TAG_LATEST: $CI_REGISTRY_IMAGE/osdu-gcp:latest
......@@ -161,13 +207,14 @@ osdu-gcp-containerize-for-release:
osdu-gcp-test-python:
extends: .osdu-gcp-variables
stage: integration
image: gcr.io/google.com/cloudsdktool/cloud-sdk
image: python:3.8
needs: ["osdu-gcp-deploy-deployment"]
only:
variables:
- $OSDU_GCP == 'true' && $OSDU_GCP_INT_TEST_TYPE == 'python'
script:
- apt-get install -y python3-venv
- bash
- curl https://sdk.cloud.google.com > install.sh
- bash install.sh --disable-prompts
- source /root/google-cloud-sdk/completion.bash.inc
- source /root/google-cloud-sdk/path.bash.inc
- python3 -m venv env
- source env/bin/activate
- pip install --upgrade pip
......@@ -188,19 +235,50 @@ osdu-gcp-test-python:
--data_partition $OSDU_GCP_TENANT
--acl_domain $DOMAIN
--legal_tag $LEGAL_TAG
- pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=basic
- pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=!search
osdu-gcp-dev2-test-python:
extends: .osdu-gcp-dev2-variables
stage: integration
image: python:3.8
needs: ["osdu-gcp-dev2-deploy-deployment"]
script:
- bash
- curl https://sdk.cloud.google.com > install.sh
- bash install.sh --disable-prompts
- source /root/google-cloud-sdk/completion.bash.inc
- source /root/google-cloud-sdk/path.bash.inc
- python3 -m venv env
- source env/bin/activate
- pip install --upgrade pip
- pip install wheel pytest pytest-cov
- >
for REQ in $PIP_REQUIREMENTS ; do
pip install -r $REQ
done
- cd tests/integration
- echo $OSDU_GCP_INTEGRATION_TESTER | base64 -d > file.json
- gcloud auth activate-service-account --key-file file.json
- gcloud config set project $OSDU_GCP_PROJECT
- >
python gen_postman_env.py
--token $(gcloud auth print-access-token)
--base_url $OSDU_GCP_WELLBORE_BASE_URL
--cloud_provider $OSDU_GCP_VENDOR
--data_partition $OSDU_GCP_TENANT
--acl_domain $DOMAIN
--legal_tag $LEGAL_TAG
- pytest ./functional --environment="./generated/postman_environment.json" --filter-tag=!search
# Disable maven job in gcp common gke pipeline
osdu-gcp-test:
extends:
- .osdu-gcp-variables
# Allow failure on deployments
ibm-deploy:
allow_failure: true
# Allow failure on integration tests
osdu-gcp-dev2-test:
extends:
- .osdu-gcp-dev2-variables
ibm-test:
# Allow failure on private development deployments
ibm-deploy-devpri:
allow_failure: true
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
\ No newline at end of file
# 3rd-Party Software License Notice
Generated by fossa-cli (https://github.com/fossas/fossa-cli).
Formatted by fossa-with-cache (https://community.opengroup.org/divido/fossa-with-cache).
This software includes the following software and licenses:
========================================================================
......@@ -7,42 +8,42 @@ Apache-2.0
========================================================================
The following software have components provided under the terms of this license:
- aiohttp (from https://github.com/aio-libs/aiohttp/)
- aiobotocore (from https://github.com/aio-libs/aiobotocore)
- aiohttp (from https://github.com/aio-libs/aiohttp, https://github.com/aio-libs/aiohttp/)
- async-timeout (from https://github.com/aio-libs/async_timeout/)
- boto3 (from https://github.com/boto/boto3)
- botocore (from https://github.com/boto/botocore)
- coverage (from https://coverage.readthedocs.io)
- coverage (from https://github.com/nedbat/coveragepy)
- cryptography (from https://github.com/pyca/cryptography)
- google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-auth (from https://github.com/googleapis/google-auth-library-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-core (from https://github.com/GoogleCloudPlatform/google-cloud-python, https://github.com/googleapis/python-cloud-core)
- google-cloud-monitoring (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-cloud-trace (from https://github.com/googleapis/googleapis)
- googleapis-common-protos (from https://github.com/googleapis/googleapis)
- grpcio (from https://grpc.io)
- importlib-metadata (from http://importlib-metadata.readthedocs.io/)
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
- msgpack (from http://msgpack.org/)
- multidict (from https://github.com/aio-libs/multidict/)
- numpy (from http://www.numpy.org)
- numpy
- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator)
- opencensus (from https://github.com/census-instrumentation/opencensus-python)
- opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context)
- opencensus-ext-azure (from )
- opencensus-ext-logging (from )
- opencensus-ext-azure (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-azure)
- opencensus-ext-logging (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-logging)
- opencensus-ext-ocagent (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-ocagent)
- opencensus-ext-stackdriver (from https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-stackdriver)
- opencensus-proto (from https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-python)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pyarrow (from https://arrow.apache.org/)
- pytest-asyncio (from https://github.com/pytest-dev/pytest-asyncio)
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-multipart (from http://github.com/andrew-d/python-multipart)
- requests (from https://requests.readthedocs.io)
- requests (from http://python-requests.org, https://requests.readthedocs.io)
- rfc3986 (from https://rfc3986.readthedocs.org)
- rsa (from https://stuvel.eu/rsa)
- s3transfer (from https://github.com/boto/s3transfer)
......@@ -50,6 +51,7 @@ The following software have components provided under the terms of this license:
- sortedcontainers (from http://www.grantjenks.com/docs/sortedcontainers/)
- structlog (from http://www.structlog.org/)
- tblib (from https://github.com/ionelmc/python-tblib)
- toposort (from https://bitbucket.org/ericvsmith/toposort)
- tornado (from http://www.tornadoweb.org/)
- yarl (from https://github.com/aio-libs/yarl/)
......@@ -58,17 +60,18 @@ BSD-2-Clause
========================================================================
The following software have components provided under the terms of this license:
- colorama (from https://github.com/tartley/colorama)
- grpcio (from https://grpc.io)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- numpy
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- ply (from http://www.dabeaz.com/ply/)
- pyasn1 (from http://sourceforge.net/projects/pyasn1/)
- pyasn1-modules (from http://sourceforge.net/projects/pyasn1/)
- pycparser (from https://github.com/eliben/pycparser)
- tblib (from https://github.com/ionelmc/python-tblib)
- wrapt (from https://github.com/GrahamDumpleton/wrapt)
========================================================================
BSD-3-Clause
......@@ -76,11 +79,12 @@ BSD-3-Clause
The following software have components provided under the terms of this license:
- HeapDict (from http://stutzbachenterprises.com/)
- Jinja2 (from http://jinja.pocoo.org/, https://palletsprojects.com/p/jinja/)
- MarkupSafe (from https://palletsprojects.com/p/markupsafe/)
- adlfs (from https://github.com/hayesgb/adlfs/)
- asgiref (from http://github.com/django/asgiref/)
- click (from http://github.com/mitsuhiko/click)
- cloudpickle (from https://github.com/cloudpipe/cloudpickle)
- colorama (from https://github.com/tartley/colorama)
- cryptography (from https://github.com/pyca/cryptography)
- dask (from http://github.com/dask/dask/)
- decorator (from https://github.com/micheles/decorator)
......@@ -95,13 +99,13 @@ The following software have components provided under the terms of this license:
- isodate (from http://cheeseshop.python.org/pypi/isodate)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- numpy
- oauthlib (from https://github.com/idan/oauthlib)
- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- pip-tools (from http://pypi.python.org/pypi/pip-tools/1.8.1rc3)
- pip-tools (from https://github.com/jazzband/pip-tools/)
- ply (from http://www.dabeaz.com/ply/)
- protobuf (from https://developers.google.com/protocol-buffers/)
- psutil (from https://github.com/giampaolo/psutil)
......@@ -109,14 +113,17 @@ The following software have components provided under the terms of this license:
- pyasn1 (from http://sourceforge.net/projects/pyasn1/)
- pyasn1-modules (from http://sourceforge.net/projects/pyasn1/)
- pycparser (from https://github.com/eliben/pycparser)
- pyparsing (from http://pyparsing.wikispaces.com/)
- pyrsistent (from http://github.com/tobgu/pyrsistent/)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib)
- s3fs (from http://github.com/dask/s3fs/)
- starlette (from https://github.com/encode/starlette)
- tblib (from https://github.com/ionelmc/python-tblib)
- toolz (from http://github.com/pytoolz/toolz/)
- uvicorn (from https://github.com/tomchristie/uvicorn)
- uvicorn (from https://github.com/tomchristie/uvicorn, https://www.uvicorn.org/)
- wrapt (from https://github.com/GrahamDumpleton/wrapt)
- zict (from http://github.com/dask/zict/)
========================================================================
......@@ -129,9 +136,10 @@ The following software have components provided under the terms of this license:
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- s3fs (from http://github.com/dask/s3fs/)
- toolz (from http://github.com/pytoolz/toolz/)
========================================================================
......@@ -139,14 +147,21 @@ CC-BY-SA-3.0
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- numpy
========================================================================
CC0-1.0
========================================================================
The following software have components provided under the terms of this license:
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
========================================================================
GPL-2.0-only
========================================================================
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- coverage (from https://github.com/nedbat/coveragepy)
- grpcio (from https://grpc.io)
========================================================================
......@@ -161,7 +176,7 @@ GPL-3.0-only
========================================================================
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- coverage (from https://github.com/nedbat/coveragepy)
- grpcio (from https://grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org)
......@@ -184,8 +199,6 @@ The following software have components provided under the terms of this license:
========================================================================
JSON
========================================================================
The following software have components provided under the terms of this license:
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
========================================================================
......@@ -219,14 +232,14 @@ The following software have components provided under the terms of this license:
- PyJWT (from http://github.com/jpadilla/pyjwt)
- PyYAML (from http://pyyaml.org/wiki/PyYAML)
- adal (from https://github.com/AzureAD/azure-activedirectory-library-for-python)
- aiohttp (from https://github.com/aio-libs/aiohttp/)
- aiohttp (from https://github.com/aio-libs/aiohttp, https://github.com/aio-libs/aiohttp/)
- aioitertools (from https://github.com/jreese/aioitertools)
- aioredis (from https://github.com/aio-libs/aioredis)
- anyio (from https://pypi.org/project/anyio/3.3.0/)
- anyio (from https://pypi.org/project/anyio/3.3.0/, https://pypi.org/project/anyio/3.4.0/)
- asgiref (from http://github.com/django/asgiref/)
- atomicwrites (from https://github.com/untitaker/python-atomicwrites)
- attrs (from https://attrs.readthedocs.io/)
- attrs (from https://attrs.readthedocs.io/, https://www.attrs.org/)
- azure-common (from https://github.com/Azure/azure-sdk-for-python)
- azure-core (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/core/azure-core)
- azure-core (from https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/core/azure-core)
- azure-datalake-store (from https://github.com/Azure/azure-data-lake-store-python)
- azure-identity (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/identity/azure-identity)
- azure-keyvault (from https://github.com/Azure/azure-sdk-for-python)
......@@ -234,33 +247,36 @@ The following software have components provided under the terms of this license:
- azure-keyvault-keys (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-keys)
- azure-keyvault-secrets (from https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets)
- azure-storage-blob (from https://github.com/Azure/azure-storage-python)
- backoff (from https://github.com/litl/backoff)
- backoff
- botocore (from https://github.com/boto/botocore)
- cachetools (from https://github.com/tkem/cachetools)
- cffi (from http://cffi.readthedocs.org)
- cffi
- charset-normalizer (from https://github.com/ousret/charset_normalizer)
- coverage (from https://coverage.readthedocs.io)
- coverage (from https://github.com/nedbat/coveragepy)
- deepdiff (from https://github.com/seperman/deepdiff)
- fastapi (from https://github.com/tiangolo/fastapi)
- grpcio (from https://grpc.io)
- h11 (from https://github.com/python-hyper/h11)
- h11
- iniconfig (from http://github.com/RonnyPfannschmidt/iniconfig)
- jmespath (from https://github.com/jmespath/jmespath.py)
- jsonschema (from http://github.com/Julian/jsonschema)
- jsonschema
- mockito (from https://github.com/kaste/mockito-python)
- msal (from https://github.com/AzureAD/microsoft-authentication-library-for-python)
- msal-extensions (from https://pypi.org/project/msal-extensions/0.1.3/)
- msrest (from https://github.com/Azure/msrest-for-python)
- munch (from http://github.com/Infinidat/munch)
- natsort (from https://github.com/SethMMorton/natsort)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- numpy
- ordered-set (from http://github.com/LuminosoInsight/ordered-set)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- pep517 (from https://github.com/takluyver/pep517)
- pluggy (from https://github.com/pytest-dev/pluggy)
- pluggy
- py (from http://pylib.readthedocs.org/)
- pyarrow (from https://arrow.apache.org/)
- pydantic (from https://github.com/samuelcolvin/pydantic)
- pyparsing (from http://pyparsing.wikispaces.com/)
- pyrsistent (from http://github.com/tobgu/pyrsistent/)
- pytest (from http://pytest.org)
- pytest (from http://pytest.org, https://docs.pytest.org/en/latest/)
- pytest-cov (from https://github.com/pytest-dev/pytest-cov)
- pytest-httpx (from https://colin-b.github.io/pytest_httpx/)
- pytest-mock (from https://github.com/pytest-dev/pytest-mock/)
......@@ -272,10 +288,16 @@ The following software have components provided under the terms of this license:
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
- toml (from https://github.com/uiri/toml)
- tomli (from https://pypi.org/project/tomli/1.1.0/)
- tomli (from https://pypi.org/project/tomli/1.2.2/, https://pypi.org/project/tomli/2.0.0/)
- urllib3 (from https://urllib3.readthedocs.io/)
- xmltodict (from https://github.com/martinblech/xmltodict)
- zipp (from https://github.com/jaraco/zipp)
========================================================================
MIT-CMU
========================================================================
The following software have components provided under the terms of this license:
- pyparsing (from http://pyparsing.wikispaces.com/)
========================================================================
MPL-2.0
......@@ -284,20 +306,26 @@ The following software have components provided under the terms of this license:
- certifi (from http://certifi.io/)
- charset-normalizer (from https://github.com/ousret/charset_normalizer)
- hypothesis (from https://hypothesis.works)
========================================================================
MPL-2.0-no-copyleft-exception
========================================================================
- hypothesis (from https://hypothesis.works)
========================================================================
NCSA
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- numpy
========================================================================
OPL-1.0
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- numpy
========================================================================
OpenSSL
......@@ -312,12 +340,12 @@ Python-2.0
The following software have components provided under the terms of this license:
- async-timeout (from https://github.com/aio-libs/async_timeout/)
- coverage (from https://coverage.readthedocs.io)
- coverage (from https://github.com/nedbat/coveragepy)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- google-auth (from https://github.com/googleapis/google-auth-library-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python, https://github.com/googleapis/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- ply (from http://www.dabeaz.com/ply/)
- portalocker (from https://github.com/WoLpH/portalocker)
- python-dateutil (from https://dateutil.readthedocs.org)
......@@ -332,28 +360,26 @@ SunPro
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- numpy
========================================================================
Unlicense
TCL
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io)
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
========================================================================
WTFPL
Unlicense
========================================================================
The following software have components provided under the terms of this license:
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
- grpcio (from https://grpc.io)
========================================================================
X11
WTFPL
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
========================================================================
ZPL-2.1
......@@ -368,7 +394,7 @@ Zlib
The following software have components provided under the terms of this license:
- grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org)
- numpy
========================================================================
public-domain
......@@ -377,9 +403,8 @@ The following software have components provided under the terms of this license:
- botocore (from https://github.com/boto/botocore)
- grpcio (from https://grpc.io)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- hypothesis (from https://hypothesis.works)
- numpy
- pandas (from http://pandas.pydata.org, https://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/)
- pytz (from http://pythonhosted.org/pytz)
......@@ -7,7 +7,7 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
## Install Software and Packages
1. Clone the os-wellbore-ddms [repository](https://community.opengroup.org/osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services.git)
2. Download [Python](https://www.python.org/downloads/) >=3.7
2. Download [Python](https://www.python.org/downloads/) >=3.8
3. Ensure pip, a pre-installed package manager and installer for Python, is installed and is upgraded to the latest version.
```bash
......@@ -55,6 +55,9 @@ Wellbore Domain Data Management Services (Wellbore-DDMS) Open Subsurface Data Un
- Implementation of blob storage and partition service on Azure
- osdu-core-lib-python-azure
- Implementation of blob storage and partition service on AWS
- osdu-core-lib-python-aws
- Client libraries for OSDU data ecosystem services
- osdu-data-ecosystem-search
- osdu-data-ecosystem-storage
......@@ -187,6 +190,20 @@ python main.py -e SERVICE_HOST_STORAGE https://api.example.com/storage -e SERVIC
-e USE_PARTITION_SERVICE disabled
```
- The following environment variables are required when the cloud provider is set to AWS:
- SERVICE_HOST_SEARCH: The Search Service host
- SERVICE_HOST_STORAGE: The Storage Service host
- SERVICE_HOST_PARTITION: The Partition Service host
```bash
python main.py -e CLOUD_PROVIDER aws \
-e SERVICE_HOST_SEARCH search_host \
-e SERVICE_HOST_STORAGE storage_host \
-e SERVICE_HOST_PARTITION partition_host
```
Note: If you're running locally, you may need to provide environmental variables in your IDE. Here is a sample for providing a `.env` file.
As default, all Core Services endpoint values are set to `None` in `app/conf.py`, you can update `.env` file for core services endpoints based on your cloud provider.
......@@ -349,8 +366,7 @@ This example runs basic tests using the local filesystem for blob storage and st
First, create the temp storage folders and run the service.
```bash
mkdir -p tmpstorage
mkdir -p tmpblob
mkdir -p tmpstorage tmpblob
python main.py -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH $(pwd)/tmpstorage -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH $(pwd)/tmpblob -e CLOUD_PROVIDER local
```
......@@ -378,6 +394,11 @@ If you want to work with other requirements file, you can specify them
pip-sync requirements.txt requirements_dev.txt
```
**Note:** On a Windows workstation, platform-specific modules such as `pywin32` are also needed. In this case don't use `pip-sync` but `pip install` instead.
```bash
pip install -r requirements.txt -r requirements_dev.txt
```
If you want to update `requirements.txt` to retrieve the most recent version, respecting bounds set in `requirements.in`, you can use:
```bash
......@@ -390,6 +411,8 @@ If you want to update the version of only one dependency, for instance fastapi:
pip-compile --upgrade-package fastapi
```
**Note:** On a Windows workstation, **don't** commit the `pywin32` back to the `requirements.txt` file, that will cause CICD to fail.
For more information: https://github.com/jazzband/pip-tools/
### Debugging:
......
......@@ -18,7 +18,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette.authentication import AuthCredentials
from app.utils import Context
from app.context import Context
security = HTTPBearer()
......
......@@ -12,10 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .bulk_id import BulkId
from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .bulk_uri import BulkURI
from .dataframe_persistence import create_and_store_dataframe, get_dataframe, download_bulk
from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync
from .json_orient import JSONOrient
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from .exceptions import UnknownChannelsException, InvalidBulkException, NoBulkException, NoDataException, RecordNotFoundException
from .consistency_checks import ConsistencyException, DataConsistencyChecks
from .dask.client import DaskClient
from .dask.localcluster import DaskException
from .capture_timings import capture_timings
# TMP: this should probably not be exposed outside of the bulk_persistence package
from .temp_dir import get_temp_dir
......@@ -36,7 +36,9 @@ import pyarrow as pa
import pyarrow.feather as feather
import pyarrow.parquet as pq
from app.utils import get_pool_executor, get_wdms_temp_dir
from app.pool_executor import get_pool_executor
from .temp_dir import get_temp_dir
from .dataframe_serializer import DataframeSerializerAsync
from .blob_bulk import BlobBulk
from .mime_types import MimeType, MimeTypes
......@@ -60,7 +62,7 @@ def export_to_parquet(
pq.write_table(
pa.Table.from_pandas(dataframe, preserve_index=True),
path_like,
version="2.0",
version="2.6",
compression="snappy",
)
return path_like, {"content_type": MimeTypes.PARQUET.type}
......@@ -199,7 +201,7 @@ async def create_and_write_blob(
# Build the output filename which will be used as bulk id
blob_id = blob_id or str(uuid.uuid4())
out_filename = blob_id + file_exporter.mime_type.extension
out_path = path.join(out_dir or get_wdms_temp_dir(), out_filename)
out_path = path.join(out_dir or get_temp_dir(), out_filename)
# Dump/Export the dataframe into a file format
export_to_file_function = custom_export_to_file_fn or file_exporter.writer_fn
......@@ -231,7 +233,4 @@ async def create_and_write_blob(
@with_trace('read_blob')
async def read_blob(blob: BlobBulk):
importer = BlobFileImporters.from_string(blob.content_type)
# TODO: run in executor?
dataframe = importer.reader_fn(blob.data)
return dataframe
return await DataframeSerializerAsync().read_parquet(blob.data)
......@@ -13,27 +13,7 @@
# limitations under the License.
import uuid
from typing import Tuple, Optional
class BulkId:
@staticmethod
def new_bulk_id() -> str:
return str(uuid.uuid4())
@classmethod
def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str:
if prefix:
return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}'
return uuid.UUID(bulk_id).urn
# Returns a tuple (<uuid> : str, <prefix> : str)
@classmethod
def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]:
if urn is None:
raise ValueError('attempted to decode empty urn')
parts = urn.split(":")
if len(parts) < 4:
return str(uuid.UUID(urn)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def new_bulk_id() -> str:
return str(uuid.uuid4())
from typing import Optional, NamedTuple
class BulkStorageVersion(NamedTuple):
""" This is the version of the bulk storage engine """
version: str
""" unique version identifier """
uri_prefix: Optional[str]
""" associated uri prefix """
BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None)
""" first bulk management implementation with direct management to blob storage with a single blob """
BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1")
""" version 1, using Dask to handle bulk manipulation and storage """
BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None)
""" represent an invalid/undefined storage version """
from typing import Optional, Tuple
import uuid
from .bulk_storage_version import (
BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid)
class BulkURI:
"""
Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how
the bulk is stored.
Usage:
- ctor from URI string value:
`bulk_uri = BulkURI.decode(uri_str)`
- ctor explicit given a bulk_id and a storage version:
`bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)`
- ctor explict using class method:
`bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)`
- encode to URI string value:
`uri_str: str = bulk_uri.encode()`
- check which storage engine version is:
`bulk_uri.storage_version == BulkStorageVersion_V0`
`bulk_uri.is_bulk_storage_V0()`
"""
def __init__(self, bulk_id: str, version: BulkStorageVersion):
"""
make an new one or invalid
Either pass uri alone or bulk_id, version
:param bulk_id: expected as a valid uuid
:param version: storage version
:throw: ValueError
"""
if not bulk_id or not version or version == BulkStorageVersion_Invalid:
bulk_id = ''
version = BulkStorageVersion_Invalid
else:
# ensure valid uuid
uuid.UUID(bulk_id)
self._bulk_id = bulk_id
self._storage_version = version
@classmethod
def invalid(cls):
""" make an invalid instance """
return cls('', BulkStorageVersion_Invalid)
@classmethod
def decode(cls, uri: str) -> 'BulkURI':
"""
construct a BulkURI from an encoded URI
:throw: ValueError
"""
if not uri:
return BulkURI.invalid()
bulk_id, prefix = cls._decode_uri(uri)
if not prefix:
version = BulkStorageVersion_V0
elif prefix == BulkStorageVersion_V1.uri_prefix:
version = BulkStorageVersion_V1
else:
raise ValueError('Unsupported prefix in bulk URI: ' + prefix)
return cls(bulk_id=bulk_id, version=version)
def is_bulk_storage_V0(self) -> bool:
""" convenient check that returns True is version == BulkStorageVersions.V0 """
return self._storage_version.version == BulkStorageVersion_V0.version
@classmethod
def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V0 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0)
@classmethod
def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI':
""" construct a BulkURI for storage engine V1 given a bulk id """
return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1)
@property
def bulk_id(self) -> str:
return self._bulk_id
@property
def storage_version(self) -> BulkStorageVersion:
return self._storage_version
def encode(self) -> str:
"""
encode to uri as string
If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id`
If the prefix is empty or None, uri format = `urn:uuid:$bulk_id`
:Throw: ValueError
"""
if self._storage_version.uri_prefix:
return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}'
return uuid.UUID(self._bulk_id).urn
@classmethod
def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]:
"""
Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix].
If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix]
If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None]
:throw: ValueError if urn empty or invalid UUID
"""
if uri is None:
raise ValueError('attempted to decode empty urn')
parts = uri.split(":")
if len(parts) < 4:
return str(uuid.UUID(uri)), None
return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2])
def is_valid(self) -> bool:
""" check invalid """
if self._bulk_id and self._storage_version.version:
return True
return False
from logging import Logger, INFO
from functools import wraps
import asyncio
from time import perf_counter, process_time
from ..helper.logger import get_logger
def make_log_captured_timing_handler(level=INFO):
def log_captured_timing(tag, wall, cpu):
get_logger().log(level, f"Timing of {tag}, wall={wall:.5f}s, cpu={cpu:.5f}s")
return log_captured_timing
default_capture_timing_handlers = [make_log_captured_timing_handler(INFO)]
def capture_timings(tag, handlers=default_capture_timing_handlers):
""" basic timing decorator, get both wall and cpu """
def decorate(target):
if asyncio.iscoroutinefunction(target):
@wraps(target)
async def async_inner(*args, **kwargs):
start_perf = perf_counter()
start_process = process_time()
try:
return await target(*args, **kwargs)
finally:
perf_elapsed = perf_counter() - start_perf
process_elapsed = process_time() - start_process
for handler in handlers:
handler(tag=tag, wall=perf_elapsed, cpu=process_elapsed)
return async_inner
@wraps(target)
def sync_inner(*args, **kwargs):
start_perf = perf_counter()
start_process = process_time()
try:
return target(*args, **kwargs)
finally:
perf_elapsed = perf_counter() - start_perf
process_elapsed = process_time() - start_process
for handler in handlers:
handler(tag=tag, wall=perf_elapsed, cpu=process_elapsed)
return sync_inner
return decorate
from abc import ABC, abstractmethod
from typing import Iterable, Set
import re
import pandas as pd
from fastapi import status
from .dask.errors import BulkError
class ConsistencyException(BulkError):
http_status = status.HTTP_400_BAD_REQUEST
class DataConsistencyChecks(ABC):
# regular expression pattern for extracting column name from bulk data column label
_col_label_pattern = re.compile(r"^(?P<name>.+)\[(?P<start>[^:]+):?(?P<stop>.*)\]$")
@classmethod
@abstractmethod
async def check_bulk_consistency_on_commit_session(cls, record: "Record", new_bulk_id):
pass
@classmethod
@abstractmethod
def check_bulk_consistency_on_post_bulk(cls, record: "Record", df: pd.DataFrame):
pass
@staticmethod
def _get_data_columns_name(col_labels: Iterable[str]) -> Set[str]:
"""
Get column names from bulk data column labels
"""
def _get_col_name_from_col_label(col_label: str):
match = DataConsistencyChecks._col_label_pattern.match(col_label)
if not match:
return col_label
return match["name"]
res = (_get_col_name_from_col_label(col) for col in col_labels if col)
return {r for r in res if r != ""}
from . import dask_config
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module groups function related to bulk catalog.
A catalog contains metadata of the chunks
"""
import asyncio
import functools
import json
from contextlib import suppress
from dataclasses import dataclass
from typing import Dict, Iterable, List, NamedTuple, Optional, Set
from ...helper.traces import with_trace
from ..capture_timings import capture_timings
from .storage_path_builder import join, remove_protocol
from .utils import worker_capture_timing_handlers
@dataclass
class ChunkGroup:
"""A chunk group represent a chunk list having exactly the same schemas
(columns labels and dtypes)"""
labels: Set[str]
paths: List[str]
dtypes: List[str]
ColumnLabel = str
ColumnDType = str
class BulkCatalog:
"""Represent a bulk catalog
Example:
{
"recordId": "7507fb30-9cfa-4506-9cd8-6cbacbcda740",
"nbRows": 1000,
"indexPath": "folder/wdms_index/index.parquet,
"columns" : [
{
"labels": ["A", "B"],
"paths": ["folder/file1.parquet", "folder/file2.parquet"],
"dtypes": ["Int64, "Float32"]
},
{
"labels": ["C"],
"paths": ["folder/file3.parquet"],
"dtypes": ["Float32"]
}
],
}
"""
def __init__(self, record_id: str) -> None:
self.record_id: str = record_id # TODO remove
self.nb_rows: int = 0
self.index_path: Optional[str] = None
self.columns: List[ChunkGroup] = []
@property
def all_columns_count(self) -> int:
"""
Return number of columns contained in bulk data
"""
return len(self.all_columns_dtypes)
@property
def all_columns_dtypes(self) -> Dict[ColumnLabel, ColumnDType]:
"""Returns all columns with their dtype
Returns:
Dict[str, str]: a dict { column label : column dtype }
"""
res = {}
for col_group in self.columns:
res.update({cn: dt for cn, dt in zip(col_group.labels, col_group.dtypes)})
return res
def add_chunk(self, chunk_group: ChunkGroup) -> None:
"""Add ChunkGroup to the catalog."""
if len(chunk_group.labels) == 0:
return
keys = frozenset(chunk_group.labels)
chunk_group_with_same_schema = next((x for x in self.columns if len(
keys) == len(x.labels) and all(l in keys for l in x.labels)), None)
if chunk_group_with_same_schema:
chunk_group_with_same_schema.paths.extend(chunk_group.paths)
else:
self.columns.append(chunk_group)
def remove_columns_info(self, labels: Iterable[str]) -> None:
"""Removes columns information
Args:
labels (Iterable[str]): columns labels to remove
"""
clean_needed = False
labels_set = frozenset(labels)
for col_group in self.columns:
remaining_columns = {col: dt for col, dt in zip(
col_group.labels, col_group.dtypes) if col not in labels_set}
if len(remaining_columns) != len(col_group.labels):
col_group.labels = set(remaining_columns.keys())
col_group.dtypes = list(remaining_columns.values())
clean_needed = clean_needed or len(col_group.labels) == 0
if clean_needed:
self.columns = [c for c in self.columns if c.labels]
def change_columns_info(self, chunk_group: ChunkGroup) -> None:
"""Replace column information with the given one
Args:
chunk_group (ChunkGroup): new column information
"""
self.remove_columns_info(chunk_group.labels)
self.add_chunk(chunk_group)
class ColumnsPaths(NamedTuple):
labels: Set[str]
paths: List[str]
def get_paths_for_columns(self, labels: Iterable[str], base_path: str) -> List[ColumnsPaths]:
"""Returns the paths to load data of the requested columns grouped by paths
Args:
labels (Iterable[str]): List of desired columns. If None or empty select all columns.
base_path (str): Base path as prefix to chunks path
Returns:
List[ColumnsPaths]: The requested columns grouped by paths
"""
grouped_files = []
for col_group in self.columns:
matching_columns = col_group.labels.intersection(labels) if labels else col_group.labels
if matching_columns:
grouped_files.append(self.ColumnsPaths(
labels=matching_columns,
paths=[join(base_path, f) for f in col_group.paths])
)
return grouped_files
def as_dict(self) -> dict:
"""Returns the dict representation of the catalog"""
return {
"recordId": self.record_id,
"nbRows": self.nb_rows,
"indexPath": self.index_path,
'columns': [{
'labels': list(c.labels),
'paths': c.paths,
'dtypes': c.dtypes
} for c in self.columns],
}
@classmethod
def from_dict(cls, catalog_as_dict: dict) -> "BulkCatalog":
"""construct a Catalog from a dict"""
catalog = cls(record_id=catalog_as_dict["recordId"])
catalog.nb_rows = catalog_as_dict["nbRows"]
catalog.index_path = catalog_as_dict["indexPath"]
catalog.columns = [
ChunkGroup(set(c["labels"]), c["paths"], c["dtypes"])
for c in catalog_as_dict["columns"]
]
return catalog
CATALOG_FILE_NAME = 'bulk_catalog.json'
@capture_timings('save_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('save_bulk_catalog')
async def save_bulk_catalog(filesystem, folder_path: str, catalog: BulkCatalog) -> None:
"""save a bulk catalog to a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with filesystem.open(meta_path, 'w') as outfile:
_func = functools.partial(json.dumps, catalog.as_dict(), indent=0)
data = await asyncio.get_running_loop().run_in_executor(None, _func)
outfile.write(data)
@capture_timings('load_bulk_catalog', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk_catalog')
async def load_bulk_catalog(filesystem, folder_path: str) -> Optional[BulkCatalog]:
"""load a bulk catalog from a json file in the given folder path"""
folder_path, _ = remove_protocol(folder_path)
meta_path = join(folder_path, CATALOG_FILE_NAME)
with suppress(FileNotFoundError):
with filesystem.open(meta_path) as json_file:
data = await asyncio.get_running_loop().run_in_executor(None, json.load, json_file)
return BulkCatalog.from_dict(data)
return None
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import dask
from dask.utils import format_bytes
from dask.distributed import Client as DaskDistributedClient
from distributed import LocalCluster
from app.conf import Config
from .localcluster import get_dask_configuration
HOUR = 3600 # in seconds
class DaskClient:
# singleton of DaskDistributedClient class
client: DaskDistributedClient = None
# Ensure access to critical section is done for only one coroutine
lock_client: asyncio.Lock = None
@staticmethod
async def create() -> DaskDistributedClient:
if not DaskClient.lock_client:
DaskClient.lock_client = asyncio.Lock()
if not DaskClient.client:
async with DaskClient.lock_client:
if not DaskClient.client:
from app.helper.logger import get_logger
logger = get_logger()
logger.info(f"Dask client initialization started...")
n_workers, threads_per_worker, worker_memory_limit = get_dask_configuration(config=Config, logger=logger)
logger.info(f"Dask client worker configuration: {n_workers} workers running with "
f"{format_bytes(worker_memory_limit)} of RAM and {threads_per_worker} threads each")
# Ensure memory used by workers is freed regularly despite memory leak
dask.config.set({'distributed.worker.lifetime.duration': HOUR * 24})
dask.config.set({'distributed.worker.lifetime.stagger': HOUR * 1})
dask.config.set({'distributed.worker.lifetime.restart': True})
logger.info(f"Dask cluster configuration - "
f"worker lifetime: {dask.config.get('distributed.worker.lifetime.duration')}s. "
f"stagger: {dask.config.get('distributed.worker.lifetime.stagger')}s.")
cluster = await LocalCluster(
asynchronous=True,
processes=True,
threads_per_worker=threads_per_worker,
n_workers=n_workers,
memory_limit=worker_memory_limit,
dashboard_address=None
)
# A worker could be killed when executing a task if lifetime duration elapsed,
# "cluster.adapt(min=N, max=N)" ensure the respawn of workers if it happens
cluster.adapt(minimum=n_workers, maximum=n_workers)
DaskClient.client = await DaskDistributedClient(cluster, asynchronous=True)
get_logger().info(f"Dask client initialized : {DaskClient.client}")
return DaskClient.client
@staticmethod
async def close():
if not DaskClient.lock_client:
return
async with DaskClient.lock_client:
if DaskClient.client:
# closing the cluster (started independently from the client)
cluster = await DaskClient.client.cluster
await cluster.close()
await DaskClient.client.close() # or shutdown
DaskClient.client = None
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from .dask_bulk_storage import DaskBulkStorage
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
base_directory=base_directory,
storage_options={'auto_mkdir': True})
return await DaskBulkStorage.create(params)
import dask
from ..temp_dir import get_temp_dir
dask.config.set({'temporary_directory': get_temp_dir()})
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path, remove
import uuid
import asyncio
from io import BytesIO
from contextlib import asynccontextmanager, contextmanager, suppress
from typing import Union, AsyncGenerator, AsyncContextManager
from dask.utils import format_bytes
from ..temp_dir import get_temp_dir
from app.helper.logger import get_logger
GiB = 10 ** 9
MiB = 10 ** 6
"""
Dask data IPC (inter process communication) implementations
=============
This module contains various mechanism to pass data (bytes) between the main process to the dask worker:
* `DaskNativeDataIPC` uses the native Dask mechanism using dask_client.scatter
* `DaskLocalFileDataIPC` uses temporary local files to transfer data. The main motivation is to reduce the memory while
improving efficiency. Here's a note from Dask: `Note that it is often better to submit jobs to your workers to have them
load the data rather than loading data locally and then scattering it out to them.`
* `DaskNoneDataIPC` does nothing but forward what is put inside. This is only in case of mono process and as utility for
testing and development.
Data is expected to flow is one way for now, from main to worker. In main producer set the data asynchronously using a
context manager. The `set` method return an handle and getter pointer function. The data can then be fetched using the
getter function given the handle: and pass the result as argument to the worker:
.. code-block:: python
async with ipc_data.set(data_to_pass_to_worker) as (ipc_data_handle, ipc_data_getter_func):
dask.client.submit(some_func, ipc_data_handle, ipc_data_getter_func, ...)
Inside the worker, the data is fetched synchronously as a file_like object:
.. code-block:: python
with ipc_data_getter_func(ipc_data_handle) as file_like_data:
actual_data: bytes = file_like_data.read()
"""
async def _real_all_from_async_gen(gen: AsyncGenerator[bytes, None]) -> bytes:
""" concat all data from an async generator and return the result"""
chunks = []
async for chunk in gen:
chunks.append(chunk)
return b"".join(chunks)
@contextmanager
def ipc_data_getter_from_bytes(ipc_ref):
""" get data as file_like given data handle as direct bytes. To use with 'with' statement. """
yield BytesIO(ipc_ref)
@contextmanager
def ipc_data_getter_from_file(ipc_ref):
""" get data as file_like given data handle from a file. To use with 'with' statement. """
with open(ipc_ref, 'rb') as f:
yield f
class DaskNativeDataIPC:
"""
Data IPC implementation based on Dask native method (DaskClient.scatter) which efficiency degrades after 2MB
"""
ipc_type = 'dask_native'
def __init__(self, dask_client):
""" build a dask native ipc """
self._client = dask_client
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
scattered_data = await self._client.scatter(data, direct=True)
yield scattered_data, ipc_data_getter_from_bytes
class DaskLocalFileDataIPC:
"""
Data IPC using local file to share data. The implementations focuses to release memory as soon as possible reducing
the memory footprint.
It also 'monitors' the space used on disk.
"""
ipc_type = 'local_file'
total_files_count = 0 # only for monitoring purposes
total_size_in_file = 0 # only for monitoring purposes
log_usage_info_threshold = 1 * GiB
log_usage_warning_threshold = 2 * GiB
log_usage_error_threshold = 5 * GiB
def __init__(self, base_folder=None, io_chunk_size=50*MiB):
self._base_folder = base_folder or get_temp_dir()
self._io_chunk_size = io_chunk_size
class _AsyncSetterContextManager:
# done this way rather than @asynccontextmanager to release memory as soon as it's possible
def __init__(self, base_folder: str, data: Union[bytes, AsyncGenerator[bytes, None]], chunk_size: int):
self._base_folder = base_folder
self._data = data
self._file_path = None
self._file_size = 0
self._io_chunk_size = chunk_size
def _clean(self):
# delete file if any
if self._file_path:
get_logger().debug(f"IPC data via file, deletion of {self._file_path}")
with suppress(Exception):
remove(self._file_path)
# keep track of total file and size for monitoring purposes
DaskLocalFileDataIPC.total_files_count = max(0, DaskLocalFileDataIPC.total_files_count-1)
DaskLocalFileDataIPC.total_size_in_file = max(0, DaskLocalFileDataIPC.total_size_in_file-self._file_size)
self._file_path = None
async def _write_to_file(self, file, chunk_data: bytes) -> int:
""" write the into file by chunk """
if self._io_chunk_size == 0 or len(chunk_data) < self._io_chunk_size: # write it all at once
return file.write(chunk_data) or 0
# loop and release the event loop
dump_size = self._io_chunk_size
written_size = 0
for i in range(0, len(chunk_data), dump_size):
written_size += file.write(chunk_data[i:i + dump_size]) or 0
# as Disk I/O cannot really be async, read/write one chunk at a time then release the event loop
await asyncio.sleep(0)
return written_size
async def __aenter__(self):
filepath = path.join(self._base_folder, 'ipc_' + str(uuid.uuid4()))
try:
with open(filepath, 'wb') as f:
DaskLocalFileDataIPC.total_files_count += 1
self._file_path = filepath
if type(self._data) is bytes: # basic type check
# data are bytes
self._file_size = await self._write_to_file(f, self._data)
else:
# data is passed as a async generator
async for data_chunk in self._data:
# async generator provided: iterate on chunks
self._file_size += await self._write_to_file(f, data_chunk)
self._data = None # unref so it can be freed
get_logger().debug(f"IPC data via file, {self._file_size} bytes written into {self._file_path}")
DaskLocalFileDataIPC.total_size_in_file += self._file_size
return filepath, ipc_data_getter_from_file
except: # clean up file in any case on write failure
self._clean()
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._clean()
def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
self._log_files_stat()
return self._AsyncSetterContextManager(self._base_folder, data, self._io_chunk_size)
@classmethod
def _log_files_stat(cls):
""" internal log current number of file and total size """
if cls.total_size_in_file > cls.log_usage_error_threshold:
get_logger().error(f"unexpected IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_warning_threshold:
get_logger().warning(f"IPC data high usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
elif cls.total_size_in_file > cls.log_usage_info_threshold:
get_logger().info(f"IPC data usage: {cls.total_size_in_file} files"
f" for {format_bytes(cls.total_size_in_file)}")
class DaskNoneDataIPC:
""" Utility, when no multiprocess, do nothing just pass, get data as it """
ipc_type = 'none'
@asynccontextmanager
async def set(self, data: Union[bytes, AsyncGenerator[bytes, None]]) -> AsyncContextManager:
if type(data) is not bytes: # basic type check
data = await _real_all_from_async_gen(data)
yield data, ipc_data_getter_from_bytes