Commit 32538d8a authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Add code to re-open the last segment in a cloud file. Add unit tests for...

Add code to re-open the last segment in a cloud file. Add unit tests for update on cloud both at the FileADT level and at the IZgyWriter level.
parent 50f08894
......@@ -897,7 +897,7 @@ public:
// or it may have been written by the old ZGY-Cloud.
// Distinguishing those two is not always possible because
// ZGY-Cloud can also put everything in the same segment.
throw Errors::ZgyUserError
throw Errors::ZgyUpdateRules
("Only files uploaded by OpenZGY can be updated.");
}
if (headersize != segsizes[0]) {
......@@ -907,7 +907,7 @@ public:
// or the old ZGY-Cloud. If there is more than one segment
// or eof is > headersize then there is something weird going on.
// Probably not useful to report on that case though.
throw Errors::ZgyUserError
throw Errors::ZgyUpdateRules
("Only files uploaded by OpenZGY can be updated. Bad Header size.");
}
}
......@@ -1552,6 +1552,11 @@ public:
*/
void close()
{
// TODO-@@@: If the file has never been written to and the error
// flag is set then discard everyhing and do NOT write any data.
// This can in some cases avoid corrupting a file that was opened
// for write and then has an error thrown.
// The same logic may be needed in _close_internal.
if (_fd) {
finalize(std::vector<DecimationType>
{DecimationType::LowPass, DecimationType::WeightedAverage},
......
......@@ -169,6 +169,9 @@ private:
void do_write_one(const void* const data, const std::int64_t blocknum, const std::int64_t size, const bool overwrite);
void do_write_many(const void* const data, const std::int64_t blocknum, const std::int64_t size, const std::int64_t blobsize, const bool overwrite);
public:
// The raw SDGenericDataset is needed by SeismicStoreFileDelayedWrite
// when opening a file for update.
std::shared_ptr<SDGenericDatasetWrapper> datasetwrapper() const {return _dataset;}
// TODO-Low per-instance logging. This is tedious to implement
// because many of the helper classes will need to hold a logger
// instance as well, or need the logger passed in each call.
......@@ -294,6 +297,7 @@ public:
virtual bool xx_iscloud() const override;
private:
void _reopen_last_segment();
void _flush_part(std::int64_t this_segsize);
void _flush(bool final_call);
};
......@@ -1320,18 +1324,21 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
overwrite = true;
this->_dataset->info()->getLocalOffset
(offset, size, &blocknum, &local_offset, &local_size);
// Normally we get here to overwrite blob 0, and that is ok.
// TODO-Low: This code needs more work if/when allowing update.
// This test will fail in the parallel upload case
// because local_offset and local_size refers to SDAPI blocks and
// not the larger segments that we are asked to write. local_size
// will usually not be larger than one SDAPI block and will thus
// fail the size check. It is not an immediate concern because
// block 0 should work, and updating other blocks is only needed
// when re-opening a closed segment. Which is not yet implemented.
// Maybe check offset+N*(segsize/segsplit) (last SDAPI block).
// I am unsure whether it is only the test that is wrong or whether
// this case needs more special handling. Worry about that later.
// Normally we only get here to overwrite blob 0, and that is ok.
// Writing block 0 is not multi-threaded and does not resize.
// If opening an existing file for update it depends on how that
// is handled elsewhere. Hopefully we still won't get here
// If that happens then there are several caveats:
// - May need to allow resizing the last brick, which in turn
// invalidates some assumptions about immutable information.
// - The test below will fail in the parallel upload case
// because local_offset and local_size refers to SDAPI blocks and
// not the larger segments that we are asked to write. local_size
// will usually not be larger than one SDAPI block and will thus
// fail the size check.
// - Maybe check offset+N*(segsize/segsplit) (last SDAPI block)?
// - I am unsure whether it is only the test that is wrong or whether
// this case needs more special handling.
if (local_offset != 0 || local_size != size)
throw OpenZGY::Errors::ZgyInternalError("Cannot write resized segment.");
}
......@@ -1694,12 +1701,6 @@ SeismicStoreFile::_cached_read(/*TODO-Low: seg, offset, view*/)
// FileADT -> SeismicStoreFile -> SeismicStoreFileDelayedWrite /////////
/////////////////////////////////////////////////////////////////////////////
OpenMode _mode;
std::shared_ptr<OpenZGY::IOContext> _config;
std::shared_ptr<SeismicStoreFile> _relay;
std::vector<char> _open_segment;
UsageHint _usage_hint;
SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const IOContext *iocontext)
: FileADT()
, _mode(mode)
......@@ -1717,6 +1718,9 @@ SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& fi
if (!context)
throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));
if (mode == OpenMode::ReadWrite)
this->_reopen_last_segment();
}
SeismicStoreFileDelayedWrite::~SeismicStoreFileDelayedWrite()
......@@ -1993,6 +1997,109 @@ SeismicStoreFileDelayedWrite::xx_iscloud() const
return this->_relay->xx_iscloud();
}
/**
*
* Re-open the last segment by reading it into memory. This is
* needed if the last segment was not full. In the unlikely case
* where the last write managed to exactly fill the segment it isn't
* technically needed but is done anyway to avoid corner cases.
*
* Note that if there is just one segment there is nothing more
* to be done. Our callers are responsible for reading and parsing
* the headers on open and writing them back on close.
*
* Do some consistency checks, over and above what the ZgyWriter
* does to make sure this file was written by OpenZGY and that
* critical parameters such as the segment size was not changed.
*
* Instead of throwing an exception the code could have silently changed
* segsize to a valid number. But there would be several caveats.
*
* - The underlying SeimsicStoreFile needs the segment size in xx_write()
* and its value might have been cached in the constructor.
*
* - If segsize needs to change, what should the code do with segsplit?
* Regardless of what is chosen it won't be intutive.
*
* - Some cases might end up with odd segment size. E.g. if there are
* two segments, original segment size 2 GB, new segment size 1 GB
* and the second (not full) segment is 1025 MB then that would
* become the new segment size.
*
* Called from constructor, so calling virtual methods in this class
* won't work.
*
* Thread safety: No.
*/
void
SeismicStoreFileDelayedWrite::_reopen_last_segment()
{
const std::int64_t user_segsize = this->_config->_real_segsize;
const std::vector<std::int64_t> segments = this->_relay->xx_segments(false);
std::shared_ptr<SDGenericDatasetWrapper> wrapper =
this->_relay->datasetwrapper();
const std::int64_t numseg =
static_cast<std::int64_t>(wrapper->dataset()->getBlockNum());
const std::int64_t numbytes = this->_relay->xx_eof();
if (numseg >= 2) {
//const std::int64_t file_segsize = dataset->getBlockSize(1);
//const std::int64_t file_lastsize = dataset->getBlockSize(numseg-1);
// Or trust the xx_segments
const std::int64_t file_segsize = segments[1];
const std::int64_t file_lastsize = segments.back();
// CHECK the user supplied segment size did not change
if (numseg == 2) {
// The second and also last segment contains data but is
// probably not full.
if (file_segsize > user_segsize) {
throw OpenZGY::Errors::ZgyUpdateRules("This ZGY file was written with a larger segment size, or it was not uploaded by OpenZGY");
// If segments[1] <= segsize*segsplit this might technically have
// worked, but would be a really obscure corner case causing the
// file to be rewritten with a different segment size.
}
}
else { // numseg > 2
// The second segment is now full, so this must have been the
// segment size the file was origially written with.
if (file_segsize != user_segsize) {
throw OpenZGY::Errors::ZgyUpdateRules("This ZGY file was written with a different segment size, or it was not uploaded by OpenZGY");
}
}
// FOOL both SeismicStoreFile and SeismicStoreFileDelayedWrite to
// believe we are still writing the file for the first time,
// with no way except for the OpenMode to say it is wrong.
//
// Need to re-open the last segment by reading it into the open
// buffer and deleting it from the cloud file.
std::vector<char>& seg = this->_open_segment;
seg.resize(file_lastsize);
wrapper->reAuthorizeManager();
wrapper->dataset()->readBlock
(static_cast<int>(numseg-1), seg.data(), 0, seg.size());
// This is the point of no return when it comes to preserving
// the file unchanged in case an error happens. So, maybe not
// delete that segment quite yet, and just logically delete it
// instead? This is risky because e.g. xx_eof in the relayed
// instance will need to know about the subterfuge.
// See more caveats in SeismicStoreFile::xx_write().
wrapper->dataset()->deleteBlock(std::to_string(numseg-1));
// Force the cached DatasetInformation to get updated to
// reflect the new state of the file, i.e. one segment less.
wrapper->updateDataset(wrapper->dataset());
// CHECK that the code didn't mess things up.
// Should really be testing this->xx_eof() but because this
// method is called from the constructor our own virtuals
// will not work.
if (this->_relay->xx_eof() + std::int64_t(seg.size()) != numbytes)
throw OpenZGY::Errors::ZgyInternalError("Bug: eof was changed by reopen.");
if (static_cast<std::int64_t>(wrapper->dataset()->getBlockNum()) !=numseg-1)
throw OpenZGY::Errors::ZgyInternalError("Bug: sdapi didn't delete.");
}
}
/**
* Flush "this_segsize" pending bytes. Leave any residual data
* in the open segment buffer.
......
......@@ -307,6 +307,16 @@ public:
return *this;
}
/**
* For unit tests only. Allows setting segment size to any value.
*/
SeismicStoreIOContext& segsizebytes(std::int64_t nbytes)
{
this->_real_segsize = nbytes;
this->_segsize = this->_real_segsize * (this->_segsplit<1 ? 1 : this->_segsplit);
return *this;
}
/**
* Applies to write() and finalize().
*
......
......@@ -38,6 +38,7 @@ using namespace OpenZGY::Errors;
using namespace InternalZGY;
using Test_Utils::LocalFileAutoDelete;
using Test_Utils::CloudFileAutoDelete;
using Test_Utils::must_throw;
namespace {
#if 0
......@@ -185,7 +186,6 @@ test_sdfiledelete()
TEST_CHECK(ex.what() && std::string(ex.what()).find("does not exist") != std::string::npos);
}
}
#endif
/**
* Python one-liner to show the expiry time of the OPENZGY_TOKEN
......@@ -225,6 +225,138 @@ test_sdtoken()
}
}
static void
update_sdreopen(const std::string& filename, std::int64_t start, std::int64_t nbytes, bool tweak)
{
OpenZGY::SeismicStoreIOContext context =
OpenZGY::SeismicStoreIOContext(*Test_Utils::default_sd_context())
.segsizebytes(2048)
.segsplit(nbytes==10000 ? 8 : 1);
std::shared_ptr<FileADT> fd =
FileFactory::instance().create
(filename,
start==0 && nbytes == 256 && !tweak ? OpenMode::Truncate : OpenMode::ReadWrite,
&context);
std::vector<std::int16_t> data(nbytes/sizeof(std::int16_t));
for (std::int64_t ii=0; ii<(std::int64_t)data.size(); ++ii)
data[ii] = static_cast<std::int16_t>((start / sizeof(std::int16_t)) + ii);
if (tweak && data.size() > 42)
data[42] *= -1;
fd->xx_write(data.data(), start, nbytes, UsageHint::Unknown);
fd->xx_close();
fd.reset();
}
static bool
check_sdreopen(const std::string& filename, std::int64_t expect_bytes, bool tweak)
{
bool ok = true;
std::shared_ptr<FileADT> fd =
FileFactory::instance().create
(filename, OpenMode::ReadOnly, Test_Utils::default_context());
// TODO-@@@: Currently xx_segments() won't check all blocks.
// Can change this but beware use of the debug callback gets expensive.
const std::vector<std::int64_t> segs = fd->xx_segments(true);
const std::int64_t expect_num_segs = 1 + (expect_bytes - 256 + 2047) / 2048;
const std::int64_t expect_lastseg = ((expect_bytes + 1792 - 1) % 2048) + 1;
if (TEST_EQUAL((std::int64_t)segs.size(), expect_num_segs)) {
for (std::int64_t ii=0; ii<expect_num_segs; ++ii)
if (!TEST_EQUAL(segs[ii], (ii==0 ? 256 :
ii==expect_num_segs-1 ? expect_lastseg :
2048)))
{
ok = false;
}
}
else {
ok = false;
}
if (TEST_EQUAL(fd->xx_eof(), expect_bytes)) {
std::vector<std::int16_t> data(expect_bytes/sizeof(std::int16_t));
fd->xx_read(data.data(), 0, expect_bytes, UsageHint::Unknown);
for (std::int64_t ii=0; ii < (std::int64_t)data.size(); ++ii) {
std::int16_t e = static_cast<std::int16_t>((tweak && ii==42) ? -ii : ii);
if (data[ii] != e) {
if (!TEST_EQUAL(data[ii], e)) {
ok = false;
break;
}
}
}
}
else {
ok = false;
}
fd->xx_close();
fd.reset();
return ok;
}
static void
test_sdreopen()
{
// Segment size is 2048 bytes except seg0 which is 256 bytes.
//
// The test will make 6 writes to the file,
// Bytes written: 256, 2048+42, 12, 4096, 2048-42-12, 998, 10000
// Total size: 256, 2346, 2358, 6454, 8448, 9446, 19446
// Segments: 1, 3, 3, 5, 5, 6, 11
//
// Rationale for choosing those sizes is explained before each test.
//
// In (2) and (6) there is no need to re-open the last segment but in (6)
// it will probably happen anyway to simplify the code.
CloudFileAutoDelete cad("sdreopen.dat", Test_Utils::default_sd_context());
// (1) Initial create and establish size of segment 0.
update_sdreopen(cad.name(), 0, 256, false);
TEST_CHECK(check_sdreopen(cad.name(), 256, false));
// (2) Write a bit more that one segment, last segment will be odd size.
update_sdreopen(cad.name(), 256, 2048+42, false);
TEST_CHECK(check_sdreopen(cad.name(), 2346, false));
// (3) Write less than one segment, last segment odd size before and after.
update_sdreopen(cad.name(), 2346, 12, false);
TEST_CHECK(check_sdreopen(cad.name(), 2358, false));
// (4) Write more than one segment, last segment odd size before and after.
update_sdreopen(cad.name(), 2358, 4096, false);
TEST_CHECK(check_sdreopen(cad.name(), 6454, false));
// (5) Write less than one segment, exactly filling up last segment.
update_sdreopen(cad.name(), 6454, 2048-42-12, false);
TEST_CHECK(check_sdreopen(cad.name(), 8448, false));
// (6) Write less than one segment, last segment full before, odd after.
update_sdreopen(cad.name(), 8448, 998, false);
TEST_CHECK(check_sdreopen(cad.name(), 9446, false));
// (7) Write lots of data with multi threaded upload.
update_sdreopen(cad.name(), 9446, 10000, false);
TEST_CHECK(check_sdreopen(cad.name(), 19446, false));
// (8) Overwrite segment 0, no change in size.
update_sdreopen(cad.name(), 0, 256, true);
TEST_CHECK(check_sdreopen(cad.name(), 19446, true));
// (*) Overwrite segment 0 with a smaller size.
must_throw("Cannot change the size of block zero", [&](){
update_sdreopen(cad.name(), 0, 200, false);});
// (*) Overwrite segment 0 with a larger size.
must_throw("Cannot write resized segment", [&](){
update_sdreopen(cad.name(), 0, 2048, false);});
// (*) Overwrite closed segment with no size change.
// Support might be added for that if needed.
must_throw("has already been flushed", [&](){
update_sdreopen(cad.name(), 512, 2048, false);});
}
#endif // HAVE_SD
} // namespace for tests
namespace {
......@@ -233,11 +365,12 @@ namespace {
public:
Register()
{
register_test("aaa.sdtoken", test_sdtoken);
register_test("file.localfilefactory", test_localfilefactory);
#ifdef HAVE_SD
register_test("aaa.sdtoken", test_sdtoken);
register_test("file.sdfilefactory", test_sdfilefactory);
register_test("file.sdfiledelete", test_sdfiledelete);
register_test("file.sdreopen", test_sdreopen);
#endif
}
} dummy;
......
......@@ -1821,6 +1821,211 @@ test_reopen_zgypublic()
}
}
static std::vector<std::int16_t>
make_sequence(std::int64_t start, std::int64_t count, std::int64_t step)
{
std::vector<std::int16_t> result(count);
std::int16_t value{static_cast<std::int16_t>(start % 32768)};
for (std::int64_t ii = 0; ii < (std::int64_t)result.size(); ii += step) {
// Assuming no rollover.
result[ii] = value++;
}
return result;
}
template<typename T>
static std::string
array_to_string(const std::vector<T>& vec, double scale)
{
if (vec.empty())
return "()";
std::stringstream ss;
for (T it : vec)
ss << ", " << (it / scale);
return "(" + ss.str().substr(2) + ")";
}
#ifdef HAVE_SD
/**
* Extract the segment information from FileStatistics -> xx_segments()
* as the abbreviated 3-element vector of (first, all_middle, last)
* and expand that to a list of all the segment sizes.
* Yes this is way too roundabout.
* TODO-@@@-Test: For a unit test the code should really have checked
* each and every segment using SDAPI to see that all except the first
* and last are the same size.
*
* Note that if the FileStatistics come from a file opened for write
* then the result depends on the segsplit setting. Data still in the
* write buffer is assigned to the last segment; it won't be split
* into the correctly sized segments until xx_close().
*/
static std::vector<std::int64_t>
list_segments(std::shared_ptr<const OpenZGY::FileStatistics> fs, int verbose)
{
const std::vector<std::int64_t>& segs = fs->segmentSizes();
const std::int64_t fsize = fs->fileSize();
const std::int64_t numsegs =
(fsize <= segs[0] && segs.size() == 1 ? 1 :
fsize <= segs[0] + segs[1] && segs.size() == 2 ? 2 :
segs.size() != 3 ? 0 :
2 + (fsize - segs[0] - segs[2] + (segs[1] - 1)) / segs[1]);
std::vector<std::int64_t> result;
if (segs.size() < 3) {
result = segs;
}
else {
result.push_back(segs[0]);
for (std::int64_t seg = 1; seg < numsegs - 1; ++seg)
result.push_back(segs[1]);
result.push_back(segs[2]);
}
if (verbose)
std::cout << "segments " << array_to_string(result, 512*1024)
<< " bytes " << fsize
<< std::endl;
return result;
}
static std::vector<std::int64_t>
list_segments(const std::string& name, int verbose)
{
std::shared_ptr<OpenZGY::IZgyReader> reader =
OpenZGY::IZgyReader::open(name, Test_Utils::default_sd_context());
std::shared_ptr<const OpenZGY::FileStatistics> fs = reader->filestats();
reader->close();
return list_segments(fs, verbose);
}
static void
test_reopen_sd()
{
typedef OpenZGY::IZgyWriter::size3i_t size3i_t;
Test_Utils::CloudFileAutoDelete cad("reopen_sd.zgy", Test_Utils::default_sd_context());
OpenZGY::SeismicStoreIOContext context =
OpenZGY::SeismicStoreIOContext(*Test_Utils::default_sd_context())
.segsplit(8)
.segsize(2);
if (verbose())
std::cout << std::endl;
std::shared_ptr<OpenZGY::IZgyWriter> writer =
OpenZGY::IZgyWriter::open(ZgyWriterArgs()
.iocontext(&context)
.filename(cad.name())
.size(2*64, 3*64, 15*64) // 45 MB
.datatype(SampleDataType::int16)
.datarange(-32768,+32767));
// Write 11 bricks, 5.5 MB (2.75 segs), plus 12 lowres => 5.75 data segs
// Lowres: lod1 6 bricks, lod2 3 bricks, lod3 2 bricks, lod4 1 brick
std::vector<std::int16_t> data1 = make_sequence(1, 11*64, 64);
writer->write(size3i_t{0,0,0}, size3i_t{1,1,11*64}, data1.data());
writer->finalize(std::vector<OpenZGY::DecimationType>{OpenZGY::DecimationType::Decimate}, nullptr);
// Check segment size before closing; note that the last segment
// is the data buffer which can (if segsplit>1) cover more than
// one real segment.
{
std::vector<std::int64_t> check = list_segments(writer->filestats(), verbose());
if (TEST_EQUAL(check.size(), (std::size_t)2)) {
TEST_EQUAL(check[0], 512*1024);
TEST_EQUAL(check[1/**/], 23*512*1024);
}
}
writer->close();
// This re-opens the file for read to check the segment sizes.
{
std::vector<std::int64_t> check = list_segments(cad.name(), verbose());
if (TEST_EQUAL(check.size(), (std::size_t)7)) {
TEST_EQUAL(check[0], 512*1024);
TEST_EQUAL(check[1], 4*512*1024);
TEST_EQUAL(check[2], 4*512*1024);
TEST_EQUAL(check[3], 4*512*1024);
TEST_EQUAL(check[4], 4*512*1024);
TEST_EQUAL(check[5], 4*512*1024);
TEST_EQUAL(check[6], 3*512*1024);
}
}
// Write 11 bricks in addition to the 23 already present. Total now 34.
// One header segment and 9 data segments.
// Also the same 12 lowres bricks need to be updated. The three last
// ones are found in the re-opened last segment and can be updated there.
// Provided that segsplit is large enough to hold everything we want
// to update this time around. 8 is sufficient, i.e. 32 bricks.
// The remaining 9 cause more space to be allocated so we end up with
// 43 bricks or 10.75 data bricks total.
writer =
OpenZGY::IZgyWriter::reopen(ZgyWriterArgs()
.iocontext(&context)
.filename(cad.name()));
std::vector<std::int16_t> data2 = make_sequence(11*64+1, 11*64, 64);
writer->write(size3i_t{64,0,0}, size3i_t{1,1,11*64}, data2.data());
writer->finalize(std::vector<OpenZGY::DecimationType>{OpenZGY::DecimationType::Decimate}, nullptr);
// Check segment size before closing; see notes above.
// All but the last segments already written will remain.
{
std::vector<std::int64_t> check = list_segments(writer->filestats(), verbose());
if (TEST_EQUAL(check.size(), (std::size_t)7)) {
// Previous write.
TEST_EQUAL(check[0], 512*1024);
TEST_EQUAL(check[1], 4*512*1024);
TEST_EQUAL(check[2], 4*512*1024);
TEST_EQUAL(check[3], 4*512*1024);
TEST_EQUAL(check[4], 4*512*1024);
TEST_EQUAL(check[5], 4*512*1024);
// Reopened 3 bricks, this and the new 15 bricks still not flushed.
TEST_EQUAL(check[6], 23*512*1024);
}
}
writer->close();
std::shared_ptr<OpenZGY::IZgyReader> reader =
OpenZGY::IZgyReader::open(cad.name(), &context);
{
std::vector<std::int64_t> check =
list_segments(reader->filestats(), verbose());
if (TEST_EQUAL(check.size(), (std::size_t)12)) {
TEST_EQUAL(check[0], 512*1024);
TEST_EQUAL(check[1], 4*512*1024);
TEST_EQUAL(check[2], 4*512*1024);
TEST_EQUAL(check[3], 4*512*1024);
TEST_EQUAL(check[4], 4*512*1024);
TEST_EQUAL(check[5], 4*512*1024);
TEST_EQUAL(check[6], 4*512*1024);
TEST_EQUAL(check[7], 4*512*1024);
TEST_EQUAL(check[8], 4*512*1024);
TEST_EQUAL(check[9], 4*512*1024);
TEST_EQUAL(check[10], 4*512*1024);
TEST_EQUAL(check[11], 3*512*1024);
}
}
std::vector<std::int16_t> check1(11*64);
std::vector<std::int16_t> check2(11*64);
reader->read(size3i_t{0,0,0}, size3i_t{1,1,11*64}, check1.data(), 0);