Commit ec89f182 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Optimized memory allocator useful in a few places.

parent fd61bbf4
Pipeline #50215 passed with stages
in 8 minutes and 23 seconds
......@@ -23,6 +23,8 @@
#include <functional>
#include <iostream>
#include <sstream>
#include <mutex>
#include <algorithm>
using OpenZGY::IOContext;
namespace InternalZGY {
......@@ -30,6 +32,22 @@ namespace InternalZGY {
}
#endif
namespace {
/**
* Return non-zero if the private memory allocator should be used in certain
* circumstances. The default is 1 which will enable those places which will
* most likey benefit and which appear safe. Set to 0 to get an idea of how
* large the saving is. Note that this might result in disappointment, aa the
* system malloc() is fairly efficient already.
*/
static int
malloc_shortcut()
{
static int enable = Environment::getNumericEnv("OPENZGY_MALLOC_SHORTCUT", 1);
return enable;
}
}
/////////////////////////////////////////////////////////////////////////////
// FileADT (base class) /////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
......@@ -219,6 +237,76 @@ FileADT::_deliver(
}
}
/**
* Rudimentary pool of scratch buffers to avoid alloc/dealloc overhead.
* Allocated data is returned as a std::shared_ptr that takes care of release.
* Once allocated the the memory might not be released to the CRT until the
* application exists. The function should only be used for short lived data.
* Also, do NOT create any std::weak_ptr instances referencing the result.
*/
std::shared_ptr<void>
FileADT::_allocate(std::int64_t size)
{
const std::int64_t minsize{16*1024}, maxsize{1024*1024};
const std::int64_t highwater{500}, lowwater{200};
static std::vector<std::shared_ptr<void>> cache;
static std::mutex mutex;
static size_t hint{0};
std::shared_ptr<void> result;
if (size >= minsize && size <= maxsize && malloc_shortcut() >= 1) {
std::lock_guard<std::mutex> lk(mutex);
if (hint >= cache.size())
hint = 0;
// Start searching for an available entry at the point we left off last.
// This might *slightly* improve performance at the cost of messier code.
// And be careful about the corner case where the cache is empty.
std::vector<std::shared_ptr<void>>::const_iterator it, start = cache.begin() + hint;
for (it = start; it != cache.end(); ++it) {
if (it->unique()) {
hint = it - cache.begin() + 1;
return *it;
}
}
for (it = cache.begin(); it != start; ++it) {
if (it->unique()) {
hint = it - cache.begin() + 1;
return *it;
}
}
result = std::shared_ptr<void>(::malloc(maxsize), [](void* p){::free(p);});
if (result) {
// This eviction method is crude but is not expected to be needed at all.
// Assuming that all allocated data really is short lived, but for some
// reason there is so much of it that keeping it allocated after
// everything settles down is not practical. Whether free or in-use
// pointers are evicted doesn't really matter because in-use data will
// be freed very soon anyway. The reason for the shuffle is that if
// long lived data is allocated by accident it will eventually get
// evicted from the cache and becomes regular allocated data.
// To force the eviction code to be executed, try to read a large file
// usinh 1,000 or so threads.
if ((std::int64_t)cache.size() > highwater && highwater > lowwater) {
//std::cerr << "FileADT::_allocate() is evicting entries." << std::endl;
std::random_shuffle(cache.begin(), cache.end());
cache.resize(lowwater);
}
cache.push_back(result);
}
}
else {
// Bypass everything and just do a normal malloc.
result = std::shared_ptr<void>(::malloc(size), [](void* p){::free(p);});
}
if (!result) {
// This should be a std::bad_alloc but it might not be legal to throw that
// exception from user code. Using new instead of malloc would make the
// issue moot, but malloc seems safer both with respect to type punning
// and alignmemt issues.
throw std::runtime_error("Failed to allocate " + std::to_string(size) + " bytes");
}
return result;
}
std::shared_ptr<FileADT>
FileADT::factory(const std::string& filename, OpenMode mode, const IOContext *iocontext)
{
......
......@@ -258,6 +258,7 @@ protected:
public: // Actually internal. Used by ConsolidateRequests.
static void _deliver(const ReadRequest::delivery_t& fn, const ReadRequest::data_t& data, std::int64_t offset, std::int64_t size, bool transient);
static void _deliver_now(const ReadRequest::delivery_t& fn, const ReadRequest::data_t& data, std::int64_t size, bool transient);
static std::shared_ptr<void> _allocate(std::int64_t size);
public:
static std::shared_ptr<FileADT> factory(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
......
......@@ -271,9 +271,12 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
// If xx_read() is overridden then whoever did that wouldn't expect
// xx_readv() to change. The fact that I choose to implement one in
// terms of the other is an implementation detail.
// Note that the delivery function can retain a reference to the data.
// This is allowed as long as the data is still short lived. If not then
// this isn't a disaster due to the eviction code in _allocate().
if (!parallel_ok || requests.size() < 2) {
for (const ReadRequest& r : requests) {
std::shared_ptr<char> data(new char[r.size], std::default_delete<char[]>());
std::shared_ptr<void> data = _allocate(r.size);
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
_deliver(r.delivery, data, 0, r.size, transient_ok);
}
......
......@@ -1167,15 +1167,24 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us
void
SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
{
#if 0
// For now just implement xx_readv in terms of xx_read.
for (const ReadRequest& r : requests) {
std::shared_ptr<char> data(new char[r.size], std::default_delete<char[]>());
this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint);
_deliver(r.delivery, data, 0, r.size, transient_ok);
if (requests.size() == 1) {
// Handle this simple case specially. There will be more cases to test
// but the shortcut might help performance. Especially if the memory
// allocation can be made more efficient. For testing the shortcut can be
// made unconditional. But that will disable the consolidate-brick logic.
// Explicitly use xx_read() in this class, not any overrides. If xx_read()
// is overridden then whoever did that wouldn't expect xx_readv() to change.
// The fact that one is implemented using the other is an implementation detail.
// Note that the delivery function can retain a reference to the data.
// This is allowed as long as the data is still short lived. If not then
// this isn't a disaster due to the eviction code in _allocate().
for (const ReadRequest& r : requests) {
std::shared_ptr<void> data = _allocate(r.size);
this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint);
_deliver(r.delivery, data, 0, r.size, transient_ok);
}
return;
}
return;
#endif
// Consolidate adjacent bricks before reading.
//
......@@ -1227,6 +1236,11 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
return std::max(a, b.local_size + b.outpos);
});
// This would probably work, but the single-brick case is already
// handled and the case for two to four int8 bricks or two int16
// bricks are not that interesting. At least not for applications
// that read just one brick at a time. Those apps will not get here.
//std::shared_ptr<void> data = _allocate(r.size);
std::shared_ptr<char> data(new char[realsize], std::default_delete<char[]>());
if (this->_config->_debug_trace)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment