Commit e7095eea authored by Yunhua Koglin's avatar Yunhua Koglin
Browse files

add dask storage support lib for wellbore ddms

parent 93875ddc
Pipeline #52680 passed with stage
in 28 seconds
......@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = '0.0.1'
__version__ = '0.0.2'
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .storage_aws import AwsStorage
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage = AwsStorage(session=None,service_account_file=directory)
await storage.getCredentials(tenant)
base_directory = storage._bucket_name+"/"+tenant.bucket_name
storage_options={'anon': False, 'use_ssl': True, 'key':storage._access_key, 'secret':storage._secret_key, 'token':storage._session_token}
return DaskStorageParameters(protocol='s3',
base_directory=base_directory,
storage_options=storage_options)
\ No newline at end of file
......@@ -20,7 +20,8 @@ from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from osdu.core.api.storage.tenant import Tenant
import boto3
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError, WaiterError
try:
import ujson as json
except ImportError:
......@@ -33,7 +34,6 @@ def set_default(obj):
return list(obj)
raise TypeError
class AwsStorage(BlobStorageBase):
def __init__(self, session=None, service_account_file: Optional[str] = None):
self._session = session
......@@ -47,6 +47,7 @@ class AwsStorage(BlobStorageBase):
self._secret_key = ""
self._session_token = ""
self._bucket_name =""
self._timestamp =datetime.now(timezone.utc)
def getPolicy(self, bucket: str, keypath:str):
""" policy for access keypath folder in bucket """
......@@ -123,7 +124,8 @@ class AwsStorage(BlobStorageBase):
async def getCredentials(self, tenant: Tenant):
if len(self._session_token) > 2 and datetime.now(timezone.utc) < self._timestamp:
return
""" get credentials to access s3 bucket for tenant's folder """
# all tenants in one bucket(self._bucket_name). Each tenant.bucket_name is in fact, a folder in self._bucket_name
folder = tenant.bucket_name
......@@ -153,13 +155,18 @@ class AwsStorage(BlobStorageBase):
self._access_key = credentials['AccessKeyId']
self._secret_key = credentials['SecretAccessKey']
self._session_token = credentials['SessionToken']
timeexpire = credentials['Expiration']
self._timestamp = timeexpire - timedelta(seconds = 30) # expire 30 sec before it should
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
overwrite: bool = True,
if_match=None,
if_not_match=None,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs):
bucket = tenant.bucket_name
timeout: int = 30, **kwargs) -> Blob:
await self.getCredentials(tenant)
bucket = tenant.bucket_name
# bucket here is in fact, a folder in the seismic ddms bucket. Same for other operations (download, delete...)
object_name = f"{bucket}/{object_name}"
s3_client = boto3.client(
's3',
......@@ -168,6 +175,8 @@ class AwsStorage(BlobStorageBase):
aws_secret_access_key=self._secret_key,
aws_session_token=self._session_token
)
stream_data = self._preprocess_data(file_data)
extra: dict = {}
if(content_type != None):
extra['ContentType'] = content_type
......@@ -175,19 +184,38 @@ class AwsStorage(BlobStorageBase):
extra['Metadata'] = metadata
if(auth!= None):
extra['ACL'] = auth
if if_match or if_not_match:
extra['etag'] = if_match or if_not_match
if len(extra)==0:
response = s3_client.upload_fileobj(file_data, self._bucket_name,object_name)
response = s3_client.upload_fileobj(stream_data, self._bucket_name,object_name)
else:
response = s3_client.upload_fileobj(file_data, self._bucket_name, object_name,ExtraArgs=extra)
return response
response = s3_client.upload_fileobj(stream_data, self._bucket_name, object_name,ExtraArgs=extra)
metadata = s3_client.head_object(
Bucket=self._bucket_name,
Key=object_name
)
return Blob(identifier=object_name,
bucket=tenant.bucket_name,
name=metadata.get('name', object_name),
metadata=metadata,
acl=metadata.get('acl', auth),
content_type=metadata.get('ContentType', content_type),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('LastModified', None),
size=metadata.get('ContentLength', -1),
etag=metadata.get('Etag', None)
)
async def delete(self, tenant: Tenant, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs):
encoded_object_name = quote(object_name, safe='')
bucket = tenant.bucket_name
await self.getCredentials(tenant)
await self.getCredentials(tenant)
bucket = tenant.bucket_name
object_name = f"{bucket}/{object_name}"
s3_client = boto3.client(
's3',
region_name=self._region,
......@@ -197,7 +225,7 @@ class AwsStorage(BlobStorageBase):
)
response = s3_client.delete_object(
Bucket=self._bucket_name,
Key=encoded_object_name,
Key=object_name
)
return response
......@@ -214,7 +242,7 @@ class AwsStorage(BlobStorageBase):
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
aws_session_token=self._session_token
)
)
obj = s3_client.get_object(Bucket=self._bucket_name, Key=object_name)
return obj['Body'].read()
......@@ -235,9 +263,9 @@ class AwsStorage(BlobStorageBase):
metadata = s3_client.head_object(
Bucket=self._bucket_name,
Key=object_name,
Key=object_name
)
return Blob(identifier=metadata.get('ETag', None),
return Blob(identifier=object_name,
bucket=bucket,
name=metadata.get('name', object_name),
metadata=metadata,
......@@ -245,7 +273,8 @@ class AwsStorage(BlobStorageBase):
content_type=metadata.get('ContentType', None),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('LastModified', None),
size=metadata.get('ContentLength', -1)
size=metadata.get('ContentLength', -1),
etag=metadata.get('Etag', None)
)
async def list_objects(self, tenant: Tenant, *args,
......@@ -253,14 +282,17 @@ class AwsStorage(BlobStorageBase):
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
await self.getCredentials(tenant)
bucket = tenant.bucket_name
prefix_name = f"{bucket}/"
kwargs = {'Bucket': self._bucket_name}
if prefix is not None:
kwargs['Prefix'] = prefix
prefix_name = f"{bucket}/{prefix}"
if max_result is not None:
kwargs['MaxKeys'] = max_result
kwargs['Prefix'] = prefix_name
s3_client = boto3.client(
's3',
region_name=self._region,
......@@ -278,3 +310,19 @@ class AwsStorage(BlobStorageBase):
objects_list.append(key['Key'])
return objects_list
@staticmethod
def _preprocess_data(data: Any) -> io.IOBase:
if data is None:
return io.StringIO('')
if isinstance(data, bytes):
return io.BytesIO(data)
if isinstance(data, str):
return io.BytesIO(data.encode('utf-8'))
if isinstance(data, io.IOBase):
return data
raise TypeError(f'unsupported upload type: "{type(data)}"')
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python>=0.4.0, <0.5
osdu-core-lib-python~=1.1.0
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment