Commit d80f9217 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Prepare for more smart pointers: Move pointer arithmetic to a separate function.

parent 800ec8fc
...@@ -135,6 +135,47 @@ FileADT::_validate_readv(const ReadList& requests, std::int64_t eof, OpenMode mo ...@@ -135,6 +135,47 @@ FileADT::_validate_readv(const ReadList& requests, std::int64_t eof, OpenMode mo
} }
} }
/**
* Convenience function to invoke a delivery functor with optional
* pointer arithmetic, delivering just a part of the buffer.
*
* If and when the delivery function is changed to use a smart pointer:
*
* Optionally check that the called function did not retain a pointer
* to the data if it promised not to do that.
*
* If the contract about not retaining references is broken then raise
* an exception. Even if the code happens to have allocate a unique
* buffer so it doesn't really care. Note: In some cases, e.g. if a
* proper cache is involved, the functor might end up retaining a
* pointer aliased to the entire cache. Which would be fatal and would
* warrant an abort(). The code here will still just throw an
* exception, though.
*
* If the functor states that it needs to retain a pointer then make
* sure it gets a smart pointer that is aliased to the entire buffer.
*/
void
FileADT::_deliver(
const ReadRequest::delivery_t& fn,
const void *data,
std::int64_t offset,
std::int64_t size,
bool transient)
{
if (!data)
throw OpenZGY::Errors::ZgyInternalError("Attempt to deliver null data");
if (!fn)
return; // Caller doesn't need the data. This is ok.
if (offset == 0) {
fn(data, size);
}
else {
auto dumb_ptr = static_cast<const char*>(data) + offset;
fn(dumb_ptr, size);
}
}
std::shared_ptr<FileADT> std::shared_ptr<FileADT>
FileADT::factory(const std::string& filename, OpenMode mode, const IOContext *iocontext) FileADT::factory(const std::string& filename, OpenMode mode, const IOContext *iocontext)
{ {
......
...@@ -232,6 +232,8 @@ protected: ...@@ -232,6 +232,8 @@ protected:
static void _validate_read(void *data, std::int64_t offset, std::int64_t size, std::int64_t eof, OpenMode mode); static void _validate_read(void *data, std::int64_t offset, std::int64_t size, std::int64_t eof, OpenMode mode);
static void _validate_write(const void *data, std::int64_t offset, std::int64_t size, OpenMode mode); static void _validate_write(const void *data, std::int64_t offset, std::int64_t size, OpenMode mode);
static void _validate_readv(const ReadList& requests, std::int64_t eof, OpenMode mode); static void _validate_readv(const ReadList& requests, std::int64_t eof, OpenMode mode);
public: // Actually internal. Used by ConsolidateRequests.
static void _deliver(const ReadRequest::delivery_t& fn, const void *data, std::int64_t offset, std::int64_t size, bool transient);
public: public:
static std::shared_ptr<FileADT> factory(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext); static std::shared_ptr<FileADT> factory(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
......
...@@ -362,7 +362,8 @@ ConsolidateRequests::_consolidated_delivery( ...@@ -362,7 +362,8 @@ ConsolidateRequests::_consolidated_delivery(
if (rr.delivery) { if (rr.delivery) {
std::int64_t end = std::min(rr.offset + rr.size - begin, size); std::int64_t end = std::min(rr.offset + rr.size - begin, size);
std::int64_t beg = std::min(rr.offset - begin, end); std::int64_t beg = std::min(rr.offset - begin, end);
rr.delivery(static_cast<const char*>(data) + beg, end - beg); // Caller will check the transient_ok flag. we won't.
FileADT::_deliver(rr.delivery, data, beg, end - beg, false);
} }
} }
}; };
......
...@@ -258,7 +258,7 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta ...@@ -258,7 +258,7 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
for (const ReadRequest& r : requests) { for (const ReadRequest& r : requests) {
std::shared_ptr<char> data(new char[r.size]); std::shared_ptr<char> data(new char[r.size]);
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint); this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size); _deliver(r.delivery, data.get(), 0, r.size, transient_ok);
} }
} }
else { else {
...@@ -298,7 +298,7 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta ...@@ -298,7 +298,7 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
const ReadRequest& r = requests[ii]; const ReadRequest& r = requests[ii];
guard.run([&](){ guard.run([&](){
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint); this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size); _deliver(r.delivery, data.get(), 0, r.size, transient_ok);
}); });
} }
} }
......
...@@ -136,7 +136,7 @@ FileParallelizer::xx_readv( ...@@ -136,7 +136,7 @@ FileParallelizer::xx_readv(
for (std::int64_t ii = 0; ii < requestcount; ++ii) { for (std::int64_t ii = 0; ii < requestcount; ++ii) {
guard.run([&](){ guard.run([&](){
std::cerr << "0123456789"[omp_get_thread_num() % 10]; std::cerr << "0123456789"[omp_get_thread_num() % 10];
requests[ii].delivery(buffers[ii].get(), requests[ii].size); _deliver(requests[ii].delivery, buffers[ii].get(), 0, requests[ii].size, transient_ok);
}); });
} }
guard.finished(); guard.finished();
......
...@@ -1155,7 +1155,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu ...@@ -1155,7 +1155,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
for (const ReadRequest& r : requests) { for (const ReadRequest& r : requests) {
std::shared_ptr<char> data(new char[r.size]); std::shared_ptr<char> data(new char[r.size]);
this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint); this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size); _deliver(r.delivery, data.get(), 0, r.size, transient_ok);
} }
return; return;
#endif #endif
...@@ -1296,8 +1296,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu ...@@ -1296,8 +1296,7 @@ SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
// TODO-Worry: If this_size != rr.size, can this ever happen? // TODO-Worry: If this_size != rr.size, can this ever happen?
// If yes then we might have lost track of where in the buffer // If yes then we might have lost track of where in the buffer
// we should copy out from. This wory also applies to the Python code. // we should copy out from. This wory also applies to the Python code.
if (rr.delivery) _deliver(rr.delivery, data.get(), pos, this_size, transient_ok);
rr.delivery(data.get() + pos, this_size);
pos += this_size; pos += this_size;
} }
} }
...@@ -1810,7 +1809,7 @@ SeismicStoreFileDelayedWrite::xx_readv(const ReadList& requests, bool parallel_o ...@@ -1810,7 +1809,7 @@ SeismicStoreFileDelayedWrite::xx_readv(const ReadList& requests, bool parallel_o
for (const ReadRequest& r : requests) { for (const ReadRequest& r : requests) {
std::shared_ptr<char> data(new char[r.size]); std::shared_ptr<char> data(new char[r.size]);
this->SeismicStoreFileDelayedWrite::xx_read(data.get(), r.offset, r.size, usagehint); this->SeismicStoreFileDelayedWrite::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size); _deliver(r.delivery, data.get(), 0, r.size, transient_ok);
} }
} }
} }
......
...@@ -237,7 +237,7 @@ LocalFileWindows::xx_readv(const ReadList& requests, bool parallel_ok, bool immu ...@@ -237,7 +237,7 @@ LocalFileWindows::xx_readv(const ReadList& requests, bool parallel_ok, bool immu
for (const ReadRequest& r : requests) { for (const ReadRequest& r : requests) {
std::unique_ptr<char> data(new char[r.size]); std::unique_ptr<char> data(new char[r.size]);
this->LocalFileWindows::xx_read(data.get(), r.offset, r.size, usagehint); this->LocalFileWindows::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size); _deliver(r.delivery, data.get(), 0, r.size, transient_ok);
} }
} }
......
...@@ -175,7 +175,7 @@ run_test(const ReadList& list_in, ...@@ -175,7 +175,7 @@ run_test(const ReadList& list_in,
// Deliver my data according to the consolidated list. // Deliver my data according to the consolidated list.
for (const ReadRequest& rr : result) for (const ReadRequest& rr : result)
rr.delivery(reinterpret_cast<char*>(data.data()) + rr.offset, rr.size); FileADT::_deliver(rr.delivery, data.data(), rr.offset, rr.size, false);
// Check that all the data that was originally requested // Check that all the data that was originally requested
// was delivered to the correct place in a timely manner. // was delivered to the correct place in a timely manner.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment