Commit 860585dd authored by fabian serin's avatar fabian serin Committed by Luc Yriarte
Browse files

Slb code push and pipeline

parent e3721e35
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
venv/
.python-version
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
#Ipython Notebook
.ipynb_checkpoints
#virtualenvs
virtualenv/
.venv/
#build tools
fastapi_client/
#IDE
.idea/
.vscode/
default:
image: python:3.7-slim-buster
stages:
- test
- deploy
build:
stage: test
script:
- pip3 install -r requirements.txt
- pip3 install -r requirements_tests.txt
- pytest test --junitxml=report.xml
artifacts:
when: always
reports:
junit: report.xml
# This job only runs on master, and it creates the lib and push it to the feed
deploylib:
stage: deploy
script:
- pip3 install -r requirements.txt
- pip3 install twine
- python3 setup.py sdist bdist_wheel
- TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=${CI_REGISTRY_USER} python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
rules:
- if: $CI_COMMIT_BRANCH == 'master'
# osdu-data-ecosystem-storage
is projet python client library for the open data ecosystem storage service.
Storage service which handles the metadata ingestion in the Data Ecosystem
This Python package is automatically generated with fastapi_client
from the swagger api definition converted to openapi 3.0 specification using swagger2openapi
## Requirements
Python 3.7
## Installation & Usage
### pip install
```bash
pip install osdu-data-ecosystem-storage
```
## Getting Started
Please follow the [installation procedure](#installation--usage) and then loog the the [example application](.§example.py)
## Documentation for API Endpoints
[storageGroupsAdministrationApi](odes_storage/docs/storageGroupsAdministrationApi.md
[storageGroupsAdministrationApi](odes_storage/docs/storageMembersAdministrationApi.md)
## Documentation For Models
[Documentation](../odes_storage/docs/)
## Documentation For Authorization
TODO
## Contributing
### Installing build tools
```
npm install -g swagger2openapi
git clone https://github.com/ChristopheLallement/fastapi_client.git
```
### Build
Convert from swagger spec to openapi spec :
```
swagger2openapi curated-swagger-storage.json -y --outfile openapi-storage.yaml
```
Then generate :
```
rm -rf ./odes_storage & \
./fastapi_client/scripts/generate.sh \
--include-auth \
-i openapi-storage.yaml \
-p odes_storage \
-n odes_storage \
-o ./
```
## Author
clallement@slb.com
\ No newline at end of file
This diff is collapsed.
from odes_storage import AsyncApis as StorageApi, AuthApiClient as StorageClient
from odes_storage.models import (
CreateUpdateRecordsResponse,
DatastoreQueryResult,
Legal,
Record,
StorageAcl,
)
import asyncio
from pprint import pprint
HOSTNAME = "https://os-storage-dot-opendes.appspot.com/api/storage"
TOKEN = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY5ZDk3YjRjYWU5MGJjZDc2YWViMjAwMjZmNmI3NzBjYWMyMjE3ODMiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20iLCJhenAiOiI0ODk1NzIxMDc2OTUtb2FjajI0ZnRlNWExN3JtM2VsZ2lsaGJuNDljaWVnaG8uYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJhdWQiOiI0ODk1NzIxMDc2OTUtb2FjajI0ZnRlNWExN3JtM2VsZ2lsaGJuNDljaWVnaG8uYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJzdWIiOiIxMDM0MzY5NjA1NjQ3Njk2ODQzMzUiLCJoZCI6Im9wZW5kZXMucDRkLmNsb3VkLnNsYi1kcy5jb20iLCJlbWFpbCI6ImNsYWxsZW1lbnRAb3BlbmRlcy5wNGQuY2xvdWQuc2xiLWRzLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJhdF9oYXNoIjoiV1RBb1lVLTI5YTU3QmpOaV9IajNndyIsImlhdCI6MTU4NzU0MDI3NCwiZXhwIjoxNTg3NTQzODc0fQ.l3OljSD9N5FhNRbj8E8Gd3dyDj8Qi40r6W4e473tfTvGHzuE6QLpm5Yhp-l43HHHcOt_Gnbm-hEq6l9XXyx4yGAhna8Wp9vgz2HRlQDcXAB1WeYkBhzJ2PCu8QrT_cqahPzH8pqN8mNuTpEzJTVKZQJIm0Crs9DVz2XyIvCaUxX9lEqHKIzCVeRYjdoBKundRw4WKVXtrLd4ZX56fP9s06aUmHdrdN8BFmSn1JS2I8XNtPHarv_kKiItWbZIjWkjvM_OOCwx38oZUztTBq9W-uMh-nIT59PGj7e_RxtGUiEBlvkZroKvHg1wF1WcPOBQ5-EvW1oU19fjWkhyfbuQFQ"
DATA_PARTITION_ID = "opendes"
async def main():
record = Record(
kind="opendes:osdu:wellbore:2.0.0",
acl=StorageAcl(
viewers=["data.default.viewers@opendes.p4d.cloud.slb-ds.com"],
owners=["data.default.owners@opendes.p4d.cloud.slb-ds.com"],
),
legal=Legal(
legaltags=["opendes-public-usa-dataset-1"],
otherRelevantDataCountries=["US"],
),
data={"msg": "hello world, from Data Ecosystem"},
)
client = StorageApi(
StorageClient(host=HOSTNAME, token=TOKEN)
)
created_record_response = await client.records_api.create_or_update_records(
data_partition_id=DATA_PARTITION_ID,
skipdupes=False,
record=[record],
)
assert isinstance(created_record_response, CreateUpdateRecordsResponse)
assert created_record_response.record_ids[0]
computed_record = await client.records_api.get_record(
data_partition_id=DATA_PARTITION_ID,
id=created_record_response.record_ids[0],
)
assert isinstance(computed_record, Record)
assert record.kind == computed_record.kind
assert record.acl == computed_record.acl
assert record.legal == computed_record.legal
assert record.data == computed_record.data
pprint(computed_record)
if __name__ == "__main__":
asyncio.run(main())
import inspect
from odes_storage import models
from odes_storage.api_client import ApiClient, AsyncApis, SyncApis # noqa F401
from odes_storage.auth_api_client import AuthApiClient
from odes_storage.exceptions import ResponseValidationError, UnexpectedResponse
for model in inspect.getmembers(models, inspect.isclass):
if model[1].__module__ == "odes_storage.models":
model_class = model[1]
model_class.update_forward_refs()
# flake8: noqa E501
from asyncio import get_event_loop
from typing import TYPE_CHECKING, Awaitable
from fastapi.encoders import jsonable_encoder
from odes_storage import models as m
if TYPE_CHECKING:
from odes_storage.api_client import ApiClient
class _QueryApi:
def __init__(self, api_client: "ApiClient"):
self.api_client = api_client
def _build_for_fetch_records(
self, data_partition_id: str, multi_record_ids: m.MultiRecordIds = None
) -> Awaitable[m.MultiRecordInfo]:
"""
The API fetches multiple records at once. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
headers = {"data-partition-id": str(data_partition_id)}
body = jsonable_encoder(multi_record_ids)
return self.api_client.request(
type_=m.MultiRecordInfo, method="POST", url="/v2/query/records", headers=headers, json=body
)
def _build_for_fetch_records_with_optional_conversion(
self, data_partition_id: str, multi_record_request: m.MultiRecordRequest = None
) -> Awaitable[m.MultiRecordResponse]:
"""
Fetch records and do corresponding conversion as user requested, no more than 20 records per request.
"""
headers = {"data-partition-id": str(data_partition_id)}
body = jsonable_encoder(multi_record_request)
return self.api_client.request(
type_=m.MultiRecordResponse, method="POST", url="/v2/query/records:batch", headers=headers, json=body
)
def _build_for_get_all_kinds(
self, data_partition_id: str, cursor: str = None, limit: int = None
) -> Awaitable[m.DatastoreQueryResult]:
"""
The API returns a list of all kinds in the specific {Data-Partition-Id}. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
query_params = {}
if cursor is not None:
query_params["cursor"] = str(cursor)
if limit is not None:
query_params["limit"] = str(limit)
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=m.DatastoreQueryResult, method="GET", url="/v2/query/kinds", params=query_params, headers=headers,
)
def _build_for_get_all_record_from_kind(
self, data_partition_id: str, cursor: str = None, limit: int = None, kind: str = None
) -> Awaitable[m.DatastoreQueryResult]:
"""
The API returns a list of all record ids which belong to the specified kind. Required roles: 'users.datalake.ops'.
"""
query_params = {}
if cursor is not None:
query_params["cursor"] = str(cursor)
if limit is not None:
query_params["limit"] = str(limit)
if kind is not None:
query_params["kind"] = str(kind)
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=m.DatastoreQueryResult, method="GET", url="/v2/query/records", params=query_params, headers=headers,
)
class AsyncQueryApi(_QueryApi):
async def fetch_records(
self, data_partition_id: str, multi_record_ids: m.MultiRecordIds = None
) -> m.MultiRecordInfo:
"""
The API fetches multiple records at once. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_fetch_records(
data_partition_id=data_partition_id, multi_record_ids=multi_record_ids
)
async def fetch_records_with_optional_conversion(
self, data_partition_id: str, multi_record_request: m.MultiRecordRequest = None
) -> m.MultiRecordResponse:
"""
Fetch records and do corresponding conversion as user requested, no more than 20 records per request.
"""
return await self._build_for_fetch_records_with_optional_conversion(
data_partition_id=data_partition_id, multi_record_request=multi_record_request
)
async def get_all_kinds(
self, data_partition_id: str, cursor: str = None, limit: int = None
) -> m.DatastoreQueryResult:
"""
The API returns a list of all kinds in the specific {Data-Partition-Id}. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_get_all_kinds(data_partition_id=data_partition_id, cursor=cursor, limit=limit)
async def get_all_record_from_kind(
self, data_partition_id: str, cursor: str = None, limit: int = None, kind: str = None
) -> m.DatastoreQueryResult:
"""
The API returns a list of all record ids which belong to the specified kind. Required roles: 'users.datalake.ops'.
"""
return await self._build_for_get_all_record_from_kind(
data_partition_id=data_partition_id, cursor=cursor, limit=limit, kind=kind
)
class SyncQueryApi(_QueryApi):
def fetch_records(self, data_partition_id: str, multi_record_ids: m.MultiRecordIds = None) -> m.MultiRecordInfo:
"""
The API fetches multiple records at once. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
coroutine = self._build_for_fetch_records(
data_partition_id=data_partition_id, multi_record_ids=multi_record_ids
)
return get_event_loop().run_until_complete(coroutine)
def fetch_records_with_optional_conversion(
self, data_partition_id: str, multi_record_request: m.MultiRecordRequest = None
) -> m.MultiRecordResponse:
"""
Fetch records and do corresponding conversion as user requested, no more than 20 records per request.
"""
coroutine = self._build_for_fetch_records_with_optional_conversion(
data_partition_id=data_partition_id, multi_record_request=multi_record_request
)
return get_event_loop().run_until_complete(coroutine)
def get_all_kinds(self, data_partition_id: str, cursor: str = None, limit: int = None) -> m.DatastoreQueryResult:
"""
The API returns a list of all kinds in the specific {Data-Partition-Id}. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
coroutine = self._build_for_get_all_kinds(data_partition_id=data_partition_id, cursor=cursor, limit=limit)
return get_event_loop().run_until_complete(coroutine)
def get_all_record_from_kind(
self, data_partition_id: str, cursor: str = None, limit: int = None, kind: str = None
) -> m.DatastoreQueryResult:
"""
The API returns a list of all record ids which belong to the specified kind. Required roles: 'users.datalake.ops'.
"""
coroutine = self._build_for_get_all_record_from_kind(
data_partition_id=data_partition_id, cursor=cursor, limit=limit, kind=kind
)
return get_event_loop().run_until_complete(coroutine)
# flake8: noqa E501
from asyncio import get_event_loop
from typing import TYPE_CHECKING, Any, Awaitable, List
from fastapi.encoders import jsonable_encoder
from odes_storage import models as m
if TYPE_CHECKING:
from odes_storage.api_client import ApiClient
class _RecordsApi:
def __init__(self, api_client: "ApiClient"):
self.api_client = api_client
def _build_for_create_or_update_records(
self, data_partition_id: str, skipdupes: bool = None, record: List[m.Record] = None
) -> Awaitable[m.CreateUpdateRecordsResponse]:
"""
The API represents the main injection mechanism into the Data Ecosystem. It allows records creation and/or update. When no record id is provided or when the provided id is not already present in the Data Ecosystem then a new record is created. If the id is related to an existing record in the Data Ecosystem then an update operation takes place and a new version of the record is created. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
query_params = {}
if skipdupes is not None:
query_params["skipdupes"] = str(skipdupes)
headers = {"data-partition-id": str(data_partition_id)}
body = jsonable_encoder(record)
return self.api_client.request(
type_=m.CreateUpdateRecordsResponse,
method="PUT",
url="/v2/records",
params=query_params,
headers=headers,
json=body,
)
def _build_for_delete_record(self, id: str, data_partition_id: str, body: Any = None) -> Awaitable[None]:
"""
The API performs a logical deletion of the given record. This operation can be reverted later. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
path_params = {"id": str(id)}
headers = {"data-partition-id": str(data_partition_id)}
body = jsonable_encoder(body)
return self.api_client.request(
type_=None,
method="POST",
url="/v2/records/{id}:delete",
path_params=path_params,
headers=headers,
json=body,
)
def _build_for_get_all_record_versions(self, id: str, data_partition_id: str) -> Awaitable[m.RecordVersions]:
"""
The API returns a list containing all versions for the given record id. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
path_params = {"id": str(id)}
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=m.RecordVersions,
method="GET",
url="/v2/records/versions/{id}",
path_params=path_params,
headers=headers,
)
def _build_for_get_record(
self, id: str, data_partition_id: str, attribute: List[str] = None
) -> Awaitable[m.Record]:
"""
This API returns the latest version of the given record. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
path_params = {"id": str(id)}
query_params = {}
if attribute is not None:
query_params["attribute"] = [str(attribute_item) for attribute_item in attribute]
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=m.Record,
method="GET",
url="/v2/records/{id}",
path_params=path_params,
params=query_params,
headers=headers,
)
def _build_for_get_record_version(
self, id: str, version: int, data_partition_id: str, attribute: List[str] = None
) -> Awaitable[m.Record]:
"""
The API retrieves the specific version of the given record. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
path_params = {"id": str(id), "version": str(version)}
query_params = {}
if attribute is not None:
query_params["attribute"] = [str(attribute_item) for attribute_item in attribute]
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=m.Record,
method="GET",
url="/v2/records/{id}/{version}",
path_params=path_params,
params=query_params,
headers=headers,
)
def _build_for_purge_record(self, id: str, data_partition_id: str) -> Awaitable[None]:
"""
The API performs the physical deletion of the given record and all of its versions. This operation cannot be undone. Required roles: 'users.datalake.ops'.
"""
path_params = {"id": str(id)}
headers = {"data-partition-id": str(data_partition_id)}
return self.api_client.request(
type_=None, method="DELETE", url="/v2/records/{id}", path_params=path_params, headers=headers,
)
class AsyncRecordsApi(_RecordsApi):
async def create_or_update_records(
self, data_partition_id: str, skipdupes: bool = None, record: List[m.Record] = None
) -> m.CreateUpdateRecordsResponse:
"""
The API represents the main injection mechanism into the Data Ecosystem. It allows records creation and/or update. When no record id is provided or when the provided id is not already present in the Data Ecosystem then a new record is created. If the id is related to an existing record in the Data Ecosystem then an update operation takes place and a new version of the record is created. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_create_or_update_records(
data_partition_id=data_partition_id, skipdupes=skipdupes, record=record
)
async def delete_record(self, id: str, data_partition_id: str, body: Any = None) -> None:
"""
The API performs a logical deletion of the given record. This operation can be reverted later. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_delete_record(id=id, data_partition_id=data_partition_id, body=body)
async def get_all_record_versions(self, id: str, data_partition_id: str) -> m.RecordVersions:
"""
The API returns a list containing all versions for the given record id. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_get_all_record_versions(id=id, data_partition_id=data_partition_id)
async def get_record(self, id: str, data_partition_id: str, attribute: List[str] = None) -> m.Record:
"""
This API returns the latest version of the given record. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_get_record(id=id, data_partition_id=data_partition_id, attribute=attribute)
async def get_record_version(
self, id: str, version: int, data_partition_id: str, attribute: List[str] = None
) -> m.Record:
"""
The API retrieves the specific version of the given record. Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.
"""
return await self._build_for_get_record_version(
id=id, version=version, data_partition_id=data_partition_id, attribute=attribute
)
async def purge_record(self, id: str, data_partition_id: str) -> None:
"""
The API performs the physical deletion of the given record and all of its versions. This operation cannot be undone. Required roles: 'users.datalake.ops'.
"""
return await self._build_for_purge_record(id=id, data_partition_id=data_partition_id)
class SyncRecordsApi(_RecordsApi):
def create_or_update_records(
self, data_partition_id: str, skipdupes: bool = None, record: List[m.Record] = None
) -> m.CreateUpdateRecordsResponse:
"""
The API represents the main injection mechanism into the Data Ecosystem. It allows records creation and/or update. When no record id is provided or when the provided id is not already present in the Data Ecosystem then a new record is created. If the id is related to an existing record in the Data Ecosystem then an update operation takes place and a new version of the record is created. Required roles: 'users.datalake.editors' or 'users.datalake.admins'.
"""
coroutine = self._build_for_create_or_update_records(
data_partition_id=data_partition_id, skipdupes=skipdupes, record=record
)
return get_event_loop().run_until_complete(coroutine)
def delete_record(self, id: str, data_partition_id: str, body: Any = None) -> None:
"""