Commit 448188e3 authored by Paal Kvamme's avatar Paal Kvamme
Browse files

More bells and whistles in the zgycopyc tool.

parent de5d1d8c
......@@ -32,6 +32,8 @@
#include <iostream>
#include <sstream>
#include <fstream>
#include <iomanip>
#include <stdexcept>
#include <stdlib.h>
#include <chrono>
......@@ -68,21 +70,6 @@ using InternalZGY::Environment;
using InternalZGY::MTGuardWithProgress;
using OpenZGY::SeismicStoreIOContext;
/**
* Convenience to hard code credentials for testing. Returns an IOContext.
* Picking up sdurl/sdapikey from the environment is redundant since
* the library already does this as a fallback.
*/
SeismicStoreIOContext getContext()
{
return SeismicStoreIOContext()
.sdurl(Environment::getStringEnv("OPENZGY_SDURL"))
.sdapikey(Environment::getStringEnv("OPENZGY_SDAPIKEY"))
.sdtoken(Environment::getStringEnv("OPENZGY_TOKEN") != "" ?
Environment::getStringEnv("OPENZGY_TOKEN") :
"FILE:carbon.slbapp.com", "");
}
/*=========================================================================*/
/* OPTION PROCESSING ==================================================*/
/*=========================================================================*/
......@@ -105,6 +92,12 @@ public:
std::string sigpipe;
std::string input;
std::string output;
std::string src_sdurl;
std::string src_sdkey;
std::string src_token;
std::string dst_sdurl;
std::string dst_sdkey;
std::string dst_token;
std::array<std::int64_t,4> fakesize;
std::array<std::int64_t,3> chunksize;
std::array<std::int64_t,3> obricksize;
......@@ -115,6 +108,7 @@ public:
int brickcount;
int sqnr;
int omp_nest;
int noisefactor;
Options(int argc, char **argv)
: myname(argc >= 1 ? argv[0] : "zgycopyc")
, verbose(2)
......@@ -128,6 +122,12 @@ public:
, sigpipe()
, input()
, output()
, src_sdurl()
, src_sdkey()
, src_token()
, dst_sdurl()
, dst_sdkey()
, dst_token()
, fakesize(std::array<std::int64_t,4>{0,0,0,0})
, chunksize(std::array<std::int64_t,3>{64,256,0})
, obricksize(std::array<std::int64_t,3>{64,64,64})
......@@ -138,6 +138,7 @@ public:
, brickcount(0)
, sqnr(0)
, omp_nest(-1)
, noisefactor(0)
{
if (myname.find_last_of("/\\") != std::string::npos)
myname = myname.substr(myname.find_last_of("/\\")+1);
......@@ -181,6 +182,13 @@ public:
"-n, --brickcount N: Only copy the first N bricks.",
"-Q, --sqnr QUALITY: Compression quality. Uncompressed if absent.",
"-z, --omp-nest N: Levels of OpenMP nesting.",
" --noise factor Add noise when copying.",
" --src-sdurl url: Seismic Store endpoint to copy from.",
" --src-sdkey key: Seismic Store API key.",
" --src-token token: Seismic Store authorization.",
" --dst-sdurl url: Seismic Store endpoint to copy from.",
" --dst-sdkey key: Seismic Store API key.",
" --dst-token token: Seismic Store authorization.",
};
std::cerr << "Usage: " << myname << " options...\n";
for (const std::string& s : help_options)
......@@ -202,7 +210,14 @@ public:
<< (dumpsqnr ? "--dumpsqnr " : "")
<< (native ? "--native " : "--float ")
<< (dropcache ? "--dropcache " : "")
<< (sigpipe.empty() ? "" : ("--sigpipe " + sigpipe + " "));
<< (sigpipe.empty() ? "" : ("--sigpipe " + sigpipe + " "))
<< (noisefactor ? "--noise " + std::to_string(noisefactor) : std::string())
<< (src_sdurl.empty() ? "" : "--src-sdurl " + src_sdurl)
<< (src_sdkey.empty() ? "" : "--src-sdkey " + src_sdkey)
<< (src_token.empty() ? "" : "--src-token " + src_token)
<< (dst_sdurl.empty() ? "" : "--dst-sdurl " + dst_sdurl)
<< (dst_sdkey.empty() ? "" : "--dst-sdkey " + dst_sdkey)
<< (dst_token.empty() ? "" : "--dst-token " + dst_token);
if (fakesize[0]||fakesize[1]||fakesize[2]||fakesize[3])
os << "--size "
......@@ -381,6 +396,13 @@ public:
{"sqnr", required_argument, 0, 'Q' },
{"snr", required_argument, 0, 'Q' },
{"omp-nest", required_argument, 0, 'z' },
{"noise", required_argument, 0, '\001' },
{"src-sdurl", required_argument, 0, '\002' },
{"src-sdkey", required_argument, 0, '\003' },
{"src-token", required_argument, 0, '\004' },
{"dst-sdurl", required_argument, 0, '\005' },
{"dst-sdkey", required_argument, 0, '\006' },
{"dst-token", required_argument, 0, '\007' },
{0, 0, 0, 0 }
};
return result;
......@@ -475,6 +497,14 @@ public:
case 'Q': sqnr = geti(optarg); break;
case 'z': omp_nest = geti(optarg); break;
case 1: noisefactor = geti(optarg); break;
case 2: src_sdurl = optarg; break;
case 3: src_sdkey = optarg; break;
case 4: src_token = optarg; break;
case 5: dst_sdurl = optarg; break;
case 6: dst_sdkey = optarg; break;
case 7: dst_token = optarg; break;
default:
throw std::runtime_error("command line: unknown option");
help(myname);
......@@ -558,6 +588,128 @@ public:
/* END OPTION PROCESSING ==============================================*/
/*=========================================================================*/
/**
* Read from file and cache the SAuth token to use.
* Refresh the cache every 5 minutes.
*
* This allows the token to be updated while the copy is in progress.
* Which may be needed if the copy takes more than 60 minutes and
* the user is using a regular token.
*
* The way the user updates the token is a pretty huge kludge:
*
* repeat as needed (and at least 5 minutes before token expires)
*
* sdutil auth login
* sdutil auth idtoken > /tmp/token$$.new
* mv /tmp/tojen$$.new /tmp/token$$
*
* zgycopyc --src-token /tmp/token$$ --dst-token /tmp/token$$
*/
class SDTokenUpdater
{
mutable std::mutex mutex_;
std::string filename_;
std::string token_;
time_t lastrefresh_;
int verbose_;
public:
explicit SDTokenUpdater(const std::string& filename, int verbose)
: mutex_()
, filename_(filename)
, token_()
, lastrefresh_(0)
, verbose_(verbose)
{
}
SDTokenUpdater(const SDTokenUpdater&) = delete;
SDTokenUpdater& operator=(const SDTokenUpdater&) = delete;
~SDTokenUpdater() {
if (verbose_ >= 3)
std::cerr << "Token updater for \"" << filename_
<< "\"is done." << std::endl;
}
/**
* Return the cached token if it is less then 5 minutes since we cached it.
* Otherwise read the possibly refreshed token from file.
*/
std::string operator()() {
std::lock_guard<std::mutex> lk(mutex_);
if (::time(nullptr) - lastrefresh_ > 300) {
std::ifstream file(filename_);
if (!file.good())
throw std::runtime_error("Cannot open token file \"" + filename_ + "\".");
std::string old = token_;
std::getline(file, token_);
if (file.good() && token_.empty())
std::getline(file, token_);
if (file.fail() || file.bad())
throw std::runtime_error("Cannot read token file \"" + filename_ + "\".");
lastrefresh_ = time(nullptr);
if (verbose_ >= 3 || (verbose_ >= 2 && !old.empty() && token_ != old)) {
std::cerr << "\nToken refresh: "
<< (token_.empty() ? "<empty>" :
token_.size() < 20 ? token_ :
"..." + token_.substr(token_.size()-5))
<< std::endl;
}
}
return token_;
}
/**
* Create an instance and wrap it in a lambda in a way that handles
* reference counting properly. The instance should be destructed
* when the last functor referencing it goes out of scope.
* Note that IOContext is cloneable, hence the functor must be
* copyable. While SDTokenUpdater is noncopyable.
*
* Yes this is rather pedantic. If I just leak the SDTokenUpdater
* instances this would not have been needed. But just in case I
* need to copy the code somewhere else I'll try to do it right.
*/
static SeismicStoreIOContext::tokencb_t create(const std::string& filename, int verbose)
{
auto updater = std::make_shared<SDTokenUpdater>(filename, verbose);
auto lambda = [updater]() -> std::string {return (*updater)();};
return lambda;
}
};
SeismicStoreIOContext getContext(const Options& opt, bool read, int verbose)
{
SeismicStoreIOContext context;
if (read) {
context.sdurl (!opt.src_sdurl.empty() ? opt.src_sdurl :
Environment::getStringEnv("OPENZGY_SDURL"));
context.sdapikey(!opt.src_sdkey.empty() ? opt.src_sdkey :
Environment::getStringEnv("OPENZGY_SDAPIKEY"));
if (!opt.src_token.empty()) {
context.sdtokencb(SDTokenUpdater::create(opt.src_token, opt.verbose), "");
}
else {
context.sdtoken(Environment::getStringEnv("OPENZGY_TOKEN"), "");
}
}
else {
context.sdurl (!opt.dst_sdurl.empty() ? opt.dst_sdurl :
Environment::getStringEnv("OPENZGY_SDURL"));
context.sdapikey(!opt.dst_sdkey.empty() ? opt.dst_sdkey :
Environment::getStringEnv("OPENZGY_SDAPIKEY"));
if (!opt.dst_token.empty()) {
context.sdtokencb(SDTokenUpdater::create(opt.dst_token, opt.verbose), "");
}
else {
context.sdtoken(Environment::getStringEnv("OPENZGY_TOKEN"), "");
}
}
if (verbose >= 3)
std::cerr << (read ? "read" : "write") << " context:\n"
<< context.toString();
return context;
}
static std::int64_t
roundup(std::int64_t in, std::int64_t step)
{
......@@ -601,6 +753,26 @@ void randomize(std::vector<std::array<std::int64_t,3>>& vec)
}
}
template<typename T>
void
addNoise(T *data, const std::array<std::int64_t,3>& size3d, int factor, float valuerange)
{
if (factor <= 0)
return;
const float noiselevel = valuerange / factor;
const float noisefactor = 1.0f / factor;
static auto seed = std::chrono::system_clock::now().time_since_epoch().count();
static std::default_random_engine generator(static_cast<unsigned>(seed));
std::uniform_real_distribution<float> add_distribution(0.0, noiselevel);
std::uniform_real_distribution<float> mul_distribution(1.0f-noisefactor, 1.0f);
const T* end = data + (size3d[0] * size3d[1] * size3d[2]);
for (T* ptr = data; ptr < end; ++ptr) {
float add = add_distribution(generator);
float mul = mul_distribution(generator);
*ptr = static_cast<T>(*ptr * mul + (ptr<=0 ? add : -add));
}
}
void readchunk(
const std::shared_ptr<OpenZGY::IZgyReader>& r,
const std::shared_ptr<OpenZGY::IZgyWriter>& w,
......@@ -608,8 +780,10 @@ void readchunk(
const std::array<std::int64_t,3>& chunksize,
const std::array<std::int64_t,3>& surveysize,
void *buffer, OpenZGY::SampleDataType dt,
SummaryPrintingTimerEx& rtimer)
SummaryPrintingTimerEx& rtimer,
int noisefactor)
{
const float range = std::max(0.0f, r->datarange()[1] - r->datarange()[0]);
const std::array<std::int64_t,3> realsize =
minimum(chunksize, sub(surveysize, pos));
if (realsize[0] > 0 && realsize[1] > 0 && realsize[2] > 0) {
......@@ -619,14 +793,17 @@ void readchunk(
case OpenZGY::SampleDataType::int8:
bps=1;
r->read(pos, realsize, static_cast<std::int8_t*>(buffer), 0);
addNoise(static_cast<std::int8_t*>(buffer), realsize, noisefactor, range);
break;
case OpenZGY::SampleDataType::int16:
r->read(pos, realsize, static_cast<std::int16_t*>(buffer), 0);
addNoise(static_cast<std::int16_t*>(buffer), realsize, noisefactor, range);
bps=2;
break;
case OpenZGY::SampleDataType::float32:
default:
r->read(pos, realsize, static_cast<float*>(buffer), 0);
addNoise(static_cast<float*>(buffer), realsize, noisefactor, range);
bps=4;
break;
}
......@@ -801,9 +978,10 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx&
ProgressWithDots p2(opt.verbose >= 1 ? 51 : 0);
ZgyWriterArgs args;
SeismicStoreIOContext context(getContext());
SeismicStoreIOContext rcontext(getContext(opt, true, opt.verbose));
SeismicStoreIOContext wcontext(getContext(opt, false, opt.verbose));
std::shared_ptr<IZgyReader> r = !opt.input.empty() ?
IZgyReader::open(opt.input, &context):
IZgyReader::open(opt.input, &rcontext):
Test::ZgyReaderMock::mock(opt.fakesize);
args.metafrom(r).filename(opt.output);
if (opt.osamplesize != OpenZGY::SampleDataType::unknown)
......@@ -814,7 +992,7 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx&
.datatype(OpenZGY::SampleDataType::float32);
if (opt.obricksize[0]>0 && opt.obricksize[1]>0 && opt.obricksize[2]>0)
args.bricksize(opt.obricksize[0], opt.obricksize[1], opt.obricksize[2]);
args.iocontext(&context);
args.iocontext(&wcontext);
// Existing files with integral storage and all samples set to
// the same value need special handling to appear consistent.
......@@ -884,7 +1062,6 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx&
if (!done.is_lock_free())
throw std::runtime_error("Consider using plain int for atomics");
if (opt.verbose >= 2) {
static auto ssize = [](SampleDataType dt)
{
......@@ -926,6 +1103,8 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx&
<< " (" << tsize(r->size()) * ssize(r->datatype()) << " MB)"
<< (opt.randomize ? " (in random order)" : "")
<< "\n";
if (opt.noisefactor)
outstring << "Add 1/" << opt.noisefactor << " noise\n";
std::cerr << outstring.str() << std::flush;
}
......@@ -956,7 +1135,7 @@ copy(const Options& opt, SummaryPrintingTimerEx& rtimer, SummaryPrintingTimerEx&
guard.run([&]()
{
readchunk(r, w, tasklist[task], bs, surveysize,
buf.get(), dt, rtimer);
buf.get(), dt, rtimer, opt.noisefactor);
});
#pragma omp ordered
guard.run([&]()
......
......@@ -50,13 +50,13 @@ def find_center(r, maxsize):
return tuple(cropsize), tuple(cropoffset)
def copy_open_file(r, w, *, progress, offset=(0,0,0), noisefactor=0):
noiselevel = abs(w.datarange[1] - w.datarange[0]) / noisefactor if noisefactor > 0 else None
noiselevel = abs(r.datarange[1] - r.datarange[0]) / noisefactor if noisefactor > 0 else None
for datastart, datasize, data in readall(r, dtype=np.float32, cropsize=w.size, cropoffset=offset, progress=progress):
if noiselevel:
# Note, maybe multiply with a random factor as well,
# if the goal is to test compression when white noise
# is present but where the input used to be integral.
data += (np.random.random_sample(view.shape) - 0.5) * noiselevel
data += (np.random.random_sample(data.shape) - 0.5) * noiselevel
w.write(datastart, data)
def read_and_discard(r, cropsize, *, progress, offset=(0,0,0), noisefactor=0):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment