Commit e358019c authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Propagate the per-instance logger also down to the cloud layer.

parent c337b827
......@@ -190,8 +190,8 @@ public:
// TODO-Low per-instance logging. This is tedious to implement
// because many of the helper classes will need to hold a logger
// instance as well, or need the logger passed in each call.
static bool _logger(int priority, const std::string& message = std::string());
static bool _logger(int priority, const std::ios& ss);
bool _logger(int priority, const std::string& message = std::string()) const;
bool _logger(int priority, const std::ios& ss) const;
private:
/**
* This class is used by _split_by_segment to describe a request as seen by
......@@ -229,14 +229,12 @@ private:
// The downside is that it gets more tedious to maintain.
std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
std::shared_ptr<SDGenericDatasetWrapper> _dataset;
static LoggerFn _loggerfn;
LoggerFn _loggerfn;
// As long as we don't inherit FileCommon we need our own timers.
std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
};
SeismicStoreFile::LoggerFn SeismicStoreFile::_loggerfn;
/**
* Improve on SeismicStoreFile, have it buffer large chunks of data before
* writing it out to a new segment.
......@@ -360,9 +358,12 @@ private:
// Will be zero if there is just one block.
std::int64_t last_block_size_;
// For debugging
SeismicStoreFile::LoggerFn loggerfn_;
public:
DatasetInformation();
explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd);
explicit DatasetInformation(const SeismicStoreFile::LoggerFn& logger);
explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger);
std::string toString() const;
public:
......@@ -379,11 +380,12 @@ public:
void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};
DatasetInformation::DatasetInformation()
DatasetInformation::DatasetInformation(const SeismicStoreFile::LoggerFn& logger)
: block_count_(0)
, block0_size_(0)
, block1_size_(0)
, last_block_size_(0)
, loggerfn_(logger ? logger : LoggerBase::emptyCallback())
{
}
......@@ -391,11 +393,12 @@ DatasetInformation::DatasetInformation()
* Create and poplulate an instance.
* Catching and translating exceptions from SDAPI is done by caller.
*/
DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd)
DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger)
: block_count_(0)
, block0_size_(0)
, block1_size_(0)
, last_block_size_(0)
, loggerfn_(logger ? logger : LoggerBase::emptyCallback())
{
// Note that sdapi is a bit confusing with respect to signed/unsigned.
// getBlockNum() returns an unsigned (uint64_t).
......@@ -446,9 +449,10 @@ DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd)
// But the problem *could* be that the block sizes of all the middle blocks
// is not the same. Which would be a real problem.
if (nbytes >= 0 && nbytes != (long long)totalSize())
SeismicStoreFile::_logger(0, "Dataset has inconsistent size");
this->loggerfn_(0, "Dataset has inconsistent size");
SeismicStoreFile::_logger(1, toString());
if (this->loggerfn_(1, ""))
this->loggerfn_(1, toString());
}
std::string
......@@ -619,7 +623,7 @@ DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::
else if (size > std::numeric_limits<std::int64_t>::max() - offset)
throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
else if (offset + size > totalSize()) {
if (SeismicStoreFile::_logger(1)) {
if (this->loggerfn_(1, "")) {
std::stringstream ss;
ss << "Reading past EOF: read("
<< "off=" << offset
......@@ -627,7 +631,7 @@ DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::
<< ", end=" << offset+size
<< ") dataset: " << toString()
<< std::endl;
SeismicStoreFile::_logger(1, ss.str());
this->loggerfn_(1, ss.str());
}
throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
}
......@@ -716,6 +720,7 @@ class SDGenericDatasetWrapper
std::shared_ptr<const DatasetInformation> info_;
OpenMode disposition_;
bool virgin_; // If true, the cached CTag should be ok.
SeismicStoreFile::LoggerFn loggerfn_;
mutable std::mutex mutex_; // Protect all members.
std::string saved_token_; // To avoid setting it again.
std::string saved_tokentype_;
......@@ -727,8 +732,10 @@ public:
typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
OpenMode disp)
OpenMode disp,
const SeismicStoreFile::LoggerFn& logger)
: manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
, loggerfn_(logger ? logger : LoggerBase::emptyCallback())
, mutex_()
, saved_token_(), saved_tokentype_()
, tokenrefresh_(), tokenrefreshtype_()
......@@ -755,11 +762,11 @@ public:
try {
switch (disposition_) {
case OpenMode::Truncate:
info_.reset(new DatasetInformation());
info_.reset(new DatasetInformation(loggerfn_));
break;
case OpenMode::ReadOnly:
case OpenMode::ReadWrite:
info_.reset(new DatasetInformation(dataset_.get()));
info_.reset(new DatasetInformation(dataset_.get(), loggerfn_));
break;
case OpenMode::Closed:
default:
......@@ -849,10 +856,12 @@ public:
* it will be re-thrown without modification.
*/
void throwCloudException(const std::exception& ex, const char *message) {
SeismicStoreFile::_logger(1, std::stringstream()
<< "Oops (" << message << ")"
<< " (" << tokenmessage_ << "): "
<< ex.what());
if (this->loggerfn_(1, "")) {
std::stringstream ss;
ss << "Oops (" << message << ")" << " ("
<< tokenmessage_ << "): " << ex.what();
this->loggerfn_(1, ss.str());
}
if (dynamic_cast<const OpenZGY::Errors::ZgyError*>(&ex))
throw;
else if (!tokenmessage_.empty())
......@@ -989,9 +998,9 @@ public:
saved_token_ = newtoken;
saved_tokentype_ = newtokentype;
if (newtoken.empty())
SeismicStoreFile::_logger(1, "The token was cleared");
this->loggerfn_(1, "The token was cleared");
else
SeismicStoreFile::_logger(1, "A new token was provided");
this->loggerfn_(1, "A new token was provided");
}
}
}
......@@ -1019,13 +1028,14 @@ public:
policy.maxWaitingTimeMicroSec = 32 * 1000 * 1000;
}
dataset_->setExponentialRetryBackoffPolicy(&policy);
if (SeismicStoreFile::_logger(2, ""))
SeismicStoreFile::_logger
(2, std::stringstream()
<< "Backoff " << (policy.enabled ? "enabled" : "disabled")
if (this->loggerfn_(2, "")) {
std::stringstream ss;
ss << "Backoff " << (policy.enabled ? "enabled" : "disabled")
<< " retries " << policy.maxRetry
<< " start " << (float)policy.initialWaitingTimeMicroSec*1.0e-6
<< " max " << (float)policy.maxWaitingTimeMicroSec*1.0e-6);
<< " max " << (float)policy.maxWaitingTimeMicroSec*1.0e-6;
this->loggerfn_(2, ss.str());
}
}
}
};
......@@ -1042,14 +1052,14 @@ SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
}
catch (const std::exception& ex) {
if (std::string(ex.what()).find("dataset is not open") == std::string::npos)
SeismicStoreFile::_logger(0, "SDGenericDataset::close(): " + std::string(ex.what()));
this->loggerfn_(0, "SDGenericDataset::close(): " + std::string(ex.what()));
}
try {
// Catch exceptions raised inside a destructor might not work. But I tried.
dataset_.reset();
}
catch (const std::exception& ex) {
SeismicStoreFile::_logger(0, "SDGenericDataset::dtor(): " + std::string(ex.what()));
this->loggerfn_(0, "SDGenericDataset::dtor(): " + std::string(ex.what()));
}
manager_.reset();
}
......@@ -1062,31 +1072,29 @@ FileUtilsSeismicStore::~FileUtilsSeismicStore()
{
}
extern "C" void BREAK()
{
std::cerr << "\nBROKE\n" << std::endl;
}
SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, const IOContext *iocontext)
: FileUtilsSeismicStore()
, _config()
{
auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
_loggerfn = ((context && context->_logger) ? context->_logger :
LoggerBase::standardCallback
(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"),
"openzgy-cloud: ", ""));
if (!context)
throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));
_rtimer.reset(new SummaryPrintingTimerEx(mode == OpenMode::ReadWrite || mode == OpenMode::Truncate ? "Cloud.reread" : "Cloud.read"));
_wtimer.reset(new SummaryPrintingTimerEx("Cloud.write"));
// TODO-Low a better way of handling this.
// Logger passed in iocontext, then storing it per file.
{
// Must protect against double init because _loggerfn is global.
static std::mutex mutex;
std::lock_guard<std::mutex> lk(mutex);
if (!_loggerfn)
_loggerfn = LoggerBase::standardCallback(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"), "openzgy-cloud: ", "");
}
_logger(3, std::stringstream() << "SeismicStoreFile("
<< "\"" << filename << "\", " << int(mode) << ", *)\n");
auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
if (!context)
throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));
std::unordered_map<std::string, std::string> extra;
using seismicdrive::api::json::Constants;
if (!context->_legaltag.empty())
......@@ -1116,7 +1124,7 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// TODO-Low: Cache the manager and possibly the SDUtils instance.
auto datasetwrapper = std::make_shared<SDGenericDatasetWrapper>
(manager, dataset, mode);
(manager, dataset, mode, _loggerfn);
datasetwrapper->authorizeManager
(context->_sdtoken, context->_sdtokentype,
......@@ -1184,7 +1192,7 @@ SeismicStoreFile::~SeismicStoreFile()
* while the very first instance is being constructed.
*/
bool
SeismicStoreFile::_logger(int priority, const std::string& message)
SeismicStoreFile::_logger(int priority, const std::string& message) const
{
return _loggerfn(priority, message);
}
......@@ -1203,7 +1211,7 @@ SeismicStoreFile::_logger(int priority, const std::string& message)
* while the very first instance is being constructed.
*/
bool
SeismicStoreFile::_logger(int priority, const std::ios& ss)
SeismicStoreFile::_logger(int priority, const std::ios& ss) const
{
auto sstream = dynamic_cast<const std::stringstream*>(&ss);
return _logger(priority, sstream ? sstream->str() : std::string());
......@@ -1733,9 +1741,9 @@ SeismicStoreFile::deleteFile(const std::string& filename, bool missing_ok) const
}
catch (const std::exception& ex) {
if (std::string(ex.what()).find("does not exist") != std::string::npos) {
SeismicStoreFile::_logger(1, std::stringstream()
<< "Deleting already deleted"
<< " \"" << filename << "\"\n");
this->_logger(1, std::stringstream()
<< "Deleting already deleted"
<< " \"" << filename << "\"\n");
if (!missing_ok)
_dataset->throwCloudException(ex, "Delete");
}
......
......@@ -18,6 +18,7 @@
#include "test_utils.h"
#include "test_all.h"
#include "../impl/environment.h"
#include "../impl/logger.h"
#include <sstream>
#include <chrono>
......@@ -207,14 +208,21 @@ CloudFileAutoDelete::remove(const std::string& name, const OpenZGY::SeismicStore
if (ctx) {
// Using the low level delete functionality;
// I could also have used OpenZGY::IZgyUtils.
// The low level code needs a logger attached here,
// unless I am ok with the default.
OpenZGY::SeismicStoreIOContext localctx(*ctx);
localctx.logger
(InternalZGY::LoggerBase::standardCallback
(InternalZGY::LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"),
"openzgy-testutils: ", ""));
std::shared_ptr<InternalZGY::FileADT> fd = InternalZGY::FileFactory::instance().create
(name, InternalZGY::OpenMode::Closed, ctx);
(name, InternalZGY::OpenMode::Closed, &localctx);
auto fd_sd = dynamic_cast<InternalZGY::FileUtilsSeismicStore*>(fd.get());
if (fd_sd) {
fd_sd->deleteFile(name, /*missing_ok=*/true);
}
else {
throw OpenZGY::Errors::ZgyInternalError("CloudFileAutoDelete::remove founbd no FileUtils");
throw OpenZGY::Errors::ZgyInternalError("CloudFileAutoDelete::remove found no FileUtils");
}
}
else {
......@@ -281,6 +289,12 @@ default_sd_context()
using InternalZGY::Environment;
static OpenZGY::SeismicStoreIOContext instance =
OpenZGY::SeismicStoreIOContext()
// Enable to prove that logging is configurable, for seismic storeat least.
#if 0
.logger(InternalZGY::LoggerBase::standardCallback
(InternalZGY::LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"),
"openzgy-unittest: ", ""))
#endif
.sdurl(Environment::getStringEnv("OPENZGY_SDURL"))
.sdapikey(Environment::getStringEnv("OPENZGY_SDAPIKEY"))
.sdtoken(Environment::getStringEnv("OPENZGY_TOKEN"), "");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment