Commit 943b3276 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

More telemetry added to support the performance work.

parent 587544ee
......@@ -882,6 +882,12 @@ public:
this->_fd->xx_close();
this->_fd.reset();
// Kludge for performance measurements,
// The timers in DataBuffer are global. For some experiments
// it makes sense to print and reset then just before finalize
// and also after closing each file.
InternalZGY::DataBuffer::cleartimers(true);
// If errorflag() is set and the file is new or has been
// successfully written to at least once then the client code is
// strongly advised to delete the file.
......
......@@ -1733,7 +1733,7 @@ ZgyInternalBulk::writeRegion(
// TODO-Performance: Multi-threading of _scaleDataToStorage().
// Need to figure out the cutoff where the buffer is too small and the
// overhead of OpenMP starts getting in the way.
SimpleTimer t1(*_ststimer);
SimpleTimer t1(*_ststimer, timers_on());
data = _scaleDataToStorage(data);
}
......
......@@ -34,6 +34,7 @@
#include <errno.h>
#include <unistd.h>
#include <mutex>
#include <atomic>
using OpenZGY::IOContext;
namespace InternalZGY {
......@@ -56,6 +57,60 @@ namespace {
static int enable = Environment::getNumericEnv("OPENZGY_TIMERS", 0);
return enable > 0;
}
/**
* \brief SummaryTimer that prints its result when going out of scope.
* \details This is a private extension that also knows about total
* number of bytes transfered, so it can report the actuar throughput.
*/
class OPENZGY_TEST_API SummaryPrintingTimerEx : public SummaryPrintingTimer
{
std::atomic<std::int64_t> bytes_read_;
std::atomic<std::int64_t> bytes_written_;
public:
explicit SummaryPrintingTimerEx(const char *name, bool csv = false)
: SummaryPrintingTimer(name, csv)
, bytes_read_(0)
, bytes_written_(0)
{
}
virtual ~SummaryPrintingTimerEx() {
print();
}
static std::string niceNumber(const std::string& label, std::int64_t n) {
if (n > 10*1024*1024)
return label + std::to_string(n/(1024*1024)) + " MB";
else if (n > 10*1024)
return label + std::to_string(n/1024) + " kB";
else if (n != 0)
return label + std::to_string(n) + " bytes";
else
return std::string();
}
virtual void print() {
if (getCount() != 0) {
std::string msg(csv_ ? getCSV() : getValue(true, true));
if (!msg.empty() && msg.back() == '\n')
msg = msg.substr(0, msg.size()-1);
if (csv_)
std::cerr << msg
<< "," << bytes_read_.load()
<< "," << bytes_written_.load()
<< std::endl;
else
std::cerr << msg
<< niceNumber(", R: ", bytes_read_.load())
<< niceNumber(", W: ", bytes_written_.load())
<< std::endl;
}
reset(); // Prevent the base class from printing as well.
}
void addBytesRead(std::int64_t nbytes) {
bytes_read_.fetch_add(nbytes);
}
void addBytesWritten(std::int64_t nbytes) {
bytes_written_.fetch_add(nbytes);
}
};
}
/**
......@@ -106,6 +161,11 @@ LocalFileLinux::LocalFileLinux(const std::string& filename, OpenMode mode, const
, _fd(-1)
, _mutex()
{
// Use the SummaryPrintingTimer extension that keeps track of bytes done.
// The instance allocated in the base class constructor is simply dropped.
_rtimer.reset(new SummaryPrintingTimerEx("File::read"));
_wtimer.reset(new SummaryPrintingTimerEx("File::write"));
switch (mode) {
case OpenMode::ReadOnly:
_fd = ::open(filename.c_str(), O_RDONLY, 0666);
......@@ -229,6 +289,7 @@ LocalFileLinux::xx_read(void *data, std::int64_t offset, std::int64_t size, Usag
SimpleTimer tt(*_rtimer, timers_on());
_validate_read(data, offset, size, xx_eof(), _mode);
ssize_t nbytes = ::pread(_fd, data, size, offset);
static_cast<SummaryPrintingTimerEx*>(_rtimer.get())->addBytesRead(nbytes);
_check_short_read(offset, size, nbytes);
}
......@@ -267,6 +328,7 @@ LocalFileLinux::xx_write(const void* data, std::int64_t offset, std::int64_t siz
ssize_t nbytes = ::pwrite(_fd, data, size, offset);
if (nbytes < 0)
throw OpenZGY::Errors::ZgyIoError(_name, errno);
static_cast<SummaryPrintingTimerEx*>(_wtimer.get())->addBytesWritten(nbytes);
std::lock_guard<std::mutex> lk(_mutex); // protect _eof
_eof = std::max(_eof, offset + nbytes);
if (nbytes != size)
......
......@@ -109,6 +109,7 @@ FileWithPerformanceLogger::xx_close()
void
FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
{
std::unique_lock<std::mutex> lk(_mutex);
// Optional periodic reporting.
const double now = timer.getLastStop() / timer.getFrequency();
if (_sumtimerbeg < _sumtimerend &&
......@@ -116,7 +117,9 @@ FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
now >= _sumtimerbeg + _suminterval)
{
// The timer passed in belongs to the next interval.
lk.unlock();
std::string msg = dumpThroughput(true);
lk.lock();
if (!msg.empty())
std::cout << msg << std::flush;
// Might also have reported and cleared the latency log.
......@@ -142,7 +145,6 @@ FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
const int ibin = (fbin < 0 ? 0 :
fbin > (nbuckets-1) ? nbuckets-1 :
std::floor(fbin+0.5));
std::lock_guard<std::mutex> lk(_mutex);
_histbins[ibin] += 1;
_nsamples += 1;
_statsum += value;
......
......@@ -472,9 +472,16 @@ SummaryTimer::getValue(bool details, bool msonly) const
const char*
SummaryTimer::getCSV() const
{
// Avoid odd characters in name to make it easier for spreadsheets.
char *safename = new char[strlen(getName())+1];
char *dst = safename;
const char *src = getName(); // Instance member, remains valid long enough.
for (; *src; ++src, ++dst)
*dst = isalnum(*src) ? *src : '_';
*dst = '\0';
sprintf_s(pimpl_->buff_, sizeof(pimpl_->buff_),
"TIMER,\"%s\",%d,%.3lf,%.3lf\n",
getName(), getCount(), getTotal(), getAdjusted());
safename, getCount(), getTotal(), getAdjusted());
return pimpl_->buff_;
}
......@@ -521,8 +528,9 @@ SummaryTimer::add(const Timer& t)
/* SummaryPrintingTimer ===============================================*/
/*=========================================================================*/
SummaryPrintingTimer::SummaryPrintingTimer(const char *name)
SummaryPrintingTimer::SummaryPrintingTimer(const char *name, bool csv)
: SummaryTimer(name)
, csv_(csv)
{
}
......@@ -535,8 +543,7 @@ void
SummaryPrintingTimer::print()
{
if (getCount() != 0) {
const char *msg = getValue(true, true);
//const char *msg = getCSV();
const char *msg = csv_ ? getCSV() : getValue(true, true);
fwrite(msg, 1, strlen(msg), stderr);
}
reset();
......
......@@ -194,8 +194,10 @@ public:
*/
class OPENZGY_TEST_API SummaryPrintingTimer : public SummaryTimer
{
protected:
bool csv_;
public:
explicit SummaryPrintingTimer(const char *name);
explicit SummaryPrintingTimer(const char *name, bool csv = false);
virtual ~SummaryPrintingTimer();
virtual void print();
};
......
......@@ -38,6 +38,7 @@
#include <omp.h>
#include <atomic>
#include <cstring>
#include <mutex>
#ifndef _WIN32
#include <signal.h>
#endif
......@@ -92,6 +93,7 @@ public:
bool alpha; // Still unused.
bool dumpsqnr; // Still unused.
bool native;
bool dropcache;
std::string sigpipe;
std::string input;
std::string output;
......@@ -113,6 +115,7 @@ public:
, alpha(false)
, dumpsqnr(false)
, native(true)
, dropcache(false)
, sigpipe()
, input()
, output()
......@@ -154,13 +157,14 @@ public:
//"-D, --dumpsqnr: *Dump table of sqnr vs. compression ratio.",
"-N, --native: Read/write as file's type (default).",
"-F, --float: Read/write as float.",
"-U, --dropcache: Invoke \"dropcache\" before finalize.",
"-p, --sigpipe SIGPIPE disposition.",
"-i, --input FILE.zgy: Input file name. If missing, use random data.",
"-o, --output FILE.zgy: Output file name. If missing, discard data.",
"-s, --size I,J,K,S: Size, if no input file. E.g. 16x16x16x2.",
"-b, --bricksize I,J,K: Chunk size when copying. E.g. 64x64x64 samples.",
"-B, --obricksize I,J,K Brick size in output. E.g. 64x64x64 samples.",
"-O, --osamplesize type Output float int16, or int8"
"-O, --osamplesize type Output float int16, or int8.",
"-g, --algorithm 1,2,N: LOD algorithms as 3 int: lod1,lod2,lodN.",
//"-l, --lod N: *Level of detail, 0 = full resolution.",
"-t, --threads N: Number of threads to use for reading.",
......@@ -184,6 +188,7 @@ public:
<< (alpha ? "--alpha " : "")
<< (dumpsqnr ? "--dumpsqnr " : "")
<< (native ? "--native " : "--float ")
<< (dropcache ? "--dropcache " : "")
<< (sigpipe.empty() ? "" : ("--sigpipe " + sigpipe + " "));
if (fakesize[0]||fakesize[1]||fakesize[2]||fakesize[3])
......@@ -330,7 +335,7 @@ public:
static const char* short_options()
{
return "hvqGuraDNFp:i:o:s:l:b:B:O:g:t:n:Q:";
return "hvqGuraDNFUp:i:o:s:l:b:B:O:g:t:n:Q:";
}
static const struct option *long_options()
......@@ -346,6 +351,7 @@ public:
{"dumpsqnr", no_argument, 0, 'D' },
{"native", no_argument, 0, 'N' },
{"float", no_argument, 0, 'F' },
{"dropcache", no_argument, 0, 'U' },
{"sigpipe", required_argument, 0, 'p' },
{"input", required_argument, 0, 'i' },
{"output", required_argument, 0, 'o' },
......@@ -381,6 +387,7 @@ public:
case 'D': throw std::runtime_error("--dumpsqnr not supported"); //dumpsqnr = true; break;
case 'N': native = true; break;
case 'F': native = false; break;
case 'U': dropcache = true; break;
case 'p': sigpipe = optarg; break;
case 'i':
......@@ -525,8 +532,8 @@ public:
{
if (input.empty() && fakesize[0]*fakesize[1]*fakesize[2]*fakesize[3] == 0)
throw std::runtime_error("Must specify either --input or --size");
if (!output.empty() && threads != 1)
throw std::runtime_error("Multi threading only allowed when discarding data.");
//if (!output.empty() && threads != 1)
// throw std::runtime_error("Multi threading only allowed when discarding data.");
}
};
......@@ -596,6 +603,12 @@ void copychunkT(
r->read(pos, realsize, buffer, 0);
}
{
// OpenZGY will also use an exclusive lock for writes.
// But by setting our own the "wtimer" will exclude the
// wait time. Which would have made "wtimer" not very
// useful in the multithreaded case.
static std::mutex mutex;
std::lock_guard<std::mutex> lk(mutex);
SimpleTimer wt(wtimer);
w->write(pos, realsize, buffer);
}
......@@ -892,6 +905,11 @@ copy(const Options& opt, SummaryPrintingTimer& rtimer, SummaryPrintingTimer& wti
w->close_incomplete();
}
else {
// dropcache simulates having files so huge that the Linux
// buffer cache won't keep the entire output file in memory.
// so there will be a significant I/O cost reading back LOD0.
if (opt.dropcache)
system("/usr/local/bin/dropcache");
// Don't report timing for finalizing a mocked output file.
// Yes it does actually have a (tiny) cost but the user won't
// expect to see finalize reported at all when discarding the output.
......@@ -993,6 +1011,8 @@ int main(int argc, char **argv)
{
try {
Options options(argc, argv);
if (options.dropcache)
system("/usr/local/bin/dropcache");
signals(options);
SummaryPrintingTimer stimer("Tool:sync");
SummaryPrintingTimer ftimer("Tool:finalize");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment