Commit 5a081fb7 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

Merge branch 'kvamme62/tweaks' into 'master'

Miscellaneous unrelated tweaks

See merge request !44
parents 33751921 84177b3f
Pipeline #29116 passed with stages
in 6 minutes and 48 seconds
......@@ -108,7 +108,8 @@ data: $(TESTDATARW)/Empty-v1.zgy $(TESTDATARW)/Empty-v3.zgy $(TESTDATARW)/Fancy-
LATEX := $(wildcard /usr/bin/latex)
PDFTK := $(wildcard /usr/bin/pdftk)
#PDFTK :=
# Uncommenting this will remove the "work in progress" background page.
PDFTK :=
doxygen:
$(RM) -rf $(DOX_TMP)/apidoc
......
......@@ -103,6 +103,7 @@
#include "compression.h"
#include "environment.h"
#include "fancy_timers.h"
#include "mtguard.h"
#include "../exception.h"
#include <algorithm>
......@@ -1670,10 +1671,10 @@ ZgyInternalBulk::_writeAlignedRegion(
std::vector<std::shared_ptr<const WriteBrickArgPack>> const_queue(worksize);
std::vector<std::shared_ptr<const WriteNowArgPack>> normal_queue(worksize);
std::atomic<int> errors(0);
MTGuard guard;
#pragma omp parallel for if(enable_compress_mt() && worksize > 1)
for (std::int64_t ix = 0; ix < static_cast<std::int64_t>(worksize); ++ix) {
try {
guard.run([&](){
const index3_t surveypos = work[ix]; // user's start i0,j0,k0 rounded down
const index3_t brickpos = work[ix] / bs; // as above, but in brick coords
std::shared_ptr<DataBuffer> brick = constbrick;
......@@ -1699,17 +1700,9 @@ ZgyInternalBulk::_writeAlignedRegion(
#pragma omp critical // paranoia?
normal_queue[ix] = now;
}
}
catch (const std::exception&)
{
// No effort to log the actual error or to get the loop to
// end earlier. An exception here really shouldn't happen.
errors.fetch_add(1);
}
}
if (errors.load() != 0)
throw OpenZGY::Errors::ZgyInternalError("Exception preparing buffers");
});
} // end parallel for loop
guard.finished();
// Note errorhandling:
// If there are any errors during actual writing this probably
......
......@@ -19,6 +19,7 @@
#include "timer.h"
#include "fancy_timers.h"
#include "environment.h"
#include "mtguard.h"
#include "file_performance.h"
#include <vector>
......@@ -281,38 +282,20 @@ LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immuta
// 2**31 bricks.
const int threadcount = std::min(std::min(std::numeric_limits<int>::max(), static_cast<int>(requestcount)), omp_get_max_threads());
// Exceptions thrown out of an OpenMP loop are fatal, so I need to
// handle them here.
std::atomic<int> errors(0);
std::string first_error;
MTGuard guard;
#pragma omp parallel num_threads(threadcount)
{
std::unique_ptr<char[]> data(new char[maxsize]);
#pragma omp for
for (std::int64_t ii=0; ii<requestcount; ++ii) {
if (errors.load() != 0)
continue;
const ReadRequest& r = requests[ii];
try {
guard.run([&](){
this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
r.delivery(data.get(), r.size);
}
catch (const std::exception& ex) {
if (errors.fetch_add(1) == 0) {
auto what = ex.what();
first_error = std::string(what && what[0] ? what : "EXCEPTION");
}
continue;
}
} // end omp for
}
// end parallel
if (errors.load() != 0) {
// The distinction between UserError, EndOfFile, and InternalError
// (and maybe even others) is lost. If it matters I can handle code
// for this as well.
throw OpenZGY::Errors::ZgyInternalError(first_error);
});
}
}
guard.finished();
}
}
......
......@@ -20,6 +20,7 @@
#include "databuffer.h"
#include "fancy_timers.h"
#include "environment.h"
#include "mtguard.h"
#include "../exception.h"
#include <memory.h>
......@@ -985,8 +986,7 @@ void createLodMT(const std::shared_ptr<DataBuffer>& result,
// 4 or 8 bytes. So, always slice the slowest dim.
if (input_slice_dim != -1 && input_slice_dim == output_slice_dim) {
SimpleTimerEx tt(timerMT);
std::atomic<int> errors(0);
std::string first_error;
MTGuard guard;
#pragma omp parallel
{
std::int64_t slices_per_thread = (isize[input_slice_dim]-1) / omp_get_num_threads() + 1;
......@@ -1000,20 +1000,15 @@ void createLodMT(const std::shared_ptr<DataBuffer>& result,
<< " slices per thread."
<< std::endl;
#endif
try {
guard.run([&](){
createLodPart(result, input, algorithm,
hist, bincount, histogram_min, histogram_max,
input_slice_dim,
omp_get_thread_num() * slices_per_thread,
slices_per_thread);
}
catch (const std::exception& ex) {
if (errors.fetch_add(1) == 0)
first_error = std::string(ex.what());
}
});
}
if (errors != 0)
throw OpenZGY::Errors::ZgyInternalError(first_error);
guard.finished();
}
else {
SimpleTimerEx tt(timerST);
......
......@@ -34,6 +34,21 @@ namespace InternalZGY {
* If the OpenMP loop has an ordered section then you probably need
* protect that section explicitly. Using a second call to run()
* on the same instance.
*
* Example usage:
*
* \code
* void example() {
* MTGuard guard;
* #pragma omp parallel for
* for (std::int64_t ii = 0; ii < loopcount; ++ii) {
* guard.run([&](){
* // ACTUAL CODE GOES HERE
* });
* }
* guard.finished();
* }
* \endcode
*/
class OPENZGY_TEST_API MTGuard
{
......@@ -57,6 +72,23 @@ protected:
/**
* Helper class that extends MTGuard to also handle progress reports
* and cancellation from inside an OpenMP parallel region.
*
* Example usage:
*
* \code
* void examplewithprogress() {
* ProgressWithDots p1;
* MTGuardWithProgress guard(std::ref(p1), total);
* #pragma omp parallel for
* for (std::int64_t ii = 0; ii < loopcount; ++ii) {
* guard.run([&](){
* // ACTUAL CODE GOES HERE
* guard.progress();
* });
* }
* guard.finished();
* }
*\endcode
*/
class OPENZGY_TEST_API MTGuardWithProgress: public MTGuard
{
......
......@@ -69,9 +69,8 @@ SeismicStoreIOContext::SeismicStoreIOContext()
maxsize(Environment::getNumericEnv("OPENZGY_MAXSIZE_MB", 64));
maxhole(Environment::getNumericEnv("OPENZGY_MAXHOLE_MB", 2));
aligned(Environment::getNumericEnv("OPENZGY_ALIGNED_MB", 0));
segsize(Environment::getNumericEnv("OPENZGY_SEGSIZE_MB", 1024));
// TODO-High: default segsplit to 8 instead of making it opt-in
segsplit(Environment::getNumericEnv("OPENZGY_SEGSPLIT", 1));
segsize(Environment::getNumericEnv("OPENZGY_SEGSIZE_MB", 256));
segsplit(Environment::getNumericEnv("OPENZGY_SEGSPLIT", 8));
threads(Environment::getNumericEnv("OPENZGY_NUMTHREADS", 1));
legaltag(Environment::getStringEnv("OPENZGY_LEGALTAG"));
writeid(Environment::getStringEnv("OPENZGY_WRITEID"));
......
......@@ -111,6 +111,9 @@ private:
tokencb_t _sdtokencb;
debugtrace_t _debug_trace;
private: // really private. Keep off.
std::int64_t _real_segsize;
public:
/**
* Where to contact the seismic store service.
......@@ -273,30 +276,33 @@ public:
/**
* Segment size used when writing, in MB. Must be between 1 and
* 16*1024 (i.e. 16 GB). Defaults to $OPENZGY_SEGSIZE_MB if not
* specified, or 1024 (i.e. 1 GB). The default should work fine
* in almost all cases.
* specified, or 256 MB. The default should work fine in almost all
* cases. The write buffer needs to allocate segsize * segsplit
* bytes, so make sure to take segsplit into account when deciding
* on the segment size. With both of those left at the default value
* the buffer will need 256 MB * 8 = 2 GB.
*/
SeismicStoreIOContext& segsize(int value)
{
if (value < 0 || value > 16*1024)
throw OpenZGY::Errors::ZgyUserError("segsize must be between 0 and 16*1024 MB");
this->_segsize = value * (std::int64_t)(1024*1024);
this->_real_segsize = value * (std::int64_t)(1024*1024);
// Internally, _segsize is the buffer size to be used, not the
// smaller chunk size seen by the cloud backend.
// _segsize is derived, recomputed when either factor changes.
this->_segsize = this->_real_segsize * (this->_segsplit<1 ? 1 : this->_segsplit);
return *this;
}
/**
* Maximum number of threads to be used when writing data to the cloud.
* Set to 1 if you don't want multithreaded uploads. Default is 1.
* The default will soon be changed to 8, but for now the split is
* an opt-in feature.
*
* The value of segsplit must divide evenly into segsize.
* Set to 1 if you don't want multithreaded uploads. Default is 8.
*
* Multi-threading is achieved by splitting up the segsize write buffer
* into "segsplit" SDAPI objects. Hence the name. The splitting can
* be observed directly by the caller because it determines the segment
* size seen when reading. So the "split" effect is more important
* than the "maybe-threaded" effect.
* Multi-threading is achieved by using an internal buffer size of
* segsize*segsplit, then later splitting that up into separate
* SDAPI objects than can be uploaded in parallel. So, beware that
* if you increase segsplit you might need to decrease segsize to
* avoid running out of memory.
*
* Using multiple threads for uploads can often improve throughput.
* But if the limiting factor is the bandwidth between the client
......@@ -307,6 +313,7 @@ public:
if (value < 1 || value > 1024)
throw OpenZGY::Errors::ZgyUserError("segsplit must be between 0 and 1024");
this->_segsplit = value;
this->_segsize = this->_real_segsize * (this->_segsplit<1 ? 1 : this->_segsplit);
return *this;
}
......
......@@ -66,8 +66,8 @@ void Test::TestIOContext::test_defaults()
TEST_CHECK(ctx._maxsize == 64 * 1024*1024);
TEST_CHECK(ctx._maxhole == 2 * 1024*1024);
TEST_CHECK(ctx._aligned == 0 * 1024*1024);
TEST_CHECK(ctx._segsize == 1024 * 1024*1024);
TEST_CHECK(ctx._segsplit == 1);
TEST_CHECK(ctx._segsize == 256 * 1024*1024 * ctx._segsplit);
TEST_CHECK(ctx._segsplit == 8);
TEST_CHECK(ctx._threads == 1);
TEST_CHECK(ctx._legaltag == "");
TEST_CHECK(ctx._writeid == "");
......@@ -97,7 +97,7 @@ void Test::TestIOContext::test_setters()
TEST_CHECK(ctx._maxsize == 42 * 1024*1024);
TEST_CHECK(ctx._maxhole == 7 * 1024*1024);
TEST_CHECK(ctx._aligned == 1 * 1024*1024);
TEST_CHECK(ctx._segsize == 15 * 1024*1024);
TEST_CHECK(ctx._segsize == 15 * 1024*1024 * ctx._segsplit);
TEST_CHECK(ctx._segsplit == 3);
TEST_CHECK(ctx._threads == 8);
TEST_CHECK(ctx._legaltag == "illegal");
......
......@@ -129,7 +129,7 @@ public:
, input()
, output()
, fakesize(std::array<std::int64_t,4>{0,0,0,0})
, chunksize(std::array<std::int64_t,3>{128,128,0})
, chunksize(std::array<std::int64_t,3>{64,256,0})
, obricksize(std::array<std::int64_t,3>{64,64,64})
, osamplesize(OpenZGY::SampleDataType::unknown)
, algorithm()
......
......@@ -156,7 +156,8 @@ $(TESTDATARW)/Fancy-int8.zgy: $(TESTDATA)/Fancy-int8.zgy.bz2
.PHONY: doxygen doxyint
LATEX := $(wildcard /usr/bin/latex)
PDFTK := $(wildcard /usr/bin/pdftk)
#PDFTK :=
# Uncommenting this will remove the "work in progress" background page.
PDFTK :=
doxygen:
$(RM) -rf $(DOX_TMP)/apidoc
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment