Commit a91282c6 authored by Morten Ofstad's avatar Morten Ofstad
Browse files

Update chunk metadata pages and upload dirty pages when flushing the upload queue.

parent 5fc94a7c
......@@ -20,6 +20,8 @@
#include <IO/IOManager.h>
#include "VolumeDataAccessManagerImpl.h"
#include <assert.h>
#include <algorithm>
#include <fmt/format.h>
namespace OpenVDS
{
......@@ -112,6 +114,15 @@ MetadataManager::lockPage(int pageIndex, bool* initiateTransfer)
return page;
}
void
MetadataManager::initPage(MetadataPage* page)
{
std::unique_lock<std::mutex> lock(m_mutex);
page->m_data.resize(m_metadataStatus.m_chunkMetadataByteSize * m_metadataStatus.m_chunkMetadataPageSize);
page->m_valid = true;
lock.unlock();
}
void MetadataManager::initiateTransfer(VolumeDataAccessManagerImpl *accessManager, MetadataPage* page, std::string const& url, bool verbose)
{
std::unique_lock<std::mutex> lock(m_mutex);
......@@ -121,6 +132,23 @@ void MetadataManager::initiateTransfer(VolumeDataAccessManagerImpl *accessManage
page->m_activeTransfer = m_iomanager->download(url, std::make_shared<MetadataPageTransfer>(this, accessManager, page));
}
void MetadataManager::uploadDirtyPages(VolumeDataAccessManagerImpl *accessManager)
{
std::unique_lock<std::mutex> lock(m_mutex);
for(auto &page : m_pageList)
{
if(page.IsDirty())
{
bool success = accessManager->writeMetadataPage(&page, page.m_data);
if(success)
{
page.m_dirty = false;
}
}
}
}
void MetadataManager::pageTransferError(MetadataPage* page, const char* msg)
{
std::string
......@@ -149,7 +177,6 @@ void MetadataManager::pageTransferCompleted(VolumeDataAccessManagerImpl* accessM
accessManager->pageTransferCompleted(page);
}
uint8_t const *MetadataManager::getPageEntry(MetadataPage *page, int entryIndex) const
{
assert(page->IsValid());
......@@ -157,6 +184,17 @@ uint8_t const *MetadataManager::getPageEntry(MetadataPage *page, int entryIndex)
return &page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize];
}
void MetadataManager::setPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength)
{
std::unique_lock<std::mutex> lock(m_mutex);
assert(page->IsValid());
assert(metadataLength == m_metadataStatus.m_chunkMetadataByteSize);
page->m_dirty = true;
std::copy(metadata, metadata + metadataLength, &page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize]);
}
void MetadataManager::unlockPage(MetadataPage *page)
{
assert(page);
......
......@@ -61,6 +61,7 @@ namespace OpenVDS
int m_pageIndex;
bool m_valid;
bool m_dirty;
int m_lockCount;
std::shared_ptr<Request> m_activeTransfer;
......@@ -70,11 +71,13 @@ namespace OpenVDS
MetadataManager *GetManager() { return m_manager; }
int PageIndex() const { return m_pageIndex; }
bool IsValid() const { return m_valid; }
bool IsDirty() const { return m_dirty; }
MetadataPage(MetadataManager *manager, int pageIndex)
: m_manager(manager)
, m_pageIndex(pageIndex)
, m_valid(false)
, m_dirty(false)
, m_lockCount(0)
, m_activeTransfer(nullptr)
{}
......@@ -110,13 +113,17 @@ namespace OpenVDS
MetadataPage *lockPage(int pageIndex, bool *initiateTransfer);
void initPage(MetadataPage* page);
void pageTransferError(MetadataPage *page, const char *msg);
void pageTransferCompleted(VolumeDataAccessManagerImpl *accessManager, MetadataPage* page, std::vector<uint8_t>&& data);
void initiateTransfer(VolumeDataAccessManagerImpl* accessManager, MetadataPage* page, std::string const& url, bool verbose);
void uploadDirtyPages(VolumeDataAccessManagerImpl* accessManager);
uint8_t const *getPageEntry(MetadataPage *page, int entry) const;
void setPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength);
void unlockPage(MetadataPage *page);
......
......@@ -382,7 +382,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
return true;
}
bool VolumeDataAccessManagerImpl::readChunk(const VolumeDataChunk &chunk, std::vector<uint8_t> &serializedData, std::vector<uint8_t> &metadata, CompressionInfo &compressionInfo, Error &error)
{
std::unique_lock<std::mutex> lock(m_mutex);
......@@ -475,6 +475,32 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
m_pendingRequestChangedCondition.notify_all();
}
bool VolumeDataAccessManagerImpl::writeMetadataPage(MetadataPage* metadataPage, const std::vector<uint8_t> &data)
{
assert(metadataPage->IsValid());
MetadataManager *metadataManager = metadataPage->GetManager();
std::string url = fmt::format("{}/ChunkMetadata/{}", metadataManager->layerUrlStr(), metadataPage->PageIndex());
std::string contentDispositionName = fmt::format("{}_ChunkMetadata_{}", metadataManager->layerUrlStr(), metadataPage->PageIndex());
Error error;
auto req = m_ioManager->uploadBinary(url, contentDispositionName, std::vector<std::pair<std::string, std::string>>(), std::make_shared<std::vector<uint8_t>>(data));
req->waitForFinish();
bool success = req->isSuccess(error);
if(error.code != 0)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.emplace_back(new UploadError(error, "LayerStatus"));
}
return success;
}
static int64_t createUploadJobId()
{
static std::atomic< std::int64_t > id(0);
......@@ -504,11 +530,22 @@ int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk &ch
auto metadataManager = getMetadataMangerForLayer(m_handle.layerMetadataContainer, layerName);
MetadataPage* lockedMetadataPage = nullptr;
int pageIndex = (int)(chunk.chunkIndex / metadataManager->metadataStatus().m_chunkMetadataPageSize);
int entryIndex = (int)(chunk.chunkIndex % metadataManager->metadataStatus().m_chunkMetadataPageSize);
bool initiateTransfer;
MetadataPage* lockedMetadataPage = metadataManager->lockPage(pageIndex, &initiateTransfer);
// Newly created page
if(initiateTransfer)
{
metadataManager->initPage(lockedMetadataPage);
}
int64_t jobId = createUploadJobId();
auto completedCallback = [this, jobId](const Request &request, const Error &error)
auto completedCallback = [this, hash, metadataManager, lockedMetadataPage, entryIndex, jobId](const Request &request, const Error &error)
{
std::unique_lock<std::mutex> lock(m_mutex);
......@@ -516,8 +553,14 @@ int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk &ch
{
m_uploadErrors.emplace_back(new UploadError(error, request.getObjectName()));
}
else
{
metadataManager->setPageEntry(lockedMetadataPage, entryIndex, reinterpret_cast<const uint8_t *>(&hash), sizeof(hash));
}
m_pendingUploadRequests.erase(jobId);
lockedMetadataPage->GetManager()->unlockPage(lockedMetadataPage);
};
std::vector<char> base64Hash;
......@@ -541,8 +584,16 @@ void VolumeDataAccessManagerImpl::flushUploadQueue()
request.waitForFinish();
}
for(auto it = m_handle.layerMetadataContainer.managers.begin(); it != m_handle.layerMetadataContainer.managers.end(); ++it)
{
auto metadataManager = it->second.get();
metadataManager->uploadDirtyPages(this);
}
Error error;
serializeAndUploadLayerStatus(m_handle, error);
if(error.code != 0)
{
std::unique_lock<std::mutex> lock(m_mutex);
......@@ -568,7 +619,7 @@ void VolumeDataAccessManagerImpl::forceClearAllUploadErrors()
int32_t VolumeDataAccessManagerImpl::uploadErrorCount()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_uploadErrors.size() - m_currentErrorIndex;
return int32_t(m_uploadErrors.size() - m_currentErrorIndex);
}
void VolumeDataAccessManagerImpl::getCurrentUploadError(const char** objectId, int32_t* errorCode, const char** errorString)
......
......@@ -205,7 +205,8 @@ public:
bool prepareReadChunkData(const VolumeDataChunk& chunk, bool verbose, Error& error);
bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error);
void pageTransferCompleted(MetadataPage* page);
void pageTransferCompleted(MetadataPage* metadataPage);
bool writeMetadataPage(MetadataPage* metadataPage, const std::vector<uint8_t> &data);
int64_t requestWriteChunk(const VolumeDataChunk &chunk, const DataBlock &dataBlock, const std::vector<uint8_t> &data);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment