Commit 01e1055b authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/single-brick-shortcut' into 'master'

Single brick shortcut

See merge request !81
parents 524e4a34 3fa8f6b2
Pipeline #50225 passed with stages
in 14 minutes and 44 seconds
......@@ -598,12 +598,14 @@ public:
virtual void read(const size3i_t& start, const size3i_t& size, float* data, int lod) const override
{
throw_if_not_readable();
std::shared_ptr<float> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<float,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, true);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
if (!_accessor->expeditedRead(start, size, data, lod, InternalZGY::RawDataType::Float32)) {
std::shared_ptr<float> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<float,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, true);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
}
}
/**
......@@ -615,12 +617,14 @@ public:
virtual void read(const size3i_t& start, const size3i_t& size, std::int16_t* data, int lod) const override
{
throw_if_not_readable();
std::shared_ptr<std::int16_t> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<std::int16_t,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, false);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
if (!_accessor->expeditedRead(start, size, data, lod, InternalZGY::RawDataType::SignedInt16)) {
std::shared_ptr<std::int16_t> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<std::int16_t,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, false);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
}
}
/**
......@@ -632,12 +636,14 @@ public:
virtual void read(const size3i_t& start, const size3i_t& size, std::int8_t* data, int lod) const override
{
throw_if_not_readable();
std::shared_ptr<std::int8_t> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<std::int8_t,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, false);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
if (!_accessor->expeditedRead(start, size, data, lod, InternalZGY::RawDataType::SignedInt8)) {
std::shared_ptr<std::int8_t> fakeshared = fake_shared(data);
auto databuffer = std::make_shared<InternalZGY::DataBufferNd<std::int8_t,3>>(fakeshared, size);
_accessor->readToExistingBuffer(databuffer, start, lod, false);
databuffer.reset();
if (!fakeshared.unique())
throw Errors::ZgyInternalError("A Reference to the user's buffer was retained.");
}
}
/**
......
......@@ -133,6 +133,17 @@ namespace {
return enable > 0;
}
/**
* Return non-zero if there should be special case handling of the trivial
* case of reading exactly one thread. The feature is on by default.
*/
static bool
expedited_read()
{
static int enable = Environment::getNumericEnv("OPENZGY_EXPEDITED_READ", 1);
return enable > 0;
}
/**
* \brief Add or subtract this buffer's samples in the statistics.
*
......@@ -860,6 +871,97 @@ ZgyInternalBulk::trackedBricksTryEnable(bool on)
}
}
namespace {
template<typename T>
static void
fillMeT(void* data, const std::array<std::int64_t,3>& size, double value)
{
std::fill(static_cast<T*>(data),
static_cast<T*>(data) + size[0]*size[1]*size[2],
static_cast<T>(value));
}
static void
fillMe(void* data, const std::array<std::int64_t,3>& size, double value, RawDataType dtype)
{
switch (dtype) {
case RawDataType::Float32: fillMeT<float>(data, size, value); break;
case RawDataType::SignedInt16:fillMeT<std::int16_t>(data, size, value); break;
case RawDataType::SignedInt8: fillMeT<std::int8_t>(data, size, value); break;
default: throw OpenZGY::Errors::ZgyInternalError("Unrecognized valuetype." + std::to_string(int(dtype)));
}
}
}
/**
* More efficient read of a region that happes to be exactly one brick,
* not compressed, not converted int to float, and possibly other limitatons.
* This can bypass a lot of logic and will elide at least one memory
* allocation. Return false if the conditions are not met. The caller can
* invoke this function unconditionally as long as readToExistingBuffer()
* is used as a fallback.
*/
bool
ZgyInternalBulk::expeditedRead(const std::array<std::int64_t,3>& start, const std::array<std::int64_t,3>& size, void* data, int lod, RawDataType result_type) const
{
// TODO-Performance: Used in ZgyWriter::read(), not just ZgyReader::read().
// This ought to work but will require more testing.
if (!expedited_read())
return false;
if (this->_metadata->fh().version() == 1)
return false;
if (result_type != this->_metadata->ih().datatype())
return false;
const std::array<std::int64_t,3> bs = this->_metadata->ih().bricksize();
if ((size[0] != bs[0]) ||
(size[1] != bs[1]) ||
(size[2] != bs[2]) ||
(start[0] % bs[0]) != 0 ||
(start[1] % bs[1]) != 0 ||
(start[2] % bs[2]) != 0)
return false;
// TODO-Performance: Could be simplified.
_validateUserPosition(start, size, lod); // Could be simplified.
// TODO-Performance: Could be simplified.
std::vector<LutInfoEx> bricklist = _partsNeeded(start, size, lod);
if (bricklist.size() != 1)
throw OpenZGY::Errors::ZgyInternalError("expeditedRead messed up.");
const LutInfoEx& brick = bricklist.front();
switch (brick.status) {
case BrickStatus::Missing:
_logger(2, "Expedited read missing");
fillMe(data, size, this->_metadata->ih().defaultstorage(), result_type);
break;
case BrickStatus::Constant:
_logger(2, "Expedited read constant");
fillMe(data, size, brick.double_constvalue, result_type);
break;
case BrickStatus::Normal:
if (brick.size_in_file != size[0] * size[1] * size[2] *
static_cast<std::int64_t>(RawDataTypeDetails(result_type).size)) {
throw OpenZGY::Errors::ZgyInternalError("Bad size in expeditedRead.");
}
_logger(2, "Expedited read regular");
this->_file->xx_read(data, brick.offset_in_file, brick.size_in_file, UsageHint::Data);
// TODO-High: Fill area outside survey with defaultstorage.
break;
case BrickStatus::Compressed:
_logger(2, "Expedited read compressed (not implemented)");
return false;
default:
throw OpenZGY::Errors::ZgyInternalError("Internal error, bad brick status");
}
return true;
}
/**
* \brief Mark bricks as dirty.
*
......@@ -1118,7 +1220,7 @@ ZgyInternalBulk::_validateUserPosition(
<< " lod " << lod
<< " is empty or outside the valid range"
<< " (0, 0, 0)"
<< " to " << ssize[0] << ", " << ssize[1] << ", " << ssize[2] << ")"
<< " to (" << ssize[0] << ", " << ssize[1] << ", " << ssize[2] << ")"
;
_logger(1, ss.str() + "\n");
throw OpenZGY::Errors::ZgyUserError(ss.str());
......
......@@ -110,6 +110,11 @@ public:
const std::array<std::int64_t,3>& size,
int32_t lod, bool as_float, bool check_constant) const;
bool expeditedRead(
const std::array<std::int64_t,3>& start,
const std::array<std::int64_t,3>& size,
void* data, int lod, RawDataType result_type) const;
void writeRegion(
const std::shared_ptr<DataBuffer>& data,
const std::array<std::int64_t,3>& start,
......
......@@ -289,6 +289,7 @@ FileADT::_allocate(std::int64_t size)
//std::cerr << "FileADT::_allocate() is evicting entries." << std::endl;
std::random_shuffle(cache.begin(), cache.end());
cache.resize(lowwater);
hint = 0;
}
cache.push_back(result);
}
......
......@@ -18,6 +18,7 @@
#include "../iocontext.h"
#include "../exception.h"
#include "../impl/environment.h"
#include "../impl/mtguard.h"
#include <iostream>
#include <iomanip>
......@@ -35,6 +36,7 @@
using namespace OpenZGY;
using namespace OpenZGY::Formatters;
using Test_Utils::LocalFileAutoDelete;
using Test_Utils::CloudFileAutoDelete;
using Test_Utils::must_throw;
namespace {
......@@ -2340,6 +2342,231 @@ test_readwrite_cloud()
}
#endif
namespace {
template<typename T>
static std::string
formatMe(std::int64_t pos, std::int64_t ii, std::int64_t jj, std::int64_t kk, const T* value)
{
std::stringstream ss;
ss << "Pos " << pos << " brick ("
<< ii << "," << jj << "," << kk
<< ") value ("
<< (double)value[0] << "," << (double)value[1] << "," << (double)value[2]
<< ")";
return ss.str();
}
}
/**
* Test that reading a file BAT i.e. one brick at a time works.
* These now trigger some performance tweaks in the accessor.
*
* The test file has 4*5*6 = 120 bricks and is written in optimal order.
* most bricks store (i/64,j/64,k/64) in first three bytes to make it
* simple to check that the contents are correct. Every 7th brick will
* be missing. Every 13th brick will be filled with a constant value
* -(i+j+k)/64
*/
template<typename T, int bricksize, bool compression>
static void
do_testbat(const std::string& filename)
{
typedef IZgyWriter::size3i_t size3i_t;
const size3i_t bs{bricksize, bricksize, bricksize};
const size3i_t bsm1{bricksize-1, bricksize, bricksize};
const size3i_t size{4*bs[0]-10, 5*bs[1]-20, 6*bs[2]-23};
const SampleDataType dt = (sizeof(T) == 4 ? SampleDataType::float32 :
sizeof(T) == 2 ? SampleDataType::int16 :
SampleDataType::int8);
#ifdef HAVE_SD
SeismicStoreIOContext sd_context(*Test_Utils::default_sd_context());
sd_context.segsize(3); // Try to trigger read crossing seg boundary.
sd_context.segsplit(7);
const IOContext * const context = &sd_context;
#else
const IOContext * const context = nullptr;
#endif
ZgyWriterArgs args = ZgyWriterArgs()
.iocontext(context)
.size(size[0], size[1], size[2])
.bricksize(bs[0], bs[1], bs[2])
.datatype(dt)
.datarange(-1, +1)
.filename(filename);
if (compression)
args.zfp_compressor(99);
std::shared_ptr<OpenZGY::IZgyWriter> writer = IZgyWriter::open(args);
if (!TEST_CHECK(bool(writer)))
return;
std::vector<T> data(bs[0]*bs[1]*bs[2], 42);
std::int64_t pos{0};
for (std::int64_t ii=0; ii<size[0]; ii += bs[0]) {
for (std::int64_t jj=0; jj<size[1]; jj += bs[1]) {
for (std::int64_t kk=0; kk<size[2]; kk += bs[2]) {
if ((pos % 7) == 0) {
// missing brick
}
else if ((pos % 13) == 0) {
// Constant-value
data[0] = static_cast<T>(-(ii/bs[0]) - (jj/bs[1]) - (kk/bs[2]));
writer->writeconst(size3i_t{ii,jj,kk}, bs, data.data());
}
else {
data[0] = static_cast<T>(ii/bs[0]);
data[1] = static_cast<T>(jj/bs[1]);
data[2] = static_cast<T>(kk/bs[2]);
writer->write(size3i_t{ii,jj,kk}, bs, data.data());
}
++pos;
}
}
}
writer->finalize(std::vector<OpenZGY::DecimationType>{
OpenZGY::DecimationType::Average
}, nullptr);
writer->close();
writer.reset();
// Test plan:
// Read one brick at a time using 10 threads. Can trigger both cases.
// Not tested: Second open is for read/write.
// Compression should trigger malloc hack, not the shortcut.
// Compressed reads might also cross segment boundary.
{
std::shared_ptr<OpenZGY::IZgyReader> reader =
IZgyReader::open(filename, context);
if (!TEST_CHECK(bool(reader)))
return;
if (verbose())
reader->filestats()->dump(std::cout, "");
InternalZGY::MTGuard guard;
#pragma omp parallel num_threads(10)
{
std::vector<T> check1(bs[0]*bs[1]*bs[2], 0);
std::vector<T> check2(bs[0]*bs[1]*bs[2], 0);
#pragma omp for schedule(dynamic,1)
for (int pos = 0; pos < 4*5*6; ++pos) {
int tmppos = pos;
const int kk = tmppos % 6; tmppos /= 6;
const int jj = tmppos % 5; tmppos /= 5;
const int ii = tmppos % 4;
std::fill(check1.begin(), check1.end(), static_cast<T>(-88));
std::fill(check2.begin(), check2.end(), static_cast<T>(-66));
guard.run([&](){
// Will trigger both tweaks, "brick shortcut" has precedence.
reader->read(size3i_t{ii*bs[0],jj*bs[1],kk*bs[2]}, bs, check1.data());
// Will trigger only the malloc tweak.
reader->read(size3i_t{ii*bs[0],jj*bs[1],kk*bs[2]}, bsm1, check2.data());
std::array<float,3> expect;
if ((pos % 7) == 0) {
expect = std::array<float,3>{0,0,0};
}
else if ((pos % 13) == 0) {
T value = static_cast<T>(-(ii+jj+kk));
expect = std::array<float,3>{(float)value, (float)value, (float)value};
}
else {
expect = std::array<float,3>{(float)ii, (float)jj, (float)kk};
}
if (check1[0]!=expect[0]||check1[1]!=expect[1] ||check1[2]!=expect[2]) {
std::string expect_str = formatMe(pos, ii, jj, kk, expect.data());
std::string actual_str = formatMe(pos, ii, jj, kk, check1.data());
TEST_EQUAL(actual_str, expect_str);
throw std::runtime_error("Mismatch in check 1");
}
if (check2[0]!=expect[0]||check2[1]!=expect[1] ||check2[2]!=expect[2]) {
std::string expect_str = formatMe(pos, ii, jj, kk, expect.data());
std::string actual_str = formatMe(pos, ii, jj, kk, check2.data());
TEST_EQUAL(actual_str, expect_str);
throw std::runtime_error("Mismatch in check 2");
}
// Too much hassle to check of the buffer if it is an edge brick.
// I want to check the others in case the read went across
// a segment boundary. That triggers soecuak case handling.
if (ii != 3 && jj != 4 && kk != 5 && (pos%7) != 0 && (pos%13) != 0) {
for (auto it = check1.begin() + 3; it != check1.end(); ++it)
if (*it != 42)
if (!TEST_EQUAL((double)*it, 42))
throw std::runtime_error("Mismatch in check 1 last part");
for (auto it = check2.begin() + 3; it != check2.end() - 1*bsm1[1]*bsm1[2]; ++it)
if (*it != 42)
if (!TEST_EQUAL((double)*it, 42))
throw std::runtime_error("Mismatch in check 2 last part");
}
});
} // loop
} // parallel region
try {
guard.finished();
}
catch (const std::exception& ex) {
TEST_EQUAL(std::string(ex.what()), std::string("success"));
}
}
}
static void
test_bat_local_1()
{
LocalFileAutoDelete lad("bat1.zgy");
do_testbat<std::int8_t, 32, false>(lad.name());
}
static void
test_bat_local_2()
{
LocalFileAutoDelete lad("bat2.zgy");
do_testbat<std::int16_t, 32, false>(lad.name());
}
static void
test_bat_local_4()
{
LocalFileAutoDelete lad("bat4.zgy");
do_testbat<float, 64, false>(lad.name());
}
static void
test_bat_local_zfp()
{
LocalFileAutoDelete lad("bat.zgy");
do_testbat<float, 64, true>(lad.name());
}
#ifdef HAVE_SD
static void
test_bat_sd_1()
{
CloudFileAutoDelete cad("bat1.zgy", Test_Utils::default_sd_context());
do_testbat<std::int8_t, 32, false>(cad.name());
}
static void
test_bat_sd_2()
{
CloudFileAutoDelete cad("bat2.zgy", Test_Utils::default_sd_context());
do_testbat<std::int16_t, 32, false>(cad.name());
}
static void
test_bat_sd_4()
{
CloudFileAutoDelete cad("bat4.zgy", Test_Utils::default_sd_context());
do_testbat<float, 64, false>(cad.name());
}
static void
test_bat_sd_zfp()
{
CloudFileAutoDelete cad("bat4.zgy", Test_Utils::default_sd_context());
do_testbat<float, 64, true>(cad.name());
}
#endif
class Register
{
public:
......@@ -2403,6 +2630,16 @@ public:
register_test("api.readwrite", test_readwrite_local);
#ifdef HAVE_SD
register_test("api.readwrite_cloud", test_readwrite_cloud);
#endif
register_test("api.bat_local_1", test_bat_local_1);
register_test("api.bat_local_2", test_bat_local_2);
register_test("api.bat_local_4", test_bat_local_4);
register_test("api.bat_local_zfp", test_bat_local_zfp);
#ifdef HAVE_SD
register_test("api.bat_sd_1", test_bat_sd_1);
register_test("api.bat_sd_2", test_bat_sd_2);
register_test("api.bat_sd_4", test_bat_sd_4);
register_test("api.bat_sd_zfp", test_bat_sd_zfp);
#endif
}
} dummy;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment