Commit 7cdf94a7 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/perflogger' into 'master'

Additional telemetry

See merge request !88
parents 92aadc59 9d0b67e7
Pipeline #55478 passed with stages
in 9 minutes and 18 seconds
......@@ -616,6 +616,26 @@ ZgyInternalBulk::readConstantValue(
result = (result * this->_metadata->ih().storagetofloat_slope() +
this->_metadata->ih().storagetofloat_intercept());
}
// Only logging success, because on failure the application will
// need to read the actual data. Which will trigger the logging
// in readToExistingBuffer.
if (_logger(2))
_logger(2, std::stringstream()
<< "read(start="
<< "(" << start[0]
<< "," << start[1]
<< "," << start[2]
<< ")"
<< ", size="
<< "(" << size[0]
<< "," << size[1]
<< "," << size[2]
<< ")"
<< ", lod=" << lod
<< std::boolalpha << ", as_float=" << as_float << ")"
<< " => constant " << result);
return std::make_pair(true,result);
}
......@@ -646,6 +666,19 @@ ZgyInternalBulk::readToExistingBuffer(
if (result->datatype() != expected_dtype)
throw OpenZGY::Errors::ZgyUserError("Requested data type not supported for this file.");
if (_logger(2))
_logger(2, std::stringstream()
<< "read(start="
<< "(" << start[0]
<< "," << start[1]
<< "," << start[2]
<< "), size="
<< "(" << result->size3d()[0]
<< "," << result->size3d()[1]
<< "," << result->size3d()[2]
<< "), lod=" << lod
<< std::boolalpha << ", as_float=" << as_float << ")");
// Need a default value to use when trying to read a brick that
// was never written, or to fill in a brick that was only partly
// written. To avoid non intuitive behavior the same value should
......@@ -2351,6 +2384,19 @@ ZgyInternalBulk::writeRegion(
_metadata->ih().datatype()))
throw OpenZGY::Errors::ZgyUserError("Invalid data type in writeRegion");
if (_logger(2))
_logger(2, std::stringstream()
<< "write(start="
<< "(" << start[0]
<< "," << start[1]
<< "," << start[2]
<< "), size="
<< "(" << data->size3d()[0]
<< "," << data->size3d()[1]
<< "," << data->size3d()[2]
<< "), lod=" << lod
<< std::boolalpha << ", is_storage=" << is_storage << ")");
// TODO-Performance: Combining range() and _scaleDataToStorage()
// might save some time.
......
......@@ -147,7 +147,7 @@ LocalFileLinux::xx_make_instance(const std::string& filename, OpenMode mode, con
if (filename.find("://") == std::string::npos) {
auto file = std::shared_ptr<FileADT>(new LocalFileLinux(filename, mode, iocontext));
// This is a no-op unless enabled by enviroment variables
file = FileWithPerformanceLogger::inject(file);
file = FileWithPerformanceLogger::inject(file, filename);
// This is for ad-hoc testing ONLY. Enable the parallelizer as the
// windows reader does. On Linux (i.e. in this file) it gives
......
......@@ -30,10 +30,10 @@ namespace InternalZGY {
}
#endif
FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, const std::string& outname, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval, const std::string& srcname)
: FileRelay(relay)
, _recorder(new PerformanceLogger
(outfile, chunksize, hist_bincount, hist_min, hist_max, interval))
(outname, chunksize, hist_bincount, hist_min, hist_max, interval, srcname))
{
}
......@@ -91,12 +91,20 @@ FileWithPerformanceLogger::xx_close()
* Inject a telemetry module if enabled by environment variables.
* If not enabled the telemetry code has zero impact on the system.
*
* OPENZGY_MEASURE_KB = brick size to monitor, or -1 for all reads.
* OPENZGY_MEASURE_BINS = bins in histogram (default 251)
* OPENZGY_MEASURE_TIME = highest latency in ms (default 500)
* OPENZGY_MEASURE_KB = brick size to monitor, or -1 for all reads.
* OPENZGY_MEASURE_LOGFILE = optionally write to this file
* OPENZGY_MEASURE_BINS = bins in histogram (default 251)
* OPENZGY_MEASURE_TIME = highest latency in ms (default 500)
* OPENZGY_MEASURE_INTERVAL = periodic report of throughput and latency
*
* There are three distinct reports being output:
* - Periodic latency and throughput, enable by MEASURE_INTERVAL, tags CSV0,CSV8
* - Latency statistics for the entire file, tags CSV1,CSV2
* - Latency histogram for the entire file, tags CSV3,CSV4,CSV5,CSV6
* and can be fine tuned by MEASURE_BINS and MEASURE_TIME.
*/
std::shared_ptr<FileADT>
FileWithPerformanceLogger::inject(std::shared_ptr<FileADT> file)
FileWithPerformanceLogger::inject(std::shared_ptr<FileADT> file, const std::string& srcname)
{
int target = Environment::getNumericEnv("OPENZGY_MEASURE_KB", 0);
if (target != 0) {
......@@ -105,13 +113,7 @@ FileWithPerformanceLogger::inject(std::shared_ptr<FileADT> file)
int interval = Environment::getNumericEnv("OPENZGY_MEASURE_INTERVAL", 0);
std::string filename = Environment::getStringEnv("OPENZGY_MEASURE_LOGFILE");
std::shared_ptr<std::ostream> out;
if (!filename.empty()) {
out = std::make_shared<std::ofstream>(filename, std::ofstream::app);
}
else {
out = std::shared_ptr<std::ostream>(&std::cout, [](std::ostream*){});
}
file = std::shared_ptr<FileADT>(new FileWithPerformanceLogger(file, out, target*1024, bincount, 0.0, maxtime, interval));
file = std::shared_ptr<FileADT>(new FileWithPerformanceLogger(file, filename, target*1024, bincount, 0.0, maxtime, interval, srcname));
}
return file;
}
......
......@@ -50,14 +50,14 @@ private:
FileWithPerformanceLogger& operator=(FileWithPerformanceLogger&&) = delete;
public:
explicit FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval);
explicit FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, const std::string& outname, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval, const std::string& srcname);
virtual ~FileWithPerformanceLogger();
// Intercept
virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint) override;
virtual void xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint) override;
virtual void xx_close() override;
static std::shared_ptr<FileADT> inject(std::shared_ptr<FileADT> file);
static std::shared_ptr<FileADT> inject(std::shared_ptr<FileADT> file, const std::string& srcname);
};
} // namespace
......@@ -1218,7 +1218,7 @@ SeismicStoreFile::xx_make_instance(const std::string& filename, OpenMode mode, c
auto file = std::shared_ptr<FileADT>(new SeismicStoreFile(filename, mode, iocontext));
// This is a no-op unless enabled by enviroment variables.
// Note, this might have been injected after the FileParallelizer instead.
file = FileWithPerformanceLogger::inject(file);
file = FileWithPerformanceLogger::inject(file, filename);
// Improve multi-threading of decompress and copy-out.
auto context = dynamic_cast<const SeismicStoreIOContext*>(iocontext);
......
......@@ -22,6 +22,7 @@
#include "fancy_timers.h"
#include "environment.h"
#include "file_parallelizer.h"
#include "file_performance.h"
#include <vector>
#include <string>
......@@ -146,6 +147,9 @@ LocalFileWindows::xx_make_instance(const std::string& filename, OpenMode mode, c
if (filename.find("://") == std::string::npos) {
auto file = std::shared_ptr<FileADT>(new LocalFileWindows(filename, mode, iocontext));
// This is a no-op unless enabled by enviroment variables
file = FileWithPerformanceLogger::inject(file, filename);
// The following is to parallelize only decompression and copy-out.
// Unlike the Linux case there is no parallel read at the lowest level.
// Which would have automatically parallelized those two.
......
......@@ -21,6 +21,7 @@
#include <iostream>
#include <fstream>
#include <sstream>
#include <iomanip>
#include <cmath>
#include <atomic>
#include <algorithm>
......@@ -32,8 +33,8 @@ namespace InternalZGY {
std::atomic<int> PerformanceLogger::_last_id{0};
PerformanceLogger::PerformanceLogger(std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: _outfile(outfile)
PerformanceLogger::PerformanceLogger(const std::string& outname, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval, const std::string& srcname)
: _outfile()
, _chunksize(chunksize)
, _mutex()
, _nsamples(0)
......@@ -51,8 +52,19 @@ PerformanceLogger::PerformanceLogger(std::shared_ptr<std::ostream> outfile, std:
, _sumbytes(0)
, _first(true)
, _id(0)
, _srcname(srcname)
{
_id = 1 + _last_id.fetch_add(1);
std::string name(outname);
if (!name.empty()) {
std::size_t pos = name.find("{}");
if (pos != std::string::npos)
name = name.substr(0, pos) + std::to_string(_id) + name.substr(pos+2);
_outfile = std::make_shared<std::ofstream>(name, std::ofstream::app);
}
else {
_outfile = std::shared_ptr<std::ostream>(&std::cerr, [](std::ostream*){});
}
}
PerformanceLogger::~PerformanceLogger()
......@@ -134,16 +146,17 @@ PerformanceLogger::dumpLatency(bool clear)
const double binwidth = (_histmax - _histmin) / (nbins - 1);
ss << "CSV1,ID,Samplecount,Histogram min,Histogram max"
<< ",Statistic min,Statistic max,Statistic average,END\n"
<< ",Statistic min,Statistic max,Statistic average,Filename,END\n"
<< "CSV2," << _id << "," << _nsamples
<< "," << _histmin << "," << _histmax
<< "," << _statmin << "," << _statmax
<< "," << _statsum / _nsamples
<< ",\"" << _srcname << "\""
<< ",END\n";
// Dump center value of each histogram bin.
// Note that CSV3, CSV4, CSV5 can all be computed in a
// spreadsheet using a simple formula. Bit I'd like to
// spreadsheet using a simple formula. But I'd like to
// have an (almost) single-click way of making the graph.
ss << "CSV3," << _id << ",Latency";
for (int ii=0; ii<nbins; ++ii)
......@@ -189,7 +202,7 @@ PerformanceLogger::dumpThroughput(bool clear)
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
if (_first) {
ss << "CSV0,ID,mbytes,time,speed,readcount,END\n";
ss << "CSV0,Timestamp,ID,Data(MB),Time(sec),Speed(MB/s),Readcount,Mean latency(ms),Filename,END\n";
_first = false;
}
if (_sumtimerbeg < _sumtimerend && _sumbytes > 0) {
......@@ -200,11 +213,17 @@ PerformanceLogger::dumpThroughput(bool clear)
// begin and end to a multiple of interval. And add code
// to output a lot of zeros for intervals with no traffic.
ss << "CSV8"
<< std::setprecision(3) << std::fixed
<< "," << _sumtimerbeg
<< "," << _id
<< "," << bytecount / (1024.0*1024.0)
<< "," << elapsed
<< "," << (bytecount / (1024.0*1024.0)) / elapsed
<< "," << _sumtimer->getCount()
<< "," << std::setprecision(0)
<< (_sumtimer->getCount() == 0 ? 0 :
1000.0 * _sumtimer->getTotal() / _sumtimer->getCount())
<< ",\"" << _srcname << "\""
<< ",END\n";
}
if (clear) {
......
......@@ -64,7 +64,7 @@ class SummaryTimer;
class PerformanceLogger
{
private:
const std::shared_ptr<std::ostream> _outfile;
std::shared_ptr<std::ostream> _outfile;
const std::int64_t _chunksize;
mutable std::mutex _mutex;
// Latency a.k.a. round trip time: reported one for each thread.
......@@ -80,6 +80,7 @@ private:
std::int64_t _sumbytes;
bool _first;
int _id;
const std::string _srcname;
static std::atomic<int> _last_id;
PerformanceLogger(const PerformanceLogger&) = delete;
......@@ -88,7 +89,7 @@ private:
PerformanceLogger& operator=(PerformanceLogger&&) = delete;
public:
explicit PerformanceLogger(std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval);
explicit PerformanceLogger(const std::string& outname, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval, const std::string& srcname);
virtual ~PerformanceLogger();
bool logThisSize(std::int64_t size);
public:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment