Commit 8530153c authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Fix race in cleanupthread

parent 7a972d27
Pipeline #6109 passed with stages
in 8 minutes and 44 seconds
......@@ -37,13 +37,13 @@ namespace OpenVDS
OPENVDS_EXPORT int _cleanupthread_timeoutseconds = 30;
static void CleanupThread(std::atomic_bool &exit, std::condition_variable &wakeup, std::map<PageAccessorKey, VolumeDataPageAccessorImpl *> &pageAccessors, std::mutex &pageAccessorsMutex)
static void CleanupThread(PageAccessorNotifier &pageAccessorNotifier, std::map<PageAccessorKey, VolumeDataPageAccessorImpl *> &pageAccessors)
{
auto long_block = std::chrono::hours(24 * 32 * 12);
auto in_progress_block = std::chrono::seconds(_cleanupthread_timeoutseconds);
while(!exit)
while(!pageAccessorNotifier.exit)
{
std::unique_lock<std::mutex> lock(pageAccessorsMutex);
std::unique_lock<std::mutex> lock(pageAccessorNotifier.mutex);
std::chrono::seconds waitFor = long_block;
for (auto &it: pageAccessors)
{
......@@ -68,21 +68,25 @@ static void CleanupThread(std::atomic_bool &exit, std::condition_variable &wakeu
}
}
}
wakeup.wait_for(lock, waitFor);
pageAccessorNotifier.dirty = false;
pageAccessorNotifier.jobNotification.wait_for(lock, waitFor, [&pageAccessorNotifier]
{
return pageAccessorNotifier.exit || pageAccessorNotifier.dirty;
}
);
}
}
VolumeDataRequestProcessor::VolumeDataRequestProcessor(VolumeDataAccessManagerImpl& manager)
: m_manager(manager)
, m_threadPool(std::thread::hardware_concurrency())
, m_cleanupExit(false)
, m_cleanupThread([this]() { CleanupThread(m_cleanupExit, m_jobNotification, m_pageAccessors, m_mutex); } )
, m_pageAccessorNotifier(m_mutex)
, m_cleanupThread([this]() { CleanupThread(m_pageAccessorNotifier, m_pageAccessors); } )
{}
VolumeDataRequestProcessor::~VolumeDataRequestProcessor()
{
m_cleanupExit = true;
m_jobNotification.notify_all();
m_pageAccessorNotifier.setExit();
m_cleanupThread.join();
for (auto &pair : m_pageAccessors)
{
......@@ -111,11 +115,11 @@ struct MarkJobAsDoneOnExit
}
if (++job->pagesProcessed == job->pagesCount)
{
std::unique_lock<std::mutex> lock(job->completed_mutex);
std::unique_lock<std::mutex> lock(job->pageAccessorNotifier.mutex);
job->pageAccessor.SetLastUsed(std::chrono::steady_clock::now());
job->pageAccessor.RemoveReference();
job->done = true;
job->doneNotify.notify_all();
job->pageAccessorNotifier.setDirtyNoLock();
}
}
Job *job;
......@@ -208,7 +212,7 @@ int64_t VolumeDataRequestProcessor::AddJob(const std::vector<VolumeDataChunk>& c
pageAccessor->AddReference();
m_jobs.emplace_back(new Job(GenJobId(), m_jobNotification, *pageAccessor, int(chunks.size()), m_mutex));
m_jobs.emplace_back(new Job(GenJobId(), m_pageAccessorNotifier, *pageAccessor, int(chunks.size())));
auto &job = m_jobs.back();
job->pages.reserve(chunks.size());
......@@ -346,14 +350,14 @@ bool VolumeDataRequestProcessor::WaitForCompletion(int64_t jobID, int millisecon
if (millisecondsBeforeTimeout > 0)
{
std::chrono::milliseconds toWait(millisecondsBeforeTimeout);
job->doneNotify.wait_for(lock, toWait, [job]
job->pageAccessorNotifier.jobNotification.wait_for(lock, toWait, [job]
{
return job->done.load();
});
}
else
{
job->doneNotify.wait(lock, [job]
job->pageAccessorNotifier.jobNotification.wait(lock, [job]
{
return job->done.load();
});
......@@ -381,7 +385,7 @@ void VolumeDataRequestProcessor::Cancel(int64_t jobID)
if (job_it == m_jobs.end())
return;
job_it->get()->cancelled = true;
m_jobNotification.notify_all();
m_pageAccessorNotifier.setDirtyNoLock();
}
float VolumeDataRequestProcessor::GetCompletionFactor(int64_t jobID)
......
......@@ -56,21 +56,53 @@ struct JobPage
VolumeDataChunk chunk;
};
struct PageAccessorNotifier
{
PageAccessorNotifier(std::mutex &mutex)
: dirty(false)
, exit(false)
, mutex(mutex)
{}
void setDirty()
{
std::unique_lock<std::mutex> lock(mutex);
dirty = true;
jobNotification.notify_all();
}
void setDirtyNoLock()
{
dirty = true;
jobNotification.notify_all();
}
void setExit()
{
std::unique_lock<std::mutex> lock(mutex);
exit = true;
jobNotification.notify_all();
}
bool dirty;
bool exit;
std::mutex &mutex;
std::condition_variable jobNotification;
};
struct Job
{
Job(int64_t jobId, std::condition_variable &doneNotify, VolumeDataPageAccessorImpl &pageAccessor, int pagesCount, std::mutex &completed_mutex)
Job(int64_t jobId, PageAccessorNotifier &pageAccessorNotifier, VolumeDataPageAccessorImpl &pageAccessor, int pagesCount)
: jobId(jobId)
, doneNotify(doneNotify)
, pageAccessorNotifier(pageAccessorNotifier)
, pageAccessor(pageAccessor)
, pagesProcessed(0)
, done(false)
, cancelled(false)
, pagesCount(pagesCount)
, completed_mutex(completed_mutex)
{}
int64_t jobId;
std::condition_variable &doneNotify;
PageAccessorNotifier &pageAccessorNotifier;
VolumeDataPageAccessorImpl &pageAccessor;
std::vector<JobPage> pages;
std::vector<std::future<Error>> future;
......@@ -78,7 +110,6 @@ struct Job
std::atomic_bool done;
std::atomic_bool cancelled;
int pagesCount;
std::mutex &completed_mutex;
Error completedError;
};
......@@ -101,9 +132,8 @@ private:
std::map<PageAccessorKey, VolumeDataPageAccessorImpl *> m_pageAccessors;
std::vector<std::unique_ptr<Job>> m_jobs;
std::mutex m_mutex;
std::condition_variable m_jobNotification;
ThreadPool m_threadPool;
std::atomic_bool m_cleanupExit;
PageAccessorNotifier m_pageAccessorNotifier;
std::thread m_cleanupThread;
};
......
......@@ -419,17 +419,17 @@ void VolumeDataStoreVDSFile::SetMetadataStatus(std::string const &layerName, Met
assert(0 && "Not implemented");
}
VolumeDataStoreVDSFile::VolumeDataStoreVDSFile(VDS &vds, const std::string &fileName, Mode mode, Error &error)
VolumeDataStoreVDSFile::VolumeDataStoreVDSFile(VDS &vds, const std::string &vdsFileName, Mode mode, Error &error)
: m_vds(vds)
, m_isVDSObjectFilePresent(false)
, m_isVolumeDataLayoutFilePresent(false)
, m_dataStore(HueBulkDataStore::Open(fileName.c_str()), &HueBulkDataStore::Close)
, m_dataStore(HueBulkDataStore::Open(vdsFileName.c_str()), &HueBulkDataStore::Close)
{
if(mode == ReadWrite)
{
if(!m_dataStore->IsOpen())
{
m_dataStore.reset(HueBulkDataStore::CreateNew(fileName.c_str(), false));
m_dataStore.reset(HueBulkDataStore::CreateNew(vdsFileName.c_str(), false));
}
else
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment