Commit c397b74a authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Refactor telemetry for better separation of concerns. Part 1: easy to diff changes.

parent cc6b425d
......@@ -29,11 +29,10 @@ namespace InternalZGY {
}
#endif
std::atomic<int> FileWithPerformanceLogger::_last_id{0};
std::atomic<int> PerformanceLogger::_last_id{0};
FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: _relay(relay)
, _outfile(outfile)
PerformanceLogger::PerformanceLogger(std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: _outfile(outfile)
, _chunksize(chunksize)
, _mutex()
, _nsamples(0)
......@@ -55,22 +54,40 @@ FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> re
_id = 1 + _last_id.fetch_add(1);
}
PerformanceLogger::~PerformanceLogger()
{
}
bool
PerformanceLogger::logThisSize(std::int64_t size)
{
return size == _chunksize || _chunksize < 0;
}
FileWithPerformanceLogger::FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval)
: FileRelay(relay)
, _recorder(new PerformanceLogger
(outfile, chunksize, hist_bincount, hist_min, hist_max, interval))
{
}
FileWithPerformanceLogger::~FileWithPerformanceLogger()
{
dumpToFile("destructed");
_recorder->dumpToFile("destructed");
}
void
FileWithPerformanceLogger::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
if (size == _chunksize || _chunksize < 0) {
if (_recorder->logThisSize(size)) {
Timer timer;
_relay->xx_read(data, offset, size, usagehint);
relay().xx_read(data, offset, size, usagehint);
timer.stop();
add(timer, size);
_recorder->add(timer, size);
}
else {
_relay->xx_read(data, offset, size, usagehint);
relay().xx_read(data, offset, size, usagehint);
}
}
......@@ -82,53 +99,30 @@ FileWithPerformanceLogger::xx_readv(const ReadList& requests, bool parallel_ok,
// inside SeismicStoreFile:. Or make the entire consolidate logic
// into a separate module that can be chained. Either way I may need
// multiple histograms to cover different brick sizes.
if ((requests.size() == 1 && requests.front().size == _chunksize) || _chunksize < 0) {
if ((requests.size() == 1 && _recorder->logThisSize(requests.front().size)) ||
_recorder->logThisSize(-1)) {
Timer timer;
_relay->xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
timer.stop();
std::int64_t size = 0;
for (const ReadRequest& it : requests)
size += it.size;
add(timer, size);
_recorder->add(timer, size);
}
else {
_relay->xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, usagehint);
}
}
void
FileWithPerformanceLogger::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown)
{
_relay->xx_write(data, offset, size, usagehint);
}
void
FileWithPerformanceLogger::xx_close()
{
_relay->xx_close();
dumpToFile("closed");
}
std::int64_t
FileWithPerformanceLogger::xx_eof() const
{
return _relay->xx_eof();
}
std::vector<std::int64_t>
FileWithPerformanceLogger::xx_segments(bool complete) const
{
return _relay->xx_segments(complete);
}
bool
FileWithPerformanceLogger::xx_iscloud() const
{
return _relay->xx_iscloud();
relay().xx_close();
_recorder->dumpToFile("closed");
}
void
FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
PerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
{
std::unique_lock<std::mutex> lk(_mutex);
// Optional periodic reporting.
......@@ -187,7 +181,7 @@ FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
* can be used to get the percentile of samples below a certain latency.
*/
std::string
FileWithPerformanceLogger::dumpLatency(bool clear)
PerformanceLogger::dumpLatency(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
......@@ -246,7 +240,7 @@ FileWithPerformanceLogger::dumpLatency(bool clear)
}
std::string
FileWithPerformanceLogger::dumpThroughput(bool clear)
PerformanceLogger::dumpThroughput(bool clear)
{
std::lock_guard<std::mutex> lk(_mutex);
std::stringstream ss;
......@@ -279,7 +273,7 @@ FileWithPerformanceLogger::dumpThroughput(bool clear)
}
void
FileWithPerformanceLogger::dumpToFile(const std::string& comment)
PerformanceLogger::dumpToFile(const std::string& comment)
{
std::string str1 = dumpThroughput(true);
std::string str2 = dumpLatency(true);
......
......@@ -27,6 +27,7 @@
#include <ostream>
#include "file.h"
#include "file_relay.h"
namespace InternalZGY {
#if 0
......@@ -37,12 +38,34 @@ class Timer;
class SummaryTimer;
/**
* \brief FileADT wrapper logging performance statistics.
* Inject code to log throughput, latency, etc.
*
* Note the separation of concerns.
*
* - PerformanceLogger knows how record the information but not how to
* get injected into the production code. The same class can be used
* to inject the logger at different levels ane the code can be
* shared among projects. Albeit typically using copy/paste.
*
* - Timer, PrintingTimer, etc. are low level classes used to store
* the results.
*
* - FileRelay implements the entire interface where the injection
* should happen, making each method just forward to the layer below.
* This is a convenient base class for code that only needs to
* intercept a few of the methods in the interface. Deriving from
* this class makes for cleaner code in the actual interceptor.
* The class only knows about the interface that will be used for the
* intercept. It may be shared if there are multiple loggers written
* for the same interface, In ZGY-Cloud it could have been used by
* class TelemetryFileWrapper. But if it works, don't fix it.
*
* - FileWithPerformanceLogger glues together the two classes above
* and contains the actual code to intercept use of the interface.
*/
class FileWithPerformanceLogger : public FileADT
class PerformanceLogger
{
private:
const std::shared_ptr<FileADT> _relay;
const std::shared_ptr<std::ostream> _outfile;
const std::int64_t _chunksize;
mutable std::mutex _mutex;
......@@ -61,25 +84,44 @@ private:
int _id;
static std::atomic<int> _last_id;
PerformanceLogger(const PerformanceLogger&) = delete;
PerformanceLogger(PerformanceLogger&&) = delete;
PerformanceLogger& operator=(const PerformanceLogger&) = delete;
PerformanceLogger& operator=(PerformanceLogger&&) = delete;
public:
explicit PerformanceLogger(std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval);
virtual ~PerformanceLogger();
bool logThisSize(std::int64_t size);
public:
void add(const Timer& timer, std::int64_t blocksize);
// Nonvirtual; might be called from destructor.
std::string dumpLatency(bool clear);
std::string dumpThroughput(bool clear);
void dumpToFile(const std::string& comment);
};
/**
* \brief FileADT wrapper for logging performance statistics.
*/
class FileWithPerformanceLogger : public FileRelay
{
private:
std::shared_ptr<PerformanceLogger> _recorder;
FileWithPerformanceLogger(const FileWithPerformanceLogger&) = delete;
FileWithPerformanceLogger(FileWithPerformanceLogger&&) = delete;
FileWithPerformanceLogger& operator=(const FileWithPerformanceLogger&) = delete;
FileWithPerformanceLogger& operator=(FileWithPerformanceLogger&&) = delete;
public:
explicit FileWithPerformanceLogger(std::shared_ptr<FileADT> relay, std::shared_ptr<std::ostream> outfile, std::int64_t chunksize, int hist_bincount, double hist_min, double hist_max, int interval);
virtual ~FileWithPerformanceLogger();
// Intercept
virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint) override;
virtual void xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint) override;
virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint) override;
virtual void xx_close() override;
virtual std::int64_t xx_eof() const override;
virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
virtual bool xx_iscloud() const override;
public:
void add(const Timer& timer, std::int64_t blocksize);
// Nonvirtual; might be called from destructor.
std::string dumpLatency(bool clear);
std::string dumpThroughput(bool clear);
void dumpToFile(const std::string& comment);
static std::shared_ptr<FileADT> inject(std::shared_ptr<FileADT> file);
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment