From 30cc8fd40fd827135c47bd8964b27ac036d12d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B8rgen=20Lind?= Date: Fri, 5 Nov 2021 10:33:39 +0100 Subject: [PATCH 1/2] fix: Reuse AwsStorage when implementing IbmStorage --- src/CMakeLists.txt | 4 + src/src/lib/cloud/providers/aws/AwsStorage.cc | 9 +- src/src/lib/cloud/providers/aws/AwsStorage.h | 2 + src/src/lib/cloud/providers/ibm/IbmStorage.cc | 692 +----------------- src/src/lib/cloud/providers/ibm/IbmStorage.h | 246 +------ 5 files changed, 23 insertions(+), 930 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 87e279f..e46186c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -234,6 +234,10 @@ if (AWS_PROVIDER_ENABLED) endif() if (IBM_PROVIDER_ENABLED) file(GLOB SRC_LIB_PROVIDERS_IBM ${sdapi_SOURCE_DIR}/src/lib/cloud/providers/ibm/*.cc) + if (NOT AWS_PROVIDER_ENABLED) + file(GLOB SRC_LIB_PROVIDERS_IBM_EXTRA ${sdapi_SOURCE_DIR}/src/lib/cloud/providers/aws/*.cc) + list(APPEND SRC_LIB_PROVIDERS_IBM "${SRC_LIB_PROVIDERS_IBM_EXTRA}") + endif() endif() if (GCP_PROVIDER_ENABLED) file(GLOB SRC_LIB_PROVIDERS_GCP ${sdapi_SOURCE_DIR}/src/lib/cloud/providers/gcp/*.cc) diff --git a/src/src/lib/cloud/providers/aws/AwsStorage.cc b/src/src/lib/cloud/providers/aws/AwsStorage.cc index 0245dce..0724c96 100644 --- a/src/src/lib/cloud/providers/aws/AwsStorage.cc +++ b/src/src/lib/cloud/providers/aws/AwsStorage.cc @@ -162,7 +162,14 @@ namespace seismicdrive return nullptr; } Aws::Client::ClientConfiguration config; - config.region = { region.data(), region.size() }; + if (endpoint_override.empty()) + { + config.region = {region.data(), region.size()}; + } + else + { + config.endpointOverride = {endpoint_override.data(), endpoint_override.size()}; + } std::lock_guard guard(mutex); return std::make_shared(s3_credentials, config); diff --git a/src/src/lib/cloud/providers/aws/AwsStorage.h b/src/src/lib/cloud/providers/aws/AwsStorage.h index df98092..1399dd7 100644 --- a/src/src/lib/cloud/providers/aws/AwsStorage.h +++ b/src/src/lib/cloud/providers/aws/AwsStorage.h @@ -227,6 +227,8 @@ namespace seismicdrive void setAwsProgress(bool); void setAwsPerformance(bool); + protected: + std::string endpoint_override; private: friend class AwsStorageTest; diff --git a/src/src/lib/cloud/providers/ibm/IbmStorage.cc b/src/src/lib/cloud/providers/ibm/IbmStorage.cc index c0a085f..f508b45 100644 --- a/src/src/lib/cloud/providers/ibm/IbmStorage.cc +++ b/src/src/lib/cloud/providers/ibm/IbmStorage.cc @@ -1,696 +1,14 @@ -/* Licensed Materials - Property of IBM */ -/* (c) Copyright IBM Corp. 2020. All Rights Reserved.*/ - -#define BUILDING_DLL - -#include "auth/auth_provider.h" -#include "shared/config.h" -#include "shared/mex.h" - -#include "cloud/providers/ibm/IbmStorage.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include +#include "IbmStorage.h" namespace seismicdrive { - std::atomic IbmApiInitialize::mCount(0); - - IbmApiInitialize::IbmApiInitialize(Logger &log) - : log(log) - { - mOptions.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Info; - const size_t origCount = mCount++; - if (origCount == 0) - { - log(2) << "now InitAPI"; - Aws::InitAPI(mOptions); - } - } - - IbmApiInitialize::~IbmApiInitialize() - { - const size_t newCount = --mCount; - - if (newCount == 0) - { - log(2) << "now ShutdownAPI"; - Aws::ShutdownAPI(mOptions); - } - } - - namespace - { - void parse_storage_path(const std::string &cs_path, std::string &bucket, std::string &object, Logger &log) - { - bucket.clear(); - object.clear(); - - auto path = cs_path; - if (path.rfind(sdconfig::GCSPREFIX, 0)) - { - throw seismicdrive::SDException(sdmex::gcs::PrefixError(path)); - } - path = path.substr(sdconfig::GCSPREFIX.length()); - - auto pos = path.find("/"); //seismic DDMS uses bucketName/subproject_folder/... - bucket = path.substr(0, pos); - object = path.substr(pos + 1); - log(2) << "[sdapi] - bucket - new " + bucket; - log(2) << "[sdapi] - object - new " + object; - // auto pos = path.find("$$"); //seismic DDMS uses bucketName$$subproject_folder/... - // if (pos == std::string::npos) - // { - // bucket = path; - // } - // else - // { - // bucket = path.substr(0, pos); - // object = path.substr(pos + 2); - // } - // bucket = "bucket-name-in-s3"; - // object = "folder-name-within-bucket/object-id(chunkid)"; - } - } - - const char *const ALLOCATION_TAG = "S3_SSDMS_CPP_LIB"; - - IbmStorage::IbmStorage(AuthProvider *auth_provider, const std::string &sdResource, bool sdReadOnly) - : Storage(auth_provider, sdResource, sdReadOnly), - myInit(log), - show_progress(false), - show_performance(false) - { - const char *temp = getenv("AWS_REGION"); - if (temp == NULL) - region = "us-east-1"; - else - region = std::string(temp); - } - - IbmStorage::~IbmStorage() - { - } - - bool IbmStorage::getToken() - { - std::lock_guard guard(mutex); - - sasToken = authProvider->getStorageAuthToken(sdResource, sdReadOnly); - sasToken = sdutils::trimBearer(sasToken); - auto pos = sasToken.find(':'); - if (pos == std::string::npos) - { - return false; - } - auto accessKeyId = sasToken.substr(0, pos); - auto nextId = sasToken.substr(pos + 1); - pos = nextId.find(':'); - if (pos == std::string::npos) - { - return false; - } - auto secretKey = nextId.substr(0, pos); - auto sessionToken = nextId.substr(pos + 1); - - s3_credentials.SetAWSAccessKeyId({ accessKeyId.data(), accessKeyId.size() }); - s3_credentials.SetAWSSecretKey({ secretKey.data(), secretKey.size() }); - s3_credentials.SetSessionToken({ sessionToken.data(), sessionToken.size() }); - return true; - } - - void IbmStorage::setAwsProgress(bool show) - { - show_progress = show; - } - - void IbmStorage::setAwsPerformance(bool show) - { - show_performance = show; - } - - std::shared_ptr IbmStorage::getS3() + IbmStorage::IbmStorage(AuthProvider *auth_provider, const std::string &sdresource, bool readonly) + : AwsStorage(auth_provider, sdresource, readonly) { - if (!getToken()) - { - log(0) << "Error with token"; - return nullptr; - } - Aws::Client::ClientConfiguration config; - const char *temp = getenv("IBM_COS_URL"); if (temp == NULL) - ibm_cos_url = "set-it-in-env-pl"; - else - ibm_cos_url = std::string(temp); - - config.endpointOverride = {ibm_cos_url.data(), ibm_cos_url.size()}; - log(2) << "MINIO Endpoint " + config.endpointOverride; - // config.endpointOverride = "minio-endpoint-without-https"; - std::lock_guard guard(mutex); - return std::make_shared(s3_credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - } - - bool IbmStorage::objectExists(const std::string &cs_path, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); - Aws::S3::Model::HeadObjectRequest request; - request.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return false; - - auto response = s3_client->HeadObject(request); - if (response.IsSuccess()) - { - return true; - } - else - { - log(0) << "Error: HeadObject: " + response.GetError().GetMessage(); - return false; - } - } - - void IbmStorage::uploadObject(const std::string &cs_path, const void *data, size_t len, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Upload Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Dataset start upload. BucketName: " + bucketName + ": Key " + objectName; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - log(2) << "upload Object length " + std::to_string(len); - const std::shared_ptr input_data = Aws::MakeShared("PutObjectInputStream", std::stringstream::in | std::stringstream::out | std::stringstream::binary); - - input_data->write((const char *)(data), len); //not sure which one is more effecient - - auto transferHandle = transferManager->UploadFile(input_data, //(char *)data, - bucketName.c_str(), objectName.c_str(), "binary/octet-stream", - Aws::Map()); - - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an upload part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryUpload((char *)data, transferHandle); - transferHandle->WaitUntilFinished(); - } - - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) - { - log(2) << "Dataset uploading done."; - } - else - { - std::ostringstream ss; - ss << "Error dataset uploading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - } - } - - // from a file - void IbmStorage::uploadObject(const std::string &cs_path, const std::string &file_name, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Upload Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - - log(2) << "File start upload. BucketName: " + bucketName + ": Key " + objectName; - - auto t1 = std::chrono::high_resolution_clock::now(); - - auto buffer = Aws::MakeShared("PutObjectInputStream", file_name.c_str(), std::ios_base::in | std::ios_base::binary); - - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - auto transferHandle = transferManager->UploadFile(buffer, - bucketName.c_str(), objectName.c_str(), "multipart/form-data", - Aws::Map()); - - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an upload part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryUpload(file_name.c_str(), transferHandle); - transferHandle->WaitUntilFinished(); - } - - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) - { - log(2) << "File upload done"; - } - else - { - std::ostringstream ss; - ss << "File upload Error uploading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - } - - if (show_performance) - { - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = (std::chrono::duration_cast(t2 - t1).count()) / 1000000.0; - log(0) << "time(sec) for upload " + file_name + " : " + std::to_string(duration); - } - } - - /* @param data start of data buffer to receive object contents - * @param len the read length will be saved/returned in this parameter */ - void IbmStorage::downloadObject(const std::string &cs_path, void *data, size_t &len, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download"; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - auto fs = Aws::New(ALLOCATION_TAG, contents->rdbuf()); - return fs; - }; - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), createStreamFn); - - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - len = transferHandle->GetBytesTransferred(); - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) - { - log(2) << "Data download with len " + std::to_string(len); - } - else - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - } - contents->rdbuf()->sgetn((char *)data, len); - } - - /* @param data start of data buffer to receive object contents - * @param offset first byte in the object to return - * @param len number of bytes to download */ - void IbmStorage::downloadObject(const std::string &cs_path, void *data, size_t offset, size_t len, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download with offset"; - - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - auto fs = Aws::New(ALLOCATION_TAG, contents->rdbuf()); - return fs; - }; - - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), (uint64_t)offset, (uint64_t)len, createStreamFn); - transferHandle->WaitUntilFinished(); - - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - if (len != transferHandle->GetBytesTransferred()) - { - log(0) << "Data download with len not equal " + std::to_string(len) + ' ' + std::to_string(transferHandle->GetBytesTransferred()); - } - - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) - { - log(2) << "Data download with len " + std::to_string(len); - } - else - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - } - - contents->rdbuf()->sgetn((char *)data, len); - } - - /* @param data start of data buffer to receive object contents - * @param len the length of the downloaded object will be saved/returned in the len parameter*/ - void IbmStorage::downloadObject(const std::string &cs_path, void **data, size_t &len, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); //object name ends with /? - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - auto thread_executor = - Aws::MakeShared(ALLOCATION_TAG); - - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download with unspecified length "; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - auto fs = Aws::New(ALLOCATION_TAG, contents->rdbuf()); - return fs; - }; - - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), createStreamFn); - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - len = transferHandle->GetBytesTransferred(); - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) - { - log(2) << "Data download with len " + std::to_string(len); - } - else - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - } - std::unique_ptr target(new char[len]); - contents->rdbuf()->sgetn(target.get(), len); - *data = target.release(); - } - - void IbmStorage::deleteObject(const std::string &cs_path, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); //object name ends with /? - - Aws::S3::Model::DeleteObjectRequest object_request; - object_request.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); - - auto s3_client = getS3(); - if (s3_client == nullptr) - return; - - Aws::S3::Model::DeleteObjectOutcome response = s3_client->DeleteObject(object_request); - if (response.IsSuccess()) - { - log(2) << "deleteObject " + objectName + " in bucket " + bucketName; - } - else - { - auto err = response.GetError(); - log(0) << "Error: DeleteObject: " + err.GetExceptionName() + ":" + err.GetMessage(); - } - } - - long long IbmStorage::objectSize(const std::string &cs_path, const HttpContext * /*pContext*/) - { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName, log); //object name ends with /? - - Aws::S3::Model::HeadObjectRequest headObj; - headObj.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); - long long fileSize = 0; - auto s3_client = getS3(); - if (s3_client == nullptr) - return fileSize; - - auto object = s3_client->HeadObject(headObj); - if (object.IsSuccess()) - { - fileSize = object.GetResultWithOwnership().GetContentLength(); - } + endpoint_override = "set-it-in-env-pl"; else - { - log(0) << "Head Object error: " + - object.GetError().GetExceptionName() + " - " + - object.GetError().GetMessage(); - } - return fileSize; - } - - std::vector IbmStorage::objectSizes(const std::vector &cs_paths, const HttpContext * /*pContext*/) - { - const size_t cs_paths_size = cs_paths.size(); - std::vector sizes(cs_paths_size, std::string::npos); - for (size_t i = 0; i < cs_paths_size; ++i) - { - sizes[i] = objectSize(cs_paths[i]); - } - return sizes; - } - - std::vector> IbmStorage::ls(const std::string &objPath, const HttpContext * /*pContext*/) - { - std::string sanitized_dirname = objPath; - if (!objPath.empty() && objPath.back() != '/') - { - sanitized_dirname += "/"; - } - - std::string bucketName, object_prefix; - parse_storage_path(objPath, bucketName, object_prefix, log); - - std::vector> result; - Aws::S3::Model::ListObjectsRequest objects_request; - Aws::S3::Model::ListObjectsOutcome outcome; - - auto s3_client = getS3(); - if (s3_client == nullptr) - return result; - - objects_request.WithBucket(bucketName.c_str()).WithPrefix(object_prefix.c_str()); - - bool isDone = false; - // bool isFailed = false; - while (!isDone) - { - outcome = s3_client->ListObjects(objects_request); - if (!outcome.IsSuccess()) - break; - //process - Aws::Vector object_list = outcome.GetResult().GetContents(); - - for (auto const &s3_object : object_list) - { - std::string objKey(s3_object.GetKey().c_str(), s3_object.GetKey().size()); - std::pair r1(objKey, (uint64_t)s3_object.GetSize()); - result.push_back(r1); - } - isDone = !outcome.GetResult().GetIsTruncated(); - if (!isDone) - { - objects_request.SetMarker(outcome.GetResult().GetContents().back().GetKey()); - } - } - //check isFailed - if (!outcome.IsSuccess()) - { - log(0) << "ListObjects error: " + outcome.GetError().GetExceptionName() + ' ' + outcome.GetError().GetMessage(); - } - return result; - } - - void IbmStorage::lockObject(const std::string & /*cs_path*/, const HttpContext * /*pContext*/) - { - log(0) << "IbmStorage::lockObject Not yet implemented"; - } - - void IbmStorage::waitLockObject(const std::string & /*cs_path*/, const HttpContext * /*pContext*/) - { - log(0) << "IbmStorage::waitLockObject Not yet implemented"; - } - - void IbmStorage::unlockObject(const std::string & /*cs_path*/, const HttpContext * /*pContext*/) - { - log(0) << "IbmStorage::unlockObject Not yet implemented"; - } - - void IbmStorage::setExponentialRetryBackoffPolicy(const ExponentialRetryBackoffPolicy * /*policy*/) - { - log(0) << "IbmStorage::setExponentialRetryBackoffPolicy Not yet implemented"; - } - - void IbmStorage::getChildren(const std::string &objPath, bool /*recursiveList*/, std::string * /*nextPageToken*/, - std::vector *result, const HttpContext * /*pContext*/) - { - if (!result) - { - throw seismicdrive::SDExceptionGCSAccessorError(sdmex::gcs::GetChildrenEmptyResult()); - } - result->clear(); - - std::vector> ret = ls(objPath); - - for (auto item : ret) - { - StorageObjectInfo obi; - obi.name = "gs://" + item.first; - obi.size = item.second; - result->emplace_back(obi); - } + endpoint_override = temp; } } diff --git a/src/src/lib/cloud/providers/ibm/IbmStorage.h b/src/src/lib/cloud/providers/ibm/IbmStorage.h index 9856b59..8db95ce 100644 --- a/src/src/lib/cloud/providers/ibm/IbmStorage.h +++ b/src/src/lib/cloud/providers/ibm/IbmStorage.h @@ -1,248 +1,10 @@ -/* Licensed Materials - Property of IBM */ -/* (c) Copyright IBM Corp. 2020. All Rights Reserved.*/ - -#pragma once - -#include "DLL_EXPORT.h" - -#include "auth/auth_provider.h" -#include "cloud/Storage.h" -#include -#include -#include +#include "../aws/AwsStorage.h" namespace seismicdrive { - - class IbmApiInitialize + class IbmStorage : public AwsStorage { - public: - IbmApiInitialize(Logger &log); - ~IbmApiInitialize(); - IbmApiInitialize(const IbmApiInitialize &) = delete; - IbmApiInitialize &operator=(const IbmApiInitialize &) = delete; - - private: - Aws::SDKOptions mOptions; - Logger &log; - static std::atomic mCount; - }; - - class IbmStorage : public Storage - { - public: - IbmStorage() = delete; - - /*******************************************************************************/ /** - * @brief Parameterized contructor - * - * This constructor initializes the accessor object using user-defined arguments. - * - * @param auth_provider is the auth provider - * @param sdresource is the seismic drive subproject resource (optional if the auth provider is a google default one) - * @param readonly define the access policy (read or readwrite) - ******************************************************************************/ + public: IbmStorage(AuthProvider *auth_provider, const std::string &sdresource = "", bool readonly = false); - - // IbmStorage(const std::string& cloud_provider, void* auth_provider, const std::string& sdresource = "", bool readonly = false); - - IbmStorage(const IbmStorage &rhs) = delete; - IbmStorage &operator=(const IbmStorage &) = delete; - - IbmStorage(IbmStorage &&op) = delete; - IbmStorage &operator=(IbmStorage &&op) = delete; - - ~IbmStorage(); - - /* Methods */ - - /*******************************************************************************/ /** - * @brief Checks if an object exists - * @param cs_path is the cloud storage URI of the resource - * @param pContext is the cloud storage context of the access request - * - * @return true if tge object exists, false if the object not exist - ******************************************************************************/ - bool objectExists(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Gets the size of an object - * @param cs_path is the cloud storage URI of the resource object - * @param pContext is the cloud storage context of the access request - * - * @return the size of the object in bytes - ******************************************************************************/ - long long objectSize(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Return sizes of several objects - * - * This method gets the sizes of a list of objects given their Aws cloud storage paths - * - * @param cs_paths is a vector of cloud storage objects URI - * @param pContext is the cloud storage context of the access request - * - * @return size of corresponding object or -1 - ******************************************************************************/ - std::vector objectSizes(const std::vector &cs_paths, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Upload/Create an object from a buffer - * - * This method creates an object from a given data (buffer) and upload it to Aws. - * - * @param cs_path is the cloud storage URI of the resource object - * @param data start of data to store in the object - * @param len number of bytes to store - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void uploadObject(const std::string &cs_path, const void *data, size_t len, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Upload (create) an object from a local file - * - * This method creates an object identical to a local file and upload it to Aws. - * - * @param cs_path name of object to create - * @param file_name name of file to be uploaded to object - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void uploadObject(const std::string &cs_path, const std::string &file_name, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Download all or part of an object - * - * This method downloads from an Aws cloud storage object given the start of the buffer with the wanted offset. - * This method doesnt check crc32c on partial download - * - * @param cs_path is the cloud storage URI of the resource object - * @param data start of data buffer to receive object contents - * @param offset first byte in the object to return - * @param len number of bytes to download - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void downloadObject(const std::string &cs_path, void *data, size_t offset, size_t len, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Download an object from Aws - * - * This method download an object from Aws from start to end. - * The crc32c is checked on downloaded object. - * - * @param cs_path is the cloud storage URI of the resource object - * @param data start of data buffer to receive object contents - * @param len the read length will be saved/returned in this parameter - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void downloadObject(const std::string &cs_path, void *data, size_t &len, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Download an object from Aws - * - * This method download an object from Aws by dynamically allocating the memory of the "data" buffer with the right object size. - * Memory is allocated in the method and the size will be saved and returned in the len parameter. - * The crc32c is checked on downloaded object. - * - * @param cs_path is the cloud storage URI of the resource object - * @param data start of data buffer to receive object contents - * @param len the length of the downloaded object will be saved/returned in the len parameter - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void downloadObject(const std::string &cs_path, void **data, size_t &len, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Delete an object - * @param cs_path is the cloud storage URI of the resource object to delete - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void deleteObject(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Creates a lock file at a Aws cloud storage path - * - * This method locks the object located in the given path. - * - * @param cs_path is the cloud storage URI of the resource object - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void lockObject(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief waits for the lock to be released - * - * This method waits for the lock placed on the object in the given Aws cloud storage path to be released. - * - * @param cs_path is the cloud storage URI of the resource object - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void waitLockObject(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Unlock file at a Aws cloud storage path - * - * This method removes the lock from the object in the given path. - * - * @param cs_path is the cloud storage URI of the resource object - * @param pContext is the cloud storage context of the access request - ******************************************************************************/ - void unlockObject(const std::string &cs_path, const HttpContext *pContext = nullptr) override; - - /*******************************************************************************/ /** - * @brief Set a default retry policy strategy for http requests - * - * This method sets a default http retry policy strategy. - * - * @param policy is the back-off retry policy to apply on request - ******************************************************************************/ - void setExponentialRetryBackoffPolicy(const ExponentialRetryBackoffPolicy *policy); - - /*******************************************************************************/ /** - * @brief return the object children list - * - * This method return the object children list for a specified storage path prefix - * - * @param objPath the reference resource - * @param recursiveList true to return the recursive object children list - * @param nextPageToken continuation token - * @param result results will be returned in this vector - * @param pContext is the http context of the access request - ******************************************************************************/ - void getChildren(const std::string &objPath, bool recursiveList, std::string *nextPageToken, - std::vector *result, const HttpContext *pContext); - - /*******************************************************************************/ /** - * @brief get all objects in a container resource (prefix) - * - * This method takes a storage path and return a list of all the objects that exists in that path. - * - * @param objPath the reference resource - * @param pContext is the http context of the access request - * - * @return a list of the object names in the given storage path. - ******************************************************************************/ - std::vector> ls(const std::string &objPath, const HttpContext *pContext = nullptr) override; - - void multiPartUploadObject(const std::string &cs_path, const std::string &file_name, const HttpContext * /*pContext*/); - bool getToken(); - std::shared_ptr getS3(); - void setAwsProgress(bool); - void setAwsPerformance(bool); - - private: - friend class IbmStorageTest; - - std::unique_ptr context; - - int _loggingMode{0}; - - IbmApiInitialize myInit; - Aws::Auth::AWSCredentials s3_credentials; - std::string region; - std::string ibm_cos_url; - //AWS TransferManager Progress - bool show_progress; - //AWS TransferManager Performance - bool show_performance; }; - -} +} \ No newline at end of file -- GitLab From 3bec58d3853a5a8c5e38faf2d213ef751c5f5408 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B8rgen=20Lind?= Date: Wed, 10 Nov 2021 14:31:40 +0100 Subject: [PATCH 2/2] fix: Aws failing to download objects bigger than 5MB Also throw a SDExceptionStorageError if the S3Client can't be created. The reason to stop using the TransferManager is that its an api designed to be used for files. Using it with stringstrams is error prone as seeking to a position after eof will cause the stream to be set in an error state and the TransferManager doesn't check error states on the stream. Using the S3Client apis is much more straight forward, and the same functionality is kept. --- src/src/lib/cloud/providers/aws/AwsStorage.cc | 349 +++++------------- 1 file changed, 92 insertions(+), 257 deletions(-) diff --git a/src/src/lib/cloud/providers/aws/AwsStorage.cc b/src/src/lib/cloud/providers/aws/AwsStorage.cc index 0724c96..755513b 100644 --- a/src/src/lib/cloud/providers/aws/AwsStorage.cc +++ b/src/src/lib/cloud/providers/aws/AwsStorage.cc @@ -100,6 +100,46 @@ namespace seismicdrive { throw seismicdrive::SDExceptionStorageError(std::string(error.GetExceptionName().c_str()) + " : " + std::string(error.GetMessage().c_str())); } + + std::string createRangeString(int64_t rangeMin, int64_t rangeMax) + { + if (rangeMin < 0 && rangeMax < 0) + return ""; + if (rangeMin < 0) + return std::string("bytes=0-") + std::to_string(rangeMax); + if (rangeMax < 0) + return std::string("bytes=") + std::to_string(rangeMin) + "-"; + return std::string("bytes=") + std::to_string(rangeMin) + "-" + std::to_string(rangeMax); + } + + void getObject(AwsStorage &awsStorage, Logger &log, const std::string &cs_path, int64_t rangeMin, int64_t rangeMax, Aws::S3::Model::GetObjectResult &result) + { + auto s3Client = awsStorage.getS3(); + std::string bucketName, objectName; + parse_storage_path(cs_path, bucketName, objectName); + std::string range = createRangeString(rangeMin, rangeMax); + log(2) << "Start download. Bucket: " << bucketName << ", Key: " << objectName << ", Range" << range; + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(bucketName.c_str()); + request.SetKey(objectName.c_str()); + if (!range.empty()) + request.SetRange(range); + auto outcome = s3Client->GetObject(request); + int retries = 0; + while (!outcome.IsSuccess() && retries++ < 5) + { + outcome = s3Client->GetObject(request); + } + if (!outcome.IsSuccess()) + { + log(0) << std::string("Error downloading: ") + outcome.GetError().GetMessage().c_str(); + throwAwsException(outcome.GetError()); + } + result = outcome.GetResultWithOwnership(); + log(2) << "Done download. Bucket: " << bucketName << ", Key: " << objectName << ", ContentLength: " << std::to_string(result.GetContentLength()); + } + } const char *const ALLOCATION_TAG = "S3_SSDMS_CPP_LIB"; @@ -159,7 +199,7 @@ namespace seismicdrive if (!getToken()) { log(0) << "Error with token"; - return nullptr; + throw seismicdrive::SDExceptionStorageError("Failed to create S3 client"); } Aws::Client::ClientConfiguration config; if (endpoint_override.empty()) @@ -183,10 +223,6 @@ namespace seismicdrive request.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); auto s3_client = getS3(); - if (!s3_client) - { - return false; - } auto response = s3_client->HeadObject(request); if (response.IsSuccess()) { @@ -196,67 +232,49 @@ namespace seismicdrive return false; } - void AwsStorage::uploadObject(const std::string &cs_path, const void *data, size_t len, const HttpContext * /*pContext*/) + class VectorBuf : public std::basic_streambuf> { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName); - - auto s3_client = getS3(); - if (!s3_client) + public: + VectorBuf(const char *data, size_t size) { - return; + char *d = const_cast(data); + setg(d, d, d + size); } - auto thread_executor = - Aws::MakeShared(ALLOCATION_TAG); - - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); + }; - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) + class IOStream : public Aws::IOStream + { + public: + IOStream(const void *data, size_t size) + : Aws::IOStream(&m_buffer), m_buffer(static_cast(data),size) { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Upload Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; } - log(2) << "Dataset start upload. BucketName: " + bucketName + ": Key " + objectName; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - log(2) << "upload Object length " + std::to_string(len); - const std::shared_ptr input_data = Aws::MakeShared("PutObjectInputStream", std::stringstream::in | std::stringstream::out | std::stringstream::binary); - - input_data->write((const char *)(data), len); //not sure which one is more effecient + VectorBuf m_buffer; + }; - auto transferHandle = transferManager->UploadFile(input_data, //(char *)data, - bucketName.c_str(), objectName.c_str(), "binary/octet-stream", - Aws::Map()); + void AwsStorage::uploadObject(const std::string &cs_path, const void *data, size_t len, const HttpContext * /*pContext*/) + { + std::string bucketName, objectName; + parse_storage_path(cs_path, bucketName, objectName); - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an upload part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryUpload((char *)data, transferHandle); - transferHandle->WaitUntilFinished(); + auto s3_client = getS3(); + log(2) << "Dataset start upload. BucketName: " + bucketName + " Key: " + objectName + " Length: " + std::to_string(len); + auto stream = std::make_shared(data, len); + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(bucketName.c_str()); + request.SetKey(objectName.c_str()); + request.SetBody(stream); + request.SetContentLength(len); + auto outcome = s3_client->PutObject(request); + int retries = 0; + while (!outcome.IsSuccess() && retries++ < 5) + { + outcome = s3_client->PutObject(request); } - - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) + if (!outcome.IsSuccess()) { - std::ostringstream ss; - ss << "Error dataset uploading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - throwAwsException(transferHandle->GetLastError()); + log(0) << std::string("Error dataset uploading: ") + outcome.GetError().GetMessage().c_str(); + throwAwsException(outcome.GetError()); } log(2) << "Dataset uploading done."; } @@ -268,10 +286,6 @@ namespace seismicdrive parse_storage_path(cs_path, bucketName, objectName); auto s3_client = getS3(); - if (!s3_client) - { - return; - } auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); @@ -334,66 +348,12 @@ namespace seismicdrive * @param len the read length will be saved/returned in this parameter */ void AwsStorage::downloadObject(const std::string &cs_path, void *data, size_t &len, const HttpContext * /*pContext*/) { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName); - - auto s3_client = getS3(); - if (!s3_client) - { - return; - } - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download"; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - auto fs = Aws::New(ALLOCATION_TAG, contents->rdbuf()); - return fs; - }; - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), createStreamFn); - - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - len = transferHandle->GetBytesTransferred(); - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - throwAwsException(transferHandle->GetLastError()); - } - log(2) << "Data download with length " + std::to_string(len); - contents->rdbuf()->sgetn((char *)data, len); + Aws::S3::Model::GetObjectResult result; + getObject(*this, log, cs_path, -1, int64_t(len - 1), result); + auto contentLength = size_t(result.GetContentLength()); + size_t to_read = std::min(len, contentLength); + result.GetBody().read((char *)data, to_read); + len = to_read; } /* @param data start of data buffer to receive object contents @@ -401,137 +361,24 @@ namespace seismicdrive * @param len number of bytes to download */ void AwsStorage::downloadObject(const std::string &cs_path, void *data, size_t offset, size_t len, const HttpContext * /*pContext*/) { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName); - - auto s3_client = getS3(); - if (!s3_client) - { - return; - } - auto thread_executor = Aws::MakeShared(ALLOCATION_TAG); - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + - std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download with offset"; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - return Aws::New(ALLOCATION_TAG, contents->rdbuf()); - }; - - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), (uint64_t)offset, (uint64_t)len, createStreamFn); - transferHandle->WaitUntilFinished(); - - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - if (len != transferHandle->GetBytesTransferred()) - { - log(0) << "Data download with len not equal " + std::to_string(len) + ' ' + std::to_string(transferHandle->GetBytesTransferred()); - } - - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - throwAwsException(transferHandle->GetLastError()); - } - log(2) << "Data download with length " + std::to_string(len); - contents->rdbuf()->sgetn((char *)data, len); + Aws::S3::Model::GetObjectResult result; + getObject(*this, log, cs_path, int64_t(offset), int64_t(offset + len - 1), result); + auto contentLength = size_t(result.GetContentLength()); + size_t to_read = std::min(len, contentLength); + if (contentLength < len) + log(2) << "Download less then requested: " << cs_path << ", offset:" << offset << ", len: " << len << ", contentLength: " << contentLength; + result.GetBody().read((char *)data, to_read); } /* @param data start of data buffer to receive object contents * @param len the length of the downloaded object will be saved/returned in the len parameter*/ void AwsStorage::downloadObject(const std::string &cs_path, void **data, size_t &len, const HttpContext * /*pContext*/) { - std::string bucketName, objectName; - parse_storage_path(cs_path, bucketName, objectName); //object name ends with /? - - auto s3_client = getS3(); - if (!s3_client) - { - return; - } - auto thread_executor = - Aws::MakeShared(ALLOCATION_TAG); - - Aws::Transfer::TransferManagerConfiguration transferConfig(thread_executor.get()); - - transferConfig.s3Client = s3_client; - // turn on/off update on status and progress - if (show_progress) - { - transferConfig.transferStatusUpdatedCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Transfer Status = " + std::to_string(static_cast(handle->GetStatus())); - }; - - transferConfig.uploadProgressCallback = - [this](const Aws::Transfer::TransferManager *, - const std::shared_ptr &handle) - { - log(0) << "Download Progress: " + std::to_string(handle->GetBytesTransferred()) + - " of " + std::to_string(handle->GetBytesTotalSize()) + " bytes"; - }; - } - log(2) << "Start download with unspecified length"; - auto transferManager = Aws::Transfer::TransferManager::Create(transferConfig); - - std::shared_ptr contents = std::make_shared(); - auto createStreamFn = [=]() - { - return Aws::New(ALLOCATION_TAG, contents->rdbuf()); - }; - - auto transferHandle = transferManager->DownloadFile(bucketName.c_str(), objectName.c_str(), createStreamFn); - transferHandle->WaitUntilFinished(); - size_t retries = 0; - //just make sure we don't fail because an download part failed. (e.g. network problems or interuptions) - while (transferHandle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && retries++ < 5) - { - transferManager->RetryDownload(transferHandle); - transferHandle->WaitUntilFinished(); - } - len = transferHandle->GetBytesTransferred(); - thread_executor = nullptr; //this should join all worker thread - if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) - { - std::ostringstream ss; - ss << "Error downloading: " << transferHandle->GetLastError(); - log(0) << ss.str(); - throwAwsException(transferHandle->GetLastError()); - } - log(2) << "Data download with length " + std::to_string(len); - std::unique_ptr target(new char[len]); - contents->rdbuf()->sgetn(target.get(), len); + Aws::S3::Model::GetObjectResult result; + getObject(*this, log, cs_path, -1, -1, result); + auto contentLength = size_t(result.GetContentLength()); + std::unique_ptr target(new char[contentLength]); + result.GetBody().read((char *)target.get(), contentLength); *data = target.release(); } @@ -544,10 +391,6 @@ namespace seismicdrive object_request.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); auto s3_client = getS3(); - if (!s3_client) - { - return; - } Aws::S3::Model::DeleteObjectOutcome response = s3_client->DeleteObject(object_request); if (!response.IsSuccess()) { @@ -567,10 +410,6 @@ namespace seismicdrive headObj.WithBucket(bucketName.c_str()).WithKey(objectName.c_str()); long long fileSize = 0; auto s3_client = getS3(); - if (!s3_client) - { - return fileSize; - } auto object = s3_client->HeadObject(headObj); if (object.IsSuccess()) { @@ -612,10 +451,6 @@ namespace seismicdrive Aws::S3::Model::ListObjectsOutcome outcome; auto s3_client = getS3(); - if (!s3_client) - { - return result; - } objects_request.WithBucket(bucketName.c_str()).WithPrefix(object_prefix.c_str()); bool isDone = false; -- GitLab