Commit 4376ec82 authored by Morten Ofstad's avatar Morten Ofstad
Browse files

Changed VolumeDataAccessManager so it uses the completion callback to take...

Changed VolumeDataAccessManager so it uses the completion callback to take requests out of the pending requests and started putting things in place so the metadatapage can be updated when the request completes. Changed some places to use fmt::format and removed the getDimensionsGroupString() method since we were only using it to build a string that was identical to the one you get from getDimensionGroupName().
parent c9fc03ec
......@@ -30,6 +30,8 @@
#include "VDS/VolumeDataAccessManagerImpl.h"
#include "VDS/VolumeDataRequestProcessor.h"
#include <fmt/format.h>
namespace OpenVDS
{
VDSHandle *open(const OpenOptions &options, Error &error)
......@@ -92,12 +94,12 @@ std::string getLayerName(VolumeDataLayer const &volumeDataLayer)
{
if(volumeDataLayer.getChannelIndex() == 0)
{
return std::string(DimensionGroupUtil::getDimensionGroupName(volumeDataLayer.getChunkDimensionGroup())) + "LOD" + std::to_string(volumeDataLayer.getLOD());
return fmt::format("{}LOD{}", DimensionGroupUtil::getDimensionGroupName(volumeDataLayer.getChunkDimensionGroup()), volumeDataLayer.getLOD());
}
else
{
assert(strlen(volumeDataLayer.getVolumeDataChannelDescriptor().getName()) != 0);
return std::string(volumeDataLayer.getVolumeDataChannelDescriptor().getName()) + std::string(DimensionGroupUtil::getDimensionGroupName(volumeDataLayer.getPrimaryChannelLayer().getChunkDimensionGroup())) + "LOD" + std::to_string(volumeDataLayer.getLOD());
assert(std::string(volumeDataLayer.getVolumeDataChannelDescriptor().getName()) != "");
return fmt::format("{}{}LOD{}", volumeDataLayer.getVolumeDataChannelDescriptor().getName(), DimensionGroupUtil::getDimensionGroupName(volumeDataLayer.getPrimaryChannelLayer().getChunkDimensionGroup()), volumeDataLayer.getLOD());
}
}
......
......@@ -1358,51 +1358,6 @@ const char * getDimensionGroupName(DimensionGroup dimensionGroup)
assert(dimensionGroup >= 0 && dimensionGroup < DimensionGroup_Max);
return apzDimensionNames[dimensionGroup];
}
const char* getDimensionsGroupString(DimensionsND dimensions)
{
switch(dimensions)
{
case Dimensions_012: return "012";
case Dimensions_013: return "013";
case Dimensions_014: return "014";
case Dimensions_015: return "015";
case Dimensions_023: return "023";
case Dimensions_024: return "024";
case Dimensions_025: return "025";
case Dimensions_034: return "034";
case Dimensions_035: return "035";
case Dimensions_045: return "045";
case Dimensions_123: return "123";
case Dimensions_124: return "124";
case Dimensions_125: return "125";
case Dimensions_134: return "134";
case Dimensions_135: return "135";
case Dimensions_145: return "145";
case Dimensions_234: return "234";
case Dimensions_235: return "235";
case Dimensions_245: return "245";
case Dimensions_345: return "345";
case Dimensions_01: return "01";
case Dimensions_02: return "02";
case Dimensions_03: return "03";
case Dimensions_04: return "04";
case Dimensions_05: return "05";
case Dimensions_12: return "12";
case Dimensions_13: return "13";
case Dimensions_14: return "14";
case Dimensions_15: return "15";
case Dimensions_23: return "23";
case Dimensions_24: return "24";
case Dimensions_25: return "25";
case Dimensions_34: return "34";
case Dimensions_35: return "35";
case Dimensions_45: return "45";
default:
;
}
return "";
}
bool isRemappingPossible(DimensionGroup dimensionGroupA, DimensionGroup dimensionGroupB)
{
......
......@@ -144,8 +144,6 @@ namespace DimensionGroupUtil
const char *getDimensionGroupName(DimensionGroup dimensionGroup);
const char *getDimensionsGroupString(DimensionsND dimensions);
bool isRemappingPossible(DimensionGroup dimensionGroupA, DimensionGroup dimensionGroupB);
DimensionGroup unionGroups(DimensionGroup dimensionGroupA, DimensionGroup dimensionGroupB);
......
......@@ -29,6 +29,7 @@
#include <inttypes.h>
#include <assert.h>
#include <atomic>
#include <fmt/format.h>
namespace OpenVDS
{
......@@ -317,30 +318,15 @@ static MetadataManager *getMetadataMangerForLayer(LayerMetadataContainer const &
return metadataManager;
}
static std::string createUrlForChunk(const std::string &layerUrl, uint64_t chunk)
static inline std::string createUrlForChunk(const std::string &layerName, uint64_t chunk)
{
char url[1024];
snprintf(url, sizeof(url), "%s/%" PRIu64, layerUrl.c_str(), chunk);
return url;
}
static std::string createBaseUrl(const VolumeDataLayer *layer)
{
int32_t channel = layer->getChannelIndex();
const char *channelName = channel > 0 ? layer->getLayout()->getChannelName(layer->getChannelIndex()) : "";
int32_t lod = layer->getLOD();
const char *dimensions_string = DimensionGroupUtil::getDimensionsGroupString(DimensionGroupUtil::getDimensionsNDFromDimensionGroup(layer->getChunkDimensionGroup()));
char layerURL[1024];
snprintf(layerURL, sizeof(layerURL), "%sDimensions_%sLOD%d", channelName, dimensions_string, lod);
return layerURL;
return layerName + "/" + std::to_string(chunk);
}
bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &chunk, bool verbose, Error &error)
{
std::string layerURL = createBaseUrl(chunk.layer);
auto metadataManager = getMetadataMangerForLayer(m_handle.layerMetadataContainer, layerURL);
std::string layerName = getLayerName(*chunk.layer);
auto metadataManager = getMetadataMangerForLayer(m_handle.layerMetadataContainer, layerName);
//do fallback
if (!metadataManager)
{
......@@ -362,8 +348,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
if (initiateTransfer)
{
char url[1000];
snprintf(url, sizeof(url), "%s/ChunkMetadata/%d", layerURL.c_str(), pageIndex);
std::string url = fmt::format("{}/ChunkMetadata/{}", layerName, pageIndex);
metadataManager->initiateTransfer(this, metadataPage, url, verbose);
}
......@@ -387,7 +372,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
IORange ioRange = calculateRangeHeaderImpl(parsedMetadata, metadataManager->metadataStatus(), &adaptiveLevel);
std::string url = createUrlForChunk(layerURL, chunk.chunkIndex);
std::string url = createUrlForChunk(layerName, chunk.chunkIndex);
lock.lock();
auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
......@@ -487,8 +472,8 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
}
m_pendingRequestChangedCondition.notify_all();
}
static int64_t gen_upload_jobid()
static int64_t createUploadJobId()
{
static std::atomic< std::int64_t > id(0);
return --id;
......@@ -496,37 +481,49 @@ static int64_t gen_upload_jobid()
int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk& chunk, std::shared_ptr<std::vector<uint8_t>> data)
{
std::string url = createUrlForChunk(createBaseUrl(chunk.layer), chunk.chunkIndex);
m_pendingUploadRequests.erase(std::remove_if(m_pendingUploadRequests.begin(), m_pendingUploadRequests.end(), [this](PendingUploadRequest &request){
Error error;
bool done = request.request->isDone();
if (done && !request.request->isSuccess(error))
this->m_uploadErrors.emplace_back(new UploadError(error, request.request->getObjectName()));
return done;
}), m_pendingUploadRequests.end());
m_pendingUploadRequests.push_back({gen_upload_jobid(), m_ioManager->uploadObject(url, data)});
return 0;
std::string layerName = getLayerName(*chunk.layer);
auto metadataManager = getMetadataMangerForLayer(m_handle.layerMetadataContainer, layerName);
MetadataPage* lockedMetadataPage = nullptr;
std::string url = createUrlForChunk(layerName, chunk.chunkIndex);
int64_t jobId = createUploadJobId();
auto completedCallback = [this, jobId](const Request &request, const Error &error)
{
std::unique_lock<std::mutex> lock(m_mutex);
if(error.code != 0)
{
m_uploadErrors.emplace_back(new UploadError(error, request.getObjectName()));
}
m_pendingUploadRequests.erase(jobId);
};
// add new pending upload request
std::unique_lock<std::mutex> lock(m_mutex);
m_pendingUploadRequests[jobId] = PendingUploadRequest(m_ioManager->uploadObject(url, data, completedCallback), lockedMetadataPage);
return jobId;
}
void VolumeDataAccessManagerImpl::flushUploadQueue()
{
std::unique_lock<std::mutex> lock(m_mutex);
Error error;
for (auto &upload : m_pendingUploadRequests)
while(true)
{
upload.request->waitForFinish();
if (!upload.request->isSuccess(error))
{
m_uploadErrors.emplace_back(new UploadError(error, upload.request->getObjectName()));
error = Error();
}
std::unique_lock<std::mutex> lock(m_mutex);
if(m_pendingUploadRequests.empty()) break;
Request &request = *m_pendingUploadRequests.begin()->second.m_request;
lock.unlock();
request.waitForFinish();
}
m_pendingUploadRequests.clear();
error = Error();
Error error;
serializeAndUploadLayerStatus(m_handle, error);
if(error.code != 0)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.emplace_back(new UploadError(error, "LayerStatus"));
}
}
......@@ -578,7 +575,7 @@ void VolumeDataAccessManagerImpl::getCurrentUploadError(const char** objectId, i
void VolumeDataAccessManagerImpl::addUploadError(const Error& error, VolumeDataLayer* layer, uint64_t chunk)
{
std::string urlString = createUrlForChunk(createBaseUrl(layer), chunk);
std::string urlString = createUrlForChunk(getLayerName(*layer), chunk);
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.emplace_back(new UploadError(error, urlString));
}
......
......@@ -123,8 +123,15 @@ inline bool operator<(const VolumeDataChunk &a, const VolumeDataChunk &b)
struct PendingUploadRequest
{
int64_t jobID;
std::shared_ptr<Request> request;
std::shared_ptr<Request> m_request;
MetadataPage* m_lockedMetadataPage;
PendingUploadRequest() : m_request(), m_lockedMetadataPage()
{
}
PendingUploadRequest(std::shared_ptr<Request> request, MetadataPage* lockedMetadataPage) : m_request(request), m_lockedMetadataPage(lockedMetadataPage) {}
};
struct UploadError
......@@ -219,7 +226,7 @@ private:
std::mutex m_mutex;
std::condition_variable m_pendingRequestChangedCondition;
std::map<VolumeDataChunk, PendingDownloadRequest> m_pendingDownloadRequests;
std::vector<PendingUploadRequest> m_pendingUploadRequests;
std::map<int64_t, PendingUploadRequest> m_pendingUploadRequests;
std::vector<std::unique_ptr<UploadError>> m_uploadErrors;
uint32_t m_currentErrorIndex;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment