Commit 0a1e8564 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Rework the IOManager apis to also have callback for completed uploads

parent 1263c353
...@@ -28,10 +28,13 @@ Request::Request(const std::string& objectName) ...@@ -28,10 +28,13 @@ Request::Request(const std::string& objectName)
Request::~Request() Request::~Request()
{} {}
TransferHandler::~TransferHandler() TransferDownloadHandler::~TransferDownloadHandler()
{} {
void TransferHandler::handleMetadata(const std::string& key, const std::string& header) }
{} void TransferDownloadHandler::handleMetadata(const std::string& key, const std::string& header)
{
}
IOManager::~IOManager() IOManager::~IOManager()
{} {}
IOManager* IOManager::createIOManager(const OpenOptions& options, Error &error) IOManager* IOManager::createIOManager(const OpenOptions& options, Error &error)
......
...@@ -18,18 +18,22 @@ ...@@ -18,18 +18,22 @@
#ifndef IOMANAGER_H #ifndef IOMANAGER_H
#define IOMANAGER_H #define IOMANAGER_H
#include <memory>
#include <OpenVDS/OpenVDS.h> #include <OpenVDS/OpenVDS.h>
#include <memory>
#include <map>
#include <functional>
namespace OpenVDS namespace OpenVDS
{ {
class TransferHandler class Request;
class TransferDownloadHandler
{ {
public: public:
virtual ~TransferHandler(); virtual ~TransferDownloadHandler();
virtual void handleMetadata(const std::string &key, const std::string &header); virtual void handleMetadata(const std::string &key, const std::string &header);
virtual void handleData(std::vector<uint8_t> &&data) = 0; virtual void handleData(std::vector<uint8_t> &&data) = 0;
virtual void handleError(Error &error) = 0; virtual void completed(const Request &request, const Error &error) = 0;
}; };
class Request class Request
...@@ -42,6 +46,7 @@ namespace OpenVDS ...@@ -42,6 +46,7 @@ namespace OpenVDS
virtual bool isSuccess(Error &error) const = 0; virtual bool isSuccess(Error &error) const = 0;
virtual void cancel() = 0; virtual void cancel() = 0;
const std::string &getObjectName() const { return m_objectName; } const std::string &getObjectName() const { return m_objectName; }
private: private:
std::string m_objectName; std::string m_objectName;
}; };
...@@ -56,8 +61,13 @@ namespace OpenVDS ...@@ -56,8 +61,13 @@ namespace OpenVDS
{ {
public: public:
virtual ~IOManager(); virtual ~IOManager();
virtual std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) = 0; virtual std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range = IORange()) = 0;
virtual std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange &range = IORange()) = 0; virtual std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const std::map<std::string, std::string> &metadataHeader, std::function<void(const Request &request, const Error &error)> completedCallback = nullptr) = 0;
std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request &request, const Error &error)> completedCallback = nullptr)
{
return uploadObject(objectName, data, std::map<std::string, std::string>(), completedCallback);
}
static IOManager *createIOManager(const OpenOptions &options, Error &error); static IOManager *createIOManager(const OpenOptions &options, Error &error);
}; };
......
...@@ -111,7 +111,7 @@ namespace OpenVDS ...@@ -111,7 +111,7 @@ namespace OpenVDS
auto s3error = getObjectOutcome.GetError(); auto s3error = getObjectOutcome.GetError();
objReq->m_error.code = int(s3error.GetResponseCode()); objReq->m_error.code = int(s3error.GetResponseCode());
objReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str(); objReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
objReq->m_handler->handleError(objReq->m_error); objReq->m_handler->completed(*objReq, objReq->m_error);
return; return;
} }
...@@ -130,9 +130,10 @@ namespace OpenVDS ...@@ -130,9 +130,10 @@ namespace OpenVDS
retrieved_object.read((char *)&data[0], content_length); retrieved_object.read((char *)&data[0], content_length);
objReq->m_handler->handleData(std::move(data)); objReq->m_handler->handleData(std::move(data));
} }
objReq->m_handler->completed(*objReq, objReq->m_error);
} }
DownloadRequestAWS::DownloadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler, const IORange &range) DownloadRequestAWS::DownloadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler, const IORange &range)
: Request(id) : Request(id)
, m_handler(handler) , m_handler(handler)
, m_context(std::make_shared<AsyncDownloadContext>(this)) , m_context(std::make_shared<AsyncDownloadContext>(this))
...@@ -197,12 +198,16 @@ namespace OpenVDS ...@@ -197,12 +198,16 @@ namespace OpenVDS
uploadReq->m_error.code = int(s3error.GetResponseCode()); uploadReq->m_error.code = int(s3error.GetResponseCode());
uploadReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str(); uploadReq->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
} }
if (uploadReq->m_completedCallback)
uploadReq->m_completedCallback(*uploadReq, uploadReq->m_error);
} }
UploadRequestAWS::UploadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range) UploadRequestAWS::UploadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, std::shared_ptr<std::vector<uint8_t>> data, const std::map<std::string, std::string>& metadataHeader, std::function<void(const Request & request, const Error & error)> completedCallback)
: Request(id) : Request(id)
, m_context(std::make_shared<AsyncUploadContext>(this)) , m_context(std::make_shared<AsyncUploadContext>(this))
, m_data(data) , m_data(data)
, m_metadataHeader(metadataHeader)
, m_completedCallback(completedCallback)
, m_vectorBuf(*data) , m_vectorBuf(*data)
, m_stream(std::make_shared<Aws::IOStream>(&m_vectorBuf)) , m_stream(std::make_shared<Aws::IOStream>(&m_vectorBuf))
, m_done(false) , m_done(false)
...@@ -213,14 +218,6 @@ namespace OpenVDS ...@@ -213,14 +218,6 @@ namespace OpenVDS
put.SetBody(m_stream); put.SetBody(m_stream);
put.SetContentType("binary/octet-stream"); put.SetContentType("binary/octet-stream");
put.SetContentLength(data->size()); put.SetContentLength(data->size());
if (range.end)
{
assert(false);
//Have to use the Multi upload api instead. Maybe use TransferManager?
// char rangeHeaderBuffer[100];
// snprintf(rangeHeaderBuffer, sizeof(rangeHeaderBuffer), "bytes=%zu-%zu", range.start, range.end);
// object_request.SetRange(rangeHeaderBuffer);
}
using namespace std::placeholders; using namespace std::placeholders;
auto bounded_callback = std::bind(&upload_callback, _1, _2, _3, _4, m_context); auto bounded_callback = std::bind(&upload_callback, _1, _2, _3, _4, m_context);
...@@ -277,15 +274,15 @@ namespace OpenVDS ...@@ -277,15 +274,15 @@ namespace OpenVDS
deinitizlieAWSSDK(); deinitizlieAWSSDK();
} }
std::shared_ptr<Request> IOManagerAWS::downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range) std::shared_ptr<Request> IOManagerAWS::downloadObject(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range)
{ {
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName; std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::make_shared<DownloadRequestAWS>(*m_s3Client.get(), m_bucket, id, handler, range); return std::make_shared<DownloadRequestAWS>(*m_s3Client.get(), m_bucket, id, handler, range);
} }
std::shared_ptr<Request> IOManagerAWS::uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range) std::shared_ptr<Request> IOManagerAWS::uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const std::map<std::string, std::string>& metadataHeader, std::function<void(const Request &request, const Error &error)> completedCallback)
{ {
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName; std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::make_shared<UploadRequestAWS>(*m_s3Client.get(), m_bucket, id, data, range); return std::make_shared<UploadRequestAWS>(*m_s3Client.get(), m_bucket, id, data, metadataHeader, completedCallback);
} }
} }
...@@ -34,7 +34,7 @@ namespace OpenVDS ...@@ -34,7 +34,7 @@ namespace OpenVDS
class DownloadRequestAWS : public Request class DownloadRequestAWS : public Request
{ {
public: public:
DownloadRequestAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler, const IORange &range); DownloadRequestAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferDownloadHandler> &handler, const IORange &range);
~DownloadRequestAWS() override; ~DownloadRequestAWS() override;
void waitForFinish() override; void waitForFinish() override;
...@@ -42,7 +42,7 @@ namespace OpenVDS ...@@ -42,7 +42,7 @@ namespace OpenVDS
bool isSuccess(Error &error) const override; bool isSuccess(Error &error) const override;
void cancel() override; void cancel() override;
std::shared_ptr<TransferHandler> m_handler; std::shared_ptr<TransferDownloadHandler> m_handler;
std::shared_ptr<AsyncDownloadContext> m_context; std::shared_ptr<AsyncDownloadContext> m_context;
Error m_error; Error m_error;
bool m_done; bool m_done;
...@@ -61,7 +61,7 @@ namespace OpenVDS ...@@ -61,7 +61,7 @@ namespace OpenVDS
class UploadRequestAWS : public Request class UploadRequestAWS : public Request
{ {
public: public:
UploadRequestAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, std::shared_ptr<std::vector<uint8_t>> data, const IORange &range); UploadRequestAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, std::shared_ptr<std::vector<uint8_t>> data, const std::map<std::string, std::string>& metadataHeader, std::function<void(const Request & request, const Error & error)> completedCallback);
void waitForFinish() override; void waitForFinish() override;
bool isDone() const override; bool isDone() const override;
bool isSuccess(Error &error) const override; bool isSuccess(Error &error) const override;
...@@ -69,6 +69,8 @@ namespace OpenVDS ...@@ -69,6 +69,8 @@ namespace OpenVDS
std::shared_ptr<AsyncUploadContext> m_context; std::shared_ptr<AsyncUploadContext> m_context;
std::shared_ptr<std::vector<uint8_t>> m_data; std::shared_ptr<std::vector<uint8_t>> m_data;
std::map<std::string, std::string> m_metadataHeader;
std::function<void(const Request &request, const Error &error)> m_completedCallback;
VectorBuf m_vectorBuf; VectorBuf m_vectorBuf;
std::shared_ptr<Aws::IOStream> m_stream; std::shared_ptr<Aws::IOStream> m_stream;
Error m_error; Error m_error;
...@@ -82,8 +84,8 @@ namespace OpenVDS ...@@ -82,8 +84,8 @@ namespace OpenVDS
IOManagerAWS(const AWSOpenOptions &openOptions, Error &error); IOManagerAWS(const AWSOpenOptions &openOptions, Error &error);
~IOManagerAWS() override; ~IOManagerAWS() override;
std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) override; std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range = IORange()) override;
std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range = IORange()) override; std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const std::map<std::string, std::string>& metadataHeader, std::function<void(const Request &request, const Error &error)> completedCallback) override;
private: private:
std::string m_region; std::string m_region;
std::string m_bucket; std::string m_bucket;
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
namespace OpenVDS namespace OpenVDS
{ {
class MetadataPageTransfer : public TransferHandler class MetadataPageTransfer : public TransferDownloadHandler
{ {
public: public:
MetadataPageTransfer(MetadataManager *manager, VolumeDataAccessManagerImpl *accessManager, MetadataPage *metadataPage) MetadataPageTransfer(MetadataManager *manager, VolumeDataAccessManagerImpl *accessManager, MetadataPage *metadataPage)
...@@ -38,7 +38,7 @@ void handleData(std::vector<uint8_t> &&data) override ...@@ -38,7 +38,7 @@ void handleData(std::vector<uint8_t> &&data) override
manager->pageTransferCompleted(accessManager, metadataPage, std::move(data)); manager->pageTransferCompleted(accessManager, metadataPage, std::move(data));
} }
void handleError(Error &error) override void completed(const Request &request, const Error &error) override
{ {
} }
......
...@@ -1130,14 +1130,14 @@ writeJson(Json::Value root) ...@@ -1130,14 +1130,14 @@ writeJson(Json::Value root)
return result; return result;
} }
class SyncTransferHandler : public TransferHandler class SyncTransferHandler : public TransferDownloadHandler
{ {
public: public:
void handleData(std::vector<uint8_t> &&data) override void handleData(std::vector<uint8_t> &&data) override
{ {
*(this->data) = data; *(this->data) = data;
} }
void handleError(Error &error) override void completed(const Request &request, const Error &error) override
{ {
*(this->error) = error; *(this->error) = error;
} }
......
...@@ -35,7 +35,7 @@ namespace OpenVDS ...@@ -35,7 +35,7 @@ namespace OpenVDS
class LayerMetadataContainer; class LayerMetadataContainer;
class MetadataPage; class MetadataPage;
class ReadChunkTransfer : public TransferHandler class ReadChunkTransfer : public TransferDownloadHandler
{ {
public: public:
ReadChunkTransfer(CompressionMethod compressionMethod, int adaptiveLevel) ReadChunkTransfer(CompressionMethod compressionMethod, int adaptiveLevel)
...@@ -63,7 +63,8 @@ public: ...@@ -63,7 +63,8 @@ public:
{ {
m_data = data; m_data = data;
} }
void handleError(Error& error) override
void completed(const Request &req, const Error & error) override
{ {
m_error = error; m_error = error;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment