Commit 3dd67794 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/miscellaneous' into 'master'

Miscellaneous Minor Modifications

See merge request !35
parents 164518f4 92ee8d04
Pipeline #23904 passed with stages
in 5 minutes and 31 seconds
......@@ -409,12 +409,18 @@ public:
~ZgyReader()
{
try {
close();
//close(); // See close() for why this may be a bad idea.
_accessor.reset();
_fd.reset();
}
catch (const std::exception& ex) {
// We should never get here!
// Caller should have done an explicit close() so it can handle
// exceptions itself. Exceptions thrown from a destructors are evil.
// Trying to catch exceptions from the two lines above might already
// be too late. The destructor in _fd does a similar operation
// (blind catch with logging) which makes it even less likely
// thet we get here.
std::cerr << "ERROR closing a file opened for read: "
<< (ex.what() ? ex.what() : "(null)")
<< std::endl;
......@@ -427,6 +433,8 @@ public:
* The data is read into a buffer provided by the caller.
* The method will apply conversion storage -> float if needed.
*
* Data is ordered inline(slowest), crossline, vertical(fastest).
*
* The start position refers to the specified lod level.
* At lod 0 start + data.size can be up to the survey size.
* At lod 1 the maximum is just half that, rounded up.
......@@ -512,10 +520,22 @@ public:
/**
* \brief Close the file and release resources.
*
* The %ZgyReader destructor will call %close() if not done already,
* catching and swallowing any exception. Unlike ZgyWriter::close()
* forgetting to close a file that was only open for read is not a
* major faux pas. It is still recommended to explicitly close, though.
* Unlike ZgyWriter::close(), forgetting to close a file that was
* only open for read is not a major faux pas. It can still lead to
* problems though.
*
* \li The destructor of _fd will catch and ignore any exceptions
* because if a destructor throws then this will normally
* cause the application to crash.
*
* \li If a callback is used to refresh the token this will not
* happen in our destructor (it won't call xx_close) because
* it is too risky to invoke the callback this late. It might
* not be valid any longer. This means that if the token has
* expired since the last read then the close will fail.
* Exactly why SDAPI requires a token just to close a file is
* a different question. Possibly this is for removing any
* read locks.
*/
void close()
{
......@@ -619,6 +639,8 @@ public:
*
* This will apply conversion float -> storage if needed.
*
* Data is ordered inline(slowest), crossline, vertical(fastest).
*
* A read/modify/write will be done if the region's start and size
* doesn't align with bricksize. When writing to the cloud this
* read/modify/write may incur performance and size penalties. So
......@@ -767,6 +789,7 @@ public:
bool force = false)
{
if (this->_dirty || force) {
InternalZGY::DataBuffer::cleartimers(true);
this->_dirty = false;
std::shared_ptr<InternalZGY::StatisticData> stats;
std::shared_ptr<InternalZGY::HistogramData> histo;
......@@ -859,6 +882,12 @@ public:
this->_fd->xx_close();
this->_fd.reset();
// Kludge for performance measurements,
// The timers in DataBuffer are global. For some experiments
// it makes sense to print and reset then just before finalize
// and also after closing each file.
InternalZGY::DataBuffer::cleartimers(true);
// If errorflag() is set and the file is new or has been
// successfully written to at least once then the client code is
// strongly advised to delete the file.
......
......@@ -1733,7 +1733,7 @@ ZgyInternalBulk::writeRegion(
// TODO-Performance: Multi-threading of _scaleDataToStorage().
// Need to figure out the cutoff where the buffer is too small and the
// overhead of OpenMP starts getting in the way.
SimpleTimer t1(*_ststimer);
SimpleTimer t1(*_ststimer, timers_on());
data = _scaleDataToStorage(data);
}
......
......@@ -77,6 +77,9 @@ namespace {
* If I need a report each time a DataBuffer goes out of scope
* it would be different.
*
* For slightly more advanced usage, a global cleartimers() can
* be called to output all collected details at that point.
*
* \details Thread safety: Safe because SummaryPrintingTimer is
* thread safe when used correctly.
*/
......@@ -108,6 +111,30 @@ namespace {
static AdHocTimers instance_;
return instance_;
}
void cleartimers(bool show) {
if (show) {
copysubset.print();
copyscalar.print();
scaletos.print();
scaletof.print();
clone.print();
range.print();
fill.print();
allsame.print();
ctor.print();
}
else {
copysubset.reset();
copyscalar.reset();
scaletos.reset();
scaletof.reset();
clone.reset();
range.reset();
fill.reset();
allsame.reset();
ctor.reset();
}
}
};
}
......@@ -1171,6 +1198,15 @@ DataBufferNd<T,NDim>::slice1(int dim, std::int64_t start, std::int64_t size) con
return slice(neworig, newsize);
}
/**
* For ad-hoc performance measurements.
*/
void
DataBuffer::cleartimers(bool show)
{
AdHocTimers::instance().cleartimers(show);
}
// Explicit template instanciation.
OPENZGY_IMPLEMENT_EXPLICIT_TEMPLATE(DataBufferNd<std::int8_t,3>)
OPENZGY_IMPLEMENT_EXPLICIT_TEMPLATE(DataBufferNd<std::uint8_t,3>)
......
......@@ -254,6 +254,7 @@ public:
virtual void check_cstride() const = 0;
virtual std::shared_ptr<DataBuffer> slice1(int dim, std::int64_t start, std::int64_t size) const = 0;
static std::shared_ptr<DataBuffer> makeDataBuffer3d(void *raw, std::int64_t nbytes, const std::array<std::int64_t,3>& size, RawDataType dtype);
static void cleartimers(bool show);
};
/**
......
......@@ -34,6 +34,7 @@
#include <errno.h>
#include <unistd.h>
#include <mutex>
#include <atomic>
using OpenZGY::IOContext;
namespace InternalZGY {
......@@ -56,6 +57,60 @@ namespace {
static int enable = Environment::getNumericEnv("OPENZGY_TIMERS", 0);
return enable > 0;
}
/**
* \brief SummaryTimer that prints its result when going out of scope.
* \details This is a private extension that also knows about total
* number of bytes transfered, so it can report the actuar throughput.
*/
class OPENZGY_TEST_API SummaryPrintingTimerEx : public SummaryPrintingTimer
{
std::atomic<std::int64_t> bytes_read_;
std::atomic<std::int64_t> bytes_written_;
public:
explicit SummaryPrintingTimerEx(const char *name, bool csv = false)
: SummaryPrintingTimer(name, csv)
, bytes_read_(0)
, bytes_written_(0)
{
}
virtual ~SummaryPrintingTimerEx() {
print();
}
static std::string niceNumber(const std::string& label, std::int64_t n) {
if (n > 10*1024*1024)
return label + std::to_string(n/(1024*1024)) + " MB";
else if (n > 10*1024)
return label + std::to_string(n/1024) + " kB";
else if (n != 0)
return label + std::to_string(n) + " bytes";
else
return std::string();
}
virtual void print() {
if (getCount() != 0) {
std::string msg(csv_ ? getCSV() : getValue(true, true));
if (!msg.empty() && msg.back() == '\n')
msg = msg.substr(0, msg.size()-1);
if (csv_)
std::cerr << msg
<< "," << bytes_read_.load()
<< "," << bytes_written_.load()
<< std::endl;
else
std::cerr << msg
<< niceNumber(", R: ", bytes_read_.load())
<< niceNumber(", W: ", bytes_written_.load())
<< std::endl;
}
reset(); // Prevent the base class from printing as well.
}
void addBytesRead(std::int64_t nbytes) {
bytes_read_.fetch_add(nbytes);
}
void addBytesWritten(std::int64_t nbytes) {
bytes_written_.fetch_add(nbytes);
}
};
}
/**
......@@ -106,6 +161,11 @@ LocalFileLinux::LocalFileLinux(const std::string& filename, OpenMode mode, const
, _fd(-1)
, _mutex()
{
// Use the SummaryPrintingTimer extension that keeps track of bytes done.
// The instance allocated in the base class constructor is simply dropped.
_rtimer.reset(new SummaryPrintingTimerEx("File::read"));
_wtimer.reset(new SummaryPrintingTimerEx("File::write"));
switch (mode) {
case OpenMode::ReadOnly:
_fd = ::open(filename.c_str(), O_RDONLY, 0666);
......@@ -229,6 +289,7 @@ LocalFileLinux::xx_read(void *data, std::int64_t offset, std::int64_t size, Usag
SimpleTimer tt(*_rtimer, timers_on());
_validate_read(data, offset, size, xx_eof(), _mode);
ssize_t nbytes = ::pread(_fd, data, size, offset);
static_cast<SummaryPrintingTimerEx*>(_rtimer.get())->addBytesRead(nbytes);
_check_short_read(offset, size, nbytes);
}
......@@ -267,6 +328,7 @@ LocalFileLinux::xx_write(const void* data, std::int64_t offset, std::int64_t siz
ssize_t nbytes = ::pwrite(_fd, data, size, offset);
if (nbytes < 0)
throw OpenZGY::Errors::ZgyIoError(_name, errno);
static_cast<SummaryPrintingTimerEx*>(_wtimer.get())->addBytesWritten(nbytes);
std::lock_guard<std::mutex> lk(_mutex); // protect _eof
_eof = std::max(_eof, offset + nbytes);
if (nbytes != size)
......
......@@ -109,6 +109,7 @@ FileWithPerformanceLogger::xx_close()
void
FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
{
std::unique_lock<std::mutex> lk(_mutex);
// Optional periodic reporting.
const double now = timer.getLastStop() / timer.getFrequency();
if (_sumtimerbeg < _sumtimerend &&
......@@ -116,7 +117,9 @@ FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
now >= _sumtimerbeg + _suminterval)
{
// The timer passed in belongs to the next interval.
lk.unlock();
std::string msg = dumpThroughput(true);
lk.lock();
if (!msg.empty())
std::cout << msg << std::flush;
// Might also have reported and cleared the latency log.
......@@ -142,7 +145,6 @@ FileWithPerformanceLogger::add(const Timer& timer, std::int64_t blocksize)
const int ibin = (fbin < 0 ? 0 :
fbin > (nbuckets-1) ? nbuckets-1 :
std::floor(fbin+0.5));
std::lock_guard<std::mutex> lk(_mutex);
_histbins[ibin] += 1;
_nsamples += 1;
_statsum += value;
......
......@@ -677,6 +677,10 @@ class SDGenericDatasetWrapper
OpenMode disposition_;
bool virgin_; // If true, the cached CTag should be ok.
mutable std::mutex mutex_; // Protect all members.
std::string saved_token_; // To avoid setting it again.
std::string saved_tokentype_;
OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
std::string tokenrefreshtype_;
public:
typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
......@@ -684,6 +688,8 @@ public:
OpenMode disp)
: manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
, mutex_()
, saved_token_(), saved_tokentype_()
, tokenrefresh_(), tokenrefreshtype_()
{
}
~SDGenericDatasetWrapper();
......@@ -768,6 +774,120 @@ public:
virgin_ = false;
return old != virgin_;
}
/**
* Pass credentials down to the SDAPI layer.
*
* Thread safety: Caller is responsible for preventing concurrent calls
* to update the same manager. CAVEAT: If the manager is cached and
* shared between open files then this raises some challenges.
*/
static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
{
// TODO-Low: handle impersonation better. SDAPI wants us to tell
// whether we are passing an impersonation token. If so, it will
// attempt to refresh it if expired. The problem is that looking
// at the token (e.g. one provided by an environment variable)
// to decide which type it is can be ridiculously complicated.
// See authorizeManager() in the old code. Also, it would be nice
// to be able to read back the refreshed token.
if (tokentype == "sdtoken")
manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
else if (tokentype == "file")
manager->setAuthProviderFromFile(token);
else if (token.substr(0, 5) == "FILE:")
manager->setAuthProviderFromFile(token.substr(5));
#else
if (tokentype == "file" || token.substr(0, 5) == "FILE:")
throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
else if (tokentype == "imptoken")
manager->setAuthProviderFromImpToken(token);
else if (token.substr(0, 9) == "imptoken:")
manager->setAuthProviderFromImpToken(token.substr(9));
else
manager->setAuthProviderFromString(token);
}
/**
* Pass initial credentials down to the SDAPI layer.
*
* Thread safety: This is meant to be called from the constructor
* when opening a file. So there should not be any race condition.
* CAVEAT: If the manager is cached and shared between open files
* then this raises some challenges.
*/
void authorizeManager(
const std::string& token,
const std::string& tokentype,
const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
const std::string& tokencbtype)
{
std::lock_guard<std::mutex> lk(mutex_);
// Save the refresh callback, if any, for use in reAuthorizeManager().
tokenrefresh_ = tokencb;
tokenrefreshtype_ = tokencbtype;
// Obtain the token to use, preferring the callback. Currently the
// type of token the callback returns is specified when the callback
// is registered. Not when it is invoked. Change that if needed.
std::string newtoken = tokencb ? tokencb() : token;
std::string newtokentype = tokencb ? tokencbtype : tokentype;
authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
// Save what we just set so reAuthorizeManager() won't need to set
// the exact same token again. Don't set if there was an exception
// because then we can try again later. The saved token is NOT
// used for credentials. So it it is possible to store a hash
// instead as long as the risk of collision is negligible.
saved_token_ = newtoken;
saved_tokentype_ = newtokentype;
}
/**
* Pass updated credentials down to the SDAPI layer if needed.
* Needs to be called before any operation that needs credentials.
*
* Thread safety: *this is protected by a lock.
* The lock is temporarily dropped before invoking the refresh callback.
* This means the application provided callback MUST BE THREADSAFE.
* It also means there is technically a race condition here, where a
* particular read operation uses credentials that are a few milliseconds
* out of date.
*
* Alternatively the code here could place a lock are require that the
* callback doesn't do something that couuld cause a deadlock. Not sure
* if that option would be preferable.
*/
void reAuthorizeManager()
{
std::unique_lock<std::mutex> lk(mutex_);
if (tokenrefresh_) {
auto tokenrefresh = this->tokenrefresh_;
auto tokenrefreshtype = this->tokenrefreshtype_;
std::string newtoken;
std::string newtokentype;
// By design, no locks should be held when the callback is invoked
// to avoid the risk of deadlocks. This means that the callback
// itself must be threadsafe.
lk.unlock();
newtoken = tokenrefresh();
newtokentype = tokenrefreshtype;
lk.lock();
if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
// In case of exception, always allow trying again.
saved_token_ = std::string();
saved_tokentype_ = std::string();
authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
saved_token_ = newtoken;
saved_tokentype_ = newtokentype;
SeismicStoreFile::_logger(1, "A new token was provided");
}
}
}
};
SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
......@@ -851,30 +971,12 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// TODO-Low: Cache the manager and possibly the SDUtils instance.
// TODO-Low: handle impersonation better. SDAPI wants us to tell
// whether we are passing an impersonation token. If so, it will
// attempt to refresh it if expired. The problem is that looking
// at the token (e.g. one provided by an environment variable)
// to decide which type it is can be ridiculously complicated.
// See authorizeManager() in the old code. Also, it would be nice
// to be able to read back the refreshed token.
if (context->_sdtokentype == "sdtoken")
manager->setAuthProviderFromString(context->_sdtoken);
#ifdef HAVE_SAUTH_EXTENSIONS
else if (context->_sdtokentype == "file")
manager->setAuthProviderFromFile(context->_sdtoken);
else if (context->_sdtoken.substr(0, 5) == "FILE:")
manager->setAuthProviderFromFile(context->_sdtoken.substr(5));
#else
if (context->_sdtokentype == "file" || context->_sdtoken.substr(0, 5) == "FILE:")
throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
else if (context->_sdtokentype == "imptoken")
manager->setAuthProviderFromImpToken(context->_sdtoken);
else if (context->_sdtoken.substr(0, 9) == "imptoken:")
manager->setAuthProviderFromImpToken(context->_sdtoken.substr(9));
else
manager->setAuthProviderFromString(context->_sdtoken);
auto datasetwrapper = std::make_shared<SDGenericDatasetWrapper>
(manager, dataset, mode);
datasetwrapper->authorizeManager
(context->_sdtoken, context->_sdtokentype,
context->_sdtokencb, context->_sdtoken_cbtype);
try {
switch (mode) {
......@@ -901,8 +1003,7 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// TODO-Low: If not, should other SDAPI errors be translated?
}
this->_dataset = std::make_shared<SDGenericDatasetWrapper>
(manager, dataset, mode);
this->_dataset = datasetwrapper;
// Removed this because it causes info() to be populated early,
// thereby negating the benefit of lazy evaluation. And, worse,
// causing different behavior when debugging is on.
......@@ -912,17 +1013,25 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
SeismicStoreFile::~SeismicStoreFile()
{
if (_mode != OpenMode::Closed) {
// The calling layer is supposed to do an explicit xx_close() so it
// can catch and handle exceptions, and so we can be sure the token
// callback, if used, is still valid. Do *not* try to re-authorize
// the manager. It might not be safe to invoke the callback any
// more. And do a blind catch of any exception because if we don't
// the application will crash.
if (_mode != OpenMode::Closed && _dataset && _dataset->dataset()) {
try {
xx_close();
_dataset->dataset()->close();
}
catch (const std::exception& ex) {
// The calling layer is supposed to do an explicit xx_close()
// so it can catch and handle exceptions. This blind catch is
// just a desperate attempt to avoid an application crash.
_logger(0, "EXCEPTION closing file: " + std::string(ex.what()));
}
catch (...) {
_logger(0, "EXCEPTION closing file.");
}
}
_dataset.reset();
_mode = OpenMode::Closed;
}
/**
......@@ -984,6 +1093,7 @@ void
SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
this->_validate_read(data, offset, size, this->xx_eof(), this->_mode);
this->_dataset->reAuthorizeManager();
ReadRequest request(offset, size, nullptr);
RawList split = this->_split_by_segment(ReadList{request});
if (this->_config->_debug_trace)
......@@ -1051,6 +1161,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // exclude open segment
_validate_readv(requests, current_eof, this->_mode);
this->_dataset->reAuthorizeManager();
// For debugging / logging only
const std::int64_t asked =
......@@ -1112,6 +1223,7 @@ void
SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
this->_validate_write(data, offset, size, this->_mode);
this->_dataset->reAuthorizeManager();
// TODO-High: Make sure this is nonvirtual *or* don't subtype.
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // nonvirtual
if (_logger(5, ""))
......@@ -1169,8 +1281,10 @@ SeismicStoreFile::xx_close()
case OpenMode::ReadOnly:
case OpenMode::ReadWrite:
case OpenMode::Truncate:
if (_dataset->dataset())
if (_dataset->dataset()) {
this->_dataset->reAuthorizeManager();
_dataset->dataset()->close();
}
break;
}
}
......@@ -1365,17 +1479,17 @@ SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& fi
SeismicStoreFileDelayedWrite::~SeismicStoreFileDelayedWrite()
{
// The SeismicStoreFile destructor will also try to close,
// but at that point we have already done the deed. Harmless.
try {
xx_close();
this->_flush(true);
}
catch (const std::exception& ex) {
// The calling layer is supposed to do an explicit xx_close()
// so it can catch and handle exceptions. This blind catch is
// just a desperate attempt to avoid an application crash.
_relay->_logger(0, "EXCEPTION closing file: " + std::string(ex.what()));
_relay->_logger(0, "EXCEPTION flushing file: " + std::string(ex.what()));
}
// Note: The dataset itself will be closed in _relay's destructor.
// That should happen very shortly.
}
std::shared_ptr<FileADT>
......
......@@ -472,9 +472,16 @@ SummaryTimer::getValue(bool details, bool msonly) const
const char*
SummaryTimer::getCSV() const
{
// Avoid odd characters in name to make it easier for spreadsheets.
char *safename = new char[strlen(getName())+1];
char *dst = safename;
const char *src = getName(); // Instance member, remains valid long enough.
for (; *src; ++src, ++dst)
*dst = isalnum(*src) ? *src : '_';
*dst = '\0';
sprintf_s(pimpl_->buff_, sizeof(pimpl_->buff_),
"TIMER,\"%s\",%d,%.3lf,%.3lf\n",
getName(), getCount(), getTotal(), getAdjusted());
safename, getCount(), getTotal(), getAdjusted());
return pimpl_->buff_;
}
......@@ -521,8 +528,9 @@ SummaryTimer::add(const Timer& t)
/* SummaryPrintingTimer ===============================================*/
/*=========================================================================*/
SummaryPrintingTimer::SummaryPrintingTimer(const char *name)
SummaryPrintingTimer::SummaryPrintingTimer(const char *name, bool csv)
: SummaryTimer(name)
, csv_(csv)
{
}
......@@ -535,8 +543,7 @@ void
SummaryPrintingTimer::print()
{
if (getCount() != 0) {
const char *msg = getValue(true, true);
//const char *msg = getCSV();
const char *msg = csv_ ? getCSV() : getValue(true, true);
fwrite(msg, 1, strlen(msg), stderr);
}
reset();
......
......@@ -194,8 +194,10 @@ public:
*/
class OPENZGY_TEST_API SummaryPrintingTimer : public SummaryTimer
{
protected:
bool csv_;
public:
explicit SummaryPrintingTimer(const char *name);
explicit SummaryPrintingTimer(const char *name, bool csv = false);
virtual ~SummaryPrintingTimer();
virtual void print();
};
......
......@@ -171,6 +171,16 @@ public: