Commit 852f5b29 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Multi-threading of decompress and copy-out for seismic store.

parent f1094442
......@@ -158,13 +158,27 @@ public:
* functor will not try to modify the data buffer.
* Pass False e.g. if the functor may need to byteswap
* the data it has read from file.
* With the current implementation the bulk layer
* will uncondiionally pass false because it doesn't
* know yet whether byeswap and/or subtiling is needed.
* With the current implementation this doesn't add
* much cost to the cloud reader so this is probably ok.
* transient_ok: If true the caller promises that the delivery
* functor will not keep a reference to the data buffer
* after the functor returns.
* With smart pointers it is possible to check whether
* the delivery functor kept its promise and signal
* a fatal error if it didn't. The reason that the code
* doesn't just allow keeping a pointer and look at the
* refcount on return is that this might make a future
* cache module less efficient.
*
* The delivery functor is called as
* fn(void* data, std::int64_t size)
*
* size can in some cases be more than originally requested due to
* caching and possibly less if end of file was encountered.
*
* FUTURE: a new argument partial_ok may be set to True if it is ok to
* call the delivery functor with less data than requested, and to keep
* calling it until all data has been delivered. The signature of the
......
......@@ -269,7 +269,7 @@ ConsolidateRequests::_join_requests(
ReadList new_requests;
for (const ReadList& group : all_requests) {
// Output of groupsize tells us what to read in order to cover
// all the requests in this group. We can that create a single
// all the requests in this group. We can then create a single
// ReadRequest for the group.
std::pair<std::int64_t,std::int64_t> info =
_groupsize(group, force_align, eof);
......
......@@ -240,10 +240,11 @@ LocalFileLinux::xx_read(void *data, std::int64_t offset, std::int64_t size, Usag
* not realize that it is creating nested loops. Or maybe it does, if
* it uses an application-wide thread pool?
*
* TODO-Test:
* Caveat: Since finalize() is single threaded then it should probably
* enable parallel here. One problem is that the application might
* still be inside an OpenMP loop, using a lock to make sure that
* finalize() runs unmolested. OpenMP wikk still see it is inside a
* finalize() runs unmolested. OpenMP will still see it is inside a
* parallel region so it might refuse to make one here.
*/
void
......@@ -271,6 +272,12 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
// size being requested.
// TODO-Low, if number of requests per call is typically less than
// the number of available threads then the re-use is pointless.
// TODO-High: If !transient_ok then we need one buffer per request.
// In this case it isn't too late to detect this here:
// If the (soon to be) smart pointer isn't unique after delivery
// then discard it and create a new one. In that case, might as
// well do the same if the buffer is too small. Not needing the
// up front calculation of maxsize.
std::int64_t maxsize = 0;
for (const ReadRequest& r : requests)
maxsize = std::max(maxsize, r.size);
......
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "file_parallelizer.h"
#include "mtguard.h"
#include "../exception.h"
#include <omp.h>
#include <string.h>
#include <iostream>
namespace InternalZGY {
#if 0
}
#endif
/**
* \brief Help parallelize the decompression and copy-out steps.
*
* \detailed
* - Intercept calls to xx_readv.
*
* - Send a single read request list down to the layer below.
* At the level where the parallelized is injected, each
* individal request will almost always be for a single brick.
*
* - Wait for all the data to be delvered. In the seismic server
* case the accessor will in any case defer sending results until
* it has all the requested data. Changing that will not happen
* anytime soon
*
* - Use an OpemMP loop to return each brick in a potentially
* different thread.
*
* The net effect is similar to using parallel loops at the
* end of SeismicStoreFile::xx_readv() and also in in the
* ConsolidateRequests::_consolidated_delivery() method.
* In that case no additional buffer copy would have been needed. The first place is
* for when bricks were not consolidated and the second for
* when that did happen. The problem is that this may lead
* to nested parallelization.
*
* Hence the code in this file. Which is in any case better
* due to being more modular. The extra buffer copies are bad
* though.
*/
FileParallelizer::FileParallelizer(std::shared_ptr<FileADT> relay, std::int64_t cputhreads)
: FileRelay(relay)
, _cputhreads(cputhreads)
{
std::cerr << "Parallelizer has been created with "
<< _cputhreads << " threads\n";
}
FileParallelizer::~FileParallelizer()
{
std::cerr << "Parallelizer has been destructed\n";
}
void
FileParallelizer::xx_readv(
const ReadList& requests,
bool parallel_ok, // usually true
bool immutable_ok, // usually false
bool transient_ok, // usually true
UsageHint hint)
{
const bool shortcut = false; // Set false only for debugging.
const std::int64_t requestcount = requests.size();
// Shortcut if no parallelizing possible.
if (!parallel_ok || (shortcut && requests.size() <= 1)) {
std::cerr << "Parallelizer: Nothing to do\n";
relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, hint);
return;
}
// Future: For each request, try fulfilling it entirely from the cache
// and remove from the queue because that part would have much lower
// overhead.
// Negotiating immutable_ok and transient_ok:
// Currently the data is always copied inside this method to a new buffer.
// Tell the layer below: parallel_ok, immutable_ok, transient_ok.
// If/when the delivery is done by smart pointer it may be possible
// to avoid the copy but negotiating those flags beomes non trivial.
//
// With delivery using smart pointers, SeismicStoreFile::xx_readv()
// has little or no additional cost for a mutable and not transient
// buffer except for some corner cases. So pass down the immutable_ok
// from the caller and pass the transient_ok=false needed here.
// A future caching module might make this trickier.
// The new request list sends the data to our buffers instead of to caller.
ReadList newrequests(requests);
std::vector<std::shared_ptr<char>> buffers(requestcount);
for (std::int64_t ii = 0; ii < requestcount; ++ii) {
std::int64_t buflen = newrequests[ii].size;
std::shared_ptr<char> bufptr(new char[buflen]);
buffers[ii] = bufptr;
newrequests[ii].delivery =
[bufptr,buflen](const void* data, std::int64_t len) {
std::cerr << "+";
// TODO-High: Verify: len can be more than requested due to speculative
// caching and less due to encountering EOF. The EOF ought to
// have beem tested for already and oversize is just ignored.
memcpy(bufptr.get(), data, std::min(len, buflen));
if (buflen > len)
memset(bufptr.get() + len, 0, buflen - len);
};
}
// The actual read. It will not return until all is processed.
// We copy delivered data immediately, so immutable_ok && transient_ok.
std::cerr << "Parallelizer: n=" << requestcount << " ";
relay().xx_readv(newrequests, true, true, true, hint);
// Deliver the buffers that we cached to the caller.
const std::int64_t threadcount = std::min(requestcount, (std::int64_t)omp_get_max_threads());
MTGuard guard;
#pragma omp parallel for num_threads(threadcount)
for (std::int64_t ii = 0; ii < requestcount; ++ii) {
guard.run([&](){
std::cerr << "0123456789"[omp_get_thread_num() % 10];
requests[ii].delivery(buffers[ii].get(), requests[ii].size);
});
}
guard.finished();
std::cerr << "$\n";
}
/**
* Inject a parallelizer module.
*/
std::shared_ptr<FileADT>
FileParallelizer::inject(std::shared_ptr<FileADT> file, std::int64_t cputhreads)
{
if (cputhreads > 1)
file = std::shared_ptr<FileADT>(new FileParallelizer(file, cputhreads));
return file;
}
} // namespace
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "file_relay.h"
namespace InternalZGY {
#if 0
}
#endif
/**
* Static class for (in some situation) speeding up read from cloud.
* See SeismicStoreFile::xx_readv() for some explanation.
*/
class FileParallelizer : public FileRelay
{
std::int64_t _cputhreads;
public:
FileParallelizer(std::shared_ptr<FileADT> relay, std::int64_t cputhreads);
FileParallelizer(const FileParallelizer&) = delete;
FileParallelizer& operator=(const FileParallelizer&) = delete;
virtual ~FileParallelizer();
virtual void xx_readv(const ReadList& requests,
bool parallel_ok,
bool immutable_ok,
bool transient_ok,
UsageHint hint) override;
static std::shared_ptr<FileADT> inject(std::shared_ptr<FileADT> file, std::int64_t cputhreads);
};
} // namespace
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "file_parallelizer.h"
#include "../exception.h"
namespace InternalZGY {
#if 0
}
#endif
FileRelay::FileRelay(std::shared_ptr<FileADT> relay)
: _relay(relay)
{
if (!_relay)
throw OpenZGY::Errors::ZgyInternalError("FileRelay created with no target");
}
FileRelay::~FileRelay()
{
}
} // namespace
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "file.h"
namespace InternalZGY {
#if 0
}
#endif
/**
* Static class for (in some situation) speeding up read from cloud.
*/
/**
* \brief A FileADT that forwards all requests to another instance.
*
* \details
* This class is meant to be extended to add special handling of some
* of FileADT members. Used by itself it is a no-op. The class is
* typically used when just a few methods are to be intercepted and
* it makes sense to have a default that just passes on the call.
*/
class FileRelay : public FileADT
{
private:
std::shared_ptr<FileADT> _relay;
FileRelay(const FileRelay&) = delete;
FileRelay& operator=(const FileRelay&) = delete;
protected:
const FileADT& relay() const { return *_relay; };
FileADT& relay() { return *_relay; };
public:
explicit FileRelay(std::shared_ptr<FileADT> relay);
virtual ~FileRelay();
virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint hint) override {
_relay->xx_read(data, offset, size, hint);
}
virtual void xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint hint) override {
_relay->xx_readv(requests, parallel_ok, immutable_ok, transient_ok, hint);
}
virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint hint) override {
_relay->xx_write(data, offset, size, hint);
}
virtual void xx_close() override {
_relay->xx_close();
}
virtual std::int64_t xx_eof() const override {
return _relay->xx_eof();
}
virtual bool xx_iscloud() const override {
return _relay->xx_iscloud();
}
};
} // namespace
......@@ -19,6 +19,7 @@
#include "logger.h"
#include "file_consolidate.h"
#include "file_performance.h"
#include "file_parallelizer.h"
#include "fancy_timers.h"
#include "mtguard.h"
#include "../exception.h"
......@@ -96,6 +97,7 @@
// getTokenType()
using OpenZGY::IOContext;
using OpenZGY::SeismicStoreIOContext;
namespace InternalZGY {
#if 0
}
......@@ -1095,8 +1097,15 @@ SeismicStoreFile::xx_make_instance(const std::string& filename, OpenMode mode, c
if (filename.substr(0, 5) == "sd://" &&
(mode != OpenMode::ReadWrite && mode != OpenMode::Truncate)) {
auto file = std::shared_ptr<FileADT>(new SeismicStoreFile(filename, mode, iocontext));
// This is a no-op unless enabled by enviroment variables
// This is a no-op unless enabled by enviroment variables.
// Note, this might have been injected after the FileParallelizer instead.
file = FileWithPerformanceLogger::inject(file);
// Improve multi-threading of decompress and copy-out.
auto context = dynamic_cast<const SeismicStoreIOContext*>(iocontext);
if (context && context->_cputhreads > 1)
file = FileParallelizer::inject(file, context->_cputhreads);
return file;
}
else
......@@ -1133,6 +1142,10 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us
* Thread safety: Designed to be thread safe as long as the underlying
* SDGenericDataset is. Even when data is being written in another
* thread.
*
* TODO-Worry: even with consolidate_overlaps=false, overlapping
* requests might cause surprises. Since this isn't supposed to
* happen anyway, maybe just fall back to one brick at a time?
*/
void
SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
......
......@@ -96,12 +96,14 @@
<ClCompile Include="..\src\impl\environment.cpp" />
<ClCompile Include="..\src\impl\fancy_timers.cpp" />
<ClCompile Include="..\src\impl\file.cpp" />
<ClCompile Include="..\src\impl\file_consolidate.cpp" />
<ClCompile Include="..\src\impl\file_local.cpp" />
<ClCompile Include="..\src\impl\file_performance.cpp" />
<ClCompile Include="..\src\impl\file_windows.cpp" />
<ClCompile Include="..\src\impl\file_consolidate.cpp" />
<ClCompile Include="..\src\impl\file_local.cpp" />
<ClCompile Include="..\src\impl\file_parallelizer.cpp" />
<ClCompile Include="..\src\impl\file_performance.cpp" />
<ClCompile Include="..\src\impl\file_relay.cpp" />
<ClCompile Include="..\src\impl\file_sd.cpp" />
<ClCompile Include="..\src\impl\file_smallcache.cpp" />
<ClCompile Include="..\src\impl\file_windows.cpp" />
<ClCompile Include="..\src\impl\genlod.cpp" />
<ClCompile Include="..\src\impl\guid.cpp" />
<ClCompile Include="..\src\impl\histogrambuilder.cpp" />
......@@ -136,8 +138,10 @@
<ClInclude Include="..\src\impl\environment.h" />
<ClInclude Include="..\src\impl\fancy_timers.h" />
<ClInclude Include="..\src\impl\file.h" />
<ClInclude Include="..\src\impl\file_consolidate.h" />
<ClInclude Include="..\src\impl\file_performance.h" />
<ClInclude Include="..\src\impl\file_consolidate.h" />
<ClInclude Include="..\src\impl\file_parallelizer.h" />
<ClInclude Include="..\src\impl\file_performance.h" />
<ClInclude Include="..\src\impl\file_relay.h" />
<ClInclude Include="..\src\impl\file_sd.h" />
<ClInclude Include="..\src\impl\file_smallcache.h" />
<ClInclude Include="..\src\impl\genlod.h" />
......
......@@ -53,9 +53,15 @@
<ClCompile Include="..\src\impl\file_consolidate.cpp">
<Filter>src\impl</Filter>
</ClCompile>
<ClCompile Include="..\src\impl\file_parallelizer.cpp">
<Filter>src\impl</Filter>
</ClCompile>
<ClCompile Include="..\src\impl\file_performance.cpp">
<Filter>src\impl</Filter>
</ClCompile>
<ClCompile Include="..\src\impl\file_relay.cpp">
<Filter>src\impl</Filter>
</ClCompile>
<ClCompile Include="..\src\impl\file_sd.cpp">
<Filter>src\impl</Filter>
</ClCompile>
......@@ -157,9 +163,15 @@
<ClInclude Include="..\src\impl\file_consolidate.h">
<Filter>src\impl</Filter>
</ClInclude>
<ClInclude Include="..\src\impl\file_parallelizer.h">
<Filter>src\impl</Filter>
</ClInclude>
<ClInclude Include="..\src\impl\file_performance.h">
<Filter>src\impl</Filter>
</ClInclude>
<ClInclude Include="..\src\impl\file_relay.h">
<Filter>src\impl</Filter>
</ClInclude>
<ClInclude Include="..\src\impl\file_sd.h">
<Filter>src\impl</Filter>
</ClInclude>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment