Commit 27054ccc authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/parallel-compress' into 'master'

Use multiple threads for ZFP compression when writing a ZGY file

See merge request !33
parents 32f5180e 1bd20df8
Pipeline #21951 passed with stages
in 4 minutes and 29 seconds
This diff is collapsed.
......@@ -41,6 +41,9 @@ class ZgyInternalMeta;
class DataBuffer;
struct IJK;
struct LutInfoEx;
struct WriteBrickArgPack;
struct WriteNowArgPack;
class SummaryPrintingTimer;
/**
* Read or write bulk data. The meta data needs to have been read
......@@ -74,6 +77,9 @@ private:
double _written_sample_min;
double _written_sample_max;
LoggerFn _loggerfn;
std::shared_ptr<SummaryPrintingTimer> _ptimer_st;
std::shared_ptr<SummaryPrintingTimer> _ptimer_mt;
std::shared_ptr<SummaryPrintingTimer> _ststimer;
public:
ZgyInternalBulk(
......@@ -177,35 +183,20 @@ private:
const std::array<std::int64_t,3>& used,
double missingvalue, const compressor_t& compressor);
void _writeWithRetry(
rawdata_t rawdata,
BrickStatus brickstatus,
std::int64_t fileoffset,
const std::array<std::int64_t,3>& brickpos,
std::int32_t lod);
void _writeWithRetry(const WriteNowArgPack& args);
void _writeOneNormalBrick(
const std::shared_ptr<DataBuffer>& data,
std::int64_t fileoffset,
const std::array<std::int64_t,3>& brickpos,
std::int32_t lod,
const compressor_t& compressor);
std::shared_ptr<const WriteNowArgPack>
_writeOneNormalBrick(const WriteBrickArgPack& args);
void _writeOneConstantBrick(
const std::shared_ptr<DataBuffer>& data,
const std::array<std::int64_t,3>& brickpos,
std::int32_t lod);
void _writeOneConstantBrick(const WriteBrickArgPack& args);
bool _mustLeakOldBrick(
const std::shared_ptr<DataBuffer>& data,
const compressor_t& compressor,
BrickStatus brickstatus) const;
void _writeOneBrick(
const std::shared_ptr<DataBuffer>& data,
const std::array<std::int64_t,3>& brickpos,
std::int32_t lod,
const compressor_t& compressor);
std::shared_ptr<const WriteBrickArgPack>
_writeOneBrick(const WriteBrickArgPack& args);
void _writeAlignedRegion(
const std::shared_ptr<DataBuffer>& data,
......
......@@ -1054,6 +1054,10 @@ DataBufferNd<T,NDim>::s_scaleFromFloat(const DataBuffer* in,
auto dst = std::shared_ptr<dst_type>(new dst_type(src->safesize(), src->safestride()));
// TODO-Worry: Beware of non-contiguous buffers.
// It might not be a good idea to convert the padding area.
//
// TODO-Performance: If caller has already computed the value
// range, it might be able to tell us whether clipping or testing
// for infinite or both is needed.
dst_type::value_type *dst_ptr = dst->data();
const typename src_type::value_type *src_ptr = src->data();
const typename src_type::value_type *src_end = src_ptr + src->allocsize();
......
......@@ -33,6 +33,7 @@
#endif
#include <atomic>
#include <omp.h>
namespace {
int sprintf_s(char *buffer, int size, const char *format, ...)
......@@ -112,7 +113,7 @@ namespace InternalZGY {
*/
Timer::Timer(bool enabled, const char* name, int skip, bool startrunning)
: enabled_(enabled), skip_(skip), frequency_(1), overhead_(0),
laps_(0), last_(0), total_(0), begin_(0), end_(0), running_(false), verbose_(enabled ? 1 : 0)
laps_(0), last_(0), total_(0), adjusted_(0), begin_(0), end_(0), running_(false), verbose_(enabled ? 1 : 0)
{
name_[0] = '\0';
buff_[0] = '\0';
......@@ -200,8 +201,10 @@ Timer::doStop()
end_ = getNativeTime();
if (running_) {
last_ = end_ - begin_ - overhead_;
if (laps_ >= skip_)
if (laps_ >= skip_) {
total_ += last_;
adjusted_ += last_ / omp_get_num_threads();
}
++laps_;
running_ = false;
}
......@@ -218,6 +221,7 @@ Timer::doReset()
{
last_ = 0;
total_ = 0;
adjusted_= 0;
laps_ = 0;
running_ = false;
}
......@@ -248,7 +252,7 @@ Timer::getValue(bool details, bool msonly)
if (!enabled_)
return "";
getValue_s(buff_, sizeof(buff_),
getName(), getCount(), getTotal(), getRunning(),
getName(), getCount(), getTotal(), getAdjusted(), getRunning(),
details, msonly);
return buff_;
}
......@@ -258,7 +262,7 @@ Timer::getValue(bool details, bool msonly)
* \copydoc Timer::getValue
*/
void
Timer::getValue_s(char *result, int buffer_size, const char *name, int count, double total, bool running, bool details, bool msonly)
Timer::getValue_s(char *result, int buffer_size, const char *name, int count, double total, double adjusted, bool running, bool details, bool msonly)
{
result[0] = '\0';
......@@ -284,6 +288,17 @@ Timer::getValue_s(char *result, int buffer_size, const char *name, int count, do
if (count > 1)
sprintf_s(result+strlen(result), buffer_size-strlen(result), " in %d calls", count);
if (adjusted != total && adjusted != 0) {
if (msonly)
sprintf_s(result+strlen(result), buffer_size-strlen(result), " adjusted %9.1lf ms", adjusted * 1000.0);
else if (total < 0.00999)
sprintf_s(result+strlen(result), buffer_size-strlen(result), " adjusted %6.1lf us", adjusted * 1000000.0);
else if (total < 9.99)
sprintf_s(result+strlen(result), buffer_size-strlen(result), " adjusted %6.1lf ms", adjusted * 1000.0);
else
sprintf_s(result+strlen(result), buffer_size-strlen(result), " adjusted %6.1lf s ", adjusted * 1.0);
}
//if (getSkip() != 0)
// sprintf_s(result+strlen(result), buffer_size-strlen(result), " (first %d not counted)", getSkip());
......@@ -376,11 +391,12 @@ public:
long long frequency_;
std::atomic<int> count_;
std::atomic<long long> total_;
std::atomic<long long> adjusted_;
std::atomic<long long> last_;
char name_[256]; // Optional name of this timer
char buff_[256]; // Returned from getValue().
explicit Impl(const char *name)
: frequency_(1000*1000), count_(0), total_(0), last_(0)
: frequency_(1000*1000), count_(0), total_(0), adjusted_(0), last_(0)
{
name_[0] = '\0';
buff_[0] = '\0';
......@@ -426,6 +442,12 @@ SummaryTimer::getTotal() const
return static_cast<double>(pimpl_->total_.load()) / pimpl_->frequency_;
}
double
SummaryTimer::getAdjusted() const
{
return static_cast<double>(pimpl_->adjusted_.load()) / pimpl_->frequency_;
}
double
SummaryTimer::getLast() const
{
......@@ -442,11 +464,20 @@ const char*
SummaryTimer::getValue(bool details, bool msonly) const
{
Timer::getValue_s(pimpl_->buff_, sizeof(pimpl_->buff_),
getName(), getCount(), getTotal(), false,
getName(), getCount(), getTotal(), getAdjusted(), false,
details, msonly);
return pimpl_->buff_;
}
const char*
SummaryTimer::getCSV() const
{
sprintf_s(pimpl_->buff_, sizeof(pimpl_->buff_),
"TIMER,\"%s\",%d,%.3lf,%.3lf\n",
getName(), getCount(), getTotal(), getAdjusted());
return pimpl_->buff_;
}
/**
* \brief Clear all counters.
* \details The timer name is unchanged.
......@@ -456,6 +487,7 @@ SummaryTimer::reset()
{
pimpl_->count_ = 0;
pimpl_->total_ = 0;
pimpl_->adjusted_ = 0;
pimpl_->last_ = 0;
}
......@@ -465,10 +497,11 @@ SummaryTimer::reset()
* so it is possible to e.g. add an managed timer to an umanaged one.
*/
void
SummaryTimer::add(int count, double total, double last)
SummaryTimer::add(int count, double total, double adjusted, double last)
{
pimpl_->count_.fetch_add(count);
pimpl_->total_.fetch_add((long long)(total * pimpl_->frequency_));
pimpl_->adjusted_.fetch_add((long long)(adjusted * pimpl_->frequency_));
pimpl_->last_.store((long long)(last * pimpl_->frequency_));
}
......@@ -480,7 +513,7 @@ void
SummaryTimer::add(const Timer& t)
{
if (t.getEnabled()) {
add(t.getCount(), t.getTotal(), t.getLast());
add(t.getCount(), t.getTotal(), t.getAdjusted(), t.getLast());
}
}
......@@ -503,6 +536,7 @@ SummaryPrintingTimer::print()
{
if (getCount() != 0) {
const char *msg = getValue(true, true);
//const char *msg = getCSV();
fwrite(msg, 1, strlen(msg), stderr);
}
reset();
......
......@@ -66,6 +66,7 @@ private: // mutable
int laps_; // Number of accumulated results
long long last_; // Time for last lap
long long total_; // Accumulated total time
long long adjusted_; // Accumulated total time adjusted for thread count
long long begin_; // Time of last call to Start();
long long end_; // Time of last call to Stop();
bool running_; // Guard against two Stop() in a row.
......@@ -86,13 +87,14 @@ public:
double getFrequency() const { return static_cast<double>(frequency_); }
double getLast() const { return static_cast<double>(last_) / static_cast<double>(frequency_); }
double getTotal() const { return static_cast<double>(total_) / static_cast<double>(frequency_); }
double getAdjusted() const { return static_cast<double>(adjusted_) / static_cast<double>(frequency_); }
double getOverhead() const { return static_cast<double>(overhead_) / static_cast<double>(frequency_); }
int getCount() const { return laps_ < skip_ ? 0 : laps_ - skip_; }
const char* getName() const { return name_; }
int getSkip() const { return skip_ < laps_ ? skip_ : laps_; }
bool getRunning() const { return running_; }
int getVerbose() const { return verbose_; }
static void getValue_s(char *buf, int len, const char *name, int count, double total, bool running, bool details, bool msonly);
static void getValue_s(char *buf, int len, const char *name, int count, double total, double adjusted, bool running, bool details, bool msonly);
const char* getValue(bool details = false, bool msonly = false);
// Shouldn't be public but might come in handy if extending Timer.
double getLastStop() const { return end_; }
......@@ -174,13 +176,15 @@ public:
double getFrequency() const;
int getCount() const;
double getTotal() const;
double getAdjusted() const;
double getLast() const;
const char* getName() const;
const char* getValue(bool details, bool msonly) const;
const char* getCSV() const;
// OPERATIONS
void reset();
void add(int count, double total, double last);
void add(int count, double total, double adjusted, double last);
void add(const Timer& t);
};
......@@ -223,6 +227,12 @@ public:
* // timing the following code...
*}
* \endcode
*
* A more obscure feature: If you want to output the results of a
* SummaryPrintngTimer at a specific point then you can call done()
* on any SimpleTimer that is linked to this timer and is still
* in scope, and then call print() on the PrintingTimer to pretend
* that it went out of scope.
*/
class OPENZGY_TEST_API SimpleTimer : public Timer
{
......@@ -235,9 +245,13 @@ public:
}
~SimpleTimer()
{
done();
}
void done() {
if (getEnabled()) {
stop();
owner_.add(*this);
reset();
}
}
};
......
......@@ -98,6 +98,7 @@ public:
std::array<std::int64_t,4> fakesize;
std::array<std::int64_t,3> chunksize;
std::array<std::int64_t,3> obricksize;
OpenZGY::SampleDataType osamplesize;
std::vector<OpenZGY::DecimationType> algorithm;
int lod; // Still unused. Maybe not useful.
int threads;
......@@ -118,6 +119,7 @@ public:
, fakesize(std::array<std::int64_t,4>{0,0,0,0})
, chunksize(std::array<std::int64_t,3>{128,128,0})
, obricksize(std::array<std::int64_t,3>{64,64,64})
, osamplesize(OpenZGY::SampleDataType::unknown)
, algorithm()
, lod()
, threads(1)
......@@ -158,6 +160,7 @@ public:
"-s, --size I,J,K,S: Size, if no input file. E.g. 16x16x16x2.",
"-b, --bricksize I,J,K: Chunk size when copying. E.g. 64x64x64 samples.",
"-B, --obricksize I,J,K Brick size in output. E.g. 64x64x64 samples.",
"-O, --osamplesize type Output float int16, or int8"
"-g, --algorithm 1,2,N: LOD algorithms as 3 int: lod1,lod2,lodN.",
//"-l, --lod N: *Level of detail, 0 = full resolution.",
"-t, --threads N: Number of threads to use for reading.",
......@@ -201,6 +204,21 @@ public:
<< obricksize[1] << "x"
<< obricksize[2] << " ";
switch (osamplesize) {
default:
case OpenZGY::SampleDataType::unknown:
break;
case OpenZGY::SampleDataType::int8:
os << "--osamplesize int8 ";
break;
case OpenZGY::SampleDataType::int16:
os << "--osamplesize int16 ";
break;
case OpenZGY::SampleDataType::float32:
os << "--osamplesize float32 ";
break;
}
if (!algorithm.empty()) {
for (size_t ii=0; ii<algorithm.size(); ++ii)
if (ii == 0)
......@@ -312,7 +330,7 @@ public:
static const char* short_options()
{
return "hvqGuraDNFp:i:o:s:l:b:B:g:t:n:Q:";
return "hvqGuraDNFp:i:o:s:l:b:B:O:g:t:n:Q:";
}
static const struct option *long_options()
......@@ -334,6 +352,7 @@ public:
{"size", required_argument, 0, 's' },
{"bricksize", required_argument, 0, 'b' },
{"obricksize", required_argument, 0, 'B' },
{"osamplesize",required_argument, 0, 'O' },
{"algorithm", required_argument, 0, 'g' },
{"threads", required_argument, 0, 't' },
{"brickcount", required_argument, 0, 'n' },
......@@ -410,6 +429,22 @@ public:
}
break;
case 'O':
if (0==strcmp(optarg, "float32") || 0==strcmp(optarg, "float"))
osamplesize = OpenZGY::SampleDataType::float32;
else if (0==strcmp(optarg, "int16"))
osamplesize = OpenZGY::SampleDataType::int16;
else if (0==strcmp(optarg, "int8"))
osamplesize = OpenZGY::SampleDataType::int8;
else
throw std::runtime_error("command line: sample type must be float32, int16, or int8");
// --osamplesize implies --float, so in fact the --float option is
// now almost redundant. If you want "don't change value type but
// do read/write as float" then --float means you don't need to
// state the existing data type.
native = false;
break;
case 'g': algorithm = getDecimationTypes(optarg); break;
case 'l': throw std::runtime_error("--lod not supported"); //lod = geti(optarg); break;
case 't': threads = geti(optarg); break;
......@@ -421,6 +456,11 @@ public:
help(myname);
exit(1);
}
if (sqnr > 0 &&
osamplesize != OpenZGY::SampleDataType::float32 &&
osamplesize != OpenZGY::SampleDataType::unknown) {
throw std::runtime_error("command line: Don't use --osamplesize with compressed output files");
}
}
#ifdef HAVE_GETOPT
......@@ -642,7 +682,7 @@ suggestRange(float value, OpenZGY::SampleDataType dt, float *lo, float *hi)
}
void
copy(const Options& opt, SummaryPrintingTimer& rtimer, SummaryPrintingTimer& wtimer, SummaryPrintingTimer& rwtimer, SummaryPrintingTimer& ftimer)
copy(const Options& opt, SummaryPrintingTimer& rtimer, SummaryPrintingTimer& wtimer, SummaryPrintingTimer& rwtimer, SummaryPrintingTimer& ftimer, SummaryPrintingTimer& stimer)
{
using namespace OpenZGY;
ProgressWithDots p1, p2;
......@@ -654,10 +694,12 @@ copy(const Options& opt, SummaryPrintingTimer& rtimer, SummaryPrintingTimer& wti
IZgyReader::open(opt.input, &context):
Test::ZgyReaderMock::mock(opt.fakesize);
args.metafrom(r).filename(opt.output);
if (opt.osamplesize != OpenZGY::SampleDataType::unknown)
args.datatype(opt.osamplesize);
if (opt.sqnr > 0)
args.zfp_compressor(static_cast<float>(opt.sqnr))
.zfp_lodcompressor(static_cast<float>(opt.sqnr))
.datatype(SampleDataType::float32);
.datatype(OpenZGY::SampleDataType::float32);
if (opt.obricksize[0]>0 && opt.obricksize[1]>0 && opt.obricksize[2]>0)
args.bricksize(opt.obricksize[0], opt.obricksize[1], opt.obricksize[2]);
args.iocontext(&context);
......@@ -829,19 +871,61 @@ copy(const Options& opt, SummaryPrintingTimer& rtimer, SummaryPrintingTimer& wti
// now instead of having them printed automatically in main(). If the code
// threw an exception we won't get here. But in that case the normal
// mechanism of printing from the timer's destructor is invoked.
// PS, don't use this code as a quick introduction to how simple my
// Timer class is. You might get the wrong impression.
rtimer.print();
wtimer.print();
rwtimer.print();
rwt.done(); // Pretend it went out of scope.
rwtimer.print(); // Ditto for the printing timer that owns it.
// In case Timer logging is enabled in OpenZGY then get the
// read-related timers to output now. close the Some timers are
// printed on close() and some when the writer goes out of scope but
// that is just an implementation detail. To keep them together I
// will destruct the writer immediately afyter closing it.
r->close();
r.reset();
SimpleTimer ft(ftimer);
if (opt.nolod) {
SimpleTimer ft(ftimer, !opt.output.empty());
w->close_incomplete();
}
else {
// Don't report timing for finalizing a mocked output file.
// Yes it does actually have a (tiny) cost but the user won't
// expect to see finalize reported at all when discarding the output.
SimpleTimer ft(ftimer, !opt.output.empty());
w->finalize(opt.algorithm, std::ref(p2));
w->close();
// If Timer logging is also enabled inside OpenZGY there will now
// be multiple lines of output when the output file is finalized
// and closed. Unless we are writing to the mocked dummy output
// file. All these times are related to writing. Even the
// File::read() timer which now reports reads done while making
// LOD bricks. Some timers are printed on close() and some when
// the writer goes out of scope but that is just an implementation
// detail. To keep them together I will destruct the writer
// immediately afyter closing it.
w.reset();
}
#ifndef _WIN32
// This is only for performance measurements and I am only enabling
// it for Linux as I don't know the Windows equivalent. It only
// makes sense when something was written to a local disk. If there
// are any dirty buffers after a read+discard or after a write to
// cloud then they don't belong to us. NOTE: Technically this ought
// to have been a fsync(fd) to only flush our own data. But that
// means exposing fsync from FileADT and that isn't worth the
// trouble. A machine running benchmarks shouldn't have other
// activity anyway.
if (!opt.output.empty() && opt.output.substr(0,5) != "sd://") {
SimpleTimer stim(stimer);
sync();
}
#endif
ftimer.print();
stimer.print();
}
#ifndef _WIN32
......@@ -910,20 +994,14 @@ int main(int argc, char **argv)
try {
Options options(argc, argv);
signals(options);
SummaryPrintingTimer stimer("sync");
SummaryPrintingTimer ftimer("finalize");
SummaryPrintingTimer rwtimer("read+write");
SummaryPrintingTimer wtimer("write");
SummaryPrintingTimer rtimer("read");
PrintingTimer t1("TOTAL");
copy(options, rtimer, wtimer, rwtimer, ftimer);
#ifndef _WIN32 // only for performance measurements on Linux.
{
SimpleTimer stim(stimer);
sync();
}
stimer.print();
#endif
SummaryPrintingTimer stimer("Tool:sync");
SummaryPrintingTimer ftimer("Tool:finalize");
SummaryPrintingTimer rwtimer("Tool:read+write");
SummaryPrintingTimer wtimer("Tool:write");
SummaryPrintingTimer rtimer("Tool:read");
SummaryPrintingTimer ttimer("Tool:TOTAL");
SimpleTimer t1(ttimer);
copy(options, rtimer, wtimer, rwtimer, ftimer, stimer);
}
catch (const std::exception& ex) {
std::string myname(argc >= 1 ? argv[0] : "zgycopyc");
......
......@@ -51,7 +51,7 @@ COPY ./ ./
# Enable these to allow the "test" image to run the copytimes.sh test.
# Note that the oldtools folder has not been committed to git.
#PERFTEST# copy oldtools/ oldtools/
#PERFTEST# copy private/copytimes.sh private/copytimes.sh
#PERFTEST# copy private/copytimes*.sh private/
#PERFTEST# copy oldtools/dropcache /usr/local/bin/dropcache
#PERFTEST# RUN chmod 4555 /usr/local/bin/dropcache
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment