Commit a4c4513b authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Allow calling finalize and then continue writing.

parent a815b6d5
......@@ -1268,6 +1268,11 @@ public:
* This method will be called automatically from close(), but in
* that case it is not possible to request a progress callback.
*
* It is valid to call finalize() and then continue writing to the
* file. This might be useful if the application is mixing reads and
* writes in the same open file. The low resolution data will only be
* up to date after a call to finalize().
*
* If the processing raises an exception the data is still marked
* as clean. So a second attempt will do nothing unless the
* caller passes force=True.
......@@ -1356,6 +1361,8 @@ public:
_delete_derived();
if (progress)
progress(0, 0);
// Next finalize must be a full one, so stop tracking.
_accessor_rw->trackedBricksTryEnable(false);
break;
case FinalizeAction::Keep:
......@@ -1365,7 +1372,16 @@ public:
case FinalizeAction::BuildIncremental:
case FinalizeAction::BuildFull:
_create_derived(decimation, progress, action);
try {
_create_derived(decimation, progress, action);
// Reset and start over tracking changes.
_accessor_rw->trackedBricksTryEnable(true);
}
catch(...) {
// Next finalize must be a full one.
_accessor_rw->trackedBricksTryEnable(false);
throw;
}
break;
}
......
......@@ -485,6 +485,9 @@ ZgyInternalBulk::ZgyInternalBulk(
, _ptimer_st(new SummaryPrintingTimerEx("writeAligned[S]"))
, _ptimer_mt(new SummaryPrintingTimerEx("writeAligned[M]"))
, _ststimer(new SummaryPrintingTimerEx("scaleToStorage"))
, _modified_bricks()
, _modified_stats()
, _modified_histo()
{
// If the file is being opened for update there may already be data in it.
// The histogram will show the range of written samples to date, even
......@@ -543,41 +546,9 @@ ZgyInternalBulk::ZgyInternalBulk(
}
// Valid statistics, histogram, and lowres means we might want to do
// an incremental finalize. So keep track of changes. No test for
// existing lowres here, because ZgyInternalMeta::initFromReopen()
// will have cleared the statistics if lowres is missing.
if (metadata_rw) {
const IHistHeaderAccess& hh = metadata_rw->hh();
const IInfoHeaderAccess& ih = metadata_rw->ih();
//std::cout << "\n@@@ hmin " << hh.minvalue() << " hmax " << hh.maxvalue() << " hcnt " << hh.samplecount() << " scnt " << ih.scnt() << std::endl;
if (hh.minvalue() <= hh.maxvalue() && hh.samplecount() && ih.scnt()) {
_modified_bricks.resize(ih.brickoffsets().back(), 0);
_modified_stats.reset(new StatisticData
(ih.scnt(), /*inf=*/0, ih.ssum(), ih.sssq(),
ih.smin(), ih.smax()));
_modified_histo.reset(new HistogramData
(hh.bins(), hh.bincount(),
hh.minvalue(), hh.maxvalue()));
if (_logger(2))
_logger(2, std::stringstream()
<< "Will Track changes. Initial stats (real) "
<< _modified_stats->toString()
<< " histo " << _modified_histo->toString());
// Statistics saved in the ZGY file uses float data, but internal
// calculation needs to be in storage because otherwise any clipped
// values would mess things up.
const std::array<double,2> factors = ih.storagetofloat();
_modified_stats->scale(factors[1], factors[0] + factors[1], 0, 1);
_modified_histo->scale(factors[1], factors[0] + factors[1], 0, 1);
if (_logger(1))
_logger(1, std::stringstream()
<< "Will Track changes. Initial stats (storage) "
<< _modified_stats->toString()
<< " histo " << _modified_histo->toString());
}
}
// an incremental finalize. So keep track of changes. The method
// will only turn it on if valid data exists.
this->trackedBricksTryEnable(true);
}
/**
......@@ -827,6 +798,84 @@ ZgyInternalBulk::readToNewBuffer(
return result;
}
/**
* Start or stop keeping track of changed bricks to support
* incremental finalize. Any existing information is discarded.
* Tracking requires pre-existing statistics, histogram, and low
* resolution bricks. If missing then tracking is turned off,
* regardless of what the caller asked for.
*
* This method must be nonvirtual because it is also called from the
* constructor.
*
* The test for pre-existing low resolution bricks attempted here is
* somewhat paranoid. Checking statistics and histogram should have
* been sufficient because if called from the constructor then
* ZgyInternalMeta::initFromReopen() will have cleared the statistics
* if lowres was missing. If called from finalize() the caller knows
* whether lowres exists or not and if not should not have asked us to
* turn it on.
*/
void
ZgyInternalBulk::trackedBricksTryEnable(bool on)
{
if (!_metadata_rw) {
_logger(1, "Will not track changes. Not open for write.");
_modified_bricks.clear();
_modified_stats.reset();
_modified_histo.reset();
}
else if (on) {
const IHistHeaderAccess& hh = _metadata_rw->hh();
const IInfoHeaderAccess& ih = _metadata_rw->ih();
const std::int64_t nlods = static_cast<std::int64_t>(ih.lodsizes().size());
const std::int64_t havelods = LookupTable::usableBrickLOD
(ih.lodsizes(), ih.brickoffsets(),
_metadata_rw->blup().lup(), _metadata_rw->blup().lupend());
const bool complete = (havelods == nlods &&
hh.samplecount() != 0 &&
hh.minvalue() <= hh.maxvalue() &&
ih.scnt() != 0);
if (complete) {
_modified_bricks.resize(ih.brickoffsets().back(), 0);
_modified_stats.reset(new StatisticData
(ih.scnt(), /*inf=*/0, ih.ssum(), ih.sssq(),
ih.smin(), ih.smax()));
_modified_histo.reset(new HistogramData
(hh.bins(), hh.bincount(),
hh.minvalue(), hh.maxvalue()));
if (_logger(2))
_logger(2, std::stringstream()
<< "Will Track changes. Initial stats (real) "
<< _modified_stats->toString()
<< " histo " << _modified_histo->toString());
// Statistics saved in the ZGY file uses float data, but internal
// calculation needs to be in storage because otherwise any clipped
// values would mess things up.
const std::array<double,2> factors = ih.storagetofloat();
_modified_stats->scale(factors[1], factors[0] + factors[1], 0, 1);
_modified_histo->scale(factors[1], factors[0] + factors[1], 0, 1);
if (_logger(1))
_logger(1, std::stringstream()
<< "Will Track changes. Initial stats (storage) "
<< _modified_stats->toString()
<< " histo " << _modified_histo->toString());
}
else {
_logger(1, "Will not track changes. Missing pre-existing data.");
_modified_bricks.clear();
_modified_stats.reset();
_modified_histo.reset();
}
}
else {
_logger(1, "Will not track changes. Explicitly disabled.");
_modified_bricks.clear();
_modified_stats.reset();
_modified_histo.reset();
}
}
/**
* \brief Mark bricks as dirty.
*
......@@ -840,7 +889,7 @@ ZgyInternalBulk::readToNewBuffer(
* and the third one might not be useful outside of debugging.
*
* Unlike trackedBricksDirty() the region has already been converted
* to a list of bricks but the numbers ate still in samples not bricks.
* to a list of bricks but the numbers are still in samples not bricks.
*/
void
ZgyInternalBulk::_trackedBricksSetDirty(
......@@ -2221,9 +2270,7 @@ ZgyInternalBulk::writeRegion(
if (entire_survey) {
_written_sample_min = std::numeric_limits<double>::infinity();
_written_sample_max = -std::numeric_limits<double>::infinity();
_modified_bricks.resize(0);
_modified_stats.reset();
_modified_histo.reset();
trackedBricksTryEnable(false);
}
const index3_t beg = start;
......@@ -2280,14 +2327,6 @@ ZgyInternalBulk::writeRegion(
survey_beg.data(), survey_size.data()); // clip to survey
if (lod == 0)
trackChanges(new_data, new_valid, _modified_stats.get(), _modified_histo.get(), /*add=*/true);
// Compare with the Python version:
// self._partialCopy(data, // source
// start, // survey coords of source
// data.shape, // size of source
// new_data, // target
// new_start, // survey coords of target
// new_data.shape, // size of target
// survey_beg, survey_end - survey_beg)
data = new_data;
start = new_start;
valid_count = new_valid;
......
......@@ -134,6 +134,7 @@ public: // actually internal
const std::vector<std::uint8_t>& trackedBricks() const {
return _modified_bricks;
}
void trackedBricksTryEnable(bool on);
std::uint8_t trackedBricksDirty(
const std::array<std::int64_t,3>& start,
const std::array<std::int64_t,3>& size,
......
......@@ -217,6 +217,7 @@ enum class TestTwiceFlags : int
step1_compress = 1<<0, // OFF/on.
step1_write = 1<<1, // off/ON. Some +42 samples, a few +41.
step1_finalize = 1<<2, // OFF/ON. Use "Decimate" algorithm.
step1_keepopen = 1<<11, // OFF/on. Finalize but then continue writing.
// After close and reopen:
step2_nometa = 1<<9, // OFF/on. Second open don't change corners etc.
step2_compress = 1<<3, // OFF/on.
......@@ -250,6 +251,7 @@ std::ostream& operator<<(std::ostream& os, TestTwiceFlags value)
<< (((value & F::step1_compress) != F::nothing) ? "F::step1_compress | " : "")
<< (((value & F::step1_write) != F::nothing) ? "F::step1_write | " : "")
<< (((value & F::step1_finalize) != F::nothing) ? "F::step1_finalize | " : "")
<< (((value & F::step1_keepopen) != F::nothing) ? "F::step1_keepopen | " : "")
<< (((value & F::step2_nometa) != F::nothing) ? "F::step2_nometa | " : "")
<< (((value & F::step2_compress) != F::nothing) ? "F::step2_compress | " : "")
<< (((value & F::step2_write) != F::nothing) ? "F::step2_write | " : "")
......@@ -328,11 +330,11 @@ do_test_reopen(const std::string& filename, TestTwiceFlags flags)
const float expect_lod1second = expect_nlods==1 ? -1 : expect_lod0second;
const double expect_coord =
(flagset(TestTwiceFlags::step2_nometa) ? 0 :
(flagset(TestTwiceFlags::step2_nometa )|| flagset(TestTwiceFlags::step1_keepopen) ? 0 :
5);
const double expect_samplerate =
(flagset(TestTwiceFlags::step2_nometa) ? 6 :
(flagset(TestTwiceFlags::step2_nometa) || flagset(TestTwiceFlags::step1_keepopen) ? 6 :
7);
// step1_write writes mostly "42" but one in every 7*13 written samples,
......@@ -432,7 +434,7 @@ do_test_reopen(const std::string& filename, TestTwiceFlags flags)
!flagset(TestTwiceFlags::step1_finalize) ? 88 :
!any_step2_write ? 0 :
!flagset(TestTwiceFlags::step2_fin_incr) ? 88 :
any_step2_write ? -1 : // unknown
any_step2_write ? -21 : // means 1..21
0;
ZgyWriterArgs firstargs = ZgyWriterArgs()
......@@ -478,16 +480,21 @@ do_test_reopen(const std::string& filename, TestTwiceFlags flags)
if (flagset(TestTwiceFlags::step1_finalize)) {
writer->finalize(std::vector<OpenZGY::DecimationType>{OpenZGY::DecimationType::Decimate}, nullptr);
writer->close();
}
else {
writer->close_incomplete();
}
writer.reset();
// CLOSE AND RE-OPEN FILE IF REQUESTED
if (!flagset(TestTwiceFlags::step1_keepopen)) {
if (flagset(TestTwiceFlags::step1_finalize)) {
writer->close();
}
else {
writer->close_incomplete();
}
writer.reset();
writer = IZgyWriter::reopen(secondargs);
}
// SECOND STEP IS TO UPDATE THE FILE THAT WAS JUST WRITTEN.
writer = IZgyWriter::reopen(secondargs);
// Write a region that does not overlap with the first one.
if (flagset(TestTwiceFlags::step2_write)) {
......@@ -622,7 +629,15 @@ do_test_reopen(const std::string& filename, TestTwiceFlags flags)
ok = TEST_EQUAL_FLOAT(actual_stat.ssq, expect_stat_ssq, 0.1) && ok;
ok = TEST_EQUAL(actual_stat.min, expect_stat_min) && ok;
ok = TEST_EQUAL(actual_stat.max, expect_stat_max) && ok;
ok = (expect_brickrw < 0 || TEST_EQUAL(p.total(), expect_brickrw)) && ok;
if (expect_brickrw >= 0) {
ok = (TEST_EQUAL(p.total(), expect_brickrw)) && ok;
}
else {
// In some cases it is too difficult to manually compute the
// expected result but it may be possible to set an upper limit.
ok = (TEST_CHECK(p.total() >= 1)) && ok;
ok = (TEST_CHECK(p.total() <= std::abs(expect_brickrw))) && ok;
}
ok = TEST_EQUAL(p.done(), p.total()) && ok;
// Not testing histogram min/max because it is rather unclear
......@@ -1606,6 +1621,43 @@ test_reopen_incr()
TEST_EQUAL(runs, 3+32);
}
/**
* Similar to the "reopen" case but instead if close and re-open
* the code just does a finalize and then continues writing.
* This may be useful for applications that keep a file open
* for a long time, updating it frequently and assuming that
* other parts of the application can read low resolution
* data and assume it is kept up to date.
*
* This set of tests turn on the keepopen flag which is only useful
* when both calls to finalize are turned on and an incremental
* second finalize is requested. And might as well only test the
* case with the most "interesting" writes i.e. everything except
* step2_replace. The nometa flag is implied because changing the
* metadata was to be done by thereopen. Compression is not allowed
* (because the test does multiple finalize) which means there aren't
* really that many combinations left to test. In fact, just one.
*/
static void
test_reopen_keepopen()
{
typedef TestTwiceFlags F;
LocalFileAutoDelete lad("reopen_keepopen.zgy");
TEST_CHECK(do_test_reopen(lad.name(),
F::step1_write |
F::step1_finalize |
F::step1_keepopen |
F::step2_write |
F::step2_rmw |
F::step2_finalize |
F::step2_fin_incr));
// do_test_reopen() verifies that the second finalize needs less I/O
// than the 88 operations a full one needs, but it doesn't check the
// exact number which means it doesn't check that with and without
// keepopen needs the exact same bulk I/O. Maybe not important.
}
/**
* Opening an empty file created by the old ZGY accessor has some
* challenges with respect to alignment.
......@@ -1713,7 +1765,8 @@ public:
register_test("reopen.empty", test_reopen_empty);
register_test("reopen.bad_histogram", test_reopen_bad_histogram);
register_test("reopen.reopen", test_reopen);
register_test("reopen.reopen_incr", test_reopen_incr);
register_test("reopen.incr", test_reopen_incr);
register_test("reopen.keepopen", test_reopen_keepopen);
register_test("reopen.zgypublic", test_reopen_zgypublic);
register_test("reopen.track_changes", test_reopen_track_changes);
#ifdef HAVE_SD
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment