Commit dbf7c7b5 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Add an OpenMP loop to multi-thread upload to cloud. This is still opt-in.

parent cd8a52b0
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "file_consolidate.h" #include "file_consolidate.h"
#include "file_performance.h" #include "file_performance.h"
#include "fancy_timers.h" #include "fancy_timers.h"
#include "mtguard.h"
#include "../exception.h" #include "../exception.h"
#include "../iocontext.h" #include "../iocontext.h"
...@@ -35,6 +36,7 @@ ...@@ -35,6 +36,7 @@
#include <algorithm> #include <algorithm>
#include <numeric> #include <numeric>
#include <mutex> #include <mutex>
#include <omp.h>
#ifndef _WIN32 // TODO-Low: SDAPI/ prefix also on Linux. #ifndef _WIN32 // TODO-Low: SDAPI/ prefix also on Linux.
#include <SDManager.h> #include <SDManager.h>
...@@ -1358,6 +1360,11 @@ SeismicStoreFile::do_write_one( ...@@ -1358,6 +1360,11 @@ SeismicStoreFile::do_write_one(
const std::int64_t size, const std::int64_t size,
const bool overwrite) const bool overwrite)
{ {
if (_logger(1, "")) {
_logger(1, std::stringstream()
<< "do_write_one(*, " << blocknum << ", " << size << ", "
<< std::boolalpha << overwrite << ")");
}
this->_dataset->info()->checkOnWrite(blocknum, size); this->_dataset->info()->checkOnWrite(blocknum, size);
this->_dataset->dataset()->writeBlock this->_dataset->dataset()->writeBlock
(static_cast<int>(blocknum), // cast checked by checkOnWrite() (static_cast<int>(blocknum), // cast checked by checkOnWrite()
...@@ -1424,18 +1431,32 @@ SeismicStoreFile::do_write_many( ...@@ -1424,18 +1431,32 @@ SeismicStoreFile::do_write_many(
this->_dataset->info()->checkOnWrite(blocknum, std::min(size, blobsize)); this->_dataset->info()->checkOnWrite(blocknum, std::min(size, blobsize));
const int blobcount = (size + blobsize - 1) / blobsize; const int blobcount = (size + blobsize - 1) / blobsize;
MTGuard guard;
#pragma omp parallel for num_threads(blobcount)
for (int ii = 0; ii < blobcount; ++ii) { for (int ii = 0; ii < blobcount; ++ii) {
const int iter_blocknum = ii + static_cast<int>(blocknum); if (/*blocknum == 1 &&*/ ii == 0 && _logger(1, "")) {
const std::int64_t iter_offset = ii * blobsize; _logger(1, std::stringstream()
const std::int64_t iter_endoffset = std::min(size, (ii+1) * blobsize); << "do_write_many(*, "
std::int64_t iter_size = iter_endoffset - iter_offset; << blocknum << ".." << (blocknum + blobcount - 1) << ", "
const char* iter_data = static_cast<const char*>(data)+iter_offset; << size << ", "
if (iter_size > 0) { << blobsize << ", "
this->_dataset->dataset()->writeBlock << std::boolalpha << overwrite << ")"
(iter_blocknum, iter_data, iter_size, overwrite); << " using " << omp_get_num_threads() << " threads");
_wtimer->addBytesWritten(iter_size);
} }
guard.run([&](){
const int iter_blocknum = ii + static_cast<int>(blocknum);
const std::int64_t iter_offset = ii * blobsize;
const std::int64_t iter_endoffset = std::min(size, (ii+1) * blobsize);
std::int64_t iter_size = iter_endoffset - iter_offset;
const char* iter_data = static_cast<const char*>(data)+iter_offset;
if (iter_size > 0) {
this->_dataset->dataset()->writeBlock
(iter_blocknum, iter_data, iter_size, overwrite);
_wtimer->addBytesWritten(iter_size);
}
});
} }
guard.finished();
// The rest of the functions, including updateOnWrite(), are unaware // The rest of the functions, including updateOnWrite(), are unaware
// that we might have done parallel writes. Keep them in the dark. // that we might have done parallel writes. Keep them in the dark.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment