Commit a5b52d6d authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/refactor-telemetry' into 'master'

Performance analysis tools

See merge request !84
parents cc507911 d0811bb7
Pipeline #50255 passed with stages
in 20 minutes and 40 seconds
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "file_performance.h"
#include "perflogger.h"
#include "timer.h"
#include "environment.h"
#include "../exception.h"
......@@ -21,6 +22,8 @@
#include <fstream>
#include <sstream>
#include <cmath>
#include <atomic>
#include <algorithm>
namespace InternalZGY {
#if 0
......@@ -28,45 +31,29 @@ namespace InternalZGY {
#endif
FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: _relay(relay)
, _outfile(outfile)
, _chunksize(chunksize)
, _mutex()
, _nsamples(0)
, _histmin(hist_min)
, _histmax(hist_max)
, _statmin(std::numeric_limits<double>::max())
, _statmax(std::numeric_limits<double>::lowest())
, _statsum(0)
, _statssq(0)
, _histbins(hist_bincount, 0)
, _sumtimer(new SummaryTimer("summary"))
, _sumtimerbeg(std::numeric_limits<double>::max())
, _sumtimerend(std::numeric_limits<double>::lowest())
, _suminterval(interval)
, _sumbytes(0)
: FileRelay(relay)
, _recorder(new PerformanceLogger
(outfile, chunksize, hist_bincount, hist_min, hist_max, interval))
{
}
FileWithPerformanceLogger::~FileWithPerformanceLogger()
{
std::string str1 = dumpThroughput(true);
std::string str2 = dumpLatency(true);
if (_outfile && (!str1.empty() || !str2.empty()))
*_outfile << str1 + str2 << std::flush;
_recorder->dumpToFile("destructed");
}
void
FileWithPerformanceLogger::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
if (size == _chunksize || _chunksize < 0) {
if (_recorder->logThisSize(size)) {
Timer timer;
_relay->xx_read(data, offset, size, usagehint);
relay().xx_read(data, offset, size, usagehint);
timer.stop();
add(timer, size);
_recorder->add(timer, size);
}
else {
_relay->xx_read(data, offset, size, usagehint);
relay().xx_read(data, offset, size, usagehint);
}
}
......@@ -78,199 +65,26 @@ FileWithPerformanceLogger::xx_readv(const ReadList& requests, bool parallel_ok,
// inside SeismicStoreFile:. Or make the entire consolidate logic
// into a separate module that can be chained. Either way I may need
// multiple histograms to cover different brick sizes.
if ((requests.size() == 1 && requests.front().size == _chunksize) || _chunksize < 0) {
if ((requests.size() == 1 && _recorder->logThisSize(requests.front().size)) ||
_recorder->logThisSize(-1)) {
Timer timer;
_relay->xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
timer.stop();
std::int64_t size = 0;
for (const ReadRequest& it : requests)
size += it.size;
add(timer, size);
_recorder->add(timer, size);
}
else {
_relay->xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
}
}
void
FileWithPerformanceLogger::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown)
{
_relay->xx_write(data, offset, size, usagehint);
}
void
FileWithPerformanceLogger::xx_close()
{
_relay->xx_close();
std::string str1 = dumpThroughput(true);
std::string str2 = dumpLatency(true);
if (_outfile && (!str1.empty() || !str2.empty()))
*_outfile << str1 + str2 << std::flush;
}
std::int64_t
FileWithPerformanceLogger::xx_eof() const
{
return _relay->xx_eof();
}
std::vector<std::int64_t>
FileWithPerformanceLogger::xx_segments(bool complete) const
{
return _relay->xx_segments(complete);
}
bool
FileWithPerformanceLogger::xx_iscloud() const
{
return _relay->xx_iscloud();
}
void
FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
{
std::unique_lock<std::mutex> lk(_mutex);
// Optional periodic reporting.
const double now = timer.getLastStop() / timer.getFrequency();
if (_sumtimerbeg < _sumtimerend &&
_suminterval > 0 &&
now >= _sumtimerbeg + _suminterval)
{
// The timer passed in belongs to the next interval.
lk.unlock();
std::string msg = dumpThroughput(true);
lk.lock();
if (_outfile && !msg.empty())
*_outfile << msg << std::flush;
// Might also have reported and cleared the latency log.
}
// Update throughput logging.
_sumtimer->add(timer);
_sumtimerend = now;
// ASSERT timer.getCount() == 1 otherwise we don't know the first start.
// class Timer would need to keep track of first_start.
_sumtimerbeg = std::min(_sumtimerbeg, _sumtimerend - timer.getTotal());
_sumbytes += blocksize;
// Update latency a.k.a. round trip time logging.
// Note _hist{min,max} is the center of first and last bin.
// Example: min=0, max=1000, nbuckets = 101
// gives binwidth=10, bin(3) = 0, bin(7)=1
// which is correct because bin 0 covers 0 +/- 5.
const double value = timer.getTotal()*1000;
const int nbuckets = static_cast<int>(_histbins.size());
const double binwidth = (_histmax - _histmin) / (nbuckets - 1);
const double fbin = (value - _histmin) / binwidth;
const int ibin = (fbin < 0 ? 0 :
fbin > (nbuckets-1) ? nbuckets-1 :
static_cast<int>(std::floor(fbin+0.5)));
_histbins[ibin] += 1;
_nsamples += 1;
_statsum += value;
_statssq += value*value;
if (_statmin > value) _statmin = value;
if (_statmax < value) _statmax = value;
}
/**
* Format the results as CSV that is simple to paste into a spreadsheet.
* Use a distinct start/end marker so the csv data can be easily
* extracted from a log file. Even if the previous line did not end in
* a newline.
*
* Quick instructions for the spreadsheet:
* Select the CSV3,CSV4,CSV5 cells except for the first and last columns.
* Make a line graph: data series in rows, first row and first column as label.
* This shows distibution of latency as well as cumulative counts; the latter
* can be used to get the percentile of samples below a certain latency.
*/
std::string
FileWithPerformanceLogger::dumpLatency(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
if (_nsamples != 0) {
const int nbins = static_cast<int>(_histbins.size());
const double binwidth = (_histmax - _histmin) / (nbins - 1);
ss << "CSV1,Samplecount,Histogram min,Histogram max"
<< ",Statistic min,Statistic max,Statistic average,END\n"
<< "CSV2," << _nsamples
<< "," << _histmin << "," << _histmax
<< "," << _statmin << "," << _statmax
<< "," << _statsum / _nsamples
<< ",END\n";
// Dump center value of each histogram bin.
// Note that CSV3, CSV4, CSV5 can all be computed in a
// spreadsheet using a simple formula. Bit I'd like to
// have an (almost) single-click way of making the graph.
ss << "CSV3,Latency";
for (int ii=0; ii<nbins; ++ii)
ss << "," << _histmin + binwidth * ii;
ss << ",END\n";
// Dump the bins as percentages
ss << "CSV4,Frequency%";
for (const std::int64_t count : _histbins)
ss << "," << count / (double)_nsamples;
ss << ",END\n";
// Dump the cumulative counts.
ss << "CSV5,Cumulative%";
std::int64_t summed = 0;
for (const std::int64_t count : _histbins) {
summed += count;
ss << "," << summed / (double)_nsamples;
}
ss << ",END\n";
// Dump the bins as raw counts
ss << "CSV6,Count";
for (const std::int64_t count : _histbins)
ss << "," << count;
ss << ",END\n";
if (clear) {
_nsamples = 0;
_statmin = std::numeric_limits<double>::max();
_statmax = std::numeric_limits<double>::lowest();
_statsum= 0;
_statssq = 0;
_histbins = std::vector<std::int64_t>(_histbins.size(), 0);
}
}
return ss.str();
}
std::string
FileWithPerformanceLogger::dumpThroughput(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
if (_sumtimerbeg < _sumtimerend && _sumbytes > 0) {
const double bytecount = static_cast<double>(_sumbytes);
const double elapsed = _sumtimerend - _sumtimerbeg;
// Note, if you want to emulate an interval timer running
// regardless of whether there was any traffic then round
// begin and end to a multiple of interval. And add code
// to output a lot of zeros for intervals with no traffic.
ss << "CSV7,mbytes,time,speed,readcount,END\n";
ss << "CSV8"
<< "," << bytecount / (1024.0*1024.0)
<< "," << elapsed
<< "," << (bytecount / (1024.0*1024.0)) / elapsed
<< "," << _sumtimer->getCount()
<< ",END\n";
}
if (clear) {
_sumtimer->reset();
_sumtimerbeg = std::numeric_limits<double>::max();
_sumtimerend = std::numeric_limits<double>::lowest();
_sumbytes = 0;
}
return ss.str();
relay().xx_close();
_recorder->dumpToFile("closed");
}
/**
......
......@@ -27,54 +27,36 @@
#include <ostream>
#include "file.h"
#include "file_relay.h"
namespace InternalZGY {
#if 0
}
#endif
class Timer;
class SummaryTimer;
class PerformanceLogger;
/**
* \brief FileADT wrapper logging performance statistics.
* \brief FileADT wrapper for logging performance statistics.
*/
class FileWithPerformanceLogger : public FileADT
class FileWithPerformanceLogger : public FileRelay
{
private:
const std::shared_ptr<FileADT> _relay;
const std::shared_ptr<std::ostream> _outfile;
const std::int64_t _chunksize;
mutable std::mutex _mutex;
// Latency a.k.a. round trip time: reported one for each thread.
std::int64_t _nsamples;
const double _histmin, _histmax;
double _statmin, _statmax, _statsum, _statssq;
std::vector<std::int64_t> _histbins;
// Throughput: Report transfered bytes vs. elapsed time all threads.
std::shared_ptr<SummaryTimer> _sumtimer;
double _sumtimerbeg;
double _sumtimerend;
const double _suminterval;
std::int64_t _sumbytes;
std::shared_ptr<PerformanceLogger> _recorder;
FileWithPerformanceLogger(const FileWithPerformanceLogger&) = delete;
FileWithPerformanceLogger(FileWithPerformanceLogger&&) = delete;
FileWithPerformanceLogger& operator=(const FileWithPerformanceLogger&) = delete;
FileWithPerformanceLogger& operator=(FileWithPerformanceLogger&&) = delete;
public:
explicit FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval);
virtual ~FileWithPerformanceLogger();
// Intercept
virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint) override;
virtual void xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint) override;
virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint) override;
virtual void xx_close() override;
virtual std::int64_t xx_eof() const override;
virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
virtual bool xx_iscloud() const override;
public:
void add(const Timer& timer, std::int64_t blocksize);
std::string dumpLatency(bool clear);
std::string dumpThroughput(bool clear);
static std::shared_ptr<FileADT> inject(std::shared_ptr<FileADT> file);
};
......
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "file_performance.h"
#include "perflogger.h"
#include "timer.h"
#include "environment.h"
#include "../exception.h"
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <cmath>
#include <atomic>
#include <algorithm>
namespace InternalZGY {
#if 0
}
#endif
std::atomic<int> PerformanceLogger::_last_id{0};
PerformanceLogger::PerformanceLogger(std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: _outfile(outfile)
, _chunksize(chunksize)
, _mutex()
, _nsamples(0)
, _histmin(hist_min)
, _histmax(hist_max)
, _statmin(std::numeric_limits<double>::max())
, _statmax(std::numeric_limits<double>::lowest())
, _statsum(0)
, _statssq(0)
, _histbins(hist_bincount, 0)
, _sumtimer(new SummaryTimer("summary"))
, _sumtimerbeg(std::numeric_limits<double>::max())
, _sumtimerend(std::numeric_limits<double>::lowest())
, _suminterval(interval)
, _sumbytes(0)
, _first(true)
, _id(0)
{
_id = 1 + _last_id.fetch_add(1);
}
PerformanceLogger::~PerformanceLogger()
{
}
bool
PerformanceLogger::logThisSize(std::int64_t size)
{
return size == _chunksize || _chunksize < 0;
}
void
PerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
{
std::unique_lock<std::mutex> lk(_mutex);
// Optional periodic reporting.
const double now = timer.getLastStop() / timer.getFrequency();
if (_sumtimerbeg < _sumtimerend &&
_suminterval > 0 &&
now >= _sumtimerbeg + _suminterval)
{
// The timer passed in belongs to the next interval.
lk.unlock();
std::string msg = dumpThroughput(true);
lk.lock();
if (_outfile && !msg.empty())
*_outfile << msg << std::flush;
// Might also have reported and cleared the latency log.
}
// Update throughput logging.
_sumtimer->add(timer);
_sumtimerend = now;
// ASSERT timer.getCount() == 1 otherwise we don't know the first start.
// class Timer would need to keep track of first_start.
_sumtimerbeg = std::min(_sumtimerbeg, _sumtimerend - timer.getTotal());
_sumbytes += blocksize;
// Update latency a.k.a. round trip time logging.
// Note _hist{min,max} is the center of first and last bin.
// Example: min=0, max=1000, nbuckets = 101
// gives binwidth=10, bin(3) = 0, bin(7)=1
// which is correct because bin 0 covers 0 +/- 5.
const double value = timer.getTotal()*1000;
const int nbuckets = static_cast<int>(_histbins.size());
const double binwidth = (_histmax - _histmin) / (nbuckets - 1);
const double fbin = (value - _histmin) / binwidth;
const int ibin = (fbin < 0 ? 0 :
fbin > (nbuckets-1) ? nbuckets-1 :
static_cast<int>(std::floor(fbin+0.5)));
_histbins[ibin] += 1;
_nsamples += 1;
_statsum += value;
_statssq += value*value;
if (_statmin > value) _statmin = value;
if (_statmax < value) _statmax = value;
}
/**
* Format the results as CSV that is simple to paste into a spreadsheet.
* Use a distinct start/end marker so the csv data can be easily
* extracted from a log file. Even if the previous line did not end in
* a newline.
*
* Quick instructions for the spreadsheet:
* Select the CSV3,CSV4,CSV5 cells except for the first and last columns.
* Make a line graph: data series in rows, first row and first column as label.
* This shows distibution of latency as well as cumulative counts; the latter
* can be used to get the percentile of samples below a certain latency.
*/
std::string
PerformanceLogger::dumpLatency(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
if (_nsamples != 0) {
const int nbins = static_cast<int>(_histbins.size());
const double binwidth = (_histmax - _histmin) / (nbins - 1);
ss << "CSV1,ID,Samplecount,Histogram min,Histogram max"
<< ",Statistic min,Statistic max,Statistic average,END\n"
<< "CSV2," << _id << "," << _nsamples
<< "," << _histmin << "," << _histmax
<< "," << _statmin << "," << _statmax
<< "," << _statsum / _nsamples
<< ",END\n";
// Dump center value of each histogram bin.
// Note that CSV3, CSV4, CSV5 can all be computed in a
// spreadsheet using a simple formula. Bit I'd like to
// have an (almost) single-click way of making the graph.
ss << "CSV3," << _id << ",Latency";
for (int ii=0; ii<nbins; ++ii)
ss << "," << _histmin + binwidth * ii;
ss << ",END\n";
// Dump the bins as percentages
ss << "CSV4," << _id << ",Frequency%";
for (const std::int64_t count : _histbins)
ss << "," << count / (double)_nsamples;
ss << ",END\n";
// Dump the cumulative counts.
ss << "CSV5," << _id << ",Cumulative%";
std::int64_t summed = 0;
for (const std::int64_t count : _histbins) {
summed += count;
ss << "," << summed / (double)_nsamples;
}
ss << ",END\n";
// Dump the bins as raw counts
ss << "CSV6," << _id << ",Count";
for (const std::int64_t count : _histbins)
ss << "," << count;
ss << ",END\n";
if (clear) {
_nsamples = 0;
_statmin = std::numeric_limits<double>::max();
_statmax = std::numeric_limits<double>::lowest();
_statsum= 0;
_statssq = 0;
_histbins = std::vector<std::int64_t>(_histbins.size(), 0);
}
}
return ss.str();
}
std::string
PerformanceLogger::dumpThroughput(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
if (_first) {
ss << "CSV0,ID,mbytes,time,speed,readcount,END\n";
_first = false;
}
if (_sumtimerbeg < _sumtimerend && _sumbytes > 0) {
const double bytecount = static_cast<double>(_sumbytes);
const double elapsed = _sumtimerend - _sumtimerbeg;
// Note, if you want to emulate an interval timer running
// regardless of whether there was any traffic then round
// begin and end to a multiple of interval. And add code
// to output a lot of zeros for intervals with no traffic.
ss << "CSV8"
<< "," << _id
<< "," << bytecount / (1024.0*1024.0)
<< "," << elapsed
<< "," << (bytecount / (1024.0*1024.0)) / elapsed
<< "," << _sumtimer->getCount()
<< ",END\n";
}
if (clear) {
_sumtimer->reset();
_sumtimerbeg = std::numeric_limits<double>::max();
_sumtimerend = std::numeric_limits<double>::lowest();
_sumbytes = 0;
}
return ss.str();
}
void
PerformanceLogger::dumpToFile(const std::string& comment)
{
std::string str1 = dumpThroughput(true);
std::string str2 = dumpLatency(true);
std::string str3 = "CSV9," + std::to_string(_id) + "," + comment + ",END\n";
if (_outfile && (!str1.empty() || !str2.empty()))
*_outfile << str1 + str2 + str3 << std::flush;
}
} // namespace
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
/**
* \file: perflogger.h
* \brief Telemetry,
*/