Commit 32add379 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Merge branch feature/jorgen.lind/request_io_manager_api with refs/heads/master...

Merge branch feature/jorgen.lind/request_io_manager_api with refs/heads/master into refs/merge-requests/377/train
parents 6939f7c0 5dd9ada6
Pipeline #34901 passed with stages
in 19 minutes and 46 seconds
......@@ -11,16 +11,15 @@ include_directories(${dmsCMakeDir}/src/lib)
include_directories(${dmsCMakeDir}/src/lib/cloud/providers)
if (NOT DISABLE_AZURE_IOMANAGER)
add_definitions( -DHAS_AZURE_BLOB_STORAGE_PROVIDER )
set(BUILD_POLYCLOUD ON)
add_definitions(-DPOLYCLOUD)
file(GLOB SRC_LIB_PROVIDERS_AZURE ${dmsCMakeDir}/src/lib/cloud/providers/azure/*.cc)
endif()
# retrieve sources
file(GLOB SRC_CORE ${dmsCMakeDir}/src/core/*.cc)
file(GLOB SRC_LIB_ACCESSORS ${dmsCMakeDir}/src/lib/accessors/*.cc)
file(GLOB SRC_LIB_PROVIDERS ${dmsCMakeDir}/src/lib/cloud/providers/*.cc)
if(BUILD_POLYCLOUD)
file(GLOB SRC_LIB_PROVIDERS_AZURE ${dmsCMakeDir}/src/lib/cloud/providers/azure/*.cc)
endif()
file(GLOB SRC_LIB_HTTP ${dmsCMakeDir}/src/lib/http/*.cc)
file(GLOB SRC_LIB_JSON ${dmsCMakeDir}/src/lib/json/*.cpp)
file(GLOB SRC_LIB_AUTH ${dmsCMakeDir}/src/lib/auth/*.cc)
......
......@@ -53,6 +53,7 @@ set (PRIVATE_HEADER_FILES
IO/IOManagerAzurePresigned.h
IO/IOManagerGoogle.h
IO/IOManagerHttp.h
IO/IOManagerRequestImpl.h
VDS/VDS.h
VDS/VolumeDataPartition.h
VDS/VolumeDataChannelMapping.h
......
......@@ -42,9 +42,7 @@ namespace OpenVDS
public:
Request(const std::string &objectName);
virtual ~Request();
virtual void WaitForFinish() = 0;
virtual bool IsDone() const = 0;
virtual bool IsSuccess(Error &error) const = 0;
virtual bool WaitForFinish(Error &error) = 0;
virtual void Cancel() = 0;
const std::string &GetObjectName() const { return m_objectName; }
......
......@@ -136,55 +136,28 @@ namespace OpenVDS
};
GetOrHeadRequestAWS::GetOrHeadRequestAWS(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
: Request(id)
: RequestImpl(id)
, m_handler(handler)
, m_cancelled(false)
, m_done(false)
{
}
GetOrHeadRequestAWS::~GetOrHeadRequestAWS()
{
GetOrHeadRequestAWS::Cancel();
}
void GetOrHeadRequestAWS::WaitForFinish()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_waitForFinish.wait(lock, [this]{ return m_done; });
}
bool GetOrHeadRequestAWS::IsDone() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_done;
}
bool GetOrHeadRequestAWS::IsSuccess(Error& error) const
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_done)
{
error.code = -1;
error.string = "Download not done.";
return false;
}
error = m_error;
return m_error.code == 0;
}
void GetOrHeadRequestAWS::Cancel()
{
m_cancelled = true;
}
static void readobjectinfo_callback(const Aws::S3::S3Client *client, const Aws::S3::Model::HeadObjectRequest& objreq, const Aws::S3::Model::HeadObjectOutcome &getObjectOutcome, std::weak_ptr<ReadObjectInfoRequestAWS> weak_request)
{
auto objReq = weak_request.lock();
if (!objReq || objReq->m_cancelled)
if (!objReq)
return;
std::unique_lock<std::mutex> lock(objReq->m_mutex, std::defer_lock);
RequestStateHandler requestStateHandler(*objReq);
if (requestStateHandler.isCancelledRequested())
{
return;
}
if (getObjectOutcome.IsSuccess())
{
Aws::S3::Model::HeadObjectResult result = const_cast<Aws::S3::Model::HeadObjectOutcome&>(getObjectOutcome).GetResultWithOwnership();
......@@ -199,20 +172,14 @@ namespace OpenVDS
{
objReq->m_handler->HandleMetadata(convertAwsString(it.first), convertAwsString(it.second));
}
lock.lock();
}
else
{
lock.lock();
auto s3error = getObjectOutcome.GetError();
objReq->m_error.code = int(s3error.GetResponseCode());
objReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
}
objReq->m_done = true;
objReq->m_waitForFinish.notify_all();
lock.unlock();
objReq->m_handler->Completed(*objReq, objReq->m_error);
}
......@@ -238,10 +205,15 @@ namespace OpenVDS
{
auto objReq = weak_request.lock();
if (!objReq || objReq->m_cancelled)
if (!objReq)
return;
std::unique_lock<std::mutex> lock(objReq->m_mutex, std::defer_lock);
RequestStateHandler requestStateHandler(*objReq);
if (requestStateHandler.isCancelledRequested())
{
return;
}
if (getObjectOutcome.IsSuccess())
{
Aws::S3::Model::GetObjectResult result = const_cast<Aws::S3::Model::GetObjectOutcome&>(getObjectOutcome).GetResultWithOwnership();
......@@ -266,19 +238,14 @@ namespace OpenVDS
retrieved_object.read((char*)&data[0], content_length);
objReq->m_handler->HandleData(std::move(data));
}
lock.lock();
}
else
{
lock.lock();
auto s3error = getObjectOutcome.GetError();
objReq->m_error.code = int(s3error.GetResponseCode());
objReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
}
objReq->m_done = true;
objReq->m_waitForFinish.notify_all();
lock.unlock();
objReq->m_handler->Completed(*objReq, objReq->m_error);
}
......@@ -309,7 +276,11 @@ namespace OpenVDS
if (!objReq || objReq->m_cancelled)
return;
std::unique_lock<std::mutex> lock(objReq->m_mutex);
RequestStateHandler requestStateHandler(*objReq);
if (requestStateHandler.isCancelledRequested())
{
return;
}
if (!outcome.IsSuccess())
{
auto s3error = outcome.GetError();
......@@ -317,19 +288,13 @@ namespace OpenVDS
objReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
}
objReq->m_done = true;
objReq->m_waitForFinish.notify_all();
Error error = objReq->m_error;
lock.unlock();
if (objReq->m_completedCallback)
objReq->m_completedCallback(*objReq, error);
objReq->m_completedCallback(*objReq, objReq->m_error);
}
UploadRequestAWS::UploadRequestAWS(const std::string& id, std::function<void(const Request & request, const Error & error)> completedCallback)
: Request(id)
: RequestImpl(id)
, m_completedCallback(completedCallback)
, m_cancelled(false)
, m_done(false)
{
}
......@@ -355,33 +320,6 @@ namespace OpenVDS
}
void UploadRequestAWS::WaitForFinish()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_waitForFinish.wait(lock, [this]{ return this->m_done; });
}
bool UploadRequestAWS::IsDone() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_done;
}
bool UploadRequestAWS::IsSuccess(Error& error) const
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_done)
{
error.code = -1;
error.string = "Download not done.";
return false;
}
error = m_error;
return m_error.code == 0;
}
void UploadRequestAWS::Cancel()
{
m_cancelled = true;
}
IOManagerAWS::IOManagerAWS(const AWSOpenOptions& openOptions, Error &error)
: IOManager(OpenOptions::AWS)
, m_region(openOptions.region)
......
......@@ -19,6 +19,7 @@
#define IOMANAGERAWS_H
#include "IOManager.h"
#include "IOManagerRequestImpl.h"
#include <vector>
#include <string>
......@@ -31,24 +32,13 @@ namespace OpenVDS
using AsyncDownloadContext = AsyncContext<DownloadRequestAWS>;
using AsyncUploadContext = AsyncContext<UploadRequestAWS>;
class GetOrHeadRequestAWS : public Request
class GetOrHeadRequestAWS : public RequestImpl
{
public:
GetOrHeadRequestAWS(const std::string &id, const std::shared_ptr<TransferDownloadHandler>& handler);
~GetOrHeadRequestAWS() override;
void WaitForFinish() override;
bool IsDone() const override;
bool IsSuccess(Error &error) const override;
void Cancel() override;
std::shared_ptr<TransferDownloadHandler> m_handler;
std::atomic_bool m_cancelled;
bool m_done;
Error m_error;
std::condition_variable m_waitForFinish;
mutable std::mutex m_mutex;
};
class ReadObjectInfoRequestAWS : public GetOrHeadRequestAWS
......@@ -86,23 +76,14 @@ namespace OpenVDS
VectorBuf m_buffer;
};
class UploadRequestAWS : public Request
class UploadRequestAWS : public RequestImpl
{
public:
UploadRequestAWS(const std::string &id, std::function<void(const Request & request, const Error & error)> completedCallback);
void run(Aws::S3::S3Client& client, const std::string& bucket, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::weak_ptr<UploadRequestAWS> uploadRequest);
void WaitForFinish() override;
bool IsDone() const override;
bool IsSuccess(Error &error) const override;
void Cancel() override;
std::function<void(const Request &request, const Error &error)> m_completedCallback;
std::shared_ptr<IOStream> m_stream;
std::atomic_bool m_cancelled;
bool m_done;
Error m_error;
std::condition_variable m_waitForFinish;
mutable std::mutex m_mutex;
};
class IOManagerAWS : public IOManager
......
......@@ -71,51 +71,19 @@ static std::string convertFromUtilString(const utility::string_t& str)
#endif
GetHeadRequestAzure::GetHeadRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
: Request(id)
: RequestImpl(id)
, m_handler(handler)
, m_cancelled(false)
, m_done(false)
{
}
GetHeadRequestAzure::~GetHeadRequestAzure()
{
GetHeadRequestAzure::Cancel();
}
void GetHeadRequestAzure::WaitForFinish()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_waitForFinish.wait(lock, [this]
{
return m_done;
});
}
bool GetHeadRequestAzure::IsDone() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_done;
}
bool GetHeadRequestAzure::IsSuccess(Error& error) const
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_done)
{
error.code = -1;
error.string = "GetHead not done.";
return false;
}
error = m_error;
return m_error.code == 0;
}
void GetHeadRequestAzure::Cancel()
{
//m_cancelTokenSrc.cancel();
m_cancelled = true;
m_cancelTokenSrc.cancel();
RequestImpl::Cancel();
}
ReadObjectInfoRequestAzure::ReadObjectInfoRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
......@@ -142,30 +110,25 @@ void ReadObjectInfoRequestAzure::run(azure::storage::cloud_blob_container& conta
auto readObjectRequest = request.lock();
if (!readObjectRequest)
return;
RequestStateHandler requestStateHandler(*readObjectRequest);
if (requestStateHandler.isCancelledRequested())
{
return;
}
try
{
// when the task is completed
task.get();
m_handler->HandleObjectSize(m_blob.properties().size());
if (auto tmp = request.lock())
{
m_handler->HandleObjectSize(m_blob.properties().size());
m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));
m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));
// send metadata one at a time to the metadata handler
for (auto it : m_blob.metadata())
{
m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
}
// declare success and set completion status
m_error.code = 0;
m_done = true;
m_waitForFinish.notify_all();
m_handler->Completed(*this, m_error);
// send metadata one at a time to the metadata handler
for (auto it : m_blob.metadata())
{
m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
}
}
catch (const azure::storage::storage_exception & e)
{
......@@ -173,10 +136,8 @@ void ReadObjectInfoRequestAzure::run(azure::storage::cloud_blob_container& conta
ucout << _XPLATSTR("Error message is: ") << e.what() << std::endl;
m_error.code = -1;
m_error.string = e.what();
m_done = true;
m_waitForFinish.notify_all();
m_handler->Completed(*this, m_error);
}
m_handler->Completed(*this, m_error);
});
}
......@@ -211,6 +172,11 @@ void DownloadRequestAzure::run(azure::storage::cloud_blob_container& container,
auto downloadRequest = request.lock();
if (!downloadRequest)
return;
RequestStateHandler requestStateHandler(*downloadRequest);
if (requestStateHandler.isCancelledRequested())
{
return;
}
try
{
// when the task is completed
......@@ -222,27 +188,17 @@ void DownloadRequestAzure::run(azure::storage::cloud_blob_container& container,
if (m_context.request_results().size() == 2 && m_context.request_results()[0].http_status_code() == 416 && m_requestedRange.start == 0 && m_requestedRange.end == 0)
data.clear();
if (auto tmp = request.lock())
m_handler->HandleObjectSize(m_blob.properties().size());
m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));
// send metadata one at a time to the metadata handler
for (auto it : m_blob.metadata())
{
m_handler->HandleObjectSize(m_blob.properties().size());
m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));
// send metadata one at a time to the metadata handler
for (auto it : m_blob.metadata())
{
m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
}
// send data to the data handler
m_handler->HandleData(std::move(data));
// declare success and set completion status
m_error.code = 0;
m_done = true;
m_waitForFinish.notify_all();
m_handler->Completed(*this, m_error);
m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
}
// send data to the data handler
m_handler->HandleData(std::move(data));
}
catch (const azure::storage::storage_exception & e)
{
......@@ -250,18 +206,14 @@ void DownloadRequestAzure::run(azure::storage::cloud_blob_container& container,
ucout << _XPLATSTR("Error message is: ") << e.what() << std::endl;
m_error.code = -1;
m_error.string = e.what();
m_done = true;
m_waitForFinish.notify_all();
m_handler->Completed(*this, m_error);
}
m_handler->Completed(*this, m_error);
});
}
UploadRequestAzure::UploadRequestAzure(const std::string& id, std::function<void(const Request & request, const Error & error)> completedCallback)
: Request(id)
: RequestImpl(id)
, m_completedCallback(completedCallback)
, m_cancelled(false)
, m_done(false)
{
}
......@@ -293,16 +245,15 @@ void UploadRequestAzure::run(azure::storage::cloud_blob_container& container, az
auto request = uploadRequest.lock();
if (!request)
return;
RequestStateHandler requestStateHandler(*request);
if (requestStateHandler.isCancelledRequested())
{
return;
}
try
{
uploadTask.get();
m_data.reset();
m_error.code = 0;
m_done = true;
m_waitForFinish.notify_all();
if (m_completedCallback) m_completedCallback(*this, m_error);
}
catch (azure::storage::storage_exception & e)
{
......@@ -311,45 +262,16 @@ void UploadRequestAzure::run(azure::storage::cloud_blob_container& container, az
ex_msg = std::string(e.what());
m_error.code = -1;
m_error.string = ex_msg;
m_done = true;
m_waitForFinish.notify_all();
if (m_completedCallback) m_completedCallback(*this, m_error);
}
});
}
void UploadRequestAzure::WaitForFinish()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_waitForFinish.wait(lock, [this]
{
return this->m_done;
if (m_completedCallback)
m_completedCallback(*this, m_error);
});
}
bool UploadRequestAzure::IsDone() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_done;
}
bool UploadRequestAzure::IsSuccess(Error& error) const
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_done)
{
error.code = -1;
error.string = "Upload not done.";
return false;
}
error = m_error;
return m_error.code == 0;
}
void UploadRequestAzure::Cancel()
{
m_cancelled = true;
RequestImpl::Cancel();
m_cancelTokenSrc.cancel();
}
......
......@@ -22,6 +22,7 @@
//#include "IOManagerAzure.h"
#include "IOManager.h"
#include "IOManagerRequestImpl.h"
#include <vector>
#include <string>
......@@ -34,23 +35,15 @@
namespace OpenVDS
{
class GetHeadRequestAzure : public Request
class GetHeadRequestAzure : public RequestImpl
{
public:
GetHeadRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler);
~GetHeadRequestAzure() override;
void WaitForFinish() override;
bool IsDone() const override;
bool IsSuccess(Error& error) const override;
void Cancel() override;
std::shared_ptr<TransferDownloadHandler> m_handler;
std::atomic_bool m_cancelled;
bool m_done;
Error m_error;
std::condition_variable m_waitForFinish;
mutable std::mutex m_mutex;
azure::storage::cloud_block_blob m_blob;
pplx::cancellation_token_source m_cancelTokenSrc;
pplx::task<void> m_task;
......@@ -77,23 +70,15 @@ namespace OpenVDS
IORange m_requestedRange;
};
class UploadRequestAzure : public Request
class UploadRequestAzure : public RequestImpl
{
public:
UploadRequestAzure(const std::string& id, std::function<void(const Request & request, const Error & error)> completedCallback);
void run(azure::storage::cloud_blob_container& container, azure::storage::blob_request_options options, const std::string& requestName, const std::string& contentDispositionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::weak_ptr<UploadRequestAzure> uploadRequest);
void WaitForFinish() override;
bool IsDone() const override;
bool IsSuccess(Error& error) const override;
void Cancel() override;
std::function<void(const Request & request, const Error & error)> m_completedCallback;
std::shared_ptr<std::vector<uint8_t>> m_data;
std::atomic_bool m_cancelled;
bool m_done;
Error m_error;
std::condition_variable m_waitForFinish;
mutable std::mutex m_mutex;
azure::storage::cloud_block_blob m_blob;
azure::storage::operation_context m_context;
pplx::cancellation_token_source m_cancelTokenSrc;
......
......@@ -236,6 +236,11 @@ static void cancelledDownloadCB(uv_async_t *handle)
}
}
curl_easy_cleanup(cancelled->curlEasy);