Commit 09d74bb9 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Honor the token refresh callback.

parent 4cc80a53
...@@ -409,12 +409,18 @@ public: ...@@ -409,12 +409,18 @@ public:
~ZgyReader() ~ZgyReader()
{ {
try { try {
close(); //close(); // See close() for why this may be a bad idea.
_accessor.reset();
_fd.reset();
} }
catch (const std::exception& ex) { catch (const std::exception& ex) {
// We should never get here! // We should never get here!
// Caller should have done an explicit close() so it can handle // Caller should have done an explicit close() so it can handle
// exceptions itself. Exceptions thrown from a destructors are evil. // exceptions itself. Exceptions thrown from a destructors are evil.
// Trying to catch exceptions from the two lines above might already
// be too late. The destructor in _fd does a similar operation
// (blind catch with logging) which makes it even less likely
// thet we get here.
std::cerr << "ERROR closing a file opened for read: " std::cerr << "ERROR closing a file opened for read: "
<< (ex.what() ? ex.what() : "(null)") << (ex.what() ? ex.what() : "(null)")
<< std::endl; << std::endl;
...@@ -512,10 +518,22 @@ public: ...@@ -512,10 +518,22 @@ public:
/** /**
* \brief Close the file and release resources. * \brief Close the file and release resources.
* *
* The %ZgyReader destructor will call %close() if not done already, * Unlike ZgyWriter::close(), forgetting to close a file that was
* catching and swallowing any exception. Unlike ZgyWriter::close() * only open for read is not a major faux pas. It can still lead to
* forgetting to close a file that was only open for read is not a * problems though.
* major faux pas. It is still recommended to explicitly close, though. *
* \li The destructor of _fd will catch and ignore any exceptions
* because if a destructor throws then this will normally
* cause the application to crash.
*
* \li If a callback is used to refresh the token this will not
* happen in our destructor (it won't call xx_close) because
* it is too risky to invoke the callback this late. It might
* not be valid any longer. This means that if the token has
* expired since the last read then the close will fail.
* Exactly why SDAPI requires a token just to close a file is
* a different question. Possibly this is for removing any
* read locks.
*/ */
void close() void close()
{ {
......
...@@ -677,6 +677,10 @@ class SDGenericDatasetWrapper ...@@ -677,6 +677,10 @@ class SDGenericDatasetWrapper
OpenMode disposition_; OpenMode disposition_;
bool virgin_; // If true, the cached CTag should be ok. bool virgin_; // If true, the cached CTag should be ok.
mutable std::mutex mutex_; // Protect all members. mutable std::mutex mutex_; // Protect all members.
std::string saved_token_; // To avoid setting it again.
std::string saved_tokentype_;
OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
std::string tokenrefreshtype_;
public: public:
typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr; typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager, SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
...@@ -684,6 +688,8 @@ public: ...@@ -684,6 +688,8 @@ public:
OpenMode disp) OpenMode disp)
: manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true) : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
, mutex_() , mutex_()
, saved_token_(), saved_tokentype_()
, tokenrefresh_(), tokenrefreshtype_()
{ {
} }
~SDGenericDatasetWrapper(); ~SDGenericDatasetWrapper();
...@@ -768,6 +774,120 @@ public: ...@@ -768,6 +774,120 @@ public:
virgin_ = false; virgin_ = false;
return old != virgin_; return old != virgin_;
} }
/**
* Pass credentials down to the SDAPI layer.
*
* Thread safety: Caller is responsible for preventing concurrent calls
* to update the same manager. CAVEAT: If the manager is cached and
* shared between open files then this raises some challenges.
*/
static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
{
// TODO-Low: handle impersonation better. SDAPI wants us to tell
// whether we are passing an impersonation token. If so, it will
// attempt to refresh it if expired. The problem is that looking
// at the token (e.g. one provided by an environment variable)
// to decide which type it is can be ridiculously complicated.
// See authorizeManager() in the old code. Also, it would be nice
// to be able to read back the refreshed token.
if (tokentype == "sdtoken")
manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
else if (tokentype == "file")
manager->setAuthProviderFromFile(token);
else if (token.substr(0, 5) == "FILE:")
manager->setAuthProviderFromFile(token.substr(5));
#else
if (tokentype == "file" || token.substr(0, 5) == "FILE:")
throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
else if (tokentype == "imptoken")
manager->setAuthProviderFromImpToken(token);
else if (token.substr(0, 9) == "imptoken:")
manager->setAuthProviderFromImpToken(token.substr(9));
else
manager->setAuthProviderFromString(token);
}
/**
* Pass initial credentials down to the SDAPI layer.
*
* Thread safety: This is meant to be called from the constructor
* when opening a file. So there should not be any race condition.
* CAVEAT: If the manager is cached and shared between open files
* then this raises some challenges.
*/
void authorizeManager(
const std::string& token,
const std::string& tokentype,
const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
const std::string& tokencbtype)
{
std::lock_guard<std::mutex> lk(mutex_);
// Save the refresh callback, if any, for use in reAuthorizeManager().
tokenrefresh_ = tokencb;
tokenrefreshtype_ = tokencbtype;
// Obtain the token to use, preferring the callback. Currently the
// type of token the callback returns is specified when the callback
// is registered. Not when it is invoked. Change that if needed.
std::string newtoken = tokencb ? tokencb() : token;
std::string newtokentype = tokencb ? tokencbtype : tokentype;
authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
// Save what we just set so reAuthorizeManager() won't need to set
// the exact same token again. Don't set if there was an exception
// because then we can try again later. The saved token is NOT
// used for credentials. So it it is possible to store a hash
// instead as long as the risk of collision is negligible.
saved_token_ = newtoken;
saved_tokentype_ = newtokentype;
}
/**
* Pass updated credentials down to the SDAPI layer if needed.
* Needs to be called before any operation that needs credentials.
*
* Thread safety: *this is protected by a lock.
* The lock is temporarily dropped before invoking the refresh callback.
* This means the application provided callback MUST BE THREADSAFE.
* It also means there is technically a race condition here, where a
* particular read operation uses credentials that are a few milliseconds
* out of date.
*
* Alternatively the code here could place a lock are require that the
* callback doesn't do something that couuld cause a deadlock. Not sure
* if that option would be preferable.
*/
void reAuthorizeManager()
{
std::unique_lock<std::mutex> lk(mutex_);
if (tokenrefresh_) {
auto tokenrefresh = this->tokenrefresh_;
auto tokenrefreshtype = this->tokenrefreshtype_;
std::string newtoken;
std::string newtokentype;
// By design, no locks should be held when the callback is invoked
// to avoid the risk of deadlocks. This means that the callback
// itself must be threadsafe.
lk.unlock();
newtoken = tokenrefresh();
newtokentype = tokenrefreshtype;
lk.lock();
if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
// In case of exception, always allow trying again.
saved_token_ = std::string();
saved_tokentype_ = std::string();
authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
saved_token_ = newtoken;
saved_tokentype_ = newtokentype;
SeismicStoreFile::_logger(1, "A new token was provided");
}
}
}
}; };
SDGenericDatasetWrapper::~SDGenericDatasetWrapper() SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
...@@ -851,30 +971,12 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c ...@@ -851,30 +971,12 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// TODO-Low: Cache the manager and possibly the SDUtils instance. // TODO-Low: Cache the manager and possibly the SDUtils instance.
// TODO-Low: handle impersonation better. SDAPI wants us to tell auto datasetwrapper = std::make_shared<SDGenericDatasetWrapper>
// whether we are passing an impersonation token. If so, it will (manager, dataset, mode);
// attempt to refresh it if expired. The problem is that looking
// at the token (e.g. one provided by an environment variable) datasetwrapper->authorizeManager
// to decide which type it is can be ridiculously complicated. (context->_sdtoken, context->_sdtokentype,
// See authorizeManager() in the old code. Also, it would be nice context->_sdtokencb, context->_sdtoken_cbtype);
// to be able to read back the refreshed token.
if (context->_sdtokentype == "sdtoken")
manager->setAuthProviderFromString(context->_sdtoken);
#ifdef HAVE_SAUTH_EXTENSIONS
else if (context->_sdtokentype == "file")
manager->setAuthProviderFromFile(context->_sdtoken);
else if (context->_sdtoken.substr(0, 5) == "FILE:")
manager->setAuthProviderFromFile(context->_sdtoken.substr(5));
#else
if (context->_sdtokentype == "file" || context->_sdtoken.substr(0, 5) == "FILE:")
throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
else if (context->_sdtokentype == "imptoken")
manager->setAuthProviderFromImpToken(context->_sdtoken);
else if (context->_sdtoken.substr(0, 9) == "imptoken:")
manager->setAuthProviderFromImpToken(context->_sdtoken.substr(9));
else
manager->setAuthProviderFromString(context->_sdtoken);
try { try {
switch (mode) { switch (mode) {
...@@ -901,8 +1003,7 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c ...@@ -901,8 +1003,7 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// TODO-Low: If not, should other SDAPI errors be translated? // TODO-Low: If not, should other SDAPI errors be translated?
} }
this->_dataset = std::make_shared<SDGenericDatasetWrapper> this->_dataset = datasetwrapper;
(manager, dataset, mode);
// Removed this because it causes info() to be populated early, // Removed this because it causes info() to be populated early,
// thereby negating the benefit of lazy evaluation. And, worse, // thereby negating the benefit of lazy evaluation. And, worse,
// causing different behavior when debugging is on. // causing different behavior when debugging is on.
...@@ -912,17 +1013,25 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c ...@@ -912,17 +1013,25 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
SeismicStoreFile::~SeismicStoreFile() SeismicStoreFile::~SeismicStoreFile()
{ {
if (_mode != OpenMode::Closed) { // The calling layer is supposed to do an explicit xx_close() so it
// can catch and handle exceptions, and so we can be sure the token
// callback, if used, is still valid. Do *not* try to re-authorize
// the manager. It might not be safe to invoke the callback any
// more. And do a blind catch of any exception because if we don't
// the application will crash.
if (_mode != OpenMode::Closed && _dataset && _dataset->dataset()) {
try { try {
xx_close(); _dataset->dataset()->close();
} }
catch (const std::exception& ex) { catch (const std::exception& ex) {
// The calling layer is supposed to do an explicit xx_close()
// so it can catch and handle exceptions. This blind catch is
// just a desperate attempt to avoid an application crash.
_logger(0, "EXCEPTION closing file: " + std::string(ex.what())); _logger(0, "EXCEPTION closing file: " + std::string(ex.what()));
} }
catch (...) {
_logger(0, "EXCEPTION closing file.");
}
} }
_dataset.reset();
_mode = OpenMode::Closed;
} }
/** /**
...@@ -984,6 +1093,7 @@ void ...@@ -984,6 +1093,7 @@ void
SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint) SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{ {
this->_validate_read(data, offset, size, this->xx_eof(), this->_mode); this->_validate_read(data, offset, size, this->xx_eof(), this->_mode);
this->_dataset->reAuthorizeManager();
ReadRequest request(offset, size, nullptr); ReadRequest request(offset, size, nullptr);
RawList split = this->_split_by_segment(ReadList{request}); RawList split = this->_split_by_segment(ReadList{request});
if (this->_config->_debug_trace) if (this->_config->_debug_trace)
...@@ -1051,6 +1161,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu ...@@ -1051,6 +1161,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // exclude open segment std::int64_t current_eof = SeismicStoreFile::xx_eof(); // exclude open segment
_validate_readv(requests, current_eof, this->_mode); _validate_readv(requests, current_eof, this->_mode);
this->_dataset->reAuthorizeManager();
// For debugging / logging only // For debugging / logging only
const std::int64_t asked = const std::int64_t asked =
...@@ -1112,6 +1223,7 @@ void ...@@ -1112,6 +1223,7 @@ void
SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint) SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{ {
this->_validate_write(data, offset, size, this->_mode); this->_validate_write(data, offset, size, this->_mode);
this->_dataset->reAuthorizeManager();
// TODO-High: Make sure this is nonvirtual *or* don't subtype. // TODO-High: Make sure this is nonvirtual *or* don't subtype.
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // nonvirtual std::int64_t current_eof = SeismicStoreFile::xx_eof(); // nonvirtual
if (_logger(5, "")) if (_logger(5, ""))
...@@ -1169,8 +1281,10 @@ SeismicStoreFile::xx_close() ...@@ -1169,8 +1281,10 @@ SeismicStoreFile::xx_close()
case OpenMode::ReadOnly: case OpenMode::ReadOnly:
case OpenMode::ReadWrite: case OpenMode::ReadWrite:
case OpenMode::Truncate: case OpenMode::Truncate:
if (_dataset->dataset()) if (_dataset->dataset()) {
this->_dataset->reAuthorizeManager();
_dataset->dataset()->close(); _dataset->dataset()->close();
}
break; break;
} }
} }
...@@ -1365,17 +1479,17 @@ SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& fi ...@@ -1365,17 +1479,17 @@ SeismicStoreFileDelayedWrite::SeismicStoreFileDelayedWrite(const std::string& fi
SeismicStoreFileDelayedWrite::~SeismicStoreFileDelayedWrite() SeismicStoreFileDelayedWrite::~SeismicStoreFileDelayedWrite()
{ {
// The SeismicStoreFile destructor will also try to close,
// but at that point we have already done the deed. Harmless.
try { try {
xx_close(); this->_flush(true);
} }
catch (const std::exception& ex) { catch (const std::exception& ex) {
// The calling layer is supposed to do an explicit xx_close() // The calling layer is supposed to do an explicit xx_close()
// so it can catch and handle exceptions. This blind catch is // so it can catch and handle exceptions. This blind catch is
// just a desperate attempt to avoid an application crash. // just a desperate attempt to avoid an application crash.
_relay->_logger(0, "EXCEPTION closing file: " + std::string(ex.what())); _relay->_logger(0, "EXCEPTION flushing file: " + std::string(ex.what()));
} }
// Note: The dataset itself will be closed in _relay's destructor.
// That should happen very shortly.
} }
std::shared_ptr<FileADT> std::shared_ptr<FileADT>
......
...@@ -171,6 +171,16 @@ public: ...@@ -171,6 +171,16 @@ public:
* *
* The callback implementation should not invoke any OpenZGY methods * The callback implementation should not invoke any OpenZGY methods
* except for any calls expressly documented as safe and deadlock-free. * except for any calls expressly documented as safe and deadlock-free.
* The callback implementation must also be thread safe. Set a lock
* if needed. Finally, the callback needs to be legal to call at least
* until close() is called on the reader or writer instance and preferably
* also until the instance goes out of scope.
*
* The library will try to close the reader or writer instance if the
* application fails to do so before the instance goes out of scope.
* E.g. so any locks may be reset. The library *tries* to avoid invoking
* the callback in that situation. On the suspicion that it might have
* gone out of scope. But the application code should not rely on that.
*/ */
SeismicStoreIOContext& sdtokencb(const tokencb_t& value, const std::string& type) { SeismicStoreIOContext& sdtokencb(const tokencb_t& value, const std::string& type) {
this->_sdtokencb = value; this->_sdtokencb = value;
......
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
#include <iostream> #include <iostream>
#include <functional> #include <functional>
#ifndef _WIN32
#include <signal.h>
#endif
static void do_nothing() static void do_nothing()
{ {
...@@ -67,6 +70,19 @@ static void dec_verbose() ...@@ -67,6 +70,19 @@ static void dec_verbose()
--_global_verbose; --_global_verbose;
} }
static void sigpipe_ignore()
{
#ifndef _WIN32
// On Linux the ssl library can cause bogus SIGPIPE signals.
// It should be safe to ignore, because in that case the
// library simply returns a proper error code which the caller
// apparently handles quite well.
::signal(SIGPIPE, SIG_IGN);
#else
// Assuming that Windows doesn't have the same issue.
#endif
}
namespace { namespace {
class Register class Register
{ {
...@@ -82,6 +98,7 @@ namespace { ...@@ -82,6 +98,7 @@ namespace {
// behavior here? // behavior here?
register_test(".quiet", dec_verbose); register_test(".quiet", dec_verbose);
register_test(".verbose", inc_verbose); register_test(".verbose", inc_verbose);
register_test(".sigpipe", sigpipe_ignore);
//register_test(".dummy", 0); //register_test(".dummy", 0);
//register_test(".quiet", [](){--_global_verbose;}); //register_test(".quiet", [](){--_global_verbose;});
//register_test(".verbose", [](){++_global_verbose;}); //register_test(".verbose", [](){++_global_verbose;});
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "test_all.h" #include "test_all.h"
#include "test_utils.h" #include "test_utils.h"
#include "../api.h" #include "../api.h"
#include "../iocontext.h"
#include "../exception.h" #include "../exception.h"
#include "../impl/environment.h" #include "../impl/environment.h"
...@@ -26,6 +27,8 @@ ...@@ -26,6 +27,8 @@
#include <cmath> #include <cmath>
#include <numeric> #include <numeric>
#include <limits> #include <limits>
#include <thread>
#include <chrono>
using namespace OpenZGY; using namespace OpenZGY;
using namespace OpenZGY::Formatters; using namespace OpenZGY::Formatters;
...@@ -304,6 +307,276 @@ void test_readbadpos() ...@@ -304,6 +307,276 @@ void test_readbadpos()
reader->close(); reader->close();
} }
#if HAVE_SD
/**
* Meant to be wrapped in a functor and uses as the token callback
* in a IOContext.
*/
class SDTokenUpdater
{
bool alive_;
int count_;
std::string token_;
mutable std::mutex mutex_;
public:
explicit SDTokenUpdater(const std::string& token)
: alive_(true)
, count_(0)
, token_(token)
, mutex_()
{
}
SDTokenUpdater(const SDTokenUpdater&) = delete;
SDTokenUpdater& operator=(const SDTokenUpdater&) = delete;
std::string operator()() {
std::lock_guard<std::mutex> lk(mutex_);
//if (verbose())
// std::cout << "Fetching token, alive " << alive_ << std::endl;
if (!alive_) {
// Possibly invoked from a static destructor, so use a fairly
// basic way of logging the message.
const char *msg = "FATAL ERROR: Token callback invoked after destruction\n";
fwrite(msg, 1, strlen(msg), stderr);
abort();
}
++count_;
return token_;
}
int clearcount() {
std::lock_guard<std::mutex> lk(mutex_);
int result = count_;
count_ = 0;
return result;
}
void update(const std::string& token) {
std::lock_guard<std::mutex> lk(mutex_);
token_ = token;
}
void die() {
std::lock_guard<std::mutex> lk(mutex_);
alive_ = false;
}
};
bool
read_first_sample(const std::shared_ptr<OpenZGY::IZgyReader>& reader,
bool expect_ok,
const std::string& message)
{
const std::array<std::int64_t,3> orig{0,0,0};
const std::array<std::int64_t,3> size{1,1,1};
float bulk[1]{0};
bool ok = false;
if (verbose())
std::cout << message << " is being tested." << std::endl;
try {
reader->read(orig, size, &bulk[0], 0);
if (!expect_ok)
std::cout << message << " ERROR, should not have worked."
<< std::endl;
else if (std::abs(bulk[0]-0.0039) > 0.001)
std::cout << message << " FAILED with wrong sample value "
<< bulk[0] << std::endl;
else
ok = true;
}
catch (const std::exception& ex) {
if (expect_ok) {
std::cout << message << " FAILED with: " << ex.what() << std::endl;
ok = false;
}
else {
ok = true;
if (verbose())
std::cout << "Got expected: " << ex.what() << std::endl;
}
}
return ok;
}
std::string
cloud_synt2_name()
{
std::string testfolder = InternalZGY::Environment::getStringEnv("OPENZGY_SDTESTDATA", "sd://sntc/testdata");
if (testfolder.back() != '/')
testfolder += "/";
return testfolder + "Synt2.zgy";
}
/**
* When using a token refresh callback, make sure it isn't invoked
* at a point where we cannot be sure it is still valid.
*
* We might decide that the application is required to provide a
* callback that stays valid. In that case this test is N/A.
*/
void
test_tokencb_destroy()
{
using InternalZGY::Environment;
std::string token = Environment::getStringEnv("OPENZGY_TOKEN");
auto callback = std::make_shared<SDTokenUpdater>(token);
std::function<std::string()> functor = [callback]() {return (*callback)();};
auto context = SeismicStoreIOContext()
.sdurl(Environment::getStringEnv("OPENZGY_SDURL"))