Commit 8600658b authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'local_storage_versionning' into 'master'

local fs storage enhancements

See merge request !145
parents cddf0df6 491fd10b
Pipeline #49327 passed with stages
in 11 minutes and 31 seconds
......@@ -12,7 +12,6 @@ The following software have components provided under the terms of this license:
- boto3 (from https://github.com/boto/boto3)
- botocore (from https://github.com/boto/botocore)
- coverage (from https://coverage.readthedocs.io)
- cryptography (from https://github.com/pyca/cryptography)
- google-api-core (from https://github.com/GoogleCloudPlatform/google-cloud-python)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
......@@ -56,14 +55,14 @@ BSD-2-Clause
========================================================================
The following software have components provided under the terms of this license:
- decorator (from https://github.com/micheles/decorator)
- grpcio (from http://www.grpc.io)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- ply (from http://www.dabeaz.com/ply/)
- packaging (from https://github.com/pypa/packaging)
- pyasn1 (from http://sourceforge.net/projects/pyasn1/)
- pyasn1-modules (from http://sourceforge.net/projects/pyasn1/)
- pycparser (from https://github.com/eliben/pycparser)
- tblib (from https://github.com/ionelmc/python-tblib)
========================================================================
......@@ -77,7 +76,6 @@ The following software have components provided under the terms of this license:
- click (from http://github.com/mitsuhiko/click)
- cloudpickle (from https://github.com/cloudpipe/cloudpickle)
- colorama (from https://github.com/tartley/colorama)
- cryptography (from https://github.com/pyca/cryptography)
- dask (from http://github.com/dask/dask/)
- decorator (from https://github.com/micheles/decorator)
- distributed (from https://distributed.readthedocs.io/en/latest/)
......@@ -89,26 +87,20 @@ The following software have components provided under the terms of this license:
- httpx (from https://github.com/encode/httpx)
- idna (from https://github.com/kjd/idna)
- isodate (from http://cheeseshop.python.org/pypi/isodate)
- locket (from http://github.com/mwilliamson/locket.py)
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- oauthlib (from https://github.com/idan/oauthlib)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- ply (from http://www.dabeaz.com/ply/)
- protobuf (from https://developers.google.com/protocol-buffers/)
- psutil (from https://github.com/giampaolo/psutil)
- pyarrow (from https://arrow.apache.org/)
- pyasn1 (from http://sourceforge.net/projects/pyasn1/)
- pyasn1-modules (from http://sourceforge.net/projects/pyasn1/)
- pycparser (from https://github.com/eliben/pycparser)
- pyparsing (from http://pyparsing.wikispaces.com/)
- pyrsistent (from http://github.com/tobgu/pyrsistent/)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib)
- starlette (from https://github.com/encode/starlette)
- tblib (from https://github.com/ionelmc/python-tblib)
- toolz (from http://github.com/pytoolz/toolz/)
- uvicorn (from https://github.com/tomchristie/uvicorn)
- zict (from http://github.com/dask/zict/)
......@@ -119,35 +111,33 @@ CC-BY-4.0
The following software have components provided under the terms of this license:
- adlfs (from https://github.com/hayesgb/adlfs/)
- dask (from http://github.com/dask/dask/)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fsspec (from http://github.com/intake/filesystem_spec)
- gcsfs (from https://github.com/dask/gcsfs)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
- toolz (from http://github.com/pytoolz/toolz/)
========================================================================
CC-BY-SA-3.0
CC0-1.0
========================================================================
The following software have components provided under the terms of this license:
- dask (from http://github.com/dask/dask/)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- numpy (from http://www.numpy.org)
========================================================================
CNRI-Python
DOC
========================================================================
The following software have components provided under the terms of this license:
- isodate (from http://cheeseshop.python.org/pypi/isodate)
- ply (from http://www.dabeaz.com/ply/)
- dask (from http://github.com/dask/dask/)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- numpy (from http://www.numpy.org)
========================================================================
GPL-2.0-only
========================================================================
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- grpcio (from http://www.grpc.io)
========================================================================
......@@ -156,22 +146,28 @@ GPL-2.0-or-later
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- pyparsing (from http://pyparsing.wikispaces.com/)
========================================================================
GPL-3.0-only
========================================================================
The following software have components provided under the terms of this license:
- coverage (from https://coverage.readthedocs.io)
- grpcio (from http://www.grpc.io)
========================================================================
GPL-3.0-or-later
========================================================================
The following software have components provided under the terms of this license:
- pyparsing (from http://pyparsing.wikispaces.com/)
- rfc3986 (from https://rfc3986.readthedocs.org)
========================================================================
ISC
========================================================================
The following software have components provided under the terms of this license:
- click (from http://github.com/mitsuhiko/click)
- grpcio (from http://www.grpc.io)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib)
......@@ -196,21 +192,6 @@ The following software have components provided under the terms of this license:
- chardet (from https://github.com/chardet/chardet)
========================================================================
LGPL-2.1-or-later
========================================================================
The following software have components provided under the terms of this license:
- chardet (from https://github.com/chardet/chardet)
========================================================================
LGPL-3.0-only
========================================================================
The following software have components provided under the terms of this license:
- chardet (from https://github.com/chardet/chardet)
- pycparser (from https://github.com/eliben/pycparser)
========================================================================
MIT
========================================================================
......@@ -237,6 +218,7 @@ The following software have components provided under the terms of this license:
- cachetools (from https://github.com/tkem/cachetools)
- cffi (from http://cffi.readthedocs.org)
- coverage (from https://coverage.readthedocs.io)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- fastapi (from https://github.com/tiangolo/fastapi)
- grpcio (from http://www.grpc.io)
- h11 (from https://github.com/python-hyper/h11)
......@@ -244,7 +226,6 @@ The following software have components provided under the terms of this license:
- jmespath (from https://github.com/jmespath/jmespath.py)
- jsonschema (from http://github.com/Julian/jsonschema)
- msal (from https://github.com/AzureAD/microsoft-authentication-library-for-python)
- msal-extensions (from https://pypi.org/project/msal-extensions/0.1.3/)
- msrest (from https://github.com/Azure/msrest-for-python)
- munch (from http://github.com/Infinidat/munch)
- numpy (from http://www.numpy.org)
......@@ -262,7 +243,6 @@ The following software have components provided under the terms of this license:
- python-rapidjson (from https://github.com/python-rapidjson/python-rapidjson)
- python-ulid (from https://github.com/mdomke/python-ulid)
- pytz (from http://pythonhosted.org/pytz)
- requests-oauthlib (from https://github.com/requests/requests-oauthlib)
- six (from http://pypi.python.org/pypi/six/)
- sniffio (from https://github.com/python-trio/sniffio)
- structlog (from http://www.structlog.org/)
......@@ -272,21 +252,21 @@ The following software have components provided under the terms of this license:
- zipp (from https://github.com/jaraco/zipp)
========================================================================
MPL-2.0
MIT-CMU
========================================================================
The following software have components provided under the terms of this license:
- certifi (from http://certifi.io/)
- pyparsing (from http://pyparsing.wikispaces.com/)
========================================================================
NCSA
MPL-2.0
========================================================================
The following software have components provided under the terms of this license:
- numpy (from http://www.numpy.org)
- certifi (from http://certifi.io/)
========================================================================
OPL-1.0
NCSA
========================================================================
The following software have components provided under the terms of this license:
......@@ -305,16 +285,9 @@ Python-2.0
The following software have components provided under the terms of this license:
- async-timeout (from https://github.com/aio-libs/async_timeout/)
- distributed (from https://distributed.readthedocs.io/en/latest/)
- google-auth (from https://github.com/GoogleCloudPlatform/google-auth-library-python)
- google-auth-oauthlib (from https://github.com/GoogleCloudPlatform/google-auth-library-python-oauthlib)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- portalocker (from https://github.com/WoLpH/portalocker)
- python-dateutil (from https://dateutil.readthedocs.org)
- pytz (from http://pythonhosted.org/pytz)
- rsa (from https://stuvel.eu/rsa)
- sniffio (from https://github.com/python-trio/sniffio)
- typing-extensions (from https://github.com/python/typing)
- urllib3 (from https://urllib3.readthedocs.io/)
......@@ -339,20 +312,12 @@ The following software have components provided under the terms of this license:
- jsonpath-ng (from https://github.com/h2non/jsonpath-ng)
========================================================================
ZPL-2.1
========================================================================
The following software have components provided under the terms of this license:
- pytz (from http://pythonhosted.org/pytz)
========================================================================
Zlib
========================================================================
The following software have components provided under the terms of this license:
- grpcio (from http://www.grpc.io)
- numpy (from http://www.numpy.org)
========================================================================
public-domain
......@@ -361,9 +326,7 @@ The following software have components provided under the terms of this license:
- botocore (from https://github.com/boto/botocore)
- grpcio (from http://www.grpc.io)
- numpy (from http://www.numpy.org)
- pandas (from http://pandas.pydata.org)
- py (from http://pylib.readthedocs.org/)
- pytz (from http://pythonhosted.org/pytz)
......@@ -13,7 +13,6 @@
# limitations under the License.
import asyncio
import base64
import hashlib
import json
import time
......@@ -73,6 +72,7 @@ class DefaultWorkerPlugin(WorkerPlugin):
exc = self.worker.exceptions[key]
getLogger().exception("Task '%s' has failed with exception: %s" % (key, str(exc)))
class DaskBulkStorage:
client = None
""" Dask client """
......@@ -124,23 +124,21 @@ class DaskBulkStorage:
await DaskBulkStorage.client.close() # or shutdown
DaskBulkStorage.client = None
@staticmethod
def _encode_record_id(record_id: str) -> str:
record_id_b64 = base64.b64encode(record_id.encode()).decode()
return record_id_b64.rstrip('=') # remove padding chars ('=')
def _encode_record_id(self, record_id: str) -> str:
return hashlib.sha1(record_id.encode()).hexdigest()
def _get_base_directory(self, protocol=True):
return f'{self.protocol}://{self.base_directory}' if protocol else self.base_directory
def _get_blob_path(self, record_id: str, bulk_id: str, with_protocol=True) -> str:
"""Return the bulk path from the bulk_id."""
record_id_b64 = self._encode_record_id(record_id)
return f'{self._get_base_directory(with_protocol)}/{record_id_b64}/bulk/{bulk_id}/data'
encoded_id = self._encode_record_id(record_id)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/bulk/{bulk_id}/data'
def _build_path_from_session(self, session: Session, with_protocol=True) -> str:
"""Return the session path."""
record_id_b64 = self._encode_record_id(session.recordId)
return f'{self._get_base_directory(with_protocol)}/{record_id_b64}/session/{session.id}/data'
encoded_id = self._encode_record_id(session.recordId)
return f'{self._get_base_directory(with_protocol)}/{encoded_id}/session/{session.id}/data'
def _load(self, path, **kwargs) -> dd.DataFrame:
"""Read a Parquet file into a Dask DataFrame
......@@ -230,7 +228,7 @@ class DaskBulkStorage:
if isinstance(pdf.index, pd.DatetimeIndex):
first_idx, last_idx = pdf.index[0].value, pdf.index[-1].value
idx_range = f'{first_idx}_{last_idx}'
shape = hashlib.sha256('_'.join(map(str, pdf)).encode()).hexdigest()
shape = hashlib.sha1('_'.join(map(str, pdf)).encode()).hexdigest()
t = round(time.time() * 1000)
filename = f'{idx_range}_{t}.{shape}'
......
......@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from asyncio import iscoroutinefunction, gather
import uuid
from fastapi import FastAPI, HTTPException, status
from osdu.core.api.storage.tenant import Tenant
from asyncio import gather, iscoroutinefunction
from app.model import model_utils
from fastapi import FastAPI, HTTPException, status
from odes_storage.models import *
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.exceptions import ResourceNotFoundException
from app.model import model_utils
from osdu.core.api.storage.tenant import Tenant
from ulid import ULID
async def no_check_appkey_token(appkey, token):
......@@ -60,12 +60,33 @@ class StorageRecordServiceBlobStorage:
self._container: str = container
self._auth_check = auth_check_coro
def _build_record_path(self, id: str, data_partition: str):
return f'{data_partition or "global"}_r_{id.replace(":", "_")}'
@staticmethod
def _get_record_folder(id: str, data_partition: str):
encoded_id = hash(id)
folder = f'{data_partition or "global"}_r_{encoded_id}'
return folder
async def _get_all_version_object(self, id: str, data_partition: str):
folder = self._get_record_folder(id, data_partition)
tenant = Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition)
return sorted(await self._storage.list_objects(tenant=tenant, prefix=folder))
async def _build_record_path(self, id: str, data_partition: str, version=None):
folder = self._get_record_folder(id, data_partition)
if version:
return f'{folder}/{version}'
objects = await self._get_all_version_object(id, data_partition)
return objects[-1] if objects else None
async def _check_auth(self, appkey=None, token=None):
await self._auth_check(appkey, token)
@staticmethod
def _get_new_id_for_record(record: Record):
kind = record.kind.split(':')
return ':'.join((kind[0], kind[2], uuid.uuid4().hex))
async def create_or_update_records(self,
record: List[Record] = None,
data_partition_id: str = None,
......@@ -77,12 +98,12 @@ class StorageRecordServiceBlobStorage:
# insert id if new record
for rec in record_list:
if rec.id is None:
rec.id = str(uuid.uuid4())
rec.id = self._get_new_id_for_record(rec)# str(uuid.uuid4())
rec.version = int(ULID()) # generate new version -> ulid is sorted that helps us to know the latest version
await gather(*[
self._storage.upload(
Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition_id),
self._build_record_path(record.id, data_partition_id),
await self._build_record_path(record.id, data_partition_id, version=rec.version),
model_utils.record_to_json(record),
content_type='application/json')
for record in record_list
......@@ -93,14 +114,17 @@ class StorageRecordServiceBlobStorage:
recordIds=[record.id for record in record_list],
skipped_record_ids=[])
async def get_record(self,
async def get_record_version(self,
id: str,
version: int,
data_partition_id: str = None,
appkey: str = None,
token: str = None) -> Record:
await self._check_auth(appkey, token)
object_name = self._build_record_path(id, data_partition_id)
try:
object_name = await self._build_record_path(id, data_partition_id, version=version)
if object_name is None:
raise ResourceNotFoundException("Item not found")
bin_data = await self._storage.download(
Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition_id),
object_name)
......@@ -114,17 +138,18 @@ class StorageRecordServiceBlobStorage:
appkey: str = None,
token: str = None) -> RecordVersions:
# only one version /latest is supported
return RecordVersions(recordId=id, versions=[0])
objects = await self._get_all_version_object(id, data_partition_id)
versions = [o.split('/')[-1] for o in objects]
return RecordVersions(recordId=id, versions=versions)
async def get_record_version(self,
id: str,
version: int,
data_partition_id: str = None,
attribute: List[str] = None,
appkey: str = None,
token: str = None) -> Record:
# always return the latest
return await self.get_record(id, data_partition_id, appkey, token)
async def get_record(self,
id: str,
data_partition_id: str = None,
attribute: List[str] = None,
appkey: str = None,
token: str = None) -> Record:
# return the latest
return await self.get_record_version(id, None, data_partition_id, appkey, token)
async def delete_record(self,
id: str,
......@@ -132,13 +157,13 @@ class StorageRecordServiceBlobStorage:
appkey: str = None,
token: str = None) -> None:
await self._check_auth(appkey, token)
object_name = self._build_record_path(id, data_partition_id)
try:
await self._storage.delete(
Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition_id),
object_name)
except FileNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Item not found")
for object_name in await self._get_all_version_object(id, data_partition_id):
try:
await self._storage.delete(
Tenant(project_id=self._project, bucket_name=self._container, data_partition_id=data_partition_id),
object_name)
except FileNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Item not found")
async def get_schema(self, kind, data_partition_id=None, appkey=None, token=None, *args, **kwargs):
raise NotImplementedError('StorageServiceBlobStorage.get_schema')
......@@ -27,9 +27,8 @@ cloudpickle==1.6.0
colorama==0.4.4
coverage==5.5
cryptography==3.4.7
dask==2021.4.1
dask[distributed]==2021.6.2
decorator==5.0.9
distributed==2021.4.1
fastapi==0.65.1
fsspec==2021.6.0
gcsfs==2021.6.0
......
......@@ -7,6 +7,7 @@ httpx
numpy
pandas
pyarrow
python-ulid
# Note since 3.8 includes Mock 4.0+.
mock>=4.0
......
......@@ -414,7 +414,7 @@ def test_post_records_successful(client, base_url, record_obj):
with mock.patch.object(StorageRecordServiceClientMock, 'create_or_update_records', moc_create_or_update_records):
# when
response = client.post(base_url, data=json.dumps(record_dict_list))
response = client.post(base_url, data=json.dumps(record_dict_list), headers={'content-type': 'application/json'})
# then
assert response.status_code == status.HTTP_200_OK
......
......@@ -119,7 +119,7 @@ def client_with_log(client):
response = client.get(f"/ddms/v2/logs/{log_id}/versions", headers=headers)
assert response.status_code == 200, "GET log data failed"
version_id = response.json()["versions"][0]
version_id = response.json()["versions"][1]
yield client, log_id, version_id
......
......@@ -131,7 +131,7 @@ def test_post_records_successful(client):
test_Wellbores = json.load(f)
Wellbore.parse_obj(test_Wellbores[0])
# when
response = client.post(base_url, data=json.dumps(test_Wellbores))
response = client.post(base_url, data=json.dumps(test_Wellbores), headers={'content-type' : 'application/json'})
# then
assert response.status_code == status.HTTP_200_OK
......
......@@ -124,7 +124,7 @@ def client_with_log(client):
response = client.get(f"/ddms/v2/trajectories/{trajectory_id}/versions", headers=headers)
assert response.status_code == 200, "GET log data failed"
version_id = response.json()["versions"][0]
version_id = response.json()["versions"][1]
yield client, log_id, version_id
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment