Commit 172b09c9 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Prepare for supporting parallel uploads.

parent 2a436f49
......@@ -1281,8 +1281,7 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
SimpleTimerEx tt(*_wtimer);
this->_validate_write(data, offset, size, this->_mode);
this->_dataset->reAuthorizeManager();
// TODO-High: Make sure this is nonvirtual *or* don't subtype.
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // nonvirtual
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // MUST be nonvirtual
if (_logger(5, ""))
_logger(5, std::stringstream()
<< "SeismicStoreFile.xx_write("
......@@ -1318,20 +1317,100 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
throw OpenZGY::Errors::ZgyUserError("Cannot write segments out of order.");
}
// The segsize configuration belongs to SeismicFileDelayedWrite but
// the same configuration instance is passed to us so it is reasonable
// to pick it up here. TODO-Worry: If some code creates a SeismicFile
// instance that is to be used directly without a SeismicStoreDelayedWrite
// wrapper *and* configures a non-zero segsize and segsplit then we might
// be in trouble.
const std::int64_t segsize = _config->_segsize;
const std::int64_t segsplit = _config->_segsplit;
if (segsize % segsplit != 0)
// The segsize configuration belongs to SeismicFileDelayedWrite but is
// needed here because the blobsize configuration that belongs to us
// isn't stored explicitly. It is stored as segsize/segsplit. The same
// configuration instance is passed to both us and SeismicFileDelayedWrite
// so getting it here is easy even though it technically breaks isolation.
//
// Side issue: If some code creates a SeismicFile instance that is to be
// used directly without a SeismicStoreDelayedWrite wrapper then segsplit
// should be set to 1. I am not sure what happens if it isn't.
if (_config->_segsize > 0 && _config->_segsize % _config->_segsplit != 0)
throw OpenZGY::Errors::ZgyUserError("segsize must be a multiple of segsplit");
this->_dataset->info()->checkOnWrite(blocknum, size); // Also checks blocknum fits in an int,
this->_dataset->dataset()->writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite);
_wtimer->addBytesWritten(size);
this->_dataset->updateOnWrite(blocknum, size);
const std::int64_t blobsize =
(blocknum == 0 || _config->_segsize <= 0 ||_config->_segsplit <= 1) ?
size : _config->_segsize / _config->_segsplit;
const int blobcount = (size + blobsize - 1) / blobsize;
if (size <= blobsize) {
this->_dataset->info()->checkOnWrite(blocknum, size);
this->_dataset->dataset()->writeBlock
(static_cast<int>(blocknum), // cast checked by checkOnWrite()
static_cast<const char*>(data),
static_cast<std::size_t>(size),
overwrite);
_wtimer->addBytesWritten(size);
this->_dataset->updateOnWrite(blocknum, size);
}
else {
// Write one or more SDAPI segments of no more than blobsize bytes each.
//
// Possible scenarios.
//
// \li We never get here for block 0; than one cannot be split and it
// would rarely make sense anyway because block 0 is usually small.
// Calling this code anyway with blobsize == size and blobcount == 1
// should work but is needlessly roundabout and I have not checked
// that it won't hit any corner cases.
//
// \li If size <= blobsize then we might as well use the old code,
// even if the other condtions for writing in parallel pass.
// There will be just one SDAPI block and only one thread will
// be used. The short cut means slightly less code coverage
// when running unit tests with only small files. So, use
// bigger ones.
//
// \li In most cases we get segsize bytes here (typically 1 GB) and
// we are expected to write out the data in segsplit (typically 8)
// identical SDAPI blocks each of size segsize/segsplit.
//
// \li If size != segsize this is the last or
// (if -1 <= blocknum <= segsplit) only segment with bulk data.
// In that case we may end up writing fewer than segsplit SDAPI
// blocks. Possibly just one. And the last SDAPI block can be
// smaller than segsize/segsplit blocks. We can even have
// size < segsize/segsplit meaning all headers are in SDAPI
// block 0 and all data in SDAPI block 1. In the latter case
// there will be no parallelization.
//
// The last two bullets seem to suggest we need to know what segsize is,
// But no, all this code needs to know is that it shall split the
// incoming buffer into chunks of no more than blobsize bytes.
//
// Below, only need to run consistency checks on the the first of the
// segsplit SDAPI blocks written. So we end up checing block 0 (above),
// 1, segsplit+1, ... When checking block 1 the real segment size hasn't
// been established yet so there isn't much to check. Otherwise the check
// asserts that _dataset->info()->block1size() == blobsize. There is
// always a check that blocknum fits in an int. To make the cast safe.
//
this->_dataset->info()->checkOnWrite(blocknum, std::min(size, blobsize));
for (int ii = 0; ii < blobcount; ++ii) {
const int iter_blocknum = ii + static_cast<int>(blocknum);
const std::int64_t iter_offset = ii * blobsize;
const std::int64_t iter_endoffset = std::min(size, (ii+1) * blobsize);
std::int64_t iter_size = iter_endoffset - iter_offset;
const char* iter_data = static_cast<const char*>(data)+iter_offset;
if (iter_size > 0) {
this->_dataset->dataset()->writeBlock
(iter_blocknum, iter_data, iter_size, overwrite);
_wtimer->addBytesWritten(iter_size);
}
}
// The rest of the functions, including updateOnWrite(), are unaware
// that we might have done parallel writes. Keep them in the dark.
// Otherwise there will be errors reported about writing out of sequence.
// Not to mention that updateOnWrite() isn't threadsafe.
for (int ii = 0; ii < blobcount; ++ii) {
const int iter_blocknum = ii + static_cast<int>(blocknum);
const std::int64_t iter_offset = ii * blobsize;
const std::int64_t iter_endoffset = std::min(size, (ii+1) * blobsize);
std::int64_t iter_size = iter_endoffset - iter_offset;
if (iter_size > 0) {
this->_dataset->updateOnWrite(iter_blocknum, iter_size);
}
}
} // end of decision to split into multiple bricks.
if (this->_config->_debug_trace)
this->_config->_debug_trace
(offset == current_eof ? "append" : "write",
......
......@@ -70,7 +70,8 @@ SeismicStoreIOContext::SeismicStoreIOContext()
maxhole(Environment::getNumericEnv("OPENZGY_MAXHOLE_MB", 2));
aligned(Environment::getNumericEnv("OPENZGY_ALIGNED_MB", 0));
segsize(Environment::getNumericEnv("OPENZGY_SEGSIZE_MB", 1024));
segsplit(Environment::getNumericEnv("OPENZGY_SEGSPLIT", 8));
// TODO-High: default segsplit to 8 instead of making it opt-in
segsplit(Environment::getNumericEnv("OPENZGY_SEGSPLIT", 1));
threads(Environment::getNumericEnv("OPENZGY_NUMTHREADS", 1));
legaltag(Environment::getStringEnv("OPENZGY_LEGALTAG"));
writeid(Environment::getStringEnv("OPENZGY_WRITEID"));
......
......@@ -286,7 +286,9 @@ public:
/**
* Maximum number of threads to be used when writing data to the cloud.
* Default is 8. Set to 1 if you don't want multithreaded uploads.
* Set to 1 if you don't want multithreaded uploads. Default is 1.
* The default will soon be changed to 8, but for now the split is
* an opt-in feature.
*
* The value of segsplit must divide evenly into segsize.
*
......@@ -295,6 +297,10 @@ public:
* be observed directly by the caller because it determines the segment
* size seen when reading. So the "split" effect is more important
* than the "maybe-threaded" effect.
*
* Using multiple threads for uploads can often improve throughput.
* But if the limiting factor is the bandwidth between the client
* and the cloud server then multiple treads won't help.
*/
SeismicStoreIOContext& segsplit(int value)
{
......
......@@ -67,7 +67,7 @@ void Test::TestIOContext::test_defaults()
TEST_CHECK(ctx._maxhole == 2 * 1024*1024);
TEST_CHECK(ctx._aligned == 0 * 1024*1024);
TEST_CHECK(ctx._segsize == 1024 * 1024*1024);
TEST_CHECK(ctx._segsplit == 8);
TEST_CHECK(ctx._segsplit == 1);
TEST_CHECK(ctx._threads == 1);
TEST_CHECK(ctx._legaltag == "");
TEST_CHECK(ctx._writeid == "");
......
......@@ -1209,7 +1209,7 @@ def testRmwFile(filename, zgyWriterFactory = newzgy.ZgyWriter):
trace = TraceCallsToSD()
with zgyWriterFactory(filename,
iocontext = SDCredentials(segsize=11, _debug_trace=trace),
iocontext = SDCredentials(segsize=11, segsplit=1, _debug_trace=trace),
size = surveysize,
datatype = SampleDataType.float32,
datarange = (-28,+227),
......@@ -2523,7 +2523,7 @@ def testCloudConsolidateBricks(filename, *, verbose = False):
# Re-create the file with 7 MB segment size, to stress some more code.
iocontext = SDCredentials(aligned=1, maxsize=64, maxhole=1, threads=1,
segsize=7, _debug_trace = trace
segsize=7, segsplit=1, _debug_trace = trace
)
bricksize = np.array((64, 64, 64), dtype=np.int64)
brick = np.product(bricksize) * np.dtype(np.float32).itemsize
......@@ -2623,7 +2623,7 @@ def testCloudConsolidateCreateFile(filename, *, verbose = False):
vprint("Creating test file for cloud consolidate.")
trace = TraceCallsToSD(verbose = verbose)
iocontext = SDCredentials(aligned=0, maxhole=0, maxsize=64, segsize=7,
_debug_trace = trace)
segsplit=1, _debug_trace = trace)
size = np.array((700, 500, 100), dtype=np.int64)
bricksize = np.array((64, 64, 64), dtype=np.int64)
numbricks = (size + bricksize - 1) // bricksize
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment