Commit 1f996965 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Make class DatasetInformation thread safe.

parent 87c834a4
Pipeline #19665 passed with stages
in 5 minutes and 57 seconds
......@@ -240,6 +240,8 @@ FileCommon::_real_eof() const
/**
* \brief Throw a descriptive error if there was something wrong with the read.
* \details Currently works for local files only. TODO-Low fix?
* If fixing this then make sure all implementations of xx_eof()
* and _real_eof() are also thread safe.
*/
void
FileCommon::_check_short_read(std::int64_t offset, std::int64_t size, std::int64_t got) const
......
......@@ -274,9 +274,11 @@ protected:
public:
FileCommon(const std::string& filename, OpenMode mode);
// NEW functions
// WARNING: users responsible for thread safety.
// TODO-Worry: All implementations need to be thread safe.
virtual std::int64_t _real_eof() const;
// WARNING: calls xx_eof() and _real_eof() which might not be threadsafe.
// TODO-Worry: calls xx_eof() and _real_eof() which need to be threadsafe.
// Currently not true for SeismicStoreFileDelayedWrite, but this
// check method won't be used there.
void _check_short_read(std::int64_t offset, std::int64_t size, std::int64_t got) const;
};
......
......@@ -164,7 +164,7 @@ public:
public:
// For use by debug_trace, allow SeismicStoreFileDelayedWrite() access
// to details of the file.
const DatasetInformation& debug_info();
std::shared_ptr<const DatasetInformation> debug_info();
private:
/**
* This class is used by _split_by_segment to describe a request as seen by
......@@ -303,11 +303,10 @@ private:
* stale data in that case.
*
* Thread safety:
* Modification may lead to a data race. In particular,
* SeismicStoreFile::xx_write() will call our updateOnWrite(). The
* other methods are const, and updateOnWrite() is only called from
* that one place. Due to the high level design of writes not
* neededing to be thread safe, this should not be a problem.
* All access SDGenericDatasetWrapper::info_ will be protected by
* SDGenericDatasetWrapper::mutex_. Methods that expect to change data
* (currently only updateOnWrite()) will need some special handling.
* TODO-High: there is still a snag here.
*/
class DatasetInformation
{
......@@ -445,9 +444,8 @@ DatasetInformation::toString() const
* then we are completely hosed anyway. It is possible to verify the
* assumption but it is probably too expensive to do so.
*
* Thread safety: Safe as long as updateOnWrite is not called.
* So it is safe for files opened read only or where the caller
* can guarantee there are no writes going on.
* Thread safety: Safe because instances are immutable once constructed
* and made available.
*/
std::int64_t
DatasetInformation::totalSize() const
......@@ -530,6 +528,11 @@ DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize)
/**
* Update cached size information after data is successfully written.
* checkOnWrite() must have been called already.
*
* Thread safety: NOT thred safe.
* Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
* Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
* That one wll make sure the smart pointer being updated is unique.
*/
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
......@@ -646,18 +649,33 @@ DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::
* Thread safety: The class itself is thread safe. The data being wrapped
* might not be.
*
* TODO-Medium: _virgin is not protected. However, this member may be
* YAGNI unless we need to bring back the CTag mechanism. Which might
* not be needed if we go for immutable ZGY files.
* All mutable data is protected by this->mutex_. Access methods for
* those return a smart pointer. If info() is called on one thread
* while another thread is doing a write operation (which is actually
* not allowed) then it is unspecified whether the returned value is
* before or after the write. It is also unspecified whether
* updateOnWrite() will have any effect in that case. The information
* is still consistent though, and may be used for logging etc.
*
* CAVEAT: Make sure the returned pointer remains in scope long
* enough. E.g. this is NOT safe, and might crash every 7 years or so.
* Except on customer sites where it may crash every 7 minutes.
*
* seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
* foo(dataset); // the returned smart pointer is already deleted.
*
* TODO-Yagni: _virgin is not used. It is related to the CTag mechanism.
* It is still being discussed whether that needs to be ported from the
* old accessor. It might not be needed if we go for immutable ZGY files.
*/
class SDGenericDatasetWrapper
{
std::shared_ptr<seismicdrive::SDManager> manager_;
std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
std::unique_ptr<DatasetInformation> info_;
std::shared_ptr<const DatasetInformation> info_;
OpenMode disposition_;
bool virgin_; // If true, the cached CTag should be ok.
std::mutex mutex_; // Protect lazy-eval info_ and (less importantly) dataset_.
mutable std::mutex mutex_; // Protect all members.
public:
typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
......@@ -668,10 +686,18 @@ public:
{
}
~SDGenericDatasetWrapper();
seismicdrive::SDGenericDataset& dataset() { return *dataset_; }
std::shared_ptr<seismicdrive::SDManager> manager() { return manager_; }
OpenMode disposition() const { return disposition_; }
DatasetInformation& info() {
std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
std::lock_guard<std::mutex> lk(mutex_);
return dataset_;
}
std::shared_ptr<seismicdrive::SDManager> manager() {
std::lock_guard<std::mutex> lk(mutex_);
return manager_;
}
OpenMode disposition() const {
return disposition_; // immutable, so no lock.
}
std::shared_ptr<const DatasetInformation> info() {
std::lock_guard<std::mutex> lk(mutex_);
if (!info_) {
switch (disposition_) {
......@@ -687,20 +713,40 @@ public:
throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
}
}
return *info_;
return info_;
}
void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
{
std::lock_guard<std::mutex> lk(mutex_);
dataset_ = dataset;
info_.reset();
}
// This would give a slight performance boost, saving a single call
// to checkCTag() after a file has changed. This happens so rarely
// that it isn't worth the extra testing.
//virgin_ = true;
}
bool isTouched() const { return !virgin_; }
/**
* Adjust the information after data has been written to the cloud.
*
* Thread safety:
* Multiple concurrent writers will have a race condition, but that
* scenario is expressly forbidden anyway. Smart pointers from
* info() held by callig code are race free because they are
* immutable. updateOnWrite() makes a new pointer.
*/
void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
std::shared_ptr<DatasetInformation> updated;
updated.reset(new DatasetInformation(*this->info()));
updated->updateOnWrite(blocknum, blocksize);
info_ = updated;
}
bool isTouched() const {
std::lock_guard<std::mutex> lk(mutex_);
return !virgin_;
}
bool touch() {
// Called to inform us that getCTag() might return stale data.
......@@ -716,6 +762,7 @@ public:
// much more often (every time the accessor is used to read data)
// and it won't be set true again by updateDataset(). This will
// simplify testing and has little impact on performance.
std::lock_guard<std::mutex> lk(mutex_);
bool old = virgin_;
virgin_ = false;
return old != virgin_;
......@@ -859,7 +906,7 @@ SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, c
// thereby negating the benefit of lazy evaluation. And, worse,
// causing different behavior when debugging is on.
//if (_logger(1) && mode != OpenMode::Closed)
// _logger(1, this->_dataset->info().toString());
// _logger(1, this->_dataset->info()->toString());
}
SeismicStoreFile::~SeismicStoreFile()
......@@ -935,10 +982,10 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us
ReadRequest request(offset, size, nullptr);
RawList split = this->_split_by_segment(ReadList{request});
if (this->_config->_debug_trace)
this->_config->_debug_trace("read", /*need=*/size, /*want=*/size,/*parts*/ split.size(), this->_dataset->info().allSizes(-1));
this->_config->_debug_trace("read", /*need=*/size, /*want=*/size,/*parts*/ split.size(), this->_dataset->info()->allSizes(-1));
for (const RawRequest& it : split) {
// TODO-Low: port _cached_read ?
this->_dataset->dataset().readBlock
this->_dataset->dataset()->readBlock
(static_cast<int>(it.blocknum),
static_cast<char*>(data)+it.outpos,
static_cast<size_t>(it.local_offset),
......@@ -950,8 +997,6 @@ SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, Us
* Thread safety: Designed to be thread safe as long as the underlying
* SDGenericDataset is. Even when data is being written in another
* thread.
* TODO-High: Thread safety concerns about calling xx_eof().
* Is it threadsafe? Are we getting the right overload?
*/
void
SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
......@@ -999,7 +1044,6 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
// need to fall back to the original implementation if a boundary crossing
// (which is likely to be very rare) is detected.
// TODO-High: Concerns about thread safety.
std::int64_t current_eof = SeismicStoreFile::xx_eof(); // exclude open segment
_validate_readv(requests, current_eof, this->_mode);
......@@ -1029,11 +1073,11 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
std::unique_ptr<char[]> data(new char[realsize]);
if (this->_config->_debug_trace)
this->_config->_debug_trace("readv", /*need=*/asked, /*want=*/realsize,/*parts*/ work.size(), this->_dataset->info().allSizes(-1));
this->_config->_debug_trace("readv", /*need=*/asked, /*want=*/realsize,/*parts*/ work.size(), this->_dataset->info()->allSizes(-1));
// Do the actual reading, sequentially, one consolidated chunk at a time.
for (const auto& it : work) {
this->_dataset->dataset().readBlock
this->_dataset->dataset()->readBlock
(static_cast<int>(it.blocknum),
data.get() + it.outpos,
static_cast<size_t>(it.local_offset),
......@@ -1076,12 +1120,12 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
bool overwrite{false};
if (offset == current_eof) {
// Sequential write from current EOF. Create a new segment.
blocknum = this->_dataset->info().blockCount();
blocknum = this->_dataset->info()->blockCount();
}
else if (offset < current_eof) {
// Rewrite existing block. Resizing not allowed.
overwrite = true;
this->_dataset->info().getLocalOffset
this->_dataset->info()->getLocalOffset
(offset, size, &blocknum, &local_offset, &local_size);
if (local_offset != 0 || local_size != size)
throw OpenZGY::Errors::ZgyInternalError("Cannot write resized segment.");
......@@ -1089,12 +1133,12 @@ SeismicStoreFile::xx_write(const void* data, std::int64_t offset, std::int64_t s
else {
throw OpenZGY::Errors::ZgyUserError("Cannot write segments out of order.");
}
this->_dataset->info().checkOnWrite(blocknum, size); // Also checks blocknum fits in an int,
this->_dataset->dataset().writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite);
this->_dataset->info().updateOnWrite(blocknum, size);
this->_dataset->info()->checkOnWrite(blocknum, size); // Also checks blocknum fits in an int,
this->_dataset->dataset()->writeBlock(static_cast<int>(blocknum), static_cast<const char*>(data), static_cast<std::size_t>(size), overwrite);
this->_dataset->updateOnWrite(blocknum, size);
if (this->_config->_debug_trace)
this->_config->_debug_trace(offset == current_eof ? "append" : "write",
size, size, 1, this->_dataset->info().allSizes(-1));
size, size, 1, this->_dataset->info()->allSizes(-1));
}
/**
......@@ -1120,7 +1164,8 @@ SeismicStoreFile::xx_close()
case OpenMode::ReadOnly:
case OpenMode::ReadWrite:
case OpenMode::Truncate:
_dataset->dataset().close();
if (_dataset->dataset())
_dataset->dataset()->close();
break;
}
}
......@@ -1129,16 +1174,20 @@ SeismicStoreFile::xx_close()
}
/**
* Thread safety: Only if no writes may be pending.
* _dataset->info() uses a lock, but that is not effective if
* updateOnWrite() is called.
* Thread safety: Yes.
*
* If info() is called on one thread while another thread is doing a
* write operation (which is actually not allowed) then it is
* unspecified whether the returned value is before or after the
* write.
*
* CAVEAT: Note that the method is overridden in
* SeismicStoreFileDelayedWrite, and that one might not be safe.
*/
std::int64_t
SeismicStoreFile::xx_eof() const
{
return _dataset->info().totalSize();
return _dataset->info()->totalSize();
}
/**
......@@ -1161,7 +1210,9 @@ SeismicStoreFile::deleteFile(const std::string& filename, bool missing_ok) const
<< "\"" << filename << "\", "
<< "missing_ok=" << std::boolalpha << missing_ok
<< ")\n");
seismicdrive::SDUtils utils(_dataset->manager().get());
// Make sure the returned smart pointer doesn't go out of scope.
std::shared_ptr<seismicdrive::SDManager> smart_manager = _dataset->manager();
seismicdrive::SDUtils utils(smart_manager.get());
try {
utils.deleteDataset(filename);
}
......@@ -1187,7 +1238,10 @@ SeismicStoreFile::altUrl(const std::string& filename) const
// Should I strip off any "?context= first? It doesn't make sense
// to create an alturl from another alturl. Probably doesn't
// matter much either way.
seismicdrive::SDGenericDataset dataset(_dataset->manager().get(), filename);
// Make sure the returned smart pointer doesn't go out of scope.
std::shared_ptr<seismicdrive::SDManager> smart_manager = _dataset->manager();
seismicdrive::SDGenericDataset dataset(smart_manager.get(), filename);
dataset.open(seismicdrive::SDDatasetDisposition::READ_ONLY);
const std::string wid = dataset.getConsistencyID();
const std::string ctx = dataset.getSerializedContext();
......@@ -1213,11 +1267,9 @@ SeismicStoreFile::altUrl(const std::string& filename) const
}
/**
* Thread safety: Only if no writes may be pending.
* _dataset->info() uses a lock, but that is not effective if
* updateOnWrite() is called.
* Thread safety: Yes.
*/
const DatasetInformation&
std::shared_ptr<const DatasetInformation>
SeismicStoreFile::debug_info()
{
return this->_dataset->info();
......@@ -1250,8 +1302,6 @@ SeismicStoreFile::debug_info()
* that is expected to be a very rare occurrence.
*
* Thread safety: Only if no writes may be pending.
* _dataset->info() uses a lock, but that is not effective if
* updateOnWrite() is called.
*/
SeismicStoreFile::RawList
SeismicStoreFile::_split_by_segment(const ReadList& requests)
......@@ -1263,7 +1313,7 @@ SeismicStoreFile::_split_by_segment(const ReadList& requests)
std::int64_t blocknum{0}, local_offset{0}, local_size{0};
while (size > 0) {
// Normally just one iteration i.e. no crossing seg boundary.
this->_dataset->info().getLocalOffset
this->_dataset->info()->getLocalOffset
(offset, size, &blocknum, &local_offset, &local_size);
result.push_back(RawRequest
(blocknum, local_offset, local_size, outpos));
......@@ -1471,7 +1521,7 @@ SeismicStoreFileDelayedWrite::xx_write(const void* data, std::int64_t offset, st
if (offset == 0 || this->_config->_segsize <= 0 || offset < committed) {
this->_relay->xx_write(data, offset, size, usagehint);
if (this->_config->_debug_trace)
this->_config->_debug_trace("flush", size, size, 1, this->_relay->debug_info().allSizes(this->_open_segment.size()));
this->_config->_debug_trace("flush", size, size, 1, this->_relay->debug_info()->allSizes(this->_open_segment.size()));
return;
}
......@@ -1516,7 +1566,7 @@ SeismicStoreFileDelayedWrite::xx_write(const void* data, std::int64_t offset, st
this->_usage_hint = UsageHint::Mixed;
if (this->_config->_debug_trace)
this->_config->_debug_trace("queue", size, size, 1, this->_relay->debug_info().allSizes(this->_open_segment.size()));
this->_config->_debug_trace("queue", size, size, 1, this->_relay->debug_info()->allSizes(this->_open_segment.size()));
this->_flush(false);
}
......@@ -1565,7 +1615,7 @@ SeismicStoreFileDelayedWrite::_flush_part(std::int64_t this_segsize)
this->_relay->xx_write(seg.data(), this->_relay->xx_eof(), this_segsize, this->_usage_hint);
seg.erase(seg.begin(), seg.begin() + this_segsize);
if (this->_config->_debug_trace)
this->_config->_debug_trace("flush", this_segsize, this_segsize, 1, this->_relay->debug_info().allSizes(seg.size()));
this->_config->_debug_trace("flush", this_segsize, this_segsize, 1, this->_relay->debug_info()->allSizes(seg.size()));
}
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment