Commit f9865f15 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Update AccessManager

parent d9d92d10
......@@ -46,13 +46,20 @@ VDSHandle *open(const OpenOptions &options, Error &error)
return ret.release();
}
VolumeDataLayout *layout(VDSHandle *handle)
VolumeDataLayout *getLayout(VDSHandle *handle)
{
if (!handle)
return nullptr;
return handle->volumeDataLayout.get();
}
VolumeDataAccessManager *getDataAccessManager(VDSHandle *handle)
{
if (!handle)
return nullptr;
return handle->dataAccessManager.get();
}
const char *addDescriptorString(std::string const &descriptorString, VDSHandle &handle)
{
char *data = new char[descriptorString.size() + 1];
......
......@@ -77,8 +77,8 @@ OPENVDS_EXPORT VDSHandle* open(const OpenOptions& options, Error &error);
OPENVDS_EXPORT VDSHandle* create(const OpenOptions& options, VolumeDataLayoutDescriptor const &layoutDescriptor, std::vector<VolumeDataAxisDescriptor> const &axisDescriptors, std::vector<VolumeDataChannelDescriptor> const &channelDescriptors, MetadataContainer const &metadataContainer, Error &error);
OPENVDS_EXPORT void destroy(VDSHandle *handle);
OPENVDS_EXPORT VolumeDataLayout *layout(VDSHandle *handle);
OPENVDS_EXPORT VolumeDataAccessManager *dataAccessManager(VDSHandle *handle);
OPENVDS_EXPORT VolumeDataLayout *getLayout(VDSHandle *handle);
OPENVDS_EXPORT VolumeDataAccessManager *getDataAccessManager(VDSHandle *handle);
}
#endif //OPENVDS_H
......@@ -18,7 +18,7 @@
#include "MetadataManager.h"
#include <IO/IOManager.h>
#include "VolumeDataAccessManagerImpl.h"
#include <assert.h>
namespace OpenVDS
......@@ -27,25 +27,27 @@ namespace OpenVDS
class MetadataPageTransfer : public TransferHandler
{
public:
MetadataPageTransfer(std::vector<uint8_t> &targetData)
: m_targetData(targetData)
MetadataPageTransfer(MetadataManager *manager, MetadataPage *metadataPage)
: manager(manager)
, metadataPage(metadataPage)
{ }
std::vector<uint8_t> &m_targetData;
void handleData(std::vector<uint8_t> &&data) override
{
m_targetData = data;
manager->pageTransferCompleted(metadataPage, std::move(data));
}
void handleError(Error &error) override
{
}
MetadataManager *manager;
MetadataPage *metadataPage;
};
MetadataManager::MetadataManager(IOManager *iomanager, std::string const &layerUrl, MetadataStatus const &metadataStatus, int pageLimit)
MetadataManager::MetadataManager(IOManager* iomanager, VolumeDataAccessManagerImpl* accessManager, std::string const& layerUrl, MetadataStatus const& metadataStatus, int pageLimit)
: m_iomanager(iomanager)
, m_accessManager(accessManager)
, m_layerUrl(layerUrl)
, m_metadataStatus(metadataStatus)
, m_pageLimit(pageLimit)
......@@ -113,7 +115,7 @@ void MetadataManager::initiateTransfer(MetadataPage *page, std::string const &ur
assert(!page->m_valid && !page->m_activeTransfer);
page->m_activeTransfer = m_iomanager->requestObject(url, std::make_shared<MetadataPageTransfer>(page->m_data));
page->m_activeTransfer = m_iomanager->requestObject(url, std::make_shared<MetadataPageTransfer>(this, page));
}
void MetadataManager::pageTransferError(MetadataPage *page, const char *msg)
......@@ -134,14 +136,16 @@ void MetadataManager::pageTransferError(MetadataPage *page, const char *msg)
//TODO pendingrequestchangedcondition.notify_all
}
void MetadataManager::pageTransferCompleted(MetadataPage *page)
void MetadataManager::pageTransferCompleted(MetadataPage *page, std::vector<uint8_t> &&data)
{
std::unique_lock<std::mutex> lock(m_mutex);
page->m_data = data;
page->m_valid = true;
page->m_activeTransfer = nullptr;
lock.unlock();
m_accessManager->pageTransferCompleted(page);
}
......
......@@ -82,9 +82,12 @@ namespace OpenVDS
typedef std::list<MetadataPage> MetadataPageList;
class IOManager;
class VolumeDataAccessManagerImpl;
class MetadataManager
{
IOManager *m_iomanager;
VolumeDataAccessManagerImpl *m_accessManager;
std::string m_layerUrl;
MetadataStatus m_metadataStatus;
......@@ -100,7 +103,7 @@ namespace OpenVDS
void limitPages();
public:
MetadataManager(IOManager *iomanager, std::string const &layerURL, MetadataStatus const &metadataStatus, int pageLimit);
MetadataManager(IOManager *iomanager, VolumeDataAccessManagerImpl *accessManager, std::string const &layerURL, MetadataStatus const &metadataStatus, int pageLimit);
~MetadataManager();
const char *layerUrl() const { return m_layerUrl.c_str(); }
......@@ -110,7 +113,7 @@ namespace OpenVDS
void pageTransferError(MetadataPage *page, const char *msg);
void pageTransferCompleted(MetadataPage *page);
void pageTransferCompleted(MetadataPage* page, std::vector<uint8_t>&& data);
void initiateTransfer(MetadataPage *page, std::string const &url, bool verbose);
......
......@@ -562,4 +562,45 @@ bool VolumeDataAccessManagerImpl::readChunk(const VolumeDataChunk &chunk, std::v
return true;
}
void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPage)
{
std::unique_lock<std::mutex> lock(m_mutex);
for(auto &pendingRequestKeyValuePair : m_pendingRequests)
{
VolumeDataChunk const volumeDataChunk = pendingRequestKeyValuePair.first;
PendingRequest &pendingRequest = pendingRequestKeyValuePair.second;
if(pendingRequest.m_lockedMetadataPage == metadataPage)
{
MetadataManager *metadataManager = metadataPage->GetManager();
int32_t pageIndex = (int)(volumeDataChunk.chunkIndex / metadataManager->metadataStatus().m_chunkMetadataPageSize);
int32_t entryIndex = (int)(volumeDataChunk.chunkIndex % metadataManager->metadataStatus().m_chunkMetadataPageSize);
assert(pageIndex == metadataPage->PageIndex());
if (metadataPage->IsValid())
{
uint8_t const *metadata = metadataManager->getPageEntry(metadataPage, entryIndex);
ParsedMetadata parsedMetadata = parseMetadata(metadataManager->metadataStatus().m_chunkMetadataByteSize, metadata);
metadataManager->unlockPage(metadataPage);
pendingRequest.m_lockedMetadataPage = nullptr;
int adaptiveLevel;
IORange ioRange = calculateRangeHeaderImpl(parsedMetadata, metadataManager->metadataStatus(), &adaptiveLevel);
std::string url = makeURLForChunk(metadataManager->layerUrlStr(), volumeDataChunk.chunkIndex);
pendingRequest.m_activeTransfer = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
}
}
}
m_pendingRequestChangedCondition.notify_all();
}
}
......@@ -179,6 +179,8 @@ public:
bool prepareReadChunkData(const VolumeDataChunk& chunk, int32_t(&pitch)[Dimensionality_Max], bool verbose, Error& error);
bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error);
void pageTransferCompleted(MetadataPage* page);
private:
VolumeDataLayout *m_layout;
IOManager *m_ioManager;
......
......@@ -37,6 +37,7 @@ VolumeDataPageAccessorImpl::VolumeDataPageAccessorImpl(VolumeDataAccessManagerIm
, m_layer(layer)
, m_maxPages(maxPages)
, m_isReadWrite(isReadWrite)
, m_isCommitInProgress(false)
{
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment