Commit 6d4fa43b authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Request processor

Only implemented  requestVolumeSubset
parent da95a179
......@@ -66,6 +66,7 @@ set (PRIVATE_HEADER_FILES
VDS/ValueConversion.h
VDS/VolumeSampler.h
VDS/VolumeDataRequestProcessor.h
VDS/ThreadPool.h
)
set (EXPORTED_HEADER_FILES
......
......@@ -82,6 +82,33 @@ public:
bool initializeDataBlock(const DataBlockDescriptor &descriptor, DataBlock &dataBlock, Error &error);
bool initializeDataBlock(VolumeDataChannelDescriptor::Format format, VolumeDataChannelDescriptor::Components components, Dimensionality dimensionality, int32_t (&size)[DataStoreDimensionality_Max], DataBlock &dataBlock, Error &error);
static int32_t getVoxelFormatByteSize(VolumeDataChannelDescriptor::Format format)
{
int32_t iRetval = -1;
switch (format) {
case VolumeDataChannelDescriptor::Format_R64:
case VolumeDataChannelDescriptor::Format_U64:
iRetval = 8;
break;
case VolumeDataChannelDescriptor::Format_R32:
case VolumeDataChannelDescriptor::Format_U32:
iRetval = 4;
break;
case VolumeDataChannelDescriptor::Format_U16:
iRetval = 2;
break;
case VolumeDataChannelDescriptor::Format_U8:
case VolumeDataChannelDescriptor::Format_1Bit:
iRetval =1;
break;
default:
fprintf(stderr, "Unknown voxel format");
abort();
}
return iRetval;
}
static uint32_t getElementSize(VolumeDataChannelDescriptor::Format format, VolumeDataChannelDescriptor::Components components)
{
switch(format)
......@@ -141,6 +168,15 @@ inline int32_t getAllocatedByteSizeForSize(const int32_t size)
return (size + 7) & -8;
}
template <typename T>
inline T dataBlock_ReadElement(const T *ptBuffer, size_t iElement) { return ptBuffer[iElement]; }
template <>
inline bool dataBlock_ReadElement(const bool *ptBuffer, size_t iElement) { return (reinterpret_cast<const unsigned char *>(ptBuffer)[iElement / 8] & (1 << (iElement % 8))) != 0; }
template <typename T>
inline void dataBlock_WriteElement(T *ptBuffer, size_t iElement, T tValue) { ptBuffer[iElement] = tValue; }
template <>
inline void dataBlock_WriteElement(bool *ptBuffer, size_t iElement, bool tValue) { if(tValue) { reinterpret_cast<unsigned char *>(ptBuffer)[iElement / 8] |= (1 << (iElement % 8)); } else { reinterpret_cast<unsigned char *>(ptBuffer)[iElement / 8] &= ~(1 << (iElement % 8)); } }
}
#endif //DATABLOCK_H
\ No newline at end of file
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#pragma once
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
class ThreadPool
{
public:
ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F&& f, Args&& ... args)
->std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]
{
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
template <class F, class... Args>
auto ThreadPool::enqueue(F && f, Args && ... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
{
fprintf(stderr, "enqueue on stopped ThreadPool");
abort();
}
tasks.emplace([task]()
{
(*task)();
});
}
condition.notify_one();
return res;
}
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
}
......@@ -221,6 +221,7 @@ VolumeDataAccessManagerImpl::VolumeDataAccessManagerImpl(VDSHandle &handle)
: m_handle(handle)
, m_ioManager(handle.ioManager.get())
, m_currentErrorIndex(0)
, m_requestProcessor(*this)
{
}
......@@ -293,18 +294,19 @@ VolumeDataAccessor* VolumeDataAccessManagerImpl::cloneVolumeDataAccessor(VolumeD
bool VolumeDataAccessManagerImpl::isCompleted(int64_t requestID)
{
return false;
return m_requestProcessor.isCompleted(requestID);
}
bool VolumeDataAccessManagerImpl::isCanceled(int64_t requestID)
{
return false;
return m_requestProcessor.isCanceled(requestID);
}
bool VolumeDataAccessManagerImpl::waitForCompletion(int64_t requestID, int millisecondsBeforeTimeout)
{
return false;
return m_requestProcessor.waitForCompletion(requestID, millisecondsBeforeTimeout);
}
void VolumeDataAccessManagerImpl::cancel(int64_t requestID)
{
m_requestProcessor.cancel(requestID);
}
float VolumeDataAccessManagerImpl::getCompletionFactor(int64_t requestID)
{
......
......@@ -26,13 +26,13 @@
#include "VolumeDataChunk.h"
#include "VolumeDataLayer.h"
#include "VolumeDataRequestProcessor.h"
#include <map>
#include "Base64.h"
namespace OpenVDS
{
class LayerMetadataContainer;
class MetadataPage;
class ReadChunkTransfer : public TransferDownloadHandler
......@@ -243,7 +243,7 @@ public:
private:
VDSHandle &m_handle;
IOManager *m_ioManager;
LayerMetadataContainer *m_layerMetadataContainer;
VolumeDataRequestProcessor m_requestProcessor;
IntrusiveList<VolumeDataPageAccessorImpl, &VolumeDataPageAccessorImpl::m_volumeDataPageAccessorListNode> m_volumeDataPageAccessorList;
std::mutex m_mutex;
std::condition_variable m_pendingRequestChangedCondition;
......@@ -252,5 +252,6 @@ private:
std::vector<std::unique_ptr<UploadError>> m_uploadErrors;
uint32_t m_currentErrorIndex;
};
}
#endif //VOLUMEDATAACCESSMANAGERIMPL_H
\ No newline at end of file
This diff is collapsed.
......@@ -189,8 +189,10 @@ VolumeDataPage* VolumeDataPageAccessorImpl::createPage(int64_t chunk)
return page;
}
VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
VolumeDataPage* VolumeDataPageAccessorImpl::prepareReadPage(int64_t chunk, bool *needToCallReadPreparePage)
{
assert(needToCallReadPreparePage);
*needToCallReadPreparePage = true;
std::unique_lock<std::mutex> pageMutexLocker(m_pagesMutex);
if(!m_layer)
......@@ -226,6 +228,7 @@ VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
}
m_pagesFound++;
*needToCallReadPreparePage = false;
return *page_it;
}
}
......@@ -256,17 +259,27 @@ VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
return nullptr;
}
pageMutexLocker.unlock();
return page;
}
bool VolumeDataPageAccessorImpl::readPreparedPaged(VolumeDataPage* page)
{
VolumeDataPageImpl *pageImpl = static_cast<VolumeDataPageImpl *>(page);
Error error;
VolumeDataChunk volumeDataChunk = m_layer->getChunkFromIndex(pageImpl->getChunkIndex());
std::vector<uint8_t> serialized_data;
std::vector<uint8_t> metadata;
CompressionInfo compressionInfo;
std::unique_lock<std::mutex> pageMutexLocker(m_pagesMutex, std::defer_lock);
if (!m_accessManager->readChunk(volumeDataChunk, serialized_data, metadata, compressionInfo, error))
{
pageMutexLocker.lock();
page->unPin();
pageImpl->unPin();
fprintf(stderr, "Failed when waiting for chunk: %s\n", error.string.c_str());
return nullptr;
return false;
}
std::vector<uint8_t> page_data;
......@@ -274,9 +287,9 @@ VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
if (!VolumeDataStore::deserializeVolumeData(volumeDataChunk, serialized_data, metadata, compressionInfo.GetCompressionMethod(), compressionInfo.GetAdaptiveLevel(), m_layer->getFormat(), dataBlock, page_data, error))
{
pageMutexLocker.lock();
page->unPin();
pageImpl->unPin();
fprintf(stderr, "Failed when deserializing chunk: %s\n", error.string.c_str());
return nullptr;
return false;
}
int pitch[Dimensionality_Max] = {};
......@@ -290,19 +303,31 @@ VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
}
pageMutexLocker.lock();
page->setBufferData(dataBlock, pitch, std::move(page_data));
pageImpl->setBufferData(dataBlock, pitch, std::move(page_data));
m_pagesRead++;
m_pageReadCondition.notify_all();
if(!m_layer)
{
page->unPin();
pageImpl->unPin();
page = nullptr;
}
limitPageListSize(m_maxPages, pageMutexLocker);
return m_layer;
}
VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
{
bool foundExisting;
VolumeDataPage *page = prepareReadPage(chunk, &foundExisting);
if (!page)
return nullptr;
if (foundExisting)
return page;
if (!readPreparedPaged(page))
return nullptr;
return page;
}
......
......@@ -75,8 +75,8 @@ public:
int addReference() override;
int removeReference() override;
VolumeDataPage *prepareReadPage(int64_t chunk);
void readPreparedPaged(VolumeDataPage *page);
VolumeDataPage* prepareReadPage(int64_t chunk, bool *needToCallReadPreparePage);
bool readPreparedPaged(VolumeDataPage *page);
VolumeDataPage *createPage(int64_t chunk) override;
VolumeDataPage *readPage(int64_t chunk) override;
......
......@@ -28,15 +28,6 @@
namespace OpenVDS
{
template <typename T>
inline T dataBlock_ReadElement(const T *ptBuffer, size_t iElement) { return ptBuffer[iElement]; }
template <>
inline bool dataBlock_ReadElement(const bool *ptBuffer, size_t iElement) { return (reinterpret_cast<const unsigned char *>(ptBuffer)[iElement / 8] & (1 << (iElement % 8))) != 0; }
template <typename T>
inline void dataBlock_WriteElement(T *ptBuffer, size_t iElement, T tValue) { ptBuffer[iElement] = tValue; }
template <>
inline void dataBlock_WriteElement(bool *ptBuffer, size_t iElement, bool tValue) { if(tValue) { reinterpret_cast<unsigned char *>(ptBuffer)[iElement / 8] |= (1 << (iElement % 8)); } else { reinterpret_cast<unsigned char *>(ptBuffer)[iElement / 8] &= ~(1 << (iElement % 8)); } }
template <bool isTargetDim0Contigous, bool isSourceDim0Contigous, typename T>
static void dataBlock_BlockCopyWithExplicitContiguity(T * __restrict ptTarget, const T * __restrict ptSource, int32_t iTargetIndex, int32_t iSourceIndex, int32_t (&targetPitch)[4], int32_t (&sourcePitch)[4], int32_t (&size)[4])
......
......@@ -31,7 +31,7 @@ class VolumeDataLayer;
class VolumeDataPageAccessorImpl;
class VolumeDataPageImpl : public VolumeDataPage
{
public:
private:
VolumeDataPageAccessorImpl * m_volumeDataPageAccessor;
int64_t m_chunk;
......@@ -58,6 +58,7 @@ public:
VolumeDataPageImpl(VolumeDataPageImpl const &) = delete;
int64_t getChunkIndex() const { return m_chunk; }
const DataBlock &getDataBlock() const { return m_dataBlock;}
// All these methods require the caller to hold a lock
bool isPinned();
......@@ -72,6 +73,7 @@ public:
void setBufferData(const DataBlock& dataBlock, int32_t(&pitch)[Dimensionality_Max], std::vector<uint8_t>&& blob);
void writeBack(VolumeDataLayer *volumeDataLayer, std::unique_lock<std::mutex> &pageListMutexLock);
void * getBufferInternal(int (&anPitch)[Dimensionality_Max], bool isReadWrite);
void * getRawBufferInternal() { return m_blob.data(); }
bool isCopyMarginNeeded(VolumeDataPageImpl *targetPage);
void copyMargin(VolumeDataPageImpl *targetPage);
......
......@@ -26,12 +26,17 @@
#include <cstdint>
#include <algorithm>
#include <atomic>
#include <thread>
#include <chrono>
#include <fmt/format.h>
namespace OpenVDS
{
VolumeDataRequestProcessor::VolumeDataRequestProcessor(VolumeDataAccessManagerImpl& manager)
: m_manager(manager)
, m_threadPool(std::thread::hardware_concurrency())
{}
static int64_t gen_jobid()
......@@ -40,7 +45,49 @@ static int64_t gen_jobid()
return ++id;
}
int64_t VolumeDataRequestProcessor::addJob(std::vector<VolumeDataChunk>& chunks, std::function<void(VolumeDataPage* page)> processor)
struct MarkJobAsDoneOnExit
{
MarkJobAsDoneOnExit(Job *job)
: job(job)
{}
~MarkJobAsDoneOnExit()
{
int completed = job->completed.fetch_add(1);
if (completed == job->pages.size() - 1)
{
job->doneNotify.notify_all();
}
}
Job *job;
};
static Error processPageInJob(Job *job, size_t pageIndex, VolumeDataPageAccessorImpl *pageAccessor, std::function<bool(VolumeDataPageImpl *page, const VolumeDataChunk &chunk, Error &error)> processor)
{
MarkJobAsDoneOnExit jobDone(job);
Error error;
JobPage& jobPage = job->pages[pageIndex];
if (jobPage.needToReadPage)
{
if (!pageAccessor->readPreparedPaged(jobPage.page))
{
error.code = -1;
error.string = fmt::format("Failed to read page {}.", jobPage.page->getChunkIndex());
return error;
}
}
if (job->cancelled)
{
error.code = -4;
error.string = fmt::format("Request: {} has been cancelled.", job->jobId);
return error;
}
processor(jobPage.page, jobPage.chunk, error);
return error;
}
int64_t VolumeDataRequestProcessor::addJob(const std::vector<VolumeDataChunk>& chunks, std::function<bool(VolumeDataPageImpl * page, const VolumeDataChunk &volumeDataChunk, Error & error)> processor)
{
auto layer = chunks.front().layer;
DimensionsND dimensions = DimensionGroupUtil::getDimensionsNDFromDimensionGroup(layer->getChunkDimensionGroup());
......@@ -48,23 +95,94 @@ int64_t VolumeDataRequestProcessor::addJob(std::vector<VolumeDataChunk>& chunks,
int lod = layer->getLOD();
auto layout = layer->getLayout();
std::unique_lock<std::mutex> lock(m_mutex);
PageAccessorKey key = { dimensions, lod, channel };
auto page_accessor_it = m_page_accessors.find(key);
if (page_accessor_it == m_page_accessors.end())
auto page_accessor_it = m_pageAccessors.find(key);
if (page_accessor_it == m_pageAccessors.end())
{
auto pa = static_cast<VolumeDataPageAccessorImpl *>(m_manager.createVolumeDataPageAccessor(layout, dimensions, lod, channel, 100, OpenVDS::VolumeDataAccessManager::AccessMode_ReadOnly));
auto insert_result = m_page_accessors.insert({key, pa});
auto insert_result = m_pageAccessors.insert({key, pa});
assert(insert_result.second);
page_accessor_it = insert_result.first;
}
m_jobs.emplace_back(gen_jobid());
auto job = m_jobs.back();
m_jobs.emplace_back(new Job(gen_jobid(), m_jobNotification));
auto &job = m_jobs.back();
job->pages.reserve(chunks.size());
VolumeDataPageAccessorImpl *pageAccessor = page_accessor_it->second;
for (const auto &c : chunks)
{
job.chunks.push_back(page_accessor_it->second->readPage(c.chunkIndex));
job->pages.emplace_back();
JobPage &jobPage = job->pages.back();
jobPage.page = static_cast<VolumeDataPageImpl *>(pageAccessor->prepareReadPage(c.chunkIndex, &jobPage.needToReadPage));
jobPage.chunk = c;
size_t index = job->pages.size() - 1;
job->future.push_back(m_threadPool.enqueue([&job, index, pageAccessor, processor] { return processPageInJob(job.get(), index, pageAccessor, processor); }));
}
return 0;
return job->jobId;
}
bool VolumeDataRequestProcessor::isCompleted(int64_t jobID)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto job_it = std::find_if(m_jobs.begin(), m_jobs.end(), [jobID](const std::unique_ptr<Job> &job) { return job->jobId == jobID; });
if (job_it == m_jobs.end())
return false;
if (job_it->get()->completed == job_it->get()->pages.size() && !job_it->get()->cancelled)
{
m_jobs.erase(job_it);
return true;
}
return false;
}
bool VolumeDataRequestProcessor::isCanceled(int64_t jobID)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto job_it = std::find_if(m_jobs.begin(), m_jobs.end(), [jobID](const std::unique_ptr<Job> &job) { return job->jobId == jobID; });
if (job_it == m_jobs.end())
return false;
if (job_it->get()->completed == job_it->get()->pages.size() && job_it->get()->cancelled)
{
m_jobs.erase(job_it);
return true;
}
return false;
}
bool VolumeDataRequestProcessor::waitForCompletion(int64_t jobID, int millisecondsBeforeTimeout)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto job_it = std::find_if(m_jobs.begin(), m_jobs.end(), [jobID](const std::unique_ptr<Job> &job) { return job->jobId == jobID; });
if (job_it == m_jobs.end())
return false;
Job *job = job_it->get();
if (millisecondsBeforeTimeout > 0)
{
std::chrono::milliseconds toWait(millisecondsBeforeTimeout);
job_it->get()->doneNotify.wait_for(lock, toWait, [job]{ return job->completed == job->pages.size();});
} else
{
job_it->get()->doneNotify.wait(lock, [job]{ return job->completed == job->pages.size();});
}
if (job->completed == job->pages.size() && !job->cancelled)
{
m_jobs.erase(job_it);
return true;
}
return false;
}
void VolumeDataRequestProcessor::cancel(int64_t jobID)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto job_it = std::find_if(m_jobs.begin(), m_jobs.end(), [jobID](std::unique_ptr<Job> &job) { return job->jobId == jobID; });
if (job_it == m_jobs.end())
return;
job_it->get()->cancelled = true;
}
}
......@@ -19,9 +19,11 @@
#define VOLUMEDATAREQUESTPROCESSOR_H
#include <OpenVDS/VolumeData.h>
#include <OpenVDS/OpenVDS.h>
#include "VolumeDataPageAccessorImpl.h"
#include "VolumeDataChunk.h"
#include "ThreadPool.h"
#include <stdint.h>
#include <map>
......@@ -44,14 +46,27 @@ struct PageAccessorKey
}
};
struct JobPage
{
VolumeDataPageImpl *page;
VolumeDataChunk chunk;
bool needToReadPage;
};
struct Job
{
Job(int64_t job_id)
: job_id(job_id)
Job(int64_t jobId, std::condition_variable &doneNotify)
: jobId(jobId)
, doneNotify(doneNotify)
{}
int64_t job_id;
std::vector<VolumeDataPage *> chunks;
std::function<void(VolumeDataPage *page)> processor;
int64_t jobId;
std::condition_variable &doneNotify;
std::vector<JobPage> pages;
std::vector<std::future<Error>> future;
std::atomic_int completed = 0;
std::atomic_bool cancelled = false;
Error completedError;
};