From c7c774c37f3becaa6bb1912d55580cd41220d74b Mon Sep 17 00:00:00 2001 From: Paal Kvamme Date: Mon, 5 Jul 2021 15:35:08 +0200 Subject: [PATCH 1/3] Add -T, --uthreads option to zgycopyc. As -t, --threads but sacrifice optimal ordering for increased parallelization. This makes OpenZGY zgycopyc more similar to ZGY-Public zgycopy. --- native/src/tools/zgycopyc.cpp | 37 ++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/native/src/tools/zgycopyc.cpp b/native/src/tools/zgycopyc.cpp index 481ae39..1a6b8af 100644 --- a/native/src/tools/zgycopyc.cpp +++ b/native/src/tools/zgycopyc.cpp @@ -91,6 +91,7 @@ public: bool dumpsqnr; // Still unused. bool native; bool dropcache; + bool ordered_write; std::string sigpipe; std::string input; std::string output; @@ -122,6 +123,7 @@ public: , dumpsqnr(false) , native(true) , dropcache(false) + , ordered_write(false) , sigpipe() , input() , output() @@ -184,6 +186,7 @@ public: "-f, --finalize type full, incremental, keep, etc.", //"-l, --lod N: *Level of detail, 0 = full resolution.", "-t, --threads N: Number of threads to use for reading.", + "-T, --uthreads N: As -t but writes may be unordered", "-n, --brickcount N: Only copy the first N bricks.", "-Q, --sqnr QUALITY: Compression quality. Uncompressed if absent.", "-z, --omp-nest N: Levels of OpenMP nesting.", @@ -272,7 +275,7 @@ public: if (lod != 0) os << "--lod=" << lod << " "; if (threads != 1) - os << "--threads=" << threads << " "; + os << (ordered_write ? "--threads=" : "--uthreads=") << threads << " "; if (brickcount >= 0) os << "--brickcount=" << brickcount << " "; if (sqnr > 0) @@ -414,7 +417,7 @@ public: static const char* short_options() { - return "hvqGuraDNFUp:i:o:s:l:b:B:O:g:t:n:Q:"; + return "hvqGuraDNFUp:i:o:s:l:b:B:O:g:t:T:n:Q:"; } static const struct option *long_options() @@ -441,6 +444,7 @@ public: {"algorithm", required_argument, 0, 'g' }, {"finalize" , required_argument, 0, 'f' }, {"threads", required_argument, 0, 't' }, + {"uthreads", required_argument, 0, 'T' }, {"brickcount", required_argument, 0, 'n' }, {"lod", required_argument, 0, 'l' }, {"sqnr", required_argument, 0, 'Q' }, @@ -543,7 +547,8 @@ public: case 'g': algorithm = getDecimationTypes(optarg); break; case 'f': finalize = getFinalizeAction(optarg); break; case 'l': throw std::runtime_error("--lod not supported"); //lod = geti(optarg); break; - case 't': threads = geti(optarg); break; + case 't': threads = geti(optarg); ordered_write = true; break; + case 'T': threads = geti(optarg); ordered_write = false; break; case 'n': brickcount = geti(optarg); break; case 'Q': sqnr = geti(optarg); break; case 'z': omp_nest = geti(optarg); break; @@ -1226,20 +1231,38 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx& std::cerr << outstring.str() << std::flush; } std::shared_ptr buf(malloc(bufbytes), [](void *d){::free(d);}); + if (opt.ordered_write) { #pragma omp for ordered schedule(dynamic,1) - for (std::int64_t task = 0; task < total; ++task) { - guard.run([&]() + for (std::int64_t task = 0; task < total; ++task) { + guard.run([&]() { readchunk(r, w, tasklist[task], bs, surveysize, buf.get(), dt, rtimer, opt.noisefactor); }); #pragma omp ordered - guard.run([&]() + guard.run([&]() { writechunk(r, w, tasklist[task], bs, surveysize, buf.get(), dt, wtimer); }); - guard.progress(); + guard.progress(); + } + } + else { +#pragma omp for schedule(dynamic,1) + for (std::int64_t task = 0; task < total; ++task) { + guard.run([&]() + { + readchunk(r, w, tasklist[task], bs, surveysize, + buf.get(), dt, rtimer, opt.noisefactor); + }); + guard.run([&]() + { + writechunk(r, w, tasklist[task], bs, surveysize, + buf.get(), dt, wtimer); + }); + guard.progress(); + } } } rwt.stop(); -- GitLab From fd61bbf4bee0d9ef5634dead4b93c875e042ccbd Mon Sep 17 00:00:00 2001 From: Paal Kvamme Date: Mon, 5 Jul 2021 15:35:32 +0200 Subject: [PATCH 2/3] Tweak compiler options for databuffer.cpp (only). --- native/windows/OpenZGY.vcxproj | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/native/windows/OpenZGY.vcxproj b/native/windows/OpenZGY.vcxproj index ea36f64..d9d90f8 100644 --- a/native/windows/OpenZGY.vcxproj +++ b/native/windows/OpenZGY.vcxproj @@ -92,7 +92,10 @@ $(SolutionDir)..\..\build\temp\zfpbuildnative\zfp-0.5.5\include;%(AdditionalIncludeDirectories) - + + false + false + -- GitLab From ec89f182775a4023874aaa112ab0c46c32019fee Mon Sep 17 00:00:00 2001 From: Paal Kvamme Date: Mon, 5 Jul 2021 15:38:22 +0200 Subject: [PATCH 3/3] Optimized memory allocator useful in a few places. --- native/src/impl/file.cpp | 88 ++++++++++++++++++++++++++++++++++ native/src/impl/file.h | 1 + native/src/impl/file_local.cpp | 5 +- native/src/impl/file_sd.cpp | 30 ++++++++---- 4 files changed, 115 insertions(+), 9 deletions(-) diff --git a/native/src/impl/file.cpp b/native/src/impl/file.cpp index 8aef358..07ee8c3 100644 --- a/native/src/impl/file.cpp +++ b/native/src/impl/file.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include using OpenZGY::IOContext; namespace InternalZGY { @@ -30,6 +32,22 @@ namespace InternalZGY { } #endif +namespace { + /** + * Return non-zero if the private memory allocator should be used in certain + * circumstances. The default is 1 which will enable those places which will + * most likey benefit and which appear safe. Set to 0 to get an idea of how + * large the saving is. Note that this might result in disappointment, aa the + * system malloc() is fairly efficient already. + */ + static int + malloc_shortcut() + { + static int enable = Environment::getNumericEnv("OPENZGY_MALLOC_SHORTCUT", 1); + return enable; + } +} + ///////////////////////////////////////////////////////////////////////////// // FileADT (base class) ///////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////// @@ -219,6 +237,76 @@ FileADT::_deliver( } } +/** + * Rudimentary pool of scratch buffers to avoid alloc/dealloc overhead. + * Allocated data is returned as a std::shared_ptr that takes care of release. + * Once allocated the the memory might not be released to the CRT until the + * application exists. The function should only be used for short lived data. + * Also, do NOT create any std::weak_ptr instances referencing the result. + */ +std::shared_ptr +FileADT::_allocate(std::int64_t size) +{ + const std::int64_t minsize{16*1024}, maxsize{1024*1024}; + const std::int64_t highwater{500}, lowwater{200}; + static std::vector> cache; + static std::mutex mutex; + static size_t hint{0}; + std::shared_ptr result; + if (size >= minsize && size <= maxsize && malloc_shortcut() >= 1) { + std::lock_guard lk(mutex); + if (hint >= cache.size()) + hint = 0; + // Start searching for an available entry at the point we left off last. + // This might *slightly* improve performance at the cost of messier code. + // And be careful about the corner case where the cache is empty. + std::vector>::const_iterator it, start = cache.begin() + hint; + for (it = start; it != cache.end(); ++it) { + if (it->unique()) { + hint = it - cache.begin() + 1; + return *it; + } + } + for (it = cache.begin(); it != start; ++it) { + if (it->unique()) { + hint = it - cache.begin() + 1; + return *it; + } + } + result = std::shared_ptr(::malloc(maxsize), [](void* p){::free(p);}); + if (result) { + // This eviction method is crude but is not expected to be needed at all. + // Assuming that all allocated data really is short lived, but for some + // reason there is so much of it that keeping it allocated after + // everything settles down is not practical. Whether free or in-use + // pointers are evicted doesn't really matter because in-use data will + // be freed very soon anyway. The reason for the shuffle is that if + // long lived data is allocated by accident it will eventually get + // evicted from the cache and becomes regular allocated data. + // To force the eviction code to be executed, try to read a large file + // usinh 1,000 or so threads. + if ((std::int64_t)cache.size() > highwater && highwater > lowwater) { + //std::cerr << "FileADT::_allocate() is evicting entries." << std::endl; + std::random_shuffle(cache.begin(), cache.end()); + cache.resize(lowwater); + } + cache.push_back(result); + } + } + else { + // Bypass everything and just do a normal malloc. + result = std::shared_ptr(::malloc(size), [](void* p){::free(p);}); + } + if (!result) { + // This should be a std::bad_alloc but it might not be legal to throw that + // exception from user code. Using new instead of malloc would make the + // issue moot, but malloc seems safer both with respect to type punning + // and alignmemt issues. + throw std::runtime_error("Failed to allocate " + std::to_string(size) + " bytes"); + } + return result; +} + std::shared_ptr FileADT::factory(const std::string& filename, OpenMode mode, const IOContext *iocontext) { diff --git a/native/src/impl/file.h b/native/src/impl/file.h index 4c29686..3b938b2 100644 --- a/native/src/impl/file.h +++ b/native/src/impl/file.h @@ -258,6 +258,7 @@ protected: public: // Actually internal. Used by ConsolidateRequests. static void _deliver(const ReadRequest::delivery_t& fn, const ReadRequest::data_t& data, std::int64_t offset, std::int64_t size, bool transient); static void _deliver_now(const ReadRequest::delivery_t& fn, const ReadRequest::data_t& data, std::int64_t size, bool transient); + static std::shared_ptr _allocate(std::int64_t size); public: static std::shared_ptr factory(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext); diff --git a/native/src/impl/file_local.cpp b/native/src/impl/file_local.cpp index 5cd5daa..d736f8e 100644 --- a/native/src/impl/file_local.cpp +++ b/native/src/impl/file_local.cpp @@ -271,9 +271,12 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta // If xx_read() is overridden then whoever did that wouldn't expect // xx_readv() to change. The fact that I choose to implement one in // terms of the other is an implementation detail. + // Note that the delivery function can retain a reference to the data. + // This is allowed as long as the data is still short lived. If not then + // this isn't a disaster due to the eviction code in _allocate(). if (!parallel_ok || requests.size() < 2) { for (const ReadRequest& r : requests) { - std::shared_ptr data(new char[r.size], std::default_delete()); + std::shared_ptr data = _allocate(r.size); this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint); _deliver(r.delivery, data, 0, r.size, transient_ok); } diff --git a/native/src/impl/file_sd.cpp b/native/src/impl/file_sd.cpp index 588b3bb..2cf73e7 100644 --- a/native/src/impl/file_sd.cpp +++ b/native/src/impl/file_sd.cpp @@ -1167,15 +1167,24 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us void SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint) { -#if 0 - // For now just implement xx_readv in terms of xx_read. - for (const ReadRequest& r : requests) { - std::shared_ptr data(new char[r.size], std::default_delete()); - this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint); - _deliver(r.delivery, data, 0, r.size, transient_ok); + if (requests.size() == 1) { + // Handle this simple case specially. There will be more cases to test + // but the shortcut might help performance. Especially if the memory + // allocation can be made more efficient. For testing the shortcut can be + // made unconditional. But that will disable the consolidate-brick logic. + // Explicitly use xx_read() in this class, not any overrides. If xx_read() + // is overridden then whoever did that wouldn't expect xx_readv() to change. + // The fact that one is implemented using the other is an implementation detail. + // Note that the delivery function can retain a reference to the data. + // This is allowed as long as the data is still short lived. If not then + // this isn't a disaster due to the eviction code in _allocate(). + for (const ReadRequest& r : requests) { + std::shared_ptr data = _allocate(r.size); + this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint); + _deliver(r.delivery, data, 0, r.size, transient_ok); + } + return; } - return; -#endif // Consolidate adjacent bricks before reading. // @@ -1227,6 +1236,11 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu return std::max(a, b.local_size + b.outpos); }); + // This would probably work, but the single-brick case is already + // handled and the case for two to four int8 bricks or two int16 + // bricks are not that interesting. At least not for applications + // that read just one brick at a time. Those apps will not get here. + //std::shared_ptr data = _allocate(r.size); std::shared_ptr data(new char[realsize], std::default_delete()); if (this->_config->_debug_trace) -- GitLab