Commit ee22e835 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

In Memory IOManager

parent 83bde617
......@@ -6,6 +6,7 @@ set(SOURCE_FILES
IO/IOManager.cpp
IO/IOManagerAWS.cpp
IO/IOManagerAzure.cpp
IO/IOManagerInMemory.cpp
VDS/VolumeDataPartition.cpp
VDS/VolumeDataChannelMapping.cpp
VDS/VolumeDataLayer.cpp
......@@ -35,6 +36,7 @@ set (PRIVATE_HEADER_FILES
IO/IOManager.h
IO/IOManagerAWS.h
IO/IOManagerAzure.h
IO/IOManagerInMemory.h
VDS/VDS.h
VDS/VolumeDataPartition.h
VDS/VolumeDataChannelMapping.h
......
......@@ -18,6 +18,7 @@
#include "IOManager.h"
#include "IOManagerAWS.h"
#include "IOManagerInMemory.h"
namespace OpenVDS
{
......@@ -43,6 +44,8 @@ IOManager* IOManager::CreateIOManager(const OpenOptions& options, Error &error)
{
case OpenOptions::AWS:
return new IOManagerAWS(static_cast<const AWSOpenOptions &>(options), error);
case OpenOptions::InMemory:
return new IOManagerInMemory(static_cast<const InMemoryOpenOptions &>(options), error);
default:
error.code = -1;
error.string = "Unknwon type for OpenOptions";
......
#include "IOManagerInMemory.h"
namespace OpenVDS
{
class InMemoryRequest : public Request
{
public:
InMemoryRequest(const std::string &objectName, const Error &error)
: Request(objectName)
, m_error(error)
{
}
~InMemoryRequest() override
{
}
void WaitForFinish() override
{
}
bool IsDone() const override
{
return true;
}
bool IsSuccess(Error &error) const override
{
error = m_error;
return m_error.code == 0;
}
void Cancel() override
{
}
private:
Error m_error;
};
IOManagerInMemory::IOManagerInMemory(const InMemoryOpenOptions &openOptions, Error &error)
: m_threadPool(1)
{
}
IOManagerInMemory::~IOManagerInMemory()
{
}
std::shared_ptr<OpenVDS::Request> IOManagerInMemory::Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto object = m_data[objectName].get();
Error error;
if (object)
{
for (auto &meta : object->metaHeader)
{
handler->HandleMetadata(meta.first, meta.second);
}
std::vector<uint8_t> data = object->data;
handler->HandleData(std::move(data));
}
else
{
error.string = std::string("Object: ") + objectName + " not found.";
error.code = 404;
}
auto request = std::make_shared<InMemoryRequest>(objectName, error);
m_threadPool.Enqueue([handler, error, request]{ handler->Completed(*request, error); });
return request;
}
std::shared_ptr<Request> IOManagerInMemory::Upload(const std::string objectName, const std::string &contentDispostionFilename, const std::string &contentType, const std::vector<std::pair<std::string, std::string> > &metadataHeader, std::shared_ptr<std::vector<uint8_t> > data, std::function<void (const Request &, const Error &)> completedCallback)
{
std::unique_lock<std::mutex> lock(m_mutex);
std::unique_ptr<Object> object(new Object());
object->metaHeader = metadataHeader;
object->data = *data;
m_data[objectName] = std::move(object);
auto r = std::make_shared<InMemoryRequest>(objectName, Error());
if (completedCallback)
m_threadPool.Enqueue([completedCallback, r] { completedCallback(*r, Error()); });
return r;
}
}
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef IOMANAGERINMEMORY_H
#define IOMANAGERINMEMORY_H
#include "IOManager.h"
#include <VDS/ThreadPool.h>
#include <unordered_map>
#include <string>
#include <vector>
#include <mutex>
#include <cstdint>
namespace OpenVDS
{
struct Object
{
std::vector<std::pair<std::string, std::string>> metaHeader;
std::vector<uint8_t> data;
};
class IOManagerInMemory : public IOManager
{
public:
IOManagerInMemory(const InMemoryOpenOptions &openOptions, Error &error);
~IOManagerInMemory() override;
std::shared_ptr<Request> Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range = IORange()) override;
std::shared_ptr<Request> Upload(const std::string objectName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback = nullptr) override;
private:
std::unordered_map<std::string, std::unique_ptr<Object>> m_data;
std::mutex m_mutex;
ThreadPool m_threadPool;
};
}
#endif //IOMANAGERINMEMORY_H
......@@ -39,7 +39,8 @@ struct OpenOptions
{
AWS,
Azure,
File
File,
InMemory
};
ConnectionType connectionType;
......@@ -58,6 +59,11 @@ struct AWSOpenOptions : OpenOptions
AWSOpenOptions(std::string const & bucket, std::string const & key, std::string const & region) : OpenOptions(AWS), bucket(bucket), key(key), region(region) {}
};
struct InMemoryOpenOptions : OpenOptions
{
InMemoryOpenOptions() : OpenOptions(InMemory) {}
};
struct Error
{
int code = 0;
......
......@@ -103,6 +103,7 @@ struct MarkJobAsDoneOnExit
int completed = job->completed.fetch_add(1);
if (completed == int(job->pages.size()) - 1)
{
std::unique_lock<std::mutex>(job->completed_mutex);
job->pageAccessor.SetLastUsed(std::chrono::steady_clock::now());
job->pageAccessor.RemoveReference();
job->doneNotify.notify_all();
......@@ -169,7 +170,7 @@ int64_t VolumeDataRequestProcessor::AddJob(const std::vector<VolumeDataChunk>& c
pageAccessor->AddReference();
m_jobs.emplace_back(new Job(GenJobId(), m_jobNotification, *pageAccessor));
m_jobs.emplace_back(new Job(GenJobId(), m_jobNotification, *pageAccessor, m_mutex));
auto &job = m_jobs.back();
job->pages.reserve(chunks.size());
......@@ -225,13 +226,16 @@ bool VolumeDataRequestProcessor::WaitForCompletion(int64_t jobID, int millisecon
return false;
Job *job = job_it->get();
if (millisecondsBeforeTimeout > 0)
if (job->completed < job->pages.size())
{
std::chrono::milliseconds toWait(millisecondsBeforeTimeout);
job_it->get()->doneNotify.wait_for(lock, toWait, [job]{ return job->completed == job->pages.size();});
} else
{
job_it->get()->doneNotify.wait(lock, [job]{ return job->completed == job->pages.size();});
if (millisecondsBeforeTimeout > 0)
{
std::chrono::milliseconds toWait(millisecondsBeforeTimeout);
job_it->get()->doneNotify.wait_for(lock, toWait, [job]{ return job->completed == job->pages.size();});
} else
{
job_it->get()->doneNotify.wait(lock, [job]{ return job->completed == job->pages.size();});
}
}
if (job->completed == job->pages.size() && !job->cancelled)
......
......@@ -55,12 +55,13 @@ struct JobPage
struct Job
{
Job(int64_t jobId, std::condition_variable &doneNotify, VolumeDataPageAccessorImpl &pageAccessor)
Job(int64_t jobId, std::condition_variable &doneNotify, VolumeDataPageAccessorImpl &pageAccessor, std::mutex &completed_mutex)
: jobId(jobId)
, doneNotify(doneNotify)
, pageAccessor(pageAccessor)
, completed(0)
, cancelled(false)
, completed_mutex(completed_mutex)
{}
int64_t jobId;
......@@ -70,6 +71,7 @@ struct Job
std::vector<std::future<Error>> future;
std::atomic_int completed;
std::atomic_bool cancelled;
std::mutex &completed_mutex;
Error completedError;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment