Commit cda00813 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Add telemetry for cloud write similar to what exists for local files.

parent e0a0e2ae
......@@ -19,6 +19,7 @@
#include "logger.h"
#include "file_consolidate.h"
#include "file_performance.h"
#include "fancy_timers.h"
#include "../exception.h"
#include "../iocontext.h"
......@@ -205,6 +206,9 @@ private:
std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
std::shared_ptr<SDGenericDatasetWrapper> _dataset;
static LoggerFn _loggerfn;
// As long as we don't inherit FileCommon we need our own timers.
std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
};
SeismicStoreFile::LoggerFn SeismicStoreFile::_loggerfn;
......@@ -265,6 +269,7 @@ class SeismicStoreFileDelayedWrite : public FileADT
std::shared_ptr<SeismicStoreFile> _relay;
std::vector<char> _open_segment;
UsageHint _usage_hint;
std::shared_ptr<SummaryPrintingTimerEx> _ctimer; // Access is thread safe
SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;
......@@ -927,6 +932,9 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
, _mode(mode)
, _config()
{
_rtimer.reset(new SummaryPrintingTimerEx(mode == OpenMode::ReadWrite || mode == OpenMode::Truncate ? "Cloud.reread" : "Cloud.read"));
_wtimer.reset(new SummaryPrintingTimerEx("Cloud.write"));
// TODO-Low a better way of handling this.
// Logger passed in iocontext, then storing it per file.
{
......@@ -1100,11 +1108,13 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us
this->_config->_debug_trace("read", /*need=*/size, /*want=*/size,/*parts*/ split.size(), this->_dataset->info()->allSizes(-1));
for (const RawRequest& it : split) {
// TODO-Low: port _cached_read ?
SimpleTimerEx tt(*this->_rtimer);
this->_dataset->dataset()->readBlock
(static_cast<int>(it.blocknum),
static_cast<char*>(data)+it.outpos,
static_cast<size_t>(it.local_offset),
static_cast<size_t>(it.local_size));
_rtimer->addBytesRead(it.local_size);
}
}
......@@ -1212,11 +1222,13 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
// fancy, keep track of when all the data has need read for each
// of the original requests.
for (const auto& it : work) {
SimpleTimerEx tt(*_rtimer);
this->_dataset->dataset()->readBlock
(static_cast<int>(it.blocknum),
data.get() + it.outpos,
static_cast<size_t>(it.local_offset),
static_cast<size_t>(it.local_size));
_rtimer->addBytesRead(it.local_size);
}
// TODO-Performance, if parallel_ok, can I parallelize only this
......@@ -1260,6 +1272,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
void
SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
SimpleTimerEx tt(*_wtimer);
this->_validate_write(data, offset, size, this->_mode);
this->_dataset->reAuthorizeManager();
// TODO-High: Make sure this is nonvirtual *or* don't subtype.
......@@ -1290,6 +1303,7 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
}
this->_dataset->info()->checkOnWrite(blocknum, size); // Also checks blocknum fits in an int,
this->_dataset->dataset()->writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite);
_wtimer->addBytesWritten(size);
this->_dataset->updateOnWrite(blocknum, size);
if (this->_config->_debug_trace)
this->_config->_debug_trace(offset == current_eof ? "append" : "write",
......@@ -1328,6 +1342,8 @@ SeismicStoreFile::xx_close()
}
_dataset.reset();
_mode = OpenMode::Closed;
_rtimer.reset();
_wtimer.reset();
}
/**
......@@ -1506,6 +1522,7 @@ SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& fi
, _open_segment()
, _usage_hint(UsageHint::Unknown)
{
this->_ctimer.reset(new SummaryPrintingTimerEx("Cloud.readcache"));
this->_relay.reset(new SeismicStoreFile(filename, mode, iocontext));
// TODO-Low: The relayed file already did this.
......@@ -1563,10 +1580,15 @@ SeismicStoreFileDelayedWrite::xx_read(void *data, std::int64_t offset, std::int6
throw OpenZGY::Errors::ZgyUserError("Trying to read past EOF");
if (closed_size > 0)
this->_relay->xx_read(data, offset, closed_size, this->_usage_hint);
if (opened_size > 0)
if (opened_size > 0) {
// Timing of memcpy is not interesting, but the number of calls
// and the total byte count might be.
SimpleTimerEx tt(*this->_ctimer);
memcpy(static_cast<char*>(data) + closed_size,
this->_open_segment.data() + local_offset,
opened_size);
this->_ctimer->addBytesRead(opened_size);
}
}
/**
......@@ -1735,6 +1757,7 @@ void
SeismicStoreFileDelayedWrite::xx_close()
{
this->_flush(true);
this->_ctimer.reset();
this->_relay->xx_close();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment