bulk.cpp 74.7 KB
Newer Older
1
// Copyright 2017-2021, Schlumberger
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/** file: impl/bulk.cpp
 *
 * This file contains a number of classes and free functions dealing with
 * bulk data access. There is a similar file impl.meta that deals with
 * metadata. Nothing in this file should be directly visible to users of
 * the public API.
 *
 *     impl.bulk.ZgyInternalBulk:
 *
 *         * Internal methods to read and write bulk.
 *         * Needs access to much of the meta data information.
 *           Currently this is handled by giving this class references to
 *           the ZgyInternalMeta instance. This gives access to more
 *           metadata than the class actually needs.
 *         * Refactoring notes: There are a couple of sections in this
 *           class that ought to have been separated out. As it is now
 *           the class does too much.
 *
 *             - Write support
 *             - Lod generation
 *             - Histogram generation
 *
 *     Code flow for writing:
 *
Paal Kvamme's avatar
Paal Kvamme committed
39
40
41
42
43
44
45
46
 *     The main logic is a couple of methods where each function might have
 *     ended with calling the next level down. But to make multi threading
 *     easier to implement (and obfuscate the code a bit), _writeOneBrick()
 *     and _writeOneNormalBrick() instead return to the calling
 *     _writeAlignedRegion() with an argument package that should be sent
 *     to the next method in the chain.
 *
 *     The order of calls is:
47
48
49
50
51
52
53
54
55
56
57
58
59
60
 *
 *     writeRegion (arbitrary region, user's value_type)
 *
 *         Apply conversion float -> storage and read/modify/write logic. Keep
 *         track of min and max sample range for the file as a whole.
 *
 *     _writeAlignedRegion (arbitrary aligned region, native vt)
 *
 *         Apply splitting into bricks.
 *         Expand bricks at survey edge to full size unless r/m/w already did.
 *         Convert sample position to brick number. I.e. divide by bricksize.
 *
 *     _writeOneBrick (single brick, native vt)
 *
Paal Kvamme's avatar
Paal Kvamme committed
61
62
63
64
65
66
 *         Might convert from a scalar to a regular buffer or vice versa.
 *         Might decide whether to update an existing brick, and where.
 *         Might decide to veto the compression.
 *         The next step depends on whether the buffer ended up scalar
 *         (_writeOneConstantBrick) or not (_writeOneNormalBrick eventually
 *         followed by _writeWithRetry)
67
68
69
70
71
72
 *
 *     _writeOneNormalBrick
 *
 *         Apply padding, compression, byte swapping, and convert from
 *         DataBuffer to void* + size suitable for the file i/o layer.
 *
73
74
75
76
 *     _writeOneConstantBrick
 *         Apply conversion of the constant value to a lookup table entry,
 *         Pass on the write request to the function that updates the lut.
 *
77
78
 *     _writeWithRetry
 *
Paal Kvamme's avatar
Paal Kvamme committed
79
 *        Allocate space on the bulk file as needed.
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
 *        Handle retries needed if the segment (on the cloud) was closed.
 *        Update the lookup table if a new block was allocated.
 *
 *     Code flow for reading
 *
 *     readToNewBuffer
 *     readToExistingBuffer | readConstantValue
 *     _partsNeeded
 *     _getBrickFilePosition
 *     _deliverOneBrick
 *     _file.xx_readv
 */

#include "enum.h"
#include "types.h"
#include "bulk.h"
#include "meta.h"
#include "file.h"
#include "databuffer.h"
#include "arrayops.h"
#include "structaccess.h"
#include "subtiling.h"
#include "logger.h"
#include "compression.h"
104
#include "environment.h"
105
#include "fancy_timers.h"
106
#include "mtguard.h"
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include "../exception.h"

#include <algorithm>
#include <limits>
#include <cmath>

namespace InternalZGY {
#if 0
}
#endif

using namespace InternalZGY::ArrayOps;
using namespace InternalZGY::Formatters;

121
namespace {
122
123
124
125
126
127
128
129
130
131
  /**
   * For testing only; might be removed. Apps should have no reason to
   * reset this variable because it is unlikely that they have another
   * multi-threaded loop going at the same time.
   */
  static bool enable_compress_mt()
  {
    static int enable = Environment::getNumericEnv("OPENZGY_ENABLE_COMPRESS_MT", 1);
    return enable > 0;
  }
132
133
}

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
struct IJK
{
  std::int64_t i0;
  std::int64_t j0;
  std::int64_t k0;
  std::int64_t ni;
  std::int64_t nj;
  std::int64_t nk;
  std::ostream& dump(std::ostream &os) const {
    os << "start (" << i0 << ", " << j0 << ", " << k0 << ")"
       << " size (" << ni << ", " << nj << ", " << nk << ")";
    return os;
  }
  friend std::ostream& operator<<(std::ostream& os, const IJK& ijk) {
    return ijk.dump(os);
  }
};

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
 * Argument package used by _writeOneBrick(), _writeOneNormalBrick(),
 * _writeOneConstantBrick(). One size fits all. Which is in general
 * not a god idea. But there is a compromise between readablilty
 * and ease of implementing multi-threading.
 *
 * The original implementation processed a single brick at a time.
 * Using this class it becomes reasonably easy to pass around
 * lists of bricks to be handled.
 *
 *   * brickpos, lod: Always set and not modified by layers below.
 *   * data:          Actual bulk data to output. May be changed.
 *   * compressor:    Might be cleared. N/A if brick ends up constant.
 *   * fileoffset:    Only when known. Never known yet in _writeOneBrick(),
 *                    and N/A for constant bricks.
 */
struct WriteBrickArgPack
{
  std::array<std::int64_t,3>  brickpos;
  std::int32_t                lod;
  std::shared_ptr<DataBuffer> data;
Paal Kvamme's avatar
Paal Kvamme committed
173
  compressor_t                compressor; // TODO-Low, caller use std::ref?
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
  //bool                        use_compressor; // or this?
  std::int64_t                fileoffset;
  WriteBrickArgPack(const std::array<std::int64_t,3>& brickpos_in,
                    std::int32_t lod_in,
                    const std::shared_ptr<DataBuffer>& data_in,
                    const compressor_t& compressor_in,
                    std::int64_t fileoffset_in)
    : brickpos(brickpos_in)
    , lod(lod_in)
    , data(data_in)
    , compressor(compressor_in)
    , fileoffset(fileoffset_in)
  {
  }
  std::string toString() const
  {
    std::stringstream ss;
    ss << "pos=" << brickpos
       << ", lod=" << lod
       << ", size=" << data->size3d();
    if (data->isScalar())
      ss << ", scalar=" << data->scalarAsDouble();
    if (compressor)
      ss << ", compressor=*";
    if (fileoffset)
      ss << ", fileoffset=" << std::hex << fileoffset << std::dec;
    return ss.str();
  }
};

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/**
 * Argument package used by _writeWithRetry() which is at a lower level
 * than _writeOneBrick() etc. and it got too awkward to use the
 * one size fits all rule.
 *
 * There are new fields rawdata (replaces data) and brickstatus.
 * Also the compressor is no longer needed as compression is now done.
 */
struct WriteNowArgPack
{
  std::array<std::int64_t,3>  brickpos;
  std::int32_t                lod;
  std::int64_t                fileoffset;
  rawdata_t                   rawdata;
  BrickStatus                 brickstatus;
  WriteNowArgPack(const std::array<std::int64_t,3>& brickpos_in,
                  std::int32_t lod_in,
                  std::int64_t fileoffset_in,
                  const rawdata_t rawdata_in,
                  BrickStatus brickstatus_in)
    : brickpos(brickpos_in)
    , lod(lod_in)
    , fileoffset(fileoffset_in)
    , rawdata(rawdata_in)
    , brickstatus(brickstatus_in)
  {
  }
  std::string toString() const
  {
    std::stringstream ss;
    ss << "pos=" << brickpos
       << ", lod=" << lod
       << ", size=" << rawdata.second;
    if (fileoffset)
      ss << ", fileoffset=" << std::hex << fileoffset << std::dec;
Paal Kvamme's avatar
Paal Kvamme committed
239
    // Should use symbolic names for enums, but this is just for verbose logs.
240
241
242
243
244
    ss << ", brickstatus=" << (int)brickstatus;
    return ss.str();
  }
};

245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
/**
 * TODO-Low might want to fold this into LookupTable::LutInfo.
 *
 * Add position in samples. LookupTable::getBrickFilePosition()
 * cannot easily store this because it only has the brick position
 * and would need to know the brick size to get sample position.
 *
 * Add constvalue after decoding.
 */
struct LutInfoEx : public LookupTable::LutInfo
{
  // Inherited: status, offset_in_file, size_in_file, raw_constant;
  IJK    survey_position;
  double double_constvalue;

  LutInfoEx(const LookupTable::LutInfo& info, const IJK& pos_in, double constvalue_in)
    : LookupTable::LutInfo(info)
    , survey_position(pos_in)
    , double_constvalue(constvalue_in)
  {
  }
};

/**
 * Duplicated between impl/bulk.cpp and impl/meta.cpp but sets
 * different flags.
 *
 * Start a critical section where any exception means that the
 * owner class should be permanently flagged with _is_bad = True.
 * Typically this is used to prevent secondary errors after a
 * write failure that has most likely corrupted the entire file.
 * The exception itself will not be caught.
 *
 * The _is_bad flag normally means that any further attempts
 * to access this class, at least for writing, will raise a
 * ZgyCorruptedFile exception. Regardless of what the exception
 * was that caused the flag to be set.
 *
 * C++ note: Unlike Python it isn't trivial (or portable) to check
 * whether a destructor is being called due to leaving scope normally
 * or due to unwinding an exception. So in the C++ version the code
 * should explicitly call disarm() at the end of the critical section.
287
 *
Paal Kvamme's avatar
Paal Kvamme committed
288
 * Thread safety:
289
290
291
 * The class itself is not thread safe but this should not be an issue,
 * because instances are meant to be short lived. Typically used inside
 * a method and not acceesible outside.
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
 */
class ZgyInternalBulk::ErrorsWillCorruptFile
{
  ZgyInternalBulk *_owner;
public:
  explicit ErrorsWillCorruptFile(ZgyInternalBulk* owner) : _owner(owner)
  {
  }
  ~ErrorsWillCorruptFile()
  {
    if (_owner)
      _owner->set_errorflag(true);
    _owner = nullptr;
  }
  void disarm()
  {
    _owner = nullptr;
  }
};

312
313
/**
 * ZgyInternalBulk is used both from ZgyReader, which is in general not
Paal Kvamme's avatar
Paal Kvamme committed
314
 * allowed to change any state to help make it thread safe, and from the
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
 * non threadsafe ZgyWriter.
 *
 * To mitigate the risk of accidentally modifying data using a ZgyReader
 * there is both a const and a mutable pointer to the underlying
 * ZgyInternalMeta. When instanciated from a ZgyReader the mutable pointer
 * will be empty. When instanciated from a ZgyWriter the two pointers
 * will be identical. This mitigation by itself will cause a null pointer
 * exception if trying to modify data that shouldn't be change.
 * This of course assumes that ZgyInternalMeta is const-correct so that
 * no state can be changed by using a const pointer to a ZgyInternalMeta
 * instance.
 *
 * An additional mitigation is to not use the _metadata_rw directly but
 * instead call a _get_metadata_rw() method. If ZgyInternalBulk is
 * const-correct then you get a compile time error if trying to call
 * _get_metadata_rw() from inside a method declared as const. If not,
 * _get_metadata_rw() is still preferable because it can raise a proper
 * error message instead of the null pointer exception.
 */
334
335
ZgyInternalBulk::ZgyInternalBulk(
    const std::shared_ptr<FileADT>& file,
336
337
    const std::shared_ptr<const ZgyInternalMeta>& metadata,
    const std::shared_ptr<ZgyInternalMeta>& metadata_rw,
338
339
340
341
    bool compressed_write,
    const LoggerFn& logger)
  : _file(file)
  , _metadata(metadata)
342
  , _metadata_rw(metadata_rw)
343
344
345
346
347
348
  , _update_mode(compressed_write? UpdateMode::Constant : UpdateMode::Always)
  , _compressed_write(compressed_write)
  , _is_bad(false)
  , _written_sample_min(std::numeric_limits<double>::infinity())
  , _written_sample_max(-std::numeric_limits<double>::infinity())
  , _loggerfn(logger ? logger : LoggerBase::standardCallback(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"), "openzgy-bulk:", ""))
349
350
351
  , _ptimer_st(new SummaryPrintingTimerEx("writeAligned[S]"))
  , _ptimer_mt(new SummaryPrintingTimerEx("writeAligned[M]"))
  , _ststimer(new SummaryPrintingTimerEx("scaleToStorage"))
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
{
}

/**
 * \brief Get hint about all constant region.
 *
 * Check to see if the specified region is known to have all samples
 * set to the same value. Returns a pair of (is_const, const_value).
 *
 * The function only makes inexpensive checks so it might return
 * is_const=false even if the region was in fact constant. It will not
 * make the opposite mistake. This method is only intended as a hint
 * to improve performance.
 *
 * For int8 and int16 files the caller may specify whether to scale
 * the values or not. Even if unscaled the function returns the value
 * as a double.
 */
std::pair<bool,double>
ZgyInternalBulk::readConstantValue(
      const std::array<std::int64_t,3>& start,
      const std::array<std::int64_t,3>& size,
374
      int32_t lod, bool as_float) const
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
{
  _validateUserPosition(start, size, lod);
  const double defaultstorage = this->_metadata->ih().defaultstorage();
  std::vector<LutInfoEx> bricklist = _partsNeeded(start, size, lod);
  const double nan = std::numeric_limits<double>::quiet_NaN();
  double result = nan;
  bool first = true;
  for (const LutInfoEx& brick : bricklist) {
    if (_logger(5))
      _logger(5, std::stringstream()
              << "brick " << brick.survey_position
              << " " << (int)brick.status << "\n");
    switch (brick.status) {
    case BrickStatus::Constant:
      // Two NaN values should compare equal in this test.
      if (!first && result != brick.double_constvalue &&
          !(std::isnan(result) && std::isnan(brick.double_constvalue)))
        return std::make_pair(false,nan);
      result = brick.double_constvalue;
      break;
    case BrickStatus::Missing:
      if (!first && result != defaultstorage)
        return std::make_pair(false,nan);
      result = defaultstorage;
      break;
    default:
      return std::make_pair(false,nan);
    }
    first = false;
  }
  if (as_float && std::isfinite(result)) {
    result = (result * this->_metadata->ih().storagetofloat_slope() +
              this->_metadata->ih().storagetofloat_intercept());
  }
  return std::make_pair(true,result);
}

/**
 * Read bulk data starting at "start" in index space and store the
 * result in the provided DataBuffer. Start should be in the range
 * (0,0,0) to Size-1. The count of samples to read is implied by the
 * size of the DataBuffer that is passed in. The valid data types for
 * the result are float32 (in which case samples stored as int8 or
 * int16 will be scaled) or the files's storage value type (in which
 * case there is no scaling). It is valid to pass a size that
 * includes the padding area between the survey and the end of the
 * current brick. But not past that point.
 */
void
ZgyInternalBulk::readToExistingBuffer(
      const std::shared_ptr<DataBuffer>& result,
      const std::array<std::int64_t,3>& start,
427
      int32_t lod, bool as_float) const
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
{
  _validateUserPosition(start, result->size3d(), lod);
  RawDataType expected_dtype = as_float ? RawDataType::Float32 : _metadata->ih().datatype();
  // When called from ZgyReader.read(), the check on expected_dtype represents
  // a user error trying to convert to an intergral type while the others
  // should not be possible to trigger.
  if (!result || result->isScalar() || !result->voidData())
    throw OpenZGY::Errors::ZgyInternalError("Reading to missing or wrong buffer type.");
  if (result->datatype() != expected_dtype)
    throw OpenZGY::Errors::ZgyUserError("Requested data type not supported for this file.");

  // Need a default value to use when trying to read a brick that
  // was never written, or to fill in a brick that was only partly
  // written. To avoid non intuitive behavior the same value should
  // be used for both cases, and it should be a value that would
  // also be allowed to store as a regular sample. Use the value
  // that becomes 0.0 after conversion to float. Or as close as
  // possible to that value if the coding range is not zero centric.
  // Always use zero for floating point data. (Not NaN...)
  // Dead traces don't have any special handling apart from the
  // alpha flag. They still contain whatever data was written to them.

  const double defaultstorage = this->_metadata->ih().defaultstorage();
  const double defaultvalue = this->_metadata->ih().defaultvalue();

  // Make a separate pass to gather all the bricks we need to read.
Paal Kvamme's avatar
Paal Kvamme committed
454
  // The lower levels might fetch some of them in parallel and we might be
455
456
457
458
459
460
461
462
463
464
  // able to combine bricks to read larger blocks at a time. Both changes
  // can have a dramatic performance impact effect on cloud access.

  std::vector<LutInfoEx> bricklist = _partsNeeded(start, result->size3d(), lod);

  // After all bricks have been processed, the padding past the
  // end if the survey might still not have been touched. Just in
  // case the request did in fact include such samples we will
  // initialize the entire result buffer to the default value.
  // TODO-Performance can I refactor the code so this isn't needed?
465
  // Or can I make it fill only the actual padding area?
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
  result->fill(as_float ? defaultvalue : defaultstorage);

  std::vector<ReadRequest> requests;
  for (const LutInfoEx& brick : bricklist) {
    // Functor that accepts a raw void* data + size and copies it into the
    // correct place in result. result, start, and as_float come straight
    // from user code. b.position and b.status vary per brick which is why
    // I copy them to make sure the up to date contents are captured. raw and
    // rawsize (the arguments to the lambda) is what was read from storage.

    const std::array<std::int64_t,3> bpos
      {brick.survey_position.i0,
       brick.survey_position.j0,
       brick.survey_position.k0};
    const BrickStatus bstatus = brick.status;
481
    auto deliverance = [this,result,start,as_float,bpos,bstatus](ReadRequest::data_t raw, std::int64_t rawsize) {
482
483
484
485
486
487
488
489
490
491
                         this->_deliverOneBrick
                           (result, start, bpos, raw, rawsize,
                            bstatus, as_float);
                       };
    switch (brick.status) {
    case BrickStatus::Missing:
      if (_logger(2))
        _logger(2, std::stringstream()
                << "  Reading brick at " << brick.survey_position
                << " not found, use " << defaultstorage << "\n");
492
      deliverance(std::make_shared<double>(defaultstorage), sizeof(double));
493
494
495
496
497
498
499
      break;

    case BrickStatus::Constant:
      if (_logger(2))
        _logger(2, std::stringstream()
                << "  Reading constant brick at "
                << brick.survey_position << "\n");
500
      deliverance(std::make_shared<double>(brick.double_constvalue), sizeof(double));
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
      break;

    case BrickStatus::Normal:
      if (_logger(2))
        _logger(2, std::stringstream()
                << "  Reading brick at " << brick.survey_position
                << " from file offset " << std::hex
                << (std::intptr_t)brick.offset_in_file << std::dec << "\n");
      requests.push_back(ReadRequest{brick.offset_in_file, this->_metadata->ih().bytesperbrick(), deliverance});
      break;

    case BrickStatus::Compressed:
      if (_logger(2))
        _logger(2, std::stringstream()
                << "  Reading compressed brick at " << brick.survey_position
                << " from file offset " << std::hex
                << (std::intptr_t)brick.offset_in_file << std::dec
                << " size " << brick.size_in_file << "\n");
      // TODO-Worry obscure corner case, might need to re-try if we didn't
      // get enough data. I try to prevent this with the rule that
      // no compressed brick may be larger than the uncompressed version.
      requests.push_back(ReadRequest{brick.offset_in_file, brick.size_in_file, deliverance});
      break;

    default:
      throw OpenZGY::Errors::ZgyInternalError("Internal error, bad brick status");
    }
  }
    if (_logger(2))
      _logger(2, std::stringstream()
              << requests.size() << " read requests are queued\n");
    if (!requests.empty())
533
      this->_file->xx_readv(requests, true, false, true, UsageHint::Data);
534

535
  // Note-Performance: If passing true in the second arguent above this
536
537
538
539
540
541
542
  // could help performance a lot. Especially for reading compressed files
  // where the user sends large requests without multithreading. Also
  // when finalizing compressed files. parallel_ok=true will cause the
  // decompression step to be multi-threaded. Also the conversion to
  // float (if needed) and the copy-out to the applicaton's buffer will
  // be multi-threaded. But there are caveats:
  //
543
  // * _deliverOneBrick() must be thread safe.
544
545
  //
  // * The cloud backend doesn't honor the parallel_ok argument.
546
547
  //   While this would be a very good idea it is also rather difficult
  //   to implement.
548
549
550
551
552
553
554
555
556
557
558
559
  //
  // * There is a risk of creating too many threads if the application
  //   is doing its own multi threading. Ideally the user should
  //   be able to configure this.
  //
  // * Ditto for the low resolution generator. It can probably speed
  //   up by reading with 4 (but only 4) threads. So this isn't as
  //   efficient as setting parallel_ok=true here with respect to
  //   speeding up compression. But it might help I/O against the
  //   cloud. Which the setting here won't.
  //
  // * See commants in LocalFileLinux::xx_readv() and _deliverOneBrick().
560
  //   And GenLodImpl::_calculate().
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
}

/**
 * Read bulk data starting at "start" in index space size "size".
 * Return the result in a newly allocated DataBuffer. The result will
 * either be a scalar (constant-value) buffer or a regular buffer.
 *
 * Pass check_constant=true to check extra hard for all-constant data.
 * A region written with all samples identical, as opposed to a region
 * flagged as constant without taking up space, will also be detected.
 *
 * Start should be in the range (0,0,0) to Size-1. It is valid to pass
 * a size that includes the padding area between the survey and the
 * end of the current brick. But not past that point.
 */
std::shared_ptr<DataBuffer>
ZgyInternalBulk::readToNewBuffer(
    const std::array<std::int64_t,3>& start,
    const std::array<std::int64_t,3>& size,
580
    int32_t lod, bool as_float, bool check_constant) const
581
582
583
584
585
586
587
588
{
  _validateUserPosition(start, size, lod);
  std::shared_ptr<DataBuffer> result;
  RawDataType dtype = (as_float ?
                       RawDataType::Float32 :
                       this->_metadata->ih().datatype());
  std::pair<bool,double> cvalue = this->readConstantValue(start, size, lod, as_float);
  if (cvalue.first) {
589
    result = DataBuffer::makeScalarBuffer3d(cvalue.second, size, dtype);
590
591
592
593
594
595
  }
  else {
    result = DataBuffer::makeDataBuffer3d(nullptr, 0, size, dtype);
    this->readToExistingBuffer(result, start, lod, as_float);
    if (check_constant && result->isAllSame(result->size3d().data())) {
      double scalar = result->scalarAsDouble(); // returns the first value
596
      result = DataBuffer::makeScalarBuffer3d(scalar, size, dtype);
597
598
599
600
601
    }
  }
  return result;
}

602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
/**
 * Mitigate the problem that ZgyInternalBulk is used both for reading
 * and writing, making it difficult to keep track of thread safety.
 * The function below forces the code to be more explicit read vs write.
 *
 * See ZgyInternalBulk::ZgyInternalBulk for more details.
 */
std::shared_ptr<ZgyInternalMeta>
ZgyInternalBulk::_get_metadata_rw()
{
  if (!_metadata_rw)
    throw OpenZGY::Errors::ZgyUserError("ZGY file not open for write");
  return _metadata_rw;
}

617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
/**
 * Convenience for invoking _loggerfn with a simple message.
 * This isn't much different from invoking the callback directly.
 * But it makes debugging slightly simpler to have an easy place
 * to set a breakpoint. It also adds more symmetry with respect
 * to the stringstream version, which does add value.
 */
bool
ZgyInternalBulk::_logger(int priority, const std::string& message) const
{
  return _loggerfn(priority, message);
}

/**
 * Convenience for invoking _loggerfn with a stringstream.
 * Due to a somewhat naughty cast, the function can be caller as:
 *
 *   if(_logger(pr1))
 *    _logger(pri, std::stringstream() << some << data << here);
 *
 * The first line is optional. It just prevents the expression in
 * the second line from being evaluatet if debugging is disabled.
 */
bool
ZgyInternalBulk::_logger(int priority, const std::ios& ss) const
{
  auto sstream = dynamic_cast<const std::stringstream*>(&ss);
  return _logger(priority, sstream ? sstream->str() : std::string());
}

/**
 * Check that i,j,k,lod are all inside valid bounds. Throw if not.
 * If used to validate an alpha tile then k should be passed as 0.
 * This is similar to LookupTable::_validatePosition() but checks
 * the entire region, not just one brick at a time. And the input
 * is in trace numbers not brick numbers.
 *
 * The exception message is currently rather verbose.
 * I might want to shorten it a bit; skipping the actual position.
 *
 * Note on test coverage: For defensive coding these checks are done
 * both here at a high level and in lookuptable. This hurts coverage
 * since it might not be possible to get those other checks to fail.
 * And some errore may be caught even earlier e.g. if trying to make
 * a DataBuffer with negative size.
 */
void
ZgyInternalBulk::_validateUserPosition(
    const std::array<std::int64_t,3>& start,
    const std::array<std::int64_t,3>& size,
667
    int32_t lod) const
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
{
  std::string error;
  const IInfoHeaderAccess& ih = this->_metadata->ih();
  const std::array<std::int64_t,3> one{1,1,1} ;
  const std::array<std::int64_t,3> bs  = this->_metadata->ih().bricksize();
  const std::array<std::int64_t,3> end = ((start + size + bs - one) / bs) * bs;
  const std::int64_t nlods = static_cast<std::int64_t>(ih.lodsizes().size());
  if (lod < 0 || lod >= nlods) {
    std::stringstream ss;
    ss << "Requested lod " << lod <<
      " is outside the valid range 0 to " << nlods << " inclusive";
    _logger(1, ss.str() + "\n");
    throw OpenZGY::Errors::ZgyUserError(ss.str());
  }
  const std::array<std::int64_t,3>& ssize = ih.lodsizes()[lod] * bs;
  if (start[0] < 0 || end[0] > ssize[0] || size[0] <= 0 ||
      start[1] < 0 || end[1] > ssize[1] || size[1] <= 0 ||
      start[2] < 0 || end[2] > ssize[2] || size[2] <= 0) {
    std::stringstream ss;
    ss << "Requested region"
       << " from (" << start[0] << ", " << start[1] << ", " << start[2] << ")"
       << " to (" << end[0] << ", " << end[1] << ", " << end[2] << ")"
       << " lod " << lod
       << " is empty or outside the valid range"
       << " (0, 0, 0)"
       << " to " << ssize[0] << ", " << ssize[1] << ", " << ssize[2] << ")"
       ;
    _logger(1, ss.str() + "\n");
    throw OpenZGY::Errors::ZgyUserError(ss.str());
  }
}

/**
 * Scale integral data from storage according to the coding range.
 * The operation is a no-op if file_dtype is float. The data must
 * be known to be in storage domain, we don't try to guess based on
 * the DataBuffer's valuetype.
 */
std::shared_ptr<DataBuffer>
ZgyInternalBulk::_scaleDataToFloat(
708
      const std::shared_ptr<DataBuffer>& data) const
709
710
711
712
713
714
715
716
717
718
{
  // The data buffer type should match file_dtype but I won't bother to check.
  // Also, if result ends up empty this means no conversion is needed.
  std::array<double,2> factors = this->_metadata->ih().storagetofloat();
  std::shared_ptr<DataBuffer> result = data->scaleToFloat(factors);
  return result ? result : data;
}

std::shared_ptr<DataBuffer>
ZgyInternalBulk::_scaleDataToStorage(
719
      const std::shared_ptr<DataBuffer>& data) const
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
{
  std::array<double,2> factors = this->_metadata->ih().storagetofloat();
  std::shared_ptr<DataBuffer> result =
    data->scaleToStorage(factors, _metadata->ih().datatype());
  return result ? result : data;
}

namespace {
  template<typename T> double _decodeConstantT(std::uint32_t in)
  {
    if (in == 1) // ZGY v1 could only store constant-zero.
      return 0;
    T value;
    byteswapT(&in);
    memcpy(&value, &in, sizeof(T));
    byteswapT(&value);
    return static_cast<double>(value);
  }

  template<typename T> std::uint32_t _encodeConstantT(double in)
  {
    if (std::numeric_limits<T>::is_integer)
      in = std::max((double)std::numeric_limits<T>::lowest(),
                    std::min((double)std::numeric_limits<T>::max(),
                             std::round(in)));
    T value = static_cast<T>(in);
    std::uint32_t result{0};
    byteswapT(&value);
    memcpy(&result, &value, sizeof(T));
    byteswapT(&result);
    return result;
  }
}

/**
 * Decode the constant brick value from the re-purposed file offset
 * stored in the lookup table in the ZGY file. Return as a double,
 * which should have enough precision to precisely represent all
 * supported value types.
 *
 * In ZGY v2 and later the actual constant value is stored on the file
 * as little-endian in the first 1, 2, or 4 bytes in the 64-bit slot
 * that normally holds the file offset. The v1 format does not store
 * arbitrary constants. If it had, it would be subject to the swapping
 * of low and high 32-bit words.
 *
 * Interpreting that value in C++ safely and portably is tricky.
 *
 * The input to this method has already been byteswapped to machine
 * byte order and contains the least significant 32 bits of the stored
 * file offset. So the correct handling is to mask out the least
 * significant bytes (however many we need) and return that. It may be
 * simpler to temporarily byte swap back to little endian so the code
 * can copy "first N bytes" instead. Because, memcpy seems to be the
 * safest approach to avoid casts that are technically illegal.
 *
 * The same reasoning applies to the inverse _encodeConstantT().
 * The returned result will be byteswapped before being written out,
 * so the constant needs to be stored in the least significant part
 * of the result and not the first N bytes of the result.
 *
 * TODO-Low: Idea: Could I convert a 0x80xxx pointer to the pointer's address
 * and set the size to one sample? This might actually work...
 * The "1" pointer from the v1 format might even be treated the same way;
 * the upper 23 bits of the pointer will be all zero so we could
 * point to that. Just be careful to not overwrite it. And caller
 * may need the not-converted version to test whether this is a constant.
 * Or can it simply test for size == 1 sample?
 *
 * TODO-Low: Idea: passing the pointer to the first byte might actually
 * be easier to deal with.
 */
/*static*/ double
ZgyInternalBulk::_decodeConstant(std::uint32_t in, RawDataType dtype)
{
  switch (dtype) {
  case RawDataType::SignedInt8:    return _decodeConstantT<std::int8_t>(in);
  case RawDataType::UnsignedInt8:  return _decodeConstantT<std::uint8_t>(in);
  case RawDataType::SignedInt16:   return _decodeConstantT<std::int16_t>(in);
  case RawDataType::UnsignedInt16: return _decodeConstantT<std::uint16_t>(in);
  case RawDataType::SignedInt32:   return _decodeConstantT<std::int32_t>(in);
  case RawDataType::UnsignedInt32: return _decodeConstantT<std::uint32_t>(in);
  case RawDataType::Float32:       return _decodeConstantT<float>(in);
  case RawDataType::IbmFloat32:
  default: throw OpenZGY::Errors::ZgyInternalError("Unrecognized type enum");
  }
}

/**
 * Inverse of _decodeConstant(). See that function for details.
 */
/*static*/ std::int32_t
ZgyInternalBulk::_encodeConstant(double in, RawDataType dtype)
{
  switch (dtype) {
  case RawDataType::SignedInt8:    return _encodeConstantT<std::int8_t>(in);
  case RawDataType::UnsignedInt8:  return _encodeConstantT<std::uint8_t>(in);
  case RawDataType::SignedInt16:   return _encodeConstantT<std::int16_t>(in);
  case RawDataType::UnsignedInt16: return _encodeConstantT<std::uint16_t>(in);
  case RawDataType::SignedInt32:   return _encodeConstantT<std::int32_t>(in);
  case RawDataType::UnsignedInt32: return _encodeConstantT<std::uint32_t>(in);
  case RawDataType::Float32:       return _encodeConstantT<float>(in);
  case RawDataType::IbmFloat32:
  default: throw OpenZGY::Errors::ZgyInternalError("Unrecognized type enum");
  }
}

/**
 * Return the list of bricks needed to cover the entire area given by
 * start and size. The resulting list gives the sample position
 * relative to the start of the survey. To get the brick position you
 * need to divide by brick size. Also compute file offsets etc. from
 * the lookup tables. This will indirectly validate start and size.

 * Implementation note: Don't collect the brick list in one loop and
 * then compute lookup info later. If requested size is ridiculously
 * large this may cause us to run out of memory before getting to the
 * validation stage.
 */
std::vector<LutInfoEx>
ZgyInternalBulk::_partsNeeded(
      const std::array<std::int64_t,3>& start,
      const std::array<std::int64_t,3>& size,
843
      int32_t lod) const
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
{
  std::vector<LutInfoEx> bricklist;
  const IInfoHeaderAccess& ih = this->_metadata->ih();
  const std::array<std::int64_t,3> bs = this->_metadata->ih().bricksize();
  const ILookupTableAccess& blup = this->_metadata->blup();
  const std::array<std::int64_t,3>
    brick0{(start[0] / bs[0]) * bs[0],
           (start[1] / bs[1]) * bs[1],
           (start[2] / bs[2]) * bs[2]};
  const std::array<std::int64_t,3>
    brickN{((start[0] + size[0] - 1) / bs[0]) * bs[0],
           ((start[1] + size[1] - 1) / bs[1]) * bs[1],
           ((start[2] + size[2] - 1) / bs[2]) * bs[2]};
  std::array<std::int64_t,3> iter;
  for (iter[0] = brick0[0]; iter[0] <= brickN[0]; iter[0] += bs[0]) {
    for (iter[1] = brick0[1]; iter[1] <= brickN[1]; iter[1] += bs[1]) {
      for (iter[2] = brick0[2]; iter[2] <= brickN[2]; iter[2] += bs[2]) {
        IJK pos{iter[0], iter[1], iter[2], bs[0], bs[1], bs[2]};
        LookupTable::LutInfo info = LookupTable::getBrickFilePosition
          (pos.i0/pos.ni, pos.j0/pos.nj, pos.k0/pos.nk, lod,
           /**/ih.lodsizes(), ih.brickoffsets(), blup.lup(), blup.lupend(),
           ih.bytesperbrick());
        bricklist.push_back
          (LutInfoEx(info, pos,
                     _decodeConstant(info.raw_constant, ih.datatype())));
      }
    }
  }
  return bricklist;
}

/**
 * This is the final step in readToExistingBuffer(). The data has
 * been read from storage, so now it needs to be copied back to
 * the user. This function may be invoked multiple times if data
 * was needed from more than one brick.
 *
 * Arguments:
 *    result   -- user's buffer which was passed to readToExistingBuffer().
 *    start    -- user's supplied position as a 3-tuple of index values.
 *    as_float -- user's choice about converting data.
 *    startpos -- position of the start of this brick.
 *    raw      -- bulk data for this brick as it exists on file.
 *    rawsize  -- size in BYTES of "raw".
 *    brickstatus -- normal, compressed, constant, ...
 *
 * This low level function technically deals with raw bytes as input and
 * and the higher level DataBuffer as output.
 *
 * The function will always be called to deliver one full brick. If the raw
 * data only contains a single sample then this is a constant-value brick.
 * otherwise there should be exactly bricksize-in-bytes available.
896
 * After any decompression has been done.
897
898
899
900
901
902
903
904
905
906
907
908
909
 * If the brick size is (1,1,1) (not even possible right now) then the
 * distinction between constant-value and regular goes away.
 *
 * TODO-Worry: beware this kludge: If passing sizeof(double) data then this also
 * represents a constant-value brick where the constant needs to be cast
 * to the correct type. There are some highly theoretical cases where this
 * becomes ambiguous.
 *
 * Caveat: If the data pointer is not correctly aligned to the type of data
 * then the returned DataBuffer will also have a misaligned pointer.
 * Another caveat: Even though result is reference counted, its buffer
 * might end up set to "raw" which means it must not be used after the
 * delivery function returns. Mitigated by adding a clear() member to
910
 * DataBuffer. That doesn't help if somebody already has a raw pointer though.
911
 *
912
 * Thread safety: Yes, but TODO-Worry this is not trivial to verify.
913
914
 *
 * Multithreading by having multiple read requests from the API layer is
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
 * safe. Those requests don't share any mutable data. Specifically,
 * "result" will be different for each read request.
 *
 * Multithreading by having the low level xx_readv() deliver the
 * requested bricks in parallel is also supposed to be thread safe.
 * The result parameter is shared between threads but different
 * threads will be writing to different parts of the buffer due to
 * "bpos" being different. (Note 1,2). The raw pointer might also be
 * referring to shared data. E.g. a cache. (Note 3). But since the
 * data will be destined for different parts of the result buffer it
 * shouldn't be possible for the same bytes to be sent to different
 * threads.
 *
 * Note 1:
 *
 * No two threads should be trying to write to the same location in
 * result. But there are no guarantees of alignment. E.g, thread#1
 * might write the first 7 bytes, thread#2 the next 7 bytes. The
 * assumption is that if the hardware needs to do a read/modify/write
 * due to the memory bus or cache line being wider than 8 bits (which
 * they definitely are) then the hardware or the compiler is
 * responsible for preventing a race condition. For C++11 this appears
 * to be the case. C++11 1.7 [intro.memory]/p2, irrelevant note
 * omitted:
 *
 *    A memory location is either an object of scalar type or a
 *    maximal sequence of adjacent bit-fields all having non-zero
 *    width. Two or more threads of execution (1.10) can update
 *    and access separate memory locations without interfering
 *    with each other.
 *
 * Note (*2):
 *
 * In the lower layers there are situations where the same block can
 * be requested twice in a single request. The reason for that is that
 * the caller needs two regions in the file and padding causes the two
 * to overlap. Once we get here the padding shouldn't be visible.
 *
 * Note (*3):
 *
 * Besides, if xx_readv() was called with immutable_ok=true then there
 * will be no race condition because "raw" will only be read from. And
 * if immutable_ok=false e.g. because of byteswapping then "raw" needs
 * to be a private copy of the data anyway.
959
960
961
962
963
964
 */
void
ZgyInternalBulk::_deliverOneBrick(
      const std::shared_ptr<DataBuffer>& result,
      const std::array<std::int64_t,3>& start,
      const std::array<std::int64_t,3>& startpos,
965
      const ReadRequest::data_t& raw, std::int64_t rawsize,
966
      BrickStatus brickstatus, bool as_float) const
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
{
  if (_logger(5))
    _logger(5, std::stringstream()
            << "  Special delivery! type " << (int)brickstatus
            << " user buffer start "
            <<"(" << start[0] << ", " << start[1] << ", " << start[2] << ")"
            << " this brick start "
            <<"(" << startpos[0] <<
            ", " << startpos[1]
            << ", " << startpos[2] << ")"
            << " got " << rawsize << " bytes.\n");

  const RawDataType dtype = _metadata->ih().datatype();
  const std::array<std::int64_t,3> bricksize = _metadata->ih().bricksize();

  // TODO-Low const-correctness? Might not be worth the trouble.
  std::shared_ptr<DataBuffer> data;

  // TODO-Low this might not be the right place.
  // Note that if the file contained compressed integral data
  // and the application asked for float data back, the decompressor
  // would be responsible for the int to float conversion but NOT the
  // scaling from storage values to real values. Currently this is
  // N/A becase compressed files are always float. There is almost
  // nothing to gain by compressing integral data.
  // The one case I can think of is lossless compression of
  // discrete data such as classification. Not supported but might
  // be supported in the future. In that case retrieving float
  // does not make sense and must be disallowed.

  // TODO-Low, to make it clear what is going on I should probaby
  // rename Compressed to CompressedFloat.

  switch (brickstatus) {
For faster browsing, not all history is shown. View entire blame