Commit dc3cc61e authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Update IOManager API

parent 9b1c65c1
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
namespace OpenVDS namespace OpenVDS
{ {
ObjectRequester::~ObjectRequester() Request::~Request()
{} {}
TransferHandler::~TransferHandler() TransferHandler::~TransferHandler()
{} {}
......
...@@ -32,10 +32,10 @@ namespace OpenVDS ...@@ -32,10 +32,10 @@ namespace OpenVDS
virtual void handleError(Error &error) = 0; virtual void handleError(Error &error) = 0;
}; };
class ObjectRequester class Request
{ {
public: public:
virtual ~ObjectRequester(); virtual ~Request();
virtual void waitForFinish() = 0; virtual void waitForFinish() = 0;
virtual bool isDone() const = 0; virtual bool isDone() const = 0;
virtual bool isSuccess(Error &error) const = 0; virtual bool isSuccess(Error &error) const = 0;
...@@ -52,7 +52,8 @@ namespace OpenVDS ...@@ -52,7 +52,8 @@ namespace OpenVDS
{ {
public: public:
virtual ~IOManager(); virtual ~IOManager();
virtual std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) = 0; virtual std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) = 0;
virtual std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange &range = IORange()) = 0;
static IOManager *createIOManager(const OpenOptions &options, Error &error); static IOManager *createIOManager(const OpenOptions &options, Error &error);
}; };
......
...@@ -63,10 +63,10 @@ namespace OpenVDS ...@@ -63,10 +63,10 @@ namespace OpenVDS
struct AsyncCallerContext struct AsyncCallerContext
{ {
AsyncCallerContext(ObjectRequesterAWS *back) AsyncCallerContext(DownloadRequestAWS *back)
: back(back) : back(back)
{} {}
ObjectRequesterAWS *back; DownloadRequestAWS *back;
std::mutex mutex; std::mutex mutex;
}; };
...@@ -117,7 +117,7 @@ namespace OpenVDS ...@@ -117,7 +117,7 @@ namespace OpenVDS
} }
} }
ObjectRequesterAWS::ObjectRequesterAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler, const IORange &range) DownloadRequestAWS::DownloadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler, const IORange &range)
: m_handler(handler) : m_handler(handler)
, m_context(std::make_shared<AsyncCallerContext>(this)) , m_context(std::make_shared<AsyncCallerContext>(this))
, m_done(false) , m_done(false)
...@@ -136,31 +136,31 @@ namespace OpenVDS ...@@ -136,31 +136,31 @@ namespace OpenVDS
client.GetObjectAsync(object_request, bounded_callback); client.GetObjectAsync(object_request, bounded_callback);
} }
ObjectRequesterAWS::~ObjectRequesterAWS() DownloadRequestAWS::~DownloadRequestAWS()
{ {
cancel(); cancel();
} }
void ObjectRequesterAWS::waitForFinish() void DownloadRequestAWS::waitForFinish()
{ {
std::unique_lock<std::mutex> lock(m_context->mutex); std::unique_lock<std::mutex> lock(m_context->mutex);
if (m_done) if (m_done)
return; return;
m_waitForFinish.wait(lock); m_waitForFinish.wait(lock);
} }
bool ObjectRequesterAWS::isDone() const bool DownloadRequestAWS::isDone() const
{ {
std::unique_lock<std::mutex> lock(m_context->mutex); std::unique_lock<std::mutex> lock(m_context->mutex);
return m_done; return m_done;
} }
bool ObjectRequesterAWS::isSuccess(Error& error) const bool DownloadRequestAWS::isSuccess(Error& error) const
{ {
std::unique_lock<std::mutex> lock(m_context->mutex); std::unique_lock<std::mutex> lock(m_context->mutex);
error = m_error; error = m_error;
return m_error.code == 0; return m_error.code == 0;
} }
void ObjectRequesterAWS::cancel() void DownloadRequestAWS::cancel()
{ {
std::unique_lock<std::mutex> lock(m_context->mutex); std::unique_lock<std::mutex> lock(m_context->mutex);
m_context->back = nullptr; m_context->back = nullptr;
...@@ -192,9 +192,15 @@ namespace OpenVDS ...@@ -192,9 +192,15 @@ namespace OpenVDS
deinitizlieAWSSDK(); deinitizlieAWSSDK();
} }
std::shared_ptr<ObjectRequester> IOManagerAWS::requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range) std::shared_ptr<Request> IOManagerAWS::downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range)
{ {
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName; std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::make_shared<ObjectRequesterAWS>(*m_s3Client.get(), m_bucket, id, handler, range); return std::make_shared<DownloadRequestAWS>(*m_s3Client.get(), m_bucket, id, handler, range);
}
std::shared_ptr<Request> IOManagerAWS::uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range)
{
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::shared_ptr<Request>();
} }
} }
...@@ -26,11 +26,11 @@ ...@@ -26,11 +26,11 @@
namespace OpenVDS namespace OpenVDS
{ {
struct AsyncCallerContext; struct AsyncCallerContext;
class ObjectRequesterAWS : public ObjectRequester class DownloadRequestAWS : public Request
{ {
public: public:
ObjectRequesterAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler, const IORange &range); DownloadRequestAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler, const IORange &range);
~ObjectRequesterAWS(); ~DownloadRequestAWS();
void waitForFinish() override; void waitForFinish() override;
bool isDone() const override; bool isDone() const override;
...@@ -50,7 +50,8 @@ namespace OpenVDS ...@@ -50,7 +50,8 @@ namespace OpenVDS
IOManagerAWS(const AWSOpenOptions &openOptions, Error &error); IOManagerAWS(const AWSOpenOptions &openOptions, Error &error);
~IOManagerAWS(); ~IOManagerAWS();
std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) override; std::shared_ptr<Request> downloadObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) override;
std::shared_ptr<Request> uploadObject(const std::string objectName, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range = IORange()) override;
private: private:
std::string m_region; std::string m_region;
std::string m_bucket; std::string m_bucket;
......
...@@ -118,7 +118,7 @@ void MetadataManager::initiateTransfer(VolumeDataAccessManagerImpl *accessManage ...@@ -118,7 +118,7 @@ void MetadataManager::initiateTransfer(VolumeDataAccessManagerImpl *accessManage
assert(!page->m_valid && !page->m_activeTransfer); assert(!page->m_valid && !page->m_activeTransfer);
page->m_activeTransfer = m_iomanager->requestObject(url, std::make_shared<MetadataPageTransfer>(this, accessManager, page)); page->m_activeTransfer = m_iomanager->downloadObject(url, std::make_shared<MetadataPageTransfer>(this, accessManager, page));
} }
void MetadataManager::pageTransferError(MetadataPage* page, const char* msg) void MetadataManager::pageTransferError(MetadataPage* page, const char* msg)
......
...@@ -51,7 +51,7 @@ namespace OpenVDS ...@@ -51,7 +51,7 @@ namespace OpenVDS
int64_t m_adaptiveLevelSizes[WAVELET_ADAPTIVE_LEVELS]; int64_t m_adaptiveLevelSizes[WAVELET_ADAPTIVE_LEVELS];
}; };
class ObjectRequester; class Request;
class MetadataManager; class MetadataManager;
class MetadataPage class MetadataPage
{ {
...@@ -63,7 +63,7 @@ namespace OpenVDS ...@@ -63,7 +63,7 @@ namespace OpenVDS
bool m_valid; bool m_valid;
int m_lockCount; int m_lockCount;
std::shared_ptr<ObjectRequester> m_activeTransfer; std::shared_ptr<Request> m_activeTransfer;
std::vector<uint8_t> m_data; std::vector<uint8_t> m_data;
public: public:
......
...@@ -1035,7 +1035,7 @@ bool downloadAndParseVDSJson(VDSHandle& handle, Error& error) ...@@ -1035,7 +1035,7 @@ bool downloadAndParseVDSJson(VDSHandle& handle, Error& error)
std::shared_ptr<SyncTransferHandler> syncTransferHandler = std::make_shared<SyncTransferHandler>(); std::shared_ptr<SyncTransferHandler> syncTransferHandler = std::make_shared<SyncTransferHandler>();
syncTransferHandler->error = &error; syncTransferHandler->error = &error;
syncTransferHandler->data = &volumedatalayout_json; syncTransferHandler->data = &volumedatalayout_json;
auto req = handle.ioManager->requestObject("VolumeDataLayout", syncTransferHandler); auto req = handle.ioManager->downloadObject("VolumeDataLayout", syncTransferHandler);
req->waitForFinish(); req->waitForFinish();
if (!req->isSuccess(error) || volumedatalayout_json.empty()) if (!req->isSuccess(error) || volumedatalayout_json.empty())
{ {
...@@ -1044,7 +1044,7 @@ bool downloadAndParseVDSJson(VDSHandle& handle, Error& error) ...@@ -1044,7 +1044,7 @@ bool downloadAndParseVDSJson(VDSHandle& handle, Error& error)
} }
std::vector<uint8_t> layerstatus_json; std::vector<uint8_t> layerstatus_json;
syncTransferHandler->data = &layerstatus_json; syncTransferHandler->data = &layerstatus_json;
req = handle.ioManager->requestObject("LayerStatus", syncTransferHandler); req = handle.ioManager->downloadObject("LayerStatus", syncTransferHandler);
req->waitForFinish(); req->waitForFinish();
if (!req->isSuccess(error) || layerstatus_json.empty()) if (!req->isSuccess(error) || layerstatus_json.empty())
{ {
......
...@@ -370,7 +370,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch ...@@ -370,7 +370,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
lock.lock(); lock.lock();
auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel); auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
m_pendingRequests[chunk] = PendingRequest(m_ioManager->requestObject(url, transferHandler, ioRange), transferHandler); m_pendingRequests[chunk] = PendingRequest(m_ioManager->downloadObject(url, transferHandler, ioRange), transferHandler);
return true; return true;
} }
...@@ -460,7 +460,7 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa ...@@ -460,7 +460,7 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
std::string url = makeURLForChunk(metadataManager->layerUrlStr(), volumeDataChunk.chunkIndex); std::string url = makeURLForChunk(metadataManager->layerUrlStr(), volumeDataChunk.chunkIndex);
auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel); auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
pendingRequest.m_activeTransfer = m_ioManager->requestObject(url, transferHandler, ioRange); pendingRequest.m_activeTransfer = m_ioManager->downloadObject(url, transferHandler, ioRange);
pendingRequest.m_transferHandle = transferHandler; pendingRequest.m_transferHandle = transferHandler;
} }
} }
......
...@@ -81,7 +81,7 @@ struct PendingRequest ...@@ -81,7 +81,7 @@ struct PendingRequest
{ {
MetadataPage* m_lockedMetadataPage; MetadataPage* m_lockedMetadataPage;
std::shared_ptr<ObjectRequester> m_activeTransfer; std::shared_ptr<Request> m_activeTransfer;
std::shared_ptr<ReadChunkTransfer> m_transferHandle; std::shared_ptr<ReadChunkTransfer> m_transferHandle;
PendingRequest() : m_lockedMetadataPage(nullptr) PendingRequest() : m_lockedMetadataPage(nullptr)
...@@ -91,7 +91,7 @@ struct PendingRequest ...@@ -91,7 +91,7 @@ struct PendingRequest
explicit PendingRequest(MetadataPage* lockedMetadataPage) : m_lockedMetadataPage(lockedMetadataPage), m_activeTransfer(nullptr) explicit PendingRequest(MetadataPage* lockedMetadataPage) : m_lockedMetadataPage(lockedMetadataPage), m_activeTransfer(nullptr)
{ {
} }
explicit PendingRequest(std::shared_ptr<ObjectRequester> activeTransfer, std::shared_ptr<ReadChunkTransfer> handler) : m_lockedMetadataPage(nullptr), m_activeTransfer(activeTransfer), m_transferHandle(handler) explicit PendingRequest(std::shared_ptr<Request> activeTransfer, std::shared_ptr<ReadChunkTransfer> handler) : m_lockedMetadataPage(nullptr), m_activeTransfer(activeTransfer), m_transferHandle(handler)
{ {
} }
}; };
...@@ -180,7 +180,8 @@ public: ...@@ -180,7 +180,8 @@ public:
bool prepareReadChunkData(const VolumeDataChunk& chunk, bool verbose, Error& error); bool prepareReadChunkData(const VolumeDataChunk& chunk, bool verbose, Error& error);
bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error); bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error);
void pageTransferCompleted(MetadataPage* page); void pageTransferCompleted(MetadataPage* page);
IOManager *getIoManager() const { return m_ioManager; }
private: private:
VolumeDataLayout *m_layout; VolumeDataLayout *m_layout;
IOManager *m_ioManager; IOManager *m_ioManager;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "VolumeDataPageAccessorImpl.h" #include "VolumeDataPageAccessorImpl.h"
#include "VolumeDataChunk.h" #include "VolumeDataChunk.h"
#include "VolumeDataLayer.h" #include "VolumeDataLayer.h"
#include "VolumeDataAccessManagerImpl.h"
#include <OpenVDS/VolumeDataChannelDescriptor.h> #include <OpenVDS/VolumeDataChannelDescriptor.h>
#include <algorithm> #include <algorithm>
...@@ -175,8 +176,9 @@ void VolumeDataPageImpl::setBufferData(std::vector<uint8_t>&& blob, const int(&p ...@@ -175,8 +176,9 @@ void VolumeDataPageImpl::setBufferData(std::vector<uint8_t>&& blob, const int(&p
void VolumeDataPageImpl::writeBack(VolumeDataLayer* volumeDataLayer, std::unique_lock<std::mutex>& pageListMutexLock) void VolumeDataPageImpl::writeBack(VolumeDataLayer* volumeDataLayer, std::unique_lock<std::mutex>& pageListMutexLock)
{ {
assert(m_isDirty); assert(m_isDirty);
IOManager *iomanager = m_volumeDataPageAccessor->getManager()->getIoManager();
//iomanager->
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment