Commit 116ec449 authored by Anuj Gupta's avatar Anuj Gupta
Browse files

Merge branch 'ibm-deploy-lib-test' into 'master'

Ibm deploy lib test

See merge request !10
parents 4af62f08 0bb5941c
Pipeline #73473 passed with stage
in 32 seconds
__version__ = '1.2.0'
__version__ = '1.3.0'
# TODO: this needs to be reviewed
# MUSE NOT BE CHANGED MANUALLY
......
import enum
import io
import mimetypes
......@@ -19,7 +18,6 @@ from botocore.client import Config
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
try:
import ujson as json
except ImportError:
......@@ -28,6 +26,8 @@ except ImportError:
API_ROOT = 'https://www.googleapis.com/storage/v1/b'
API_ROOT_UPLOAD = 'https://www.googleapis.com/upload/storage/v1/b'
MAX_CONTENT_LENGTH_SIMPLE_UPLOAD = 5 * 1024 * 1024 # 5 MB
# log = logging.getLogger(__name__)
......@@ -71,9 +71,10 @@ class IBMObjectStorage(BlobStorageBase):
return f'gs://{bucket}/{object_name}'
# TODO: if `metadata` is set, use multipart upload:
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *, overwrite: bool = True, if_match=None,
if_not_match=None,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs) -> dict:
timeout: int = 30, **kwargs) -> Blob:
project = tenant.project_id
bucket = tenant.bucket_name
url = f'{API_ROOT_UPLOAD}/{bucket}/o'
......@@ -90,18 +91,34 @@ class IBMObjectStorage(BlobStorageBase):
'Content-Type': content_type or '',
})
if not overwrite: # Upon google documentation
parameters['ifGenerationMatch'] = '0'
assert not (if_match and if_not_match), "if_match and if_not_match cannot be set simultaneous"
# Use 'generation' instead of Etag and 'If-Match' and 'If-None-Match' header, this is transparent for the caller
if if_match:
parameters['ifGenerationMatch'] = str(if_match)
if if_not_match:
parameters['ifGenerationNotMatch'] = str(if_not_match)
force_resumable_upload: bool = None
upload_type = self._decide_upload_type(force_resumable_upload, content_length)
if upload_type == UploadType.SIMPLE:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
return await self._upload_simple(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, timeout=timeout)
upload_response = await self._upload_simple(project, bucket, url, object_name, stream, parameters, headers,
metadata,
auth=auth, timeout=timeout)
print("upload response", upload_response)
return self.dict_to_blob(upload_response, object_name, bucket, auth, parameters, headers)
if upload_type == UploadType.RESUMABLE:
return await self._upload_resumable(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, timeout=timeout)
upload_response = self._upload_resumable(project, bucket, url, object_name, stream, parameters, headers,
metadata,
auth=auth, timeout=timeout)
print("upload response", upload_response)
return self.dict_to_blob(upload_response, object_name, bucket, auth, parameters, headers)
raise TypeError(f'upload type {upload_type} not supported')
......@@ -162,10 +179,9 @@ class IBMObjectStorage(BlobStorageBase):
return content_type, encoding
async def _upload_simple(self, project: str, bucket: str, url: str, object_name: str,
stream: io.IOBase, params: dict, headers: dict,
*, auth: Optional = None, timeout: int = 30) -> dict:
async def _upload_simple(self, project: str, bucket: str, url: str, object_name: str, stream: io.IOBase,
params: dict, headers: dict, metadata: dict, *, auth: Optional = None,
timeout: int = 30) -> dict:
params['name'] = object_name
params['uploadType'] = 'media'
......@@ -175,8 +191,10 @@ class IBMObjectStorage(BlobStorageBase):
if self._s3_client is None:
self._s3_client = await self._factory.get_s3_client(self._factory)
print("got s3 client - ", self._s3_client.get_object)
try:
self._s3_client.create_bucket(Bucket=bucket,)
self._s3_client.create_bucket(Bucket=bucket, )
except ClientError as err:
print(err)
......@@ -184,12 +202,13 @@ class IBMObjectStorage(BlobStorageBase):
response = self._s3_client.upload_fileobj(stream, bucket, object_name)
# with open(stream.name, "rb") as f:
# response = self._s3_client.upload_fileobj(f, bucket, object_name)
return response
metadata = self._s3_client.head_object(Bucket=bucket,
Key=object_name)
return metadata
async def _upload_resumable(self, project: str, bucket: str, url: str, object_name: str,
stream: io.IOBase, params: dict, headers: dict,
*, auth: Optional = None, timeout: int = 30) -> dict:
stream: io.IOBase, params: dict, headers: dict, metadata: dict,
*, auth: Optional = None, timeout: int = 30) -> dict:
params['name'] = object_name
params['uploadType'] = 'media'
......@@ -209,10 +228,11 @@ class IBMObjectStorage(BlobStorageBase):
multipart_chunksize=1024 * 20,
use_threads=True)
response = self._s3_client.upload_file(Filename=stream.name, Bucket=bucket,
Key=object_name, Config=config,
Callback=ProgressPercentage(stream.name))
return response
Key=object_name, Config=config,
Callback=ProgressPercentage(stream.name))
metadata = self._s3_client.head_object(Bucket=self._bucket_name,
Key=object_name)
return metadata
async def delete(self, tenant: Tenant, object_name: str,
*, auth: Optional = None, timeout: int = 10, params: dict = None, **kwargs):
......@@ -247,14 +267,14 @@ class IBMObjectStorage(BlobStorageBase):
s3_response_object = self._s3_client.get_object(
Bucket=bucket,
Key=encoded_object_name,
Key=object_name,
)
data = s3_response_object['Body'].read()
return data
async def download_metadata(self, tenant: Tenant, object_name: str,
*, auth: Optional = None, timeout: int = 10, ** kwargs) -> Blob:
*, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
bucket = tenant.bucket_name
if self._s3_client is None:
......@@ -306,3 +326,18 @@ class IBMObjectStorage(BlobStorageBase):
project: str, bucket: str,
forwarded_auth: Optional = None) -> str:
return "dummy token"
def dict_to_blob(self, upload_resp: dict, object_name: str, bucket: str, auth: Any = None, params: dict = None,
headers: dict = None) -> Blob:
# using 'generation' instead of ETag since 'If-Match' and 'If-None-Match' doesn't seems to work as documented
# here https://cloud.google.com/storage/docs/json_api/v1/parameters#ifmatch
return Blob(identifier=object_name,
bucket=bucket,
name=params.get('object_name', None),
metadata=upload_resp,
acl=auth,
content_type=headers.get('contentType', None),
time_created=upload_resp.get('Date', None),
time_updated=upload_resp.get('updated', None),
size=upload_resp.get('Content-Length', -1),
etag=upload_resp.get('Etag', None))
from typing import Optional
import os
from typing import Optional, List, Any
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .blob_storage_factory_ibm import IBMBlobStorageFactory
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage = IBMBlobStorageFactory()
base_directory = f'{tenant.bucket_name}/{directory}' if directory else tenant.bucket_name
storage_options = {'anon': False, 'use_ssl': True, 'key': storage._access_key, 'secret': storage._secret_key, 'client_kwargs': {"endpoint_url": "https://minio-osdu-minio.osdu-qa-16x64-ba8e38d4c011d627379af1a4280c4e35-0000.sjc03.containers.appdomain.cloud"}, 'config_kwargs': {"region_name":storage._region}}
return DaskStorageParameters(protocol='https',
base_directory=base_directory,
storage_options=storage_options)
class DaskStorageParametersFactoryIBM:
_endpointURL = None
_access_key = None
_secret_key = None
_region = None
_bucket_name_prefix = None
_signature_version = 's3v4'
def __init__(self, session=None, service_account_file: Optional[str] = None):
self._endpointURL = os.getenv("COS_URL", "NA")
self._access_key = os.getenv("COS_ACCESS_KEY", "NA")
self._secret_key = os.getenv("COS_SECRET_KEY", "NA")
self._region = os.getenv("COS_REGION", "us-east-1")
self._bucket_name_prefix = os.getenv("BUCKET_PREFIX", "dev")
async def get_dask_storage_parameters(self, tenant: Tenant,
directory: Optional[str] = None) -> DaskStorageParameters:
access_key = self._access_key
print("minio access key", access_key)
secret_key = self._secret_key
print("minio secret key", secret_key)
endpointurl = self._endpointURL
signature_version = self._signature_version
print("minio endpoint url ", endpointurl)
base_directory = f'{tenant.bucket_name}/{directory}' if directory else tenant.bucket_name
print("minio base_directory", base_directory)
storage_options = {"anon": False, "use_ssl": True, "key": access_key, "secret": secret_key, "client_kwargs": {"endpoint_url": endpointurl}, "config_kwargs": {"s3": {'signature_version': signature_version}}}
return DaskStorageParameters(protocol='s3',
base_directory=base_directory,
storage_options=storage_options)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment