From 874fad90d8c0d7ae158c6b80b66245c9a2b30cf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B8rgen=20Lind?= Date: Thu, 24 Jun 2021 13:38:08 +0200 Subject: [PATCH] Adding AWSCurl backend --- CMake/BuildAWSSDK.cmake | 10 + src/OpenVDS/CMakeLists.txt | 4 + src/OpenVDS/IO/IOManager.cpp | 11 +- src/OpenVDS/IO/IOManagerAWSCurl.cpp | 315 +++++++++++++++++++++ src/OpenVDS/IO/IOManagerAWSCurl.h | 55 ++++ src/OpenVDS/IO/IOManagerAzurePresigned.cpp | 6 - src/OpenVDS/IO/IOManagerCurl.cpp | 8 +- 7 files changed, 398 insertions(+), 11 deletions(-) create mode 100644 src/OpenVDS/IO/IOManagerAWSCurl.cpp create mode 100644 src/OpenVDS/IO/IOManagerAWSCurl.h diff --git a/CMake/BuildAWSSDK.cmake b/CMake/BuildAWSSDK.cmake index bd70b321..ca213f1b 100644 --- a/CMake/BuildAWSSDK.cmake +++ b/CMake/BuildAWSSDK.cmake @@ -4,6 +4,8 @@ macro(BuildAWSSDK) list(APPEND AWS_LIBS_LIST "bin/aws-cpp-sdk-s3.lib") list(APPEND AWS_LIBS_LIST "bin/aws-cpp-sdk-sts.lib") list(APPEND AWS_LIBS_LIST "bin/aws-cpp-sdk-transfer.lib") + list(APPEND AWS_LIBS_LIST "lib/aws-crt-cpp.lib") + list(APPEND AWS_LIBS_LIST "lib/aws-c-common.lib") @@ -32,6 +34,14 @@ macro(BuildAWSSDK) list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-cpp-sdk-s3.so") list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-cpp-sdk-sts.so") list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-cpp-sdk-transfer.so") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-crt-cpp.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-s3.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-auth.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-http.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-compression.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-cal.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-io.a") + list(APPEND AWS_DLLS_LIST "lib${LIBSUFFIX}/libaws-c-common.a") endif() BuildExternal(aws-cpp-sdk ${aws-cpp-sdk_VERSION} "" ${aws-cpp-sdk_SOURCE_DIR} "${AWS_LIBS_LIST}" "${AWS_DLLS_LIST}" "" "" "-DBUILD_ONLY=s3$sts$transfer;-DENABLE_TESTING=OFF") diff --git a/src/OpenVDS/CMakeLists.txt b/src/OpenVDS/CMakeLists.txt index 05695ce4..3fa60a7f 100644 --- a/src/OpenVDS/CMakeLists.txt +++ b/src/OpenVDS/CMakeLists.txt @@ -5,6 +5,7 @@ set(SOURCE_FILES IO/Win_File.cpp IO/IOManager.cpp IO/IOManagerAWS.cpp + IO/IOManagerAWSCurl.cpp IO/IOManagerAzure.cpp IO/IOManagerAzureSdkForCpp.cpp IO/IOManagerInMemory.cpp @@ -49,6 +50,7 @@ set (PRIVATE_HEADER_FILES IO/File.h IO/IOManager.h IO/IOManagerAWS.h + IO/IOManagerAWSCurl.h IO/IOManagerAzure.h IO/IOManagerAzureSdkForCpp.h IO/IOManagerInMemory.h @@ -113,6 +115,7 @@ set_source_files_properties(VDS/FSE/fse_decompress.cpp PROPERTIES COMPILE_FLAGS #https://github.com/aws/aws-sdk-cpp/blob/266d7682c3d07d41618c41a3521811e1219c1bc7/aws-cpp-sdk-core/include/aws/core/utils/memory/stl/AWSString.h#L21 if (NOT MSVC) set_source_files_properties(IO/IOManagerAWS.cpp PROPERTIES COMPILE_FLAGS -fvisibility=default) + set_source_files_properties(IO/IOManagerAWSCurl.cpp PROPERTIES COMPILE_FLAGS -fvisibility=default) endif() add_library(openvds_objects OBJECT @@ -169,6 +172,7 @@ endif() if (DISABLE_AWS_IOMANAGER) target_compile_definitions(openvds_objects PRIVATE OPENVDS_NO_AWS_IOMANAGER) set_source_files_properties(IO/IOManagerAWS.cpp PROPERTIES HEADER_FILE_ONLY TRUE) + set_source_files_properties(IO/IOManagerAWSCurl.cpp PROPERTIES HEADER_FILE_ONLY TRUE) else() if (aws-cpp-sdk_EXTERNAL_LIBS) target_link_libraries(openvds_objects PRIVATE ${aws-cpp-sdk_EXTERNAL_LIBS}) diff --git a/src/OpenVDS/IO/IOManager.cpp b/src/OpenVDS/IO/IOManager.cpp index 4e412465..122750ca 100644 --- a/src/OpenVDS/IO/IOManager.cpp +++ b/src/OpenVDS/IO/IOManager.cpp @@ -23,6 +23,7 @@ #ifndef OPENVDS_NO_AWS_IOMANAGER #include "IOManagerAWS.h" +#include "IOManagerAWSCurl.h" #endif #ifndef OPENVDS_NO_AZURE_IOMANAGER #include "IOManagerAzure.h" @@ -51,10 +52,16 @@ IOManager* IOManager::CreateIOManager(const OpenOptions& options, IOManager::Acc { switch(options.connectionType) { -#ifndef OPENVDS_NO_AWS_IOMANAGER case OpenOptions::AWS: - return new IOManagerAWS(static_cast(options), error); +#ifndef OPENVDS_NO_AWS_IOMANAGER + { + bool useAwsCurl = getBooleanEnvironmentVariable("OPENVDS_AWSCURL"); + if (useAwsCurl) + return new IOManagerAWSCurl(static_cast(options), error); + return new IOManagerAWS(static_cast(options), error); + } #endif + break; case OpenOptions::Azure: { #ifndef OPENVDS_NO_AZURE_SDK_FOR_CPP_IOMANAGER diff --git a/src/OpenVDS/IO/IOManagerAWSCurl.cpp b/src/OpenVDS/IO/IOManagerAWSCurl.cpp new file mode 100644 index 00000000..07945491 --- /dev/null +++ b/src/OpenVDS/IO/IOManagerAWSCurl.cpp @@ -0,0 +1,315 @@ +#include "IOManagerAWSCurl.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace OpenVDS +{ + + +static aws_byte_cursor createByteCursor(const std::string& a) +{ + return { a.size(),(uint8_t*) a.c_str() }; +} +template +static aws_byte_cursor createByteCursor(const char(&a)[SIZE]) +{ + return { SIZE - 1, (uint8_t*)a }; +} + +template +static Aws::Crt::Http::HttpHeader createHttpHeader(const A& key, const B& value) +{ + Aws::Crt::Http::HttpHeader header; + header.name = createByteCursor(key); + header.value = createByteCursor(value); + return header; +} + +class BucketLocationTransferDownloadHandler : public TransferDownloadHandler +{ +public: + BucketLocationTransferDownloadHandler() + : TransferDownloadHandler() + {} + void HandleObjectSize(int64_t size) override {} + void HandleObjectLastWriteTime(const std::string& lastWriteTimeISO8601) override {} + void HandleMetadata(const std::string& key, const std::string& header) override { meta[key] = header; }; + void HandleData(std::vector&& data) override { this->data = std::move(data); } + void Completed(const Request& request, const Error& error) override {}; + + std::vector data; + std::map meta; +}; + +static const std::string& empty_sha256() +{ + static std::string empty("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); + return empty; +} + +static std::string GetBucketLocation(const std::shared_ptr &credsProvider, const std::string &bucket, CurlHandler &curlHandler) +{ + Aws::Crt::Auth::AwsSigningConfig signingConfig; + signingConfig.SetCredentialsProvider(credsProvider); + signingConfig.SetService("s3"); + signingConfig.SetSignatureType(Aws::Crt::Auth::SignatureType::HttpRequestViaHeaders); + signingConfig.SetSignedBodyHeader(Aws::Crt::Auth::SignedBodyHeaderType::None); + signingConfig.SetRegion("us-west-1"); + std::string host = fmt::format("{}.s3.us-west-1.amazonaws.com", bucket); + std::string url = "https://" + host + "/?location"; + auto crtrequest = std::make_shared(); + crtrequest->SetPath(createByteCursor(url)); + crtrequest->SetMethod(createByteCursor("GET")); + crtrequest->AddHeader(createHttpHeader("Host", host)); + + Aws::Crt::Auth::Sigv4HttpRequestSigner requestSign; + int errorCode = 0; + requestSign.SignRequest(crtrequest, signingConfig, [&errorCode](const std::shared_ptr& request, int code) + { + errorCode = code; + }); + if (errorCode != AWS_ERROR_SUCCESS) + return ""; + + std::vector headers; + int headerCount = (int)crtrequest->GetHeaderCount(); + for (int i = 0; i < headerCount; i++) + { + auto h = crtrequest->GetHeader(i); + std::string name((const char*)h->name.ptr, h->name.len); + std::string value((const char*)h->value.ptr, h->value.len); + std::string header = name + ": " + value; + headers.emplace_back(std::move(header)); + } + headers.emplace_back("x-amz-content-sha256: " + empty_sha256()); + + auto handler = std::make_shared(); + std::shared_ptr request = std::make_shared("", handler); + curlHandler.addDownloadRequest(request, url, headers, convertToISO8601, CurlDownloadHandler::GET); + + Error error; //don't propogate error + request->WaitForFinish(error); + if (error.code) + return ""; + std::string xmlContent((const char*)handler->data.data(), handler->data.size()); + auto it = xmlContent.find("LocationConstraint"); + if (it == std::string::npos) + return ""; + auto start = xmlContent.find('>', it); + if (start == std::string::npos) + return ""; + start++; + auto end = xmlContent.find('<', start + 1); + if (end == std::string::npos) + return ""; + return xmlContent.substr(start, end - start); +} + +void assignByteCursorFromString(Aws::Crt::ByteCursor& cursor, const std::string& source) +{ + cursor.ptr = (uint8_t*)source.data(); + cursor.len = source.size(); +} + +IOManagerAWSCurl::IOManagerAWSCurl(const AWSOpenOptions& openOptions, Error& error) + : IOManager(OpenOptions::AWS) + , m_curlHandler(error) + , m_apiHandle(openOptions.disableInitApi ? nullptr : new Aws::Crt::ApiHandle()) + , m_eventLoopGroup(1) + , m_hostResolver(m_eventLoopGroup, 1000, 1000) + , m_clientBootstrap(m_eventLoopGroup, m_hostResolver) + , m_region(openOptions.region) + , m_bucket(openOptions.bucket) + , m_path(openOptions.key) +{ + if (error.code) + return; + + if (openOptions.accessKeyId.size()) + { + m_accessKeyId = openOptions.accessKeyId; + m_secretAccessKey = openOptions.secretKey; + m_sessionToken = openOptions.sessionToken; + Aws::Crt::Auth::CredentialsProviderStaticConfig config; + assignByteCursorFromString(config.AccessKeyId, m_accessKeyId); + assignByteCursorFromString(config.SecretAccessKey, m_secretAccessKey); + if (m_sessionToken.size()) + assignByteCursorFromString(config.SessionToken, m_sessionToken); + m_credentialsProvider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderStatic(config); + } + else + { + Aws::Crt::Auth::CredentialsProviderChainDefaultConfig config; + config.Bootstrap = &m_clientBootstrap; + m_credentialsProvider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderChainDefault(config); + } + + m_signingConfig.SetCredentialsProvider(m_credentialsProvider); + m_signingConfig.SetService("s3"); + m_signingConfig.SetSignatureType(Aws::Crt::Auth::SignatureType::HttpRequestViaHeaders); + m_signingConfig.SetSignedBodyHeader(Aws::Crt::Auth::SignedBodyHeaderType::None); + m_signingConfig.SetRegion(m_region.c_str()); + + if (m_region.empty() && openOptions.endpointOverride.empty()) + { + m_region = GetBucketLocation(m_credentialsProvider, m_bucket, m_curlHandler); + m_signingConfig.SetRegion(m_region.c_str()); + } + + m_protocol = "https"; + if (openOptions.endpointOverride.size()) + { + if (openOptions.endpointOverride.rfind("https://", 0) == 0) + { + m_host = openOptions.endpointOverride.substr(8); + } + else if (openOptions.endpointOverride.rfind("http://", 0) == 0) + { + m_host = openOptions.endpointOverride.substr(7); + m_protocol = "http"; + } + else + { + m_host = openOptions.endpointOverride; + } + } + else + { + m_host = fmt::format("{}.s3.{}.amazonaws.com", m_bucket, m_region); + } + +} + +IOManagerAWSCurl::~IOManagerAWSCurl() +{ + +} + +static std::string getUrlInternal(const std::string &protocol, const std::string& host, const std::string &path, const std::string& objectName) +{ + if (objectName.empty()) + { + assert(path.size()); + return fmt::format("{}://{}/{}", protocol, host, path); + } + if (path.empty()) + { + assert(objectName.size()); + return fmt::format("{}://{}/{}", protocol, host, objectName); + } + return fmt::format("{}://{}/{}/{}", protocol, host, path, objectName); +} + +static std::string getUrl(const std::string &protocol, const std::string& host, const std::string& path, const std::string& objectName) +{ + Aws::Http::URI url(getUrlInternal(protocol, host, path, objectName).c_str()); + return url.GetURIString().c_str(); +} + +static std::vector signRequest(const std::string& host, const std::string& url, Aws::Crt::Auth::AwsSigningConfig& signingConfig, const std::string& verb, const std::string& payloadHash, const std::map headerMap = std::map()) +{ + auto crtrequest = std::make_shared(); + crtrequest->SetPath(createByteCursor(url)); + crtrequest->SetMethod(createByteCursor(verb)); + crtrequest->AddHeader(createHttpHeader("Host", host)); + signingConfig.SetSignedBodyHeader(Aws::Crt::Auth::SignedBodyHeaderType::XAmzContentSha256); + signingConfig.SetSignedBodyValue(payloadHash.c_str()); + + for (auto h : headerMap) + crtrequest->AddHeader(createHttpHeader(h.first, h.second)); + + Aws::Crt::Auth::Sigv4HttpRequestSigner requestSign; + requestSign.SignRequest(crtrequest, signingConfig, [](const std::shared_ptr& request, int code) + { + if (code != AWS_ERROR_SUCCESS) + throw std::runtime_error("unexpected AWS signing failure"); + }); + + std::vector headers; + int headerCount = (int)crtrequest->GetHeaderCount(); + for (int i = 0; i < headerCount; i++) + { + auto h = crtrequest->GetHeader(i); + std::string name((const char *)h->name.ptr, h->name.len); + std::string value((const char *)h->value.ptr, h->value.len); + std::string header = name + ": " + value; + headers.emplace_back(std::move(header)); + } + + return headers; +} + +std::shared_ptr IOManagerAWSCurl::ReadObjectInfo(const std::string& objectName, std::shared_ptr handler) +{ + std::string url = getUrl(m_protocol, m_host, m_path, objectName); + std::shared_ptr request = std::make_shared(objectName, handler); + std::vector headers = signRequest(m_host, url, m_signingConfig, "HEAD", empty_sha256()); + m_curlHandler.addDownloadRequest(request, url, headers, convertToISO8601, CurlDownloadHandler::HEADER); + return request; +} + +std::shared_ptr IOManagerAWSCurl::ReadObject(const std::string& objectName, std::shared_ptr handler, const IORange& range) +{ + std::string url = getUrl(m_protocol, m_host, m_path, objectName); + std::map headerMap; + if (range.start != range.end) + { + headerMap["Reange"] = fmt::format("bytes={}-{}", range.start, range.end); + } + auto headers = signRequest(m_host, url, m_signingConfig, "GET", empty_sha256(), headerMap); + std::shared_ptr request = std::make_shared(objectName, handler); + m_curlHandler.addDownloadRequest(request, url, headers, convertToISO8601, CurlDownloadHandler::GET); + return request; +} + +std::string hashData(const std::vector& data) +{ +// Aws::Crt::ByteCursor inputhash; +// inputhash.len = data.size(); +// inputhash.ptr = const_cast(data.data()); +// +// Aws::Crt::ByteBuf buf; +// aws_byte_buf_init(&buf, Aws::Crt::g_allocator, Aws::Crt::Crypto::SHA256_DIGEST_SIZE); +// Aws::Crt::Crypto::ComputeSHA256(inputhash, buf); +// +// Aws::Crt::ByteCursor inputencoded; +// inputencoded.len = buf.len; +// inputencoded.ptr = buf.buffer; +// +// Aws::Crt::ByteBuf buf2; +// aws_byte_buf_init(&buf2, Aws::Crt::g_allocator, Aws::Crt::Crypto::SHA256_DIGEST_SIZE * 16); +// +// aws_hex_encode(&inputencoded, &buf2); +// std::string sha256((const char*)buf2.buffer, buf2.len); +// aws_byte_buf_clean_up(&buf); +// aws_byte_buf_clean_up(&buf2); + + (void)data; + return "UNSIGNED-PAYLOAD"; +} + +std::shared_ptr IOManagerAWSCurl::WriteObject(const std::string& objectName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector>& metadataHeader, std::shared_ptr> data, std::function completedCallback) +{ + std::string url = getUrl(m_protocol, m_host, m_path, objectName); + std::shared_ptr request = std::make_shared(objectName, completedCallback); + std::map headerMap; + auto headers = signRequest(m_host, url, m_signingConfig, "PUT", hashData(*data), headerMap); + if (contentDispostionFilename.size()) + headers.push_back(fmt::format("content-disposition: attachment; filename=\"{}\"", contentDispostionFilename)); + if (contentType.size()) + headers.push_back(fmt::format("content-type: {}", contentType)); + if (data->size()) + headers.push_back(fmt::format("content-length: {}", data->size())); + m_curlHandler.addUploadRequest(request, url, headers, data); + return request; +} +} \ No newline at end of file diff --git a/src/OpenVDS/IO/IOManagerAWSCurl.h b/src/OpenVDS/IO/IOManagerAWSCurl.h new file mode 100644 index 00000000..5b2be521 --- /dev/null +++ b/src/OpenVDS/IO/IOManagerAWSCurl.h @@ -0,0 +1,55 @@ +#ifndef IOMANAGERAWSCURL_H +#define IOMANAGERAWSCURL_H + +#include + +#include "IOManager.h" +#include "IOManagerCurl.h" + +#define AWS_CRT_CPP_USE_IMPORT_EXPORT +#include +#include +#include + +#include + +namespace OpenVDS +{ + + struct InitAws + { + InitAws(); + ~InitAws(); + + static std::mutex mutex; + static int count; + }; + class IOManagerAWSCurl : public IOManager + { + public: + IOManagerAWSCurl(const AWSOpenOptions &openOptions, Error &error); + ~IOManagerAWSCurl() override; + + std::shared_ptr ReadObjectInfo(const std::string &objectName, std::shared_ptr handler) override; + std::shared_ptr ReadObject(const std::string &objectName, std::shared_ptr handler, const IORange& range = IORange()) override; + std::shared_ptr WriteObject(const std::string &objectName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector>& metadataHeader, std::shared_ptr> data, std::function completedCallback = nullptr) override; + private: + CurlHandler m_curlHandler; + std::unique_ptr m_apiHandle; + Aws::Crt::Io::EventLoopGroup m_eventLoopGroup; + Aws::Crt::Io::DefaultHostResolver m_hostResolver; + Aws::Crt::Io::ClientBootstrap m_clientBootstrap; + std::shared_ptr m_credentialsProvider; + Aws::Crt::Auth::AwsSigningConfig m_signingConfig; + std::string m_region; + std::string m_bucket; + std::string m_path; + std::string m_protocol; + std::string m_host; + std::string m_accessKeyId; + std::string m_secretAccessKey; + std::string m_sessionToken; + }; +} + +#endif //IOMANAGERAWSCURL_H \ No newline at end of file diff --git a/src/OpenVDS/IO/IOManagerAzurePresigned.cpp b/src/OpenVDS/IO/IOManagerAzurePresigned.cpp index 58d8ca50..03f8da8a 100644 --- a/src/OpenVDS/IO/IOManagerAzurePresigned.cpp +++ b/src/OpenVDS/IO/IOManagerAzurePresigned.cpp @@ -82,12 +82,6 @@ namespace OpenVDS std::shared_ptr request = std::make_shared(objectName, completedCallback); std::vector headers; headers.emplace_back("x-ms-blob-type: BlockBlob"); - if (contentDispostionFilename.size()) - headers.push_back(fmt::format("content-disposition: attachment; filename=\"{}\"", contentDispostionFilename)); - if (contentType.size()) - headers.push_back(fmt::format("content-type: {}", contentType)); - if (data->size()) - headers.push_back(fmt::format("content-length: {}", data->size())); for (auto metaTag : metadataHeader) { headers.push_back(fmt::format("{}{}: {}", "x-ms-meta-", metaTag.first, metaTag.second)); diff --git a/src/OpenVDS/IO/IOManagerCurl.cpp b/src/OpenVDS/IO/IOManagerCurl.cpp index 043d196f..0b96beb4 100644 --- a/src/OpenVDS/IO/IOManagerCurl.cpp +++ b/src/OpenVDS/IO/IOManagerCurl.cpp @@ -104,7 +104,7 @@ static size_t curlReadCallback(char *buffer, size_t size, size_t nitems, void *u static void curlAddRequests(UVEventLoopData *eventLoopData) { - int maxConcurrentRequests = 64; + int maxConcurrentRequests = 40; int to_add = maxConcurrentRequests - int(eventLoopData->processingRequests.size()); assert(to_add >= 0); to_add = std::min(to_add, int(eventLoopData->queuedRequests.size())); @@ -823,8 +823,10 @@ CurlHandler::CurlHandler(Error& error) curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_SOCKETDATA, &m_eventLoopData); curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_TIMERFUNCTION, curlTimerCallback); curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_TIMERDATA, &m_eventLoopData); - curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); - curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, 128L); + +#if LIBCURL_VERSION_MAJOR > 7 || (LIBCURL_VERSION_MAJOR == 7 && LIBCURL_VERSION_MINOR > 29) + curl_multi_setopt(m_eventLoopData.curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, 32); +#endif uv_prepare_init(m_eventLoopData.loop, &m_eventLoopData.beforeBlock); m_eventLoopData.beforeBlock.data = &m_eventLoopData; -- GitLab