Commit 94a95ff1 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Multi-threading of compression and other cpu-bound tasks on write.

parent ae0b2a4c
......@@ -52,7 +52,7 @@
* Expand bricks at survey edge to full size unless r/m/w already did.
* Convert sample position to brick number. I.e. divide by bricksize.
* The next layer (_writeOneBrick) can be called with one brick
* and a time so we can re-use the buffer. On the other hand,
* at a time so we can re-use the buffer. On the other hand,
* passing a list of bricks or even all the bricks in the request
* might help parallelization.
*
......@@ -1563,64 +1563,113 @@ ZgyInternalBulk::_writeAlignedRegion(
<< ", type " << (int)file_dtype
<< ")\n");
// Make a temporary DataBuffer that will hold one brick at a time.
// Technically this could be skipped if the application is writing
// exactly one brick. But this risks a number of subtle bugs.
// If the raw data buffer is still whatever was passed from the
// application (i.e. no r/m/w or float->int conversion) then do not
// apply the short cut. Because we cannot control the lifecycle of
// the user's data buffer. It is a shared_ptr but it will typically
// have a bogus destructor. Allowing this buffer in the lower layers
// creates all kinds of corner cases. And how often will we hit
// the shortcut case anyway?
bool do_copy = false;
std::shared_ptr<DataBuffer> brick;
// If the application asked to write just a single brick than we can
// skip the multithreading logic and in most cases skip the copying
// into the temp buffer. That shortcut is probably not worth the
// risk. Especially if the buffer belongs to the application and not
// something reallocated as part of the r/m/w processing. Because we
// cannot control the lifecycle of the user's data buffer. It is a
// shared_ptr but it will typically have a bogus destructor. Also it
// cannot be modified in place for e.g. byteswapping. Allowing this
// buffer in the lower layers creates all kinds of corner cases. And
// how often will we hit the shortcut case anyway?
//
// One temporary brick buffer is allocated for each brick to be written,
// consuming the same amount of data as the original request had.
// Since the requests are queued up the code may also end up allocating
// memory for compressed data etc. and perhaps not freeing the not
// compressed buffer immediately.
//
// If this becomes problematic then consider:
//
// * If multi-threading is disabled at runtime then the "brick" buffer
// can be re-used. _writeOneConstantBrick() or _writeWithRetry(*it)
// would be called inside the loop, one brick at a time.
// * Instead of processing every single brick and then writing all
// at the end, consider splitting huge requests into several
// slightly less huge requests and process those in sequence.
// (the same applies to the r/m/w logic).
// If the user-provided data is a scalar then it can almost be used as-is
// and using the same brick for every write. But the size needs to be
// adjusted to become just a single brick. _writeOneConstantBrick()
// ignores the size but _writeOneBrick() etc. might not.
std::shared_ptr<DataBuffer> constbrick;
if (data->isScalar()) {
// Need to change "size" to a single brick.
double value = data->scalarAsDouble();
brick = DataBuffer::makeDataBuffer3d(&value, sizeof(double), bs, data->datatype());
}
else {
brick = DataBuffer::makeDataBuffer3d(nullptr, 0, bs, data->datatype());
do_copy = true;
constbrick = DataBuffer::makeDataBuffer3d(&value, sizeof(double), bs, data->datatype());
}
for (std::size_t ix = 0; ix < worksize; ++ix) {
const index3_t surveypos = work[ix]; // user's start i0,j0,k0 rounded down
const index3_t brickpos = work[ix] / bs; // as above, but in brick coords
if (do_copy) {
brick->fill(defaultstorage);
brick->copyFrom(data.get(), // source
start.data(), surveypos.data(), // in survey coords
survey_beg.data(), survey_size.data()); // clip to srv
}
std::vector<std::shared_ptr<const WriteBrickArgPack>> const_queue(worksize);
std::vector<std::shared_ptr<const WriteNowArgPack>> normal_queue(worksize);
std::atomic<int> errors(0);
#pragma omp parallel for
for (std::int64_t ix = 0; ix < static_cast<std::int64_t>(worksize); ++ix) {
try {
const index3_t surveypos = work[ix]; // user's start i0,j0,k0 rounded down
const index3_t brickpos = work[ix] / bs; // as above, but in brick coords
std::shared_ptr<DataBuffer> brick = constbrick;
if (!brick) {
brick = DataBuffer::makeDataBuffer3d(nullptr, 0, bs, data->datatype());
brick->fill(defaultstorage);
brick->copyFrom(data.get(), // source
start.data(), surveypos.data(), // in survey coords
survey_beg.data(), survey_size.data()); // clip to srv
}
// Note errorhandling:
// If there are any errors during _writeOneBrick() this probably
// means the entire file is a lost cause. This is true also for
// ZgyUserError raised in the file layer, because at that layer
// the "user" is really OpenZGY and not some client code. The only
// acceptable error is ZgySegmentIsClosed, and that will be caught
// and handled at lower levels.
// TODO-Low: Might implement retrying of writes at a lower level.
// In that case we still shouldn't see those errors here.
ErrorsWillCorruptFile watchdog(this);
std::shared_ptr<const WriteBrickArgPack> args =
std::make_shared<const WriteBrickArgPack>
(brickpos, lod, brick, compressor, 0);
args = _writeOneBrick(*args);
if (args->data->isScalar()) {
_writeOneConstantBrick(*args);
std::shared_ptr<const WriteBrickArgPack> args =
std::make_shared<const WriteBrickArgPack>
(brickpos, lod, brick, compressor, 0);
args = _writeOneBrick(*args);
if (args->data->isScalar()) {
#pragma omp critical // paranoia?
const_queue[ix] = args;
}
else {
std::shared_ptr<const WriteNowArgPack> now =_writeOneNormalBrick(*args);
#pragma omp critical // paranoia?
normal_queue[ix] = now;
}
}
else {
std::shared_ptr<const WriteNowArgPack> now =
_writeOneNormalBrick(*args);
_writeWithRetry(*now);
catch (const std::exception& ex)
{
// No effort to log the actual error or to get the loop to
// end earlier. An exception here really shouldn't happen.
errors.fetch_add(1);
}
watchdog.disarm();
}
if (errors.load() != 0)
throw OpenZGY::Errors::ZgyInternalError("Exception preparing buffers");
// Note errorhandling:
// If there are any errors during actual writing this probably
// means the entire file is a lost cause. This is true also for
// ZgyUserError raised in the file layer, because at that layer
// the "user" is really OpenZGY and not some client code. The only
// acceptable error is ZgySegmentIsClosed, and that will be caught
// and handled at lower levels.
// TODO-Low: Might implement retrying of writes at a lower level.
// In that case we still shouldn't see those errors here.
// The _writeWithRetry() method is not threadsafe and *must* be
// called sequentially. It is also highly recommended to write
// the data in the same order as it occurred in the work array.
// Otherwise the bricks get scrambled and reading will be less
// efficient. This limitation prevents us from having a separate
// worker process draining the normal_queue as data becomes
// available. _writeOneConstantBrick() doesn't care about order
// but might as well follow the same rules. That function is
// very lightweight.
ErrorsWillCorruptFile watchdog(this);
for (const auto& it : const_queue)
if (it)
_writeOneConstantBrick(*it);
for (const auto& it : normal_queue)
if (it)
_writeWithRetry(*it);
watchdog.disarm();
}
/*
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment