Commit a26ffb69 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Make LocalFileLinux::xx_readv() honor the request for parallel delivery. But...

Make LocalFileLinux::xx_readv() honor the request for parallel delivery. But don't use the feature yet.
parent c1b799d6
......@@ -555,6 +555,34 @@ ZgyInternalBulk::readToExistingBuffer(
<< requests.size() << " read requests are queued\n");
if (!requests.empty())
this->_file->xx_readv(requests, false, false, true, UsageHint::Data);
// TODO-Performance: If passing true in the second arguent above this
// could help performance a lot. Especially for reading compressed files
// where the user sends large requests without multithreading. Also
// when finalizing compressed files. parallel_ok=true will cause the
// decompression step to be multi-threaded. Also the conversion to
// float (if needed) and the copy-out to the applicaton's buffer will
// be multi-threaded. But there are caveats:
//
// * I am not confident enought about _deliverOneBrick() being
// thread safe.
//
// * The cloud backend doesn't honor the parallel_ok argument.
// While this would be a very good idea it is also difficult
// rather difficult to implement.
//
// * There is a risk of creating too many threads if the application
// is doing its own multi threading. Ideally the user should
// be able to configure this.
//
// * Ditto for the low resolution generator. It can probably speed
// up by reading with 4 (but only 4) threads. So this isn't as
// efficient as setting parallel_ok=true here with respect to
// speeding up compression. But it might help I/O against the
// cloud. Which the setting here won't.
//
// * See commants in LocalFileLinux::xx_readv() and _deliverOneBrick().
// Abd GenLodImpl::_calculate().
}
/**
......@@ -904,6 +932,18 @@ ZgyInternalBulk::_partsNeeded(
* might end up set to "raw" which means it must not be used after the
* delivery function returns. Mitigated by adding a clear() member to
* DataBuffer. THat doesn't help if somebody already has a raw pointer though.
*
* Thread safety: Maybe.
*
* Multithreading by having multiple read requests from the API layer is
* safe, as those requests don't share any mutable data.
*
* TODO-Worry: Multithreading by having the low level xx_readv() deliver
* the requested bricks in parallel MIGHT be thread safe. But beware of
* overlapping requests. Also, is it technically allowed for multiple
* threads to share the same output array even if they write to different
* parts of it? If the array isn't 8-byte aligned (on x86_64) then the
* answer is probably no. In general I am not sure.
*/
void
ZgyInternalBulk::_deliverOneBrick(
......
......@@ -35,6 +35,7 @@
#include <unistd.h>
#include <mutex>
#include <atomic>
#include <omp.h>
using OpenZGY::IOContext;
namespace InternalZGY {
......@@ -295,6 +296,24 @@ LocalFileLinux::xx_read(void *data, std::int64_t offset, std::int64_t size, Usag
/**
* \details: Thread safety: Yes, assuming that the linux ::pread is thread safe.
*
* If the caller passes parallel_ok=true this means the caller allows and
* even prefers that we deliver each request on a different thread. This
* parallelization comes in addition to allowing multiple reads in parallel
* at the OpenZGY API level.
*
* Caveat: Consider carefully whether you want both. If the
* application uses OpenMP for multi threading then by default nested
* parallel regions are disabled. You can change this. If the
* application uses some other mechanism than OpenMP used here might
* not realize that it is creating nested loops. Or maybe it does, if
* it uses an application-wide thread pool?
*
* Caveat: Since finalize() is single threaded then it should probably
* enable parallel here. One problem is that the application might
* still be inside an OpenMP loop, using a lock to make sure that
* finalize() runs unmolested. OpenMP wikk still see it is inside a
* parallel region so it might refuse to make one here.
*/
void
LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
......@@ -303,10 +322,67 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
// If xx_read() is overridden then whoever did that wouldn't expect
// xx_readv() to change. The fact that I choose to implement one in
// terms of the other is an implementation detail.
for (const ReadRequest& r : requests) {
std::unique_ptr<char[]> data(new char[r.size]);
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size);
if (!parallel_ok || requests.size() < 2) {
for (const ReadRequest& r : requests) {
std::unique_ptr<char[]> data(new char[r.size]);
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size);
}
}
else {
// OpenMP needs signed loop variable on windows.
const std::int64_t requestcount = requests.size();
// Re-use buffers within one thread, to avoid lock contention in
// the CRT. Assume that in almost all cases the requests will have
// the same size. If this is not true and the sizes vary wildly
// then we may be wasting memory here. Even more memory than the
// size being requested.
// TODO-Low, if number of requests per call is typically less than
// the number of available threads then the re-use is pointless.
std::int64_t maxsize = 0;
for (const ReadRequest& r : requests)
maxsize = std::max(maxsize, r.size);
// Cannot use more threads than we have requests, and OpenMP might
// not be smart enough to see this. Definitely not if the parallel
// region starts before the for loop, as is needed to reuse
// buffers. And sorry for the pedantic guard against more than
// 2**31 bricks.
const int threadcount = std::min(std::min(std::numeric_limits<int>::max(), static_cast<int>(requestcount)), omp_get_max_threads());
// Exceptions thrown out of an OpenMP loop are fatal, so I need to
// handle them here.
std::atomic<int> errors(0);
std::string first_error;
#pragma omp parallel num_threads(threadcount)
{
std::unique_ptr<char[]> data(new char[maxsize]);
#pragma omp for
for (std::int64_t ii=0; ii<requestcount; ++ii) {
if (errors.load() != 0)
continue;
const ReadRequest& r = requests[ii];
try {
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size);
}
catch (const std::exception& ex) {
if (errors.fetch_add(1) == 0) {
auto what = ex.what();
first_error = std::string(what && what[0] ? what : "EXCEPTION");
}
continue;
}
} // end omp for
}
// end parallel
if (errors.load() != 0) {
// The distinction between UserError, EndOfFile, and InternalError
// (and maybe even others) is lost. If it matters I can handle code
// for this as well.
throw OpenZGY::Errors::ZgyInternalError(first_error);
}
}
}
......
......@@ -1192,6 +1192,25 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
this->_config->_debug_trace("readv", /*need=*/asked, /*want=*/realsize,/*parts*/ work.size(), this->_dataset->info()->allSizes(-1));
// Do the actual reading, sequentially, one consolidated chunk at a time.
// TODO-Performance, can this easily be parallelized?
//
// * I probably need a config option for max threads to avoid
// overloading the data server.
//
// * Worry: Can there be multiple requests targeting the same area
// of the output buffer? Probably not although there can be multiple
// read requests for the same area of the file.
//
// If parallel_ok, can I then deliver data as it is received without
// waiting for the last bit? That allows reading and e.g. decompressing
// in parallel. Not a big deal if application has multiple reads in
// flight. Otherwise this might in theory double the speed.
//
// * Tricky to implement. Receiver doesn't allow partial delivery.
// So if one request requires concatenating data from multiple
// cloud reads then this needs to wait until the end. Or of really
// fancy, keep track of when all the data has need read for each
// of the original requests.
for (const auto& it : work) {
this->_dataset->dataset()->readBlock
(static_cast<int>(it.blocknum),
......@@ -1200,6 +1219,25 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
static_cast<size_t>(it.local_size));
}
// TODO-Performance, if parallel_ok, can I parallelize only this
// loop if it gets too difficult to do it inside the above loop?
// This can help speed up ZgyInternalBulk::readToExistingBuffer().
//
// * At this level. each request might be a large consolidated
// read. This means that in addition to the code here, class
// ConsolidateRequests::_consolidated_delivery might also need to
// change. This is the functor responsible for breaking the jumbo
// requests back into the original requests. It might also need
// to do parallelizing. Which means issues with nested OpenMP
// loops. One unpalatable alternative is to break encapsulation
// and do some of _consolidated_delivery's processing here. Ouch.
//
// * If I decide I won't even try to do the concurrent read and
// process described earlier then there is a (I think) much
// simpler alternative. Make a "parallelizer" adaptor that first
// does all the reads and then returns them in parallel. I may
// need to allocate yet another buffer for this, though.
std::int64_t pos = 0;
for (const ReadRequest& rr : new_requests) {
std::int64_t this_size = std::max((std::int64_t)0, std::min(rr.size, current_eof - rr.offset));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment