Commit 4dce11a0 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Free unused pageaccessors for VolumeRequestProcessor

parent 3288c4b6
......@@ -240,6 +240,7 @@ public:
int32_t UploadErrorCount() override;
void GetCurrentUploadError(const char **objectId, int32_t *errorCode, const char **errorString) override;
int CountActivePages() { return m_requestProcessor.CountActivePages(); }
private:
VDS &m_vds;
IOManager *m_ioManager;
......
......@@ -43,6 +43,7 @@ VolumeDataPageAccessorImpl::VolumeDataPageAccessorImpl(VolumeDataAccessManagerIm
, m_references(0)
, m_isReadWrite(isReadWrite)
, m_isCommitInProgress(false)
, m_lastUsed(std::chrono::steady_clock::now())
{
}
......
......@@ -25,6 +25,8 @@
#include <mutex>
#include <condition_variable>
#include <vector>
#include <atomic>
#include <chrono>
namespace OpenVDS
{
......@@ -42,9 +44,10 @@ private:
int m_pagesRead;
int m_pagesWritten;
int m_maxPages;
int m_references;
std::atomic_int m_references;
bool m_isReadWrite;
bool m_isCommitInProgress;
std::atomic<std::chrono::time_point<std::chrono::steady_clock>> m_lastUsed;
std::list<VolumeDataPageImpl *> m_pages;
std::condition_variable m_pageReadCondition;
std::condition_variable m_commitFinishedCondition;
......@@ -74,6 +77,7 @@ public:
int AddReference() override;
int RemoveReference() override;
int GetReferenceCount() const { return m_references.load(); }
VolumeDataPage* PrepareReadPage(int64_t chunk, bool *needToCallReadPreparePage);
bool ReadPreparedPaged(VolumeDataPage *page);
......@@ -91,6 +95,10 @@ public:
bool IsReadWrite() const { return m_isReadWrite; }
VolumeDataAccessManagerImpl *GetManager() const { return m_accessManager; }
void SetLastUsed(std::chrono::time_point<std::chrono::steady_clock> lastUsed) { m_lastUsed = lastUsed; }
std::chrono::time_point<std::chrono::steady_clock> GetLastUsed() const { return m_lastUsed.load(); }
};
}
......
......@@ -34,13 +34,53 @@
namespace OpenVDS
{
static void CleanupThread(std::atomic_bool &exit, std::condition_variable &wakeup, std::map<PageAccessorKey, VolumeDataPageAccessorImpl *> &pageAccessors, std::mutex &pageAccessorsMutex)
{
auto long_block = std::chrono::hours(24 * 32 * 12);
auto in_progress_block = std::chrono::seconds(30);
while(!exit)
{
std::unique_lock<std::mutex> lock(pageAccessorsMutex);
std::chrono::seconds waitFor = long_block;
for (auto &it: pageAccessors)
{
auto &pageAccessor = it.second;
int ref = pageAccessor->GetReferenceCount();
if (ref > 0)
{
if (waitFor > in_progress_block)
waitFor = in_progress_block;
}
else
{
if (pageAccessor->GetMaxPages() > 0)
{
auto duration = pageAccessor->GetLastUsed() - (std::chrono::steady_clock::now() - in_progress_block);
if (duration < std::chrono::seconds(0))
pageAccessor->SetMaxPages(0);
else if (duration < waitFor)
{
waitFor = std::chrono::duration_cast<std::chrono::seconds>(duration) + std::chrono::seconds(1);
}
}
}
}
wakeup.wait_for(lock, waitFor);
}
}
VolumeDataRequestProcessor::VolumeDataRequestProcessor(VolumeDataAccessManagerImpl& manager)
: m_manager(manager)
, m_threadPool(std::thread::hardware_concurrency())
, m_cleanupExit(false)
, m_cleanupThread([this]() { CleanupThread(m_cleanupExit, m_jobNotification, m_pageAccessors, m_mutex); } )
{}
VolumeDataRequestProcessor::~VolumeDataRequestProcessor()
{
m_cleanupExit = true;
m_jobNotification.notify_all();
m_cleanupThread.join();
for (auto &pair : m_pageAccessors)
{
m_manager.DestroyVolumeDataPageAccessor(pair.second);
......@@ -61,8 +101,10 @@ struct MarkJobAsDoneOnExit
~MarkJobAsDoneOnExit()
{
int completed = job->completed.fetch_add(1);
if (completed == job->pages.size() - 1)
if (completed == int(job->pages.size()) - 1)
{
job->pageAccessor.SetLastUsed(std::chrono::steady_clock::now());
job->pageAccessor.RemoveReference();
job->doneNotify.notify_all();
}
}
......@@ -112,21 +154,25 @@ int64_t VolumeDataRequestProcessor::AddJob(const std::vector<VolumeDataChunk>& c
if (page_accessor_it == m_pageAccessors.end())
{
auto pa = static_cast<VolumeDataPageAccessorImpl *>(m_manager.CreateVolumeDataPageAccessor(layout, dimensions, lod, channel, maxPages, OpenVDS::VolumeDataAccessManager::AccessMode_ReadOnly));
auto insert_result = m_pageAccessors.insert({key, pa});
auto insert_result = m_pageAccessors.emplace(key, pa);
assert(insert_result.second);
page_accessor_it = insert_result.first;
}
m_jobs.emplace_back(new Job(GenJobId(), m_jobNotification));
auto &job = m_jobs.back();
job->pages.reserve(chunks.size());
VolumeDataPageAccessorImpl *pageAccessor = page_accessor_it->second;
assert(pageAccessor);
if(pageAccessor->GetMaxPages() < maxPages)
{
pageAccessor->SetMaxPages(maxPages);
}
pageAccessor->AddReference();
m_jobs.emplace_back(new Job(GenJobId(), m_jobNotification, *pageAccessor));
auto &job = m_jobs.back();
job->pages.reserve(chunks.size());
for (const auto &c : chunks)
{
job->pages.emplace_back();
......@@ -205,4 +251,13 @@ void VolumeDataRequestProcessor::Cancel(int64_t jobID)
job_it->get()->cancelled = true;
}
int VolumeDataRequestProcessor::CountActivePages()
{
std::unique_lock<std::mutex> lock(m_mutex);
int ret = 0;
for (auto &pa : m_pageAccessors)
ret += pa.second->GetMaxPages();
return ret;
}
}
......@@ -55,15 +55,17 @@ struct JobPage
struct Job
{
Job(int64_t jobId, std::condition_variable &doneNotify)
Job(int64_t jobId, std::condition_variable &doneNotify, VolumeDataPageAccessorImpl &pageAccessor)
: jobId(jobId)
, doneNotify(doneNotify)
, pageAccessor(pageAccessor)
, completed(0)
, cancelled(false)
{}
int64_t jobId;
std::condition_variable &doneNotify;
VolumeDataPageAccessorImpl &pageAccessor;
std::vector<JobPage> pages;
std::vector<std::future<Error>> future;
std::atomic_int completed;
......@@ -83,6 +85,7 @@ public:
bool WaitForCompletion(int64_t requestID, int millisecondsBeforeTimeout = 0);
void Cancel(int64_t requestID);
int CountActivePages();
private:
VolumeDataAccessManagerImpl &m_manager;
std::map<PageAccessorKey, VolumeDataPageAccessorImpl *> m_pageAccessors;
......@@ -90,6 +93,8 @@ private:
std::mutex m_mutex;
std::condition_variable m_jobNotification;
ThreadPool m_threadPool;
std::atomic_bool m_cleanupExit;
std::thread m_cleanupThread;
};
}
......
......@@ -45,9 +45,9 @@ target_compile_definitions(segy_tests PRIVATE -DTEST_DATA_PATH="${TEST_DATA_PATH
target_include_directories(segy_tests PRIVATE ../src/SEG-Y)
add_test_executable(vds_integration_tests
VDS/VDSIntegrationTest.cpp
VDS/ParseVDSJsonTest.cpp
VDS/DeserializeVolumeDataTest.cpp
VDS/RequestVolumeCleanupThread.cpp
)
target_compile_definitions(vds_integration_tests PRIVATE -DTEST_DATA_PATH="${PROJECT_SOURCE_DIR}/tests/VDS")
target_compile_definitions(vds_integration_tests PRIVATE -DTEST_AWS_REGION="${TEST_AWS_REGION}")
......@@ -55,6 +55,7 @@ target_compile_definitions(vds_integration_tests PRIVATE -DTEST_AWS_BUCKET="${TE
target_compile_definitions(vds_integration_tests PRIVATE -DTEST_AWS_OBJECTID="${TEST_AWS_OBJECTID}")
add_test_executable(openvds_integration_tests
OpenVDS/OpenVDSIntegrationTest.cpp
OpenVDS/DataAccess.cpp
OpenVDS/RequestVolumeSamples.cpp
OpenVDS/RequestVolumeTraces.cpp
......
......@@ -21,7 +21,7 @@
#include <gtest/gtest.h>
GTEST_TEST(OpenVDS_integration, DownloadJson)
GTEST_TEST(OpenVDS_integration, OpenClose)
{
OpenVDS::Error error;
OpenVDS::AWSOpenOptions options;
......
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#include <OpenVDS/OpenVDS.h>
#include <cstdlib>
#include <gtest/gtest.h>
#include <OpenVDS/VolumeDataLayout.h>
#include <VDS/VolumeDataAccessManagerImpl.h>
#include <chrono>
TEST(VDS_integration, RequestVolumeCleanupThread)
{
OpenVDS::Error error;
OpenVDS::AWSOpenOptions options;
options.region = TEST_AWS_REGION;
options.bucket = TEST_AWS_BUCKET;
options.key = TEST_AWS_OBJECTID;
if(options.region.empty() || options.bucket.empty() || options.key.empty())
{
GTEST_SKIP() << "Environment variables not set";
}
ASSERT_TRUE(options.region.size() && options.bucket.size() && options.key.size());
std::unique_ptr<OpenVDS::VDS, decltype(&OpenVDS::Close)> handle(OpenVDS::Open(options, error), &OpenVDS::Close);
ASSERT_TRUE(handle);
OpenVDS::VolumeDataAccessManagerImpl *dataAccessManager = static_cast<OpenVDS::VolumeDataAccessManagerImpl *>(OpenVDS::GetDataAccessManager(handle.get()));
ASSERT_TRUE(dataAccessManager);
OpenVDS::VolumeDataLayout *layout = OpenVDS::GetLayout(handle.get());
int sampleCount0 = layout->GetDimensionNumSamples(0);
std::vector<float> buffer(10 * sampleCount0);
int sampleCount1 = layout->GetDimensionNumSamples(1);
int sampleCount2 = layout->GetDimensionNumSamples(2);
float tracePos[10][6];
for (int trace = 0; trace < 10; trace++)
{
tracePos[trace][0] = 0;
tracePos[trace][1] = (float)(trace * (sampleCount1 / 10));
tracePos[trace][2] = (float)(trace * (sampleCount2 / 10));
tracePos[trace][3] = 0;
tracePos[trace][4] = 0;
tracePos[trace][5] = 0;
}
int64_t requestId = dataAccessManager->RequestVolumeTraces(buffer.data(), layout, OpenVDS::Dimensions_012, 0, 0, tracePos, 10, OpenVDS::InterpolationMethod::Nearest, 0);
int activePages = dataAccessManager->CountActivePages();
ASSERT_GT(activePages, 0);
std::this_thread::sleep_for(std::chrono::seconds(10));
activePages = dataAccessManager->CountActivePages();
ASSERT_GT(activePages, 0);
dataAccessManager->WaitForCompletion(requestId);
activePages = dataAccessManager->CountActivePages();
ASSERT_GT(activePages, 0);
std::this_thread::sleep_for(std::chrono::seconds(21));
activePages = dataAccessManager->CountActivePages();
ASSERT_EQ(activePages, 0);
auto pageAccessor = dataAccessManager->CreateVolumeDataPageAccessor(layout, OpenVDS::Dimensions_012, 0, 0, 1000, OpenVDS::VolumeDataAccessManager::AccessMode_ReadOnly);
auto valueReader = dataAccessManager->Create3DInterpolatingVolumeDataAccessorR32(pageAccessor, 0.0f, OpenVDS::InterpolationMethod::Nearest);
std::vector<float> verify(10 * sampleCount0);
for (int trace = 0; trace < 10; trace++)
{
for (int i = 0; i < sampleCount0; i++)
{
verify[trace * sampleCount0 + i] = valueReader->GetValue(OpenVDS::FloatVector3(tracePos[trace][2], tracePos[trace][1], float(i)));
}
}
ASSERT_EQ(buffer, verify);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment