Commit df305523 authored by Yunhua Koglin's avatar Yunhua Koglin
Browse files

Merge branch 'daskpara' into 'master'

add dask storage support lib for wellbore ddms

See merge request !2
parents 93875ddc e7095eea
Pipeline #61623 passed with stage
in 1 minute and 17 seconds
......@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = '0.0.1'
__version__ = '0.0.2'
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .storage_aws import AwsStorage
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage = AwsStorage(session=None,service_account_file=directory)
await storage.getCredentials(tenant)
base_directory = storage._bucket_name+"/"+tenant.bucket_name
storage_options={'anon': False, 'use_ssl': True, 'key':storage._access_key, 'secret':storage._secret_key, 'token':storage._session_token}
return DaskStorageParameters(protocol='s3',
base_directory=base_directory,
storage_options=storage_options)
\ No newline at end of file
......@@ -20,7 +20,8 @@ from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from osdu.core.api.storage.tenant import Tenant
import boto3
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError, WaiterError
try:
import ujson as json
except ImportError:
......@@ -33,7 +34,6 @@ def set_default(obj):
return list(obj)
raise TypeError
class AwsStorage(BlobStorageBase):
def __init__(self, session=None, service_account_file: Optional[str] = None):
self._session = session
......@@ -47,6 +47,7 @@ class AwsStorage(BlobStorageBase):
self._secret_key = ""
self._session_token = ""
self._bucket_name =""
self._timestamp =datetime.now(timezone.utc)
def getPolicy(self, bucket: str, keypath:str):
""" policy for access keypath folder in bucket """
......@@ -123,7 +124,8 @@ class AwsStorage(BlobStorageBase):
async def getCredentials(self, tenant: Tenant):
if len(self._session_token) > 2 and datetime.now(timezone.utc) < self._timestamp:
return
""" get credentials to access s3 bucket for tenant's folder """
# all tenants in one bucket(self._bucket_name). Each tenant.bucket_name is in fact, a folder in self._bucket_name
folder = tenant.bucket_name
......@@ -153,13 +155,18 @@ class AwsStorage(BlobStorageBase):
self._access_key = credentials['AccessKeyId']
self._secret_key = credentials['SecretAccessKey']
self._session_token = credentials['SessionToken']
timeexpire = credentials['Expiration']
self._timestamp = timeexpire - timedelta(seconds = 30) # expire 30 sec before it should
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
overwrite: bool = True,
if_match=None,
if_not_match=None,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs):
bucket = tenant.bucket_name
timeout: int = 30, **kwargs) -> Blob:
await self.getCredentials(tenant)
bucket = tenant.bucket_name
# bucket here is in fact, a folder in the seismic ddms bucket. Same for other operations (download, delete...)
object_name = f"{bucket}/{object_name}"
s3_client = boto3.client(
's3',
......@@ -168,6 +175,8 @@ class AwsStorage(BlobStorageBase):
aws_secret_access_key=self._secret_key,
aws_session_token=self._session_token
)
stream_data = self._preprocess_data(file_data)
extra: dict = {}
if(content_type != None):
extra['ContentType'] = content_type
......@@ -175,19 +184,38 @@ class AwsStorage(BlobStorageBase):
extra['Metadata'] = metadata
if(auth!= None):
extra['ACL'] = auth
if if_match or if_not_match:
extra['etag'] = if_match or if_not_match
if len(extra)==0:
response = s3_client.upload_fileobj(file_data, self._bucket_name,object_name)
response = s3_client.upload_fileobj(stream_data, self._bucket_name,object_name)
else:
response = s3_client.upload_fileobj(file_data, self._bucket_name, object_name,ExtraArgs=extra)
return response
response = s3_client.upload_fileobj(stream_data, self._bucket_name, object_name,ExtraArgs=extra)
metadata = s3_client.head_object(
Bucket=self._bucket_name,
Key=object_name
)
return Blob(identifier=object_name,
bucket=tenant.bucket_name,
name=metadata.get('name', object_name),
metadata=metadata,
acl=metadata.get('acl', auth),
content_type=metadata.get('ContentType', content_type),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('LastModified', None),
size=metadata.get('ContentLength', -1),
etag=metadata.get('Etag', None)
)
async def delete(self, tenant: Tenant, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs):
encoded_object_name = quote(object_name, safe='')
bucket = tenant.bucket_name
await self.getCredentials(tenant)
await self.getCredentials(tenant)
bucket = tenant.bucket_name
object_name = f"{bucket}/{object_name}"
s3_client = boto3.client(
's3',
region_name=self._region,
......@@ -197,7 +225,7 @@ class AwsStorage(BlobStorageBase):
)
response = s3_client.delete_object(
Bucket=self._bucket_name,
Key=encoded_object_name,
Key=object_name
)
return response
......@@ -214,7 +242,7 @@ class AwsStorage(BlobStorageBase):
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
aws_session_token=self._session_token
)
)
obj = s3_client.get_object(Bucket=self._bucket_name, Key=object_name)
return obj['Body'].read()
......@@ -235,9 +263,9 @@ class AwsStorage(BlobStorageBase):
metadata = s3_client.head_object(
Bucket=self._bucket_name,
Key=object_name,
Key=object_name
)
return Blob(identifier=metadata.get('ETag', None),
return Blob(identifier=object_name,
bucket=bucket,
name=metadata.get('name', object_name),
metadata=metadata,
......@@ -245,7 +273,8 @@ class AwsStorage(BlobStorageBase):
content_type=metadata.get('ContentType', None),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('LastModified', None),
size=metadata.get('ContentLength', -1)
size=metadata.get('ContentLength', -1),
etag=metadata.get('Etag', None)
)
async def list_objects(self, tenant: Tenant, *args,
......@@ -253,14 +282,17 @@ class AwsStorage(BlobStorageBase):
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
await self.getCredentials(tenant)
bucket = tenant.bucket_name
prefix_name = f"{bucket}/"
kwargs = {'Bucket': self._bucket_name}
if prefix is not None:
kwargs['Prefix'] = prefix
prefix_name = f"{bucket}/{prefix}"
if max_result is not None:
kwargs['MaxKeys'] = max_result
kwargs['Prefix'] = prefix_name
s3_client = boto3.client(
's3',
region_name=self._region,
......@@ -278,3 +310,19 @@ class AwsStorage(BlobStorageBase):
objects_list.append(key['Key'])
return objects_list
@staticmethod
def _preprocess_data(data: Any) -> io.IOBase:
if data is None:
return io.StringIO('')
if isinstance(data, bytes):
return io.BytesIO(data)
if isinstance(data, str):
return io.BytesIO(data.encode('utf-8'))
if isinstance(data, io.IOBase):
return data
raise TypeError(f'unsupported upload type: "{type(data)}"')
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python>=0.4.0, <0.5
osdu-core-lib-python~=1.1.0
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment