Commit 35838da2 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Add the "segsplit" property to the iocontext.

parent 467b9d03
...@@ -498,6 +498,9 @@ DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) ...@@ -498,6 +498,9 @@ DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize)
if (blocknum != 0 && block0_size_ == 0) { if (blocknum != 0 && block0_size_ == 0) {
throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known."); throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
} }
if (blocksize < 1) {
throw OpenZGY::Errors::ZgyInternalError("Cannot write less that 1 byte.");
}
if (blocknum == 0) { if (blocknum == 0) {
// Write or overwrite block 0. // Write or overwrite block 0.
if (block0_size_ != 0 && block0_size_ != blocksize) if (block0_size_ != 0 && block0_size_ != blocksize)
...@@ -572,6 +575,9 @@ DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize) ...@@ -572,6 +575,9 @@ DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
* crossed a segment boundary. In that case the caller will need to read * crossed a segment boundary. In that case the caller will need to read
* in a loop. * in a loop.
* *
* "blocks" refer to the block number in Seismic Store, not the potentially
* larger logical blocks used by SeismicStoreDelayedWrite.
*
* Postcondition: If blocknum is returned as the last block, local_size * Postcondition: If blocknum is returned as the last block, local_size
* will be returned as requested size. If this were not so, the calling * will be returned as requested size. If this were not so, the calling
* function would be likely to loop forever. * function would be likely to loop forever.
...@@ -1295,18 +1301,40 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s ...@@ -1295,18 +1301,40 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
overwrite = true; overwrite = true;
this->_dataset->info()->getLocalOffset this->_dataset->info()->getLocalOffset
(offset, size, &blocknum, &local_offset, &local_size); (offset, size, &blocknum, &local_offset, &local_size);
// TODO-Low: This test will fail in the parallel upload case
// because local_offset and local_size refers to SDAPI blocks and
// not the larger segments that we are asked to write. local_size
// will usually not be larger than one SDAPI block and will thus
// fail the size check. It is not an immediate concern because
// block 0 should work, and updating other blocks is only needed
// when re-opening a closed segment. Which is not yet implemented.
// Maybe check offset+N*(segsize/segsplit) (last SDAPI block).
// I am unsure whether it is only the test that is wrong or whether
// this case needs more special handling. Worry about that later.
if (local_offset != 0 || local_size != size) if (local_offset != 0 || local_size != size)
throw OpenZGY::Errors::ZgyInternalError("Cannot write resized segment."); throw OpenZGY::Errors::ZgyInternalError("Cannot write resized segment.");
} }
else { else {
throw OpenZGY::Errors::ZgyUserError("Cannot write segments out of order."); throw OpenZGY::Errors::ZgyUserError("Cannot write segments out of order.");
} }
// The segsize configuration belongs to SeismicFileDelayedWrite but
// the same configuration instance is passed to us so it is reasonable
// to pick it up here. TODO-Worry: If some code creates a SeismicFile
// instance that is to be used directly without a SeismicStoreDelayedWrite
// wrapper *and* configures a non-zero segsize and segsplit then we might
// be in trouble.
const std::int64_t segsize = _config->_segsize;
const std::int64_t segsplit = _config->_segsplit;
if (segsize % segsplit != 0)
throw OpenZGY::Errors::ZgyUserError("segsize must be a multiple of segsplit");
this->_dataset->info()->checkOnWrite(blocknum, size); // Also checks blocknum fits in an int, this->_dataset->info()->checkOnWrite(blocknum, size); // Also checks blocknum fits in an int,
this->_dataset->dataset()->writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite); this->_dataset->dataset()->writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite);
_wtimer->addBytesWritten(size); _wtimer->addBytesWritten(size);
this->_dataset->updateOnWrite(blocknum, size); this->_dataset->updateOnWrite(blocknum, size);
if (this->_config->_debug_trace) if (this->_config->_debug_trace)
this->_config->_debug_trace(offset == current_eof ? "append" : "write", this->_config->_debug_trace
(offset == current_eof ? "append" : "write",
size, size, 1, this->_dataset->info()->allSizes(-1)); size, size, 1, this->_dataset->info()->allSizes(-1));
} }
...@@ -1653,6 +1681,10 @@ SeismicStoreFileDelayedWrite::xx_readv(const ReadList& requests, bool parallel_o ...@@ -1653,6 +1681,10 @@ SeismicStoreFileDelayedWrite::xx_readv(const ReadList& requests, bool parallel_o
* data cannot span the boundary between flushed and open segments, * data cannot span the boundary between flushed and open segments,
* and cannot cover both before and after EOF. * and cannot cover both before and after EOF.
* TODO-Low: Support can be added if it ever turns out to be useful. * TODO-Low: Support can be added if it ever turns out to be useful.
* The only scenarios used today are overwrite entire segment 0
* which will then always be closed. And append at EOF which will
* obviously then not have date both before and after EOF and will
* throw ZgySegmentIsClosed if data spans the open and closed parts.
* *
* If segsize is zero no buffering is done and each write will either * If segsize is zero no buffering is done and each write will either
* create a new segment or completely rewrite an existing segment. * create a new segment or completely rewrite an existing segment.
...@@ -1675,6 +1707,7 @@ SeismicStoreFileDelayedWrite::xx_write(const void* data, std::int64_t offset, st ...@@ -1675,6 +1707,7 @@ SeismicStoreFileDelayedWrite::xx_write(const void* data, std::int64_t offset, st
<< ", size=" << size << ", size=" << size
<< "), current EOF is " << written << "), current EOF is " << written
<< ", segment size " << this->_config->_segsize << ", segment size " << this->_config->_segsize
<< ", split into " << this->_config->_segsplit
<< "\n"); << "\n");
if (offset != 0 && this->_config->_segsize != 0 && offset < committed) { if (offset != 0 && this->_config->_segsize != 0 && offset < committed) {
......
...@@ -70,6 +70,7 @@ SeismicStoreIOContext::SeismicStoreIOContext() ...@@ -70,6 +70,7 @@ SeismicStoreIOContext::SeismicStoreIOContext()
maxhole(Environment::getNumericEnv("OPENZGY_MAXHOLE_MB", 2)); maxhole(Environment::getNumericEnv("OPENZGY_MAXHOLE_MB", 2));
aligned(Environment::getNumericEnv("OPENZGY_ALIGNED_MB", 0)); aligned(Environment::getNumericEnv("OPENZGY_ALIGNED_MB", 0));
segsize(Environment::getNumericEnv("OPENZGY_SEGSIZE_MB", 1024)); segsize(Environment::getNumericEnv("OPENZGY_SEGSIZE_MB", 1024));
segsplit(Environment::getNumericEnv("OPENZGY_SEGSPLIT", 8));
threads(Environment::getNumericEnv("OPENZGY_NUMTHREADS", 1)); threads(Environment::getNumericEnv("OPENZGY_NUMTHREADS", 1));
legaltag(Environment::getStringEnv("OPENZGY_LEGALTAG")); legaltag(Environment::getStringEnv("OPENZGY_LEGALTAG"));
writeid(Environment::getStringEnv("OPENZGY_WRITEID")); writeid(Environment::getStringEnv("OPENZGY_WRITEID"));
...@@ -88,6 +89,7 @@ SeismicStoreIOContext::toString() const ...@@ -88,6 +89,7 @@ SeismicStoreIOContext::toString() const
<< " maxhole: " << _maxhole / (1024*1024) << " MB\n" << " maxhole: " << _maxhole / (1024*1024) << " MB\n"
<< " aligned: " << _aligned / (1024*1024) << " MB\n" << " aligned: " << _aligned / (1024*1024) << " MB\n"
<< " segsize: " << _segsize / (1024*1024) << " MB\n" << " segsize: " << _segsize / (1024*1024) << " MB\n"
<< " segsplit: " << _segsplit << "\n"
<< " threads: " << _threads << "\n" << " threads: " << _threads << "\n"
<< " legaltag: \"" << _legaltag << "\"\n" << " legaltag: \"" << _legaltag << "\"\n"
<< " writeid: \"" << _writeid << "\"\n" << " writeid: \"" << _writeid << "\"\n"
......
...@@ -102,6 +102,7 @@ private: ...@@ -102,6 +102,7 @@ private:
std::int64_t _maxhole; std::int64_t _maxhole;
std::int64_t _aligned; std::int64_t _aligned;
std::int64_t _segsize; std::int64_t _segsize;
std::int64_t _segsplit;
std::int64_t _threads; std::int64_t _threads;
std::string _legaltag; std::string _legaltag;
std::string _writeid; std::string _writeid;
...@@ -283,6 +284,26 @@ public: ...@@ -283,6 +284,26 @@ public:
return *this; return *this;
} }
/**
* Maximum number of threads to be used when writing data to the cloud.
* Default is 8. Set to 1 if you don't want multithreaded uploads.
*
* The value of segsplit must divide evenly into segsize.
*
* Multi-threading is achieved by splitting up the segsize write buffer
* into "segsplit" SDAPI objects. Hence the name. The splitting can
* be observed directly by the caller because it determines the segment
* size seen when reading. So the "split" effect is more important
* than the "maybe-threaded" effect.
*/
SeismicStoreIOContext& segsplit(int value)
{
if (value < 1 || value > 1024)
throw OpenZGY::Errors::ZgyUserError("segsplit must be between 0 and 1024");
this->_segsplit = value;
return *this;
}
/** /**
* Use up to this many parallel requests to seismic store in order * Use up to this many parallel requests to seismic store in order
* to speed up processing. Set between 1 and 1024, This applies to * to speed up processing. Set between 1 and 1024, This applies to
......
...@@ -67,6 +67,7 @@ void Test::TestIOContext::test_defaults() ...@@ -67,6 +67,7 @@ void Test::TestIOContext::test_defaults()
TEST_CHECK(ctx._maxhole == 2 * 1024*1024); TEST_CHECK(ctx._maxhole == 2 * 1024*1024);
TEST_CHECK(ctx._aligned == 0 * 1024*1024); TEST_CHECK(ctx._aligned == 0 * 1024*1024);
TEST_CHECK(ctx._segsize == 1024 * 1024*1024); TEST_CHECK(ctx._segsize == 1024 * 1024*1024);
TEST_CHECK(ctx._segsplit == 8);
TEST_CHECK(ctx._threads == 1); TEST_CHECK(ctx._threads == 1);
TEST_CHECK(ctx._legaltag == ""); TEST_CHECK(ctx._legaltag == "");
TEST_CHECK(ctx._writeid == ""); TEST_CHECK(ctx._writeid == "");
...@@ -83,6 +84,7 @@ void Test::TestIOContext::test_setters() ...@@ -83,6 +84,7 @@ void Test::TestIOContext::test_setters()
.maxhole(7) .maxhole(7)
.aligned(1) .aligned(1)
.segsize(15) .segsize(15)
.segsplit(3)
.threads(8) .threads(8)
.legaltag("illegal") .legaltag("illegal")
.writeid("WID") .writeid("WID")
...@@ -96,6 +98,7 @@ void Test::TestIOContext::test_setters() ...@@ -96,6 +98,7 @@ void Test::TestIOContext::test_setters()
TEST_CHECK(ctx._maxhole == 7 * 1024*1024); TEST_CHECK(ctx._maxhole == 7 * 1024*1024);
TEST_CHECK(ctx._aligned == 1 * 1024*1024); TEST_CHECK(ctx._aligned == 1 * 1024*1024);
TEST_CHECK(ctx._segsize == 15 * 1024*1024); TEST_CHECK(ctx._segsize == 15 * 1024*1024);
TEST_CHECK(ctx._segsplit == 3);
TEST_CHECK(ctx._threads == 8); TEST_CHECK(ctx._threads == 8);
TEST_CHECK(ctx._legaltag == "illegal"); TEST_CHECK(ctx._legaltag == "illegal");
TEST_CHECK(ctx._writeid == "WID"); TEST_CHECK(ctx._writeid == "WID");
...@@ -111,6 +114,7 @@ void Test::TestIOContext::test_errors() ...@@ -111,6 +114,7 @@ void Test::TestIOContext::test_errors()
must_throw("must be between", [&](){ctx.maxhole(2001);}); must_throw("must be between", [&](){ctx.maxhole(2001);});
must_throw("must be between", [&](){ctx.aligned(2000);}); must_throw("must be between", [&](){ctx.aligned(2000);});
must_throw("must be between", [&](){ctx.segsize(200000);}); must_throw("must be between", [&](){ctx.segsize(200000);});
must_throw("must be between", [&](){ctx.segsplit(0);});
must_throw("must be between", [&](){ctx.threads(0);}); must_throw("must be between", [&](){ctx.threads(0);});
} }
......
...@@ -514,6 +514,8 @@ ZgyCommon_getIOContext(ZgyClass *self, const char *filename, PyObject* obj) ...@@ -514,6 +514,8 @@ ZgyCommon_getIOContext(ZgyClass *self, const char *filename, PyObject* obj)
result->aligned((int)ivalue); result->aligned((int)ivalue);
if ((ivalue = getLongValuedAttr(obj, "segsize", -1)) != -1) if ((ivalue = getLongValuedAttr(obj, "segsize", -1)) != -1)
result->segsize((int)ivalue); result->segsize((int)ivalue);
if ((ivalue = getLongValuedAttr(obj, "segsplit", -1)) != -1)
result->segsplit((int)ivalue);
if ((ivalue = getLongValuedAttr(obj, "threads", -1)) != -1) if ((ivalue = getLongValuedAttr(obj, "threads", -1)) != -1)
result->threads((int)ivalue); result->threads((int)ivalue);
if (!(value = getStringValuedAttr(obj, "legaltag")).empty()) if (!(value = getStringValuedAttr(obj, "legaltag")).empty())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment