file_sd.cpp 90.9 KB
Newer Older
1
// Copyright 2017-2021, Schlumberger
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_sd.h"

#ifdef HAVE_SD // Rest of file

#include "logger.h"
20
#include "file_consolidate.h"
21
#include "file_performance.h"
22
#include "file_parallelizer.h"
23
#include "fancy_timers.h"
24
#include "mtguard.h"
25
26
#include "../exception.h"
#include "../iocontext.h"
27
#include "environment.h"
28
29
30
31
32
33
34
35
36
37
38

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <iostream>
#include <sstream>
#include <limits>
#include <string.h>
#include <algorithm>
39
#include <numeric>
40
#include <mutex>
41
#include <omp.h>
42

Paal Kvamme's avatar
Paal Kvamme committed
43
44
45
// It would have been nice to have similar include paths in Linux and Windows
// but that is a cosmetic issue only and it is only a problem in this file.
#ifndef _WIN32
46
47
48
49
#include <SDManager.h>
#include <SDGenericDataset.h>
#include <SDUtils.h>
#include <Constants.h>
50
#include <HttpContext.h>
51
52
53
54
55
#else
#include <SDAPI/SDManager.h>
#include <SDAPI/SDGenericDataset.h>
#include <SDAPI/SDUtils.h>
#include <SDAPI/Constants.h>
56
#include <SDAPI/HttpContext.h>
57
58
#endif

59
60
61
62
63
64
65
66
67
68
69
// SDAPI footprint, might not be accurate.
// Use e.g. to look for places where a try/catch might be needed.
//
// SDGenericDataset constructors, destructor
// \b(open|close|readBlock|writeBlock|deleteBlock|getBlockNum|getBlocksSize|getSize|getConsistencyID|getSerializedContext)\(
// Not used yet in C++, the first two supported in Python+SdGlue.
// \b((get|set)(MetaData|SeismicMeta|Tags|ReadOnlyMode))\(
//
// SDManager constructors, destructor
// \b(setAuthProviderFrom(String|File|ImpToken)|setLogStatus)\(

70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/** \cond SSTORE */

/**
 * \file: file_sd.cpp
 * \brief Low level I/O, Seismic Store.
 */

// IByteIO::Disposition -> OpenMode
//OpenMode:    { Closed = 0, ReadOnly, ReadWrite, Truncate, Delete, };
//Disposition: { Create, Read, Update, Delete, Access, List };

// Plans for utilizing code from byteio_sd.cpp in ZGY-Cloud.
//
// cherry pick code from here for read and write.
//     ByteIoSeismicDrive::closeWrite()
//
// bells and whistles for configuration.
//     slurp()
//     expandArgument()
//
// Not now. Don't keep a long lived cache.
//     class SDGenericDatasetCache
//
// Worry: CTag / ETag handling? Not now
//     ByteIoSeismicDrive::readETag()
//
// Might need soon.
//     checkAccess()
//
// Utilities, might need later.
// Need a way for API users to access these directly
//     getTag()
//     setTag()
//     getMetaData()
//     setMetaData()
//     getSeisMeta()
//     setSeisMeta()
//
// maybe later, for tokentype and for error messages.
// TODO-Low: if imp token is refreshed then keep those in a cache.
// Might need support from SDAPI for that.
//     parseJsonString()
//     decodeJwt()
//     getTokenType()

using OpenZGY::IOContext;
116
using OpenZGY::SeismicStoreIOContext;
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
namespace InternalZGY {
#if 0
}
#endif

class SDGenericDatasetWrapper;
class DatasetInformation;

/**
 * Access data in seismic store as a linear file even when the dataset
 * has multiple segments. There are some limitations on write.
 *
 *   - Writes starting at EOF are allowed, and will cause a new segment
 *     to be written.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes starting before EOF are only allowed if offset,size exactly
 *     matches a previous write. This will cause that segment to be rewritten.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *
 * For read the class provides a readv() method to do scatter/gather reads.
 * The code will then consolidate adjacent bricks to get larger brick size
 * sent to SDAPI. Optionally parallelize requests that cannot be consolidated.
144
 *
145
146
147
148
149
150
 * Thread safety when used for reading:
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
 *
 * Thread safety when used for writing:
151
 * Not thread safe. See SeismicStoreFile::xx_write.
152
153
154
 *
 * Thread safety when closing a file:
 * Not thread safe.
155
156
157
158
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
159
160
161
 */
class SeismicStoreFile : public FileUtilsSeismicStore
{
162
163
  SeismicStoreFile(const SeismicStoreFile&) = delete;
  SeismicStoreFile& operator=(const SeismicStoreFile&) = delete;
164
165
166
167
168
169
170
171
172
173
174
175
public:
  typedef std::function<bool(int, const std::string&)> LoggerFn;
public:
  SeismicStoreFile(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFile();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  // Functions from FileADT
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close();
  virtual std::int64_t xx_eof() const;
176
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
177
178
179
  virtual bool xx_iscloud() const override;
  // Functions from FileUtilsSeismicStore
  virtual void deleteFile(const std::string& filename, bool missing_ok) const;
180
  virtual std::string altUrl(const std::string& filename) const;
181
182
183
private:
  void do_write_one(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const bool overwrite);
  void do_write_many(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const std::int64_t blobsize, const bool overwrite);
184
public:
185
186
  // Needed by SeismicStoreFileDelayedWrite.
  OpenMode _mode() const;
187
188
189
  // The raw SDGenericDataset is needed by SeismicStoreFileDelayedWrite
  // when opening a file for update.
  std::shared_ptr<SDGenericDatasetWrapper> datasetwrapper() const {return _dataset;}
190
191
  bool _mylogger(int priority, const std::string& message) const;
  bool _sslogger(int priority, const std::ios& ss) const;
192
private:
193
194
195
  /**
   * This class is used by _split_by_segment to describe a request as seen by
   * seismic store.
196
197
198
199
200
   *
   * Thread safety:
   * Modification may lead to a data race. This should not be an issue,
   * because instances are only meant to be modified when created or
   * copied or assigned prior to being made available to others.
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
   */
  class RawRequest
  {
  public:
    std::int64_t blocknum;	// Seismic store segment a.k.a. block
    std::int64_t local_offset;	// File offset inside this blocknum
    std::int64_t local_size;	// How much to read from this block
    std::int64_t outpos;
    RawRequest(std::int64_t a_blocknum,
               std::int64_t a_offset,
               std::int64_t a_size,
               std::int64_t a_outpos)
      : blocknum(a_blocknum)
      , local_offset(a_offset)
      , local_size(a_size)
      , outpos(a_outpos)
    {
    }
  };
  typedef std::vector<RawRequest> RawList;
  RawList _split_by_segment(const ReadList& requests);
Paal Kvamme's avatar
Paal Kvamme committed
222
  void _cached_read(/*TODO-Low: seg, offset, view*/);
223
private:
224
225
226
  // TODO-Low: To improve isolation, the user visible context should
  // be copied into an equivalent InternalZGY::SeismicStoreConfig.
  // The downside is that it gets more tedious to maintain.
227
228
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SDGenericDatasetWrapper> _dataset;
229
  LoggerFn _logger;
230
231
232
  // As long as we don't inherit FileCommon we need our own timers.
  std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
  std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
};

/**
 * Improve on SeismicStoreFile, have it buffer large chunks of data before
 * writing it out to a new segment.
 *
 *   - Writes starting at EOF are allowed, and will buffer data in the
 *     "open segment" until explicitly flushed.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes fully inside the open segment are allowed.
 *
 *   - Writes starting before the open segment are only allowed if
 *     offset,size exactly matches a previous write. This will cause that
 *     segment to be rewritten. As a corollary, writes canot span the
 *     closed segment / open segment boundary.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *     Why we might want this: On opening a file with existing
 *     data bricks we might choose to read the last segment and
 *     turn it into an open segment. Then delete (in memory only)
 *     the last segment. When it is time to flush the data it gets
 *     rewritten. This allows adding bricks to a file, while still
 *     ensuring that all segments except first and last need to be
 *     the same size. Note that there are other tasks such as
 *     incrementally updating statistics and histogram that might
 *     turn out to be a lot of work.
 *
 *   - When used to create ZGY files, caller must honor the convention
 *     that all segments except the first and last must have the same size.
 *
 *   - Caveat: The fact that random writes are sometimes allowed, sometimes
 *     not depending on the segment number violates the principle of
 *     least surprise. And makes for more elaborate testing. For ZGY
 *     it is quite useful though. ZGY can recover from a ZgySegmentIsClosed
 *     exception by abandoning (leaking) the current block and write it
 *     to a new location. With a typical access pattern this will happen
 *     only occasionally.
 *
 * Note that this class doesn't implement FileUtilsSeismicStore, so for
 * that functionality you need to instanciate a regular SeismicStoreFile.
277
278
279
 *
 * Thread safety:
 * Not thread safe by design, as it is only used for files opened for write.
280
281
282
 * If a file is opened for read/write (currently not supported)
 * or when running finalize it is safe to read data as long as no writes
 * can be pending.
283
284
285
286
287
288
289
 */
class SeismicStoreFileDelayedWrite : public FileADT
{
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SeismicStoreFile> _relay;
  std::vector<char> _open_segment;
  UsageHint _usage_hint;
290
  std::shared_ptr<SummaryPrintingTimerEx> _ctimer; // Access is thread safe
291

292
293
294
  SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
  SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;

295
296
297
298
299
300
301
302
303
public:
  SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFileDelayedWrite();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
304
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
305
306
307
  virtual bool xx_iscloud() const override;

private:
308
  void _reopen_last_segment();
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
  void _flush_part(std::int64_t this_segsize);
  void _flush(bool final_call);
};

/////////////////////////////////////////////////////////////////////////////
//    class DatasetInformation   ////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * \brief Cached information for a SDGenericDataset.
 *
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * All fields will be filled in when any of them are first needed.
 * An exception is thrown if some of the information is inaccessible.
 * For new datasets, the information may either be set explicitly by
 * the writer or it may be set on the first write.
 * For existing datasets the sdapi is queried.
 * Even if the dataset is opened read/write the information is not
 * expected to change. Some other mechanism needs to deal with
 * stale data in that case.
330
331
 *
 * Thread safety:
332
333
334
335
 * All access SDGenericDatasetWrapper::info_ will be protected by
 * SDGenericDatasetWrapper::mutex_. Methods that expect to change data
 * (currently only updateOnWrite()) will need some special handling.
 * TODO-High: there is still a snag here.
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
 */
class DatasetInformation
{
private:
  // Number of blocks on file.
  // On write, this will include the holes if data is written out of order.
  std::int64_t block_count_;

  // Size of first block. On write this might need to be explicitly set if the
  // writer decides to output blocks non sequentially. It is not possible to
  // just assume block 0 is the same size as block 1 because block 1 might
  // be the last block on file and this might have a different size.
  std::int64_t block0_size_;

  // Size of blocks that are neither the first nor last on the file.
  // Zero if there is only a single block on the file.
  std::int64_t block1_size_;

  // Size of the last block, which is allowed to be smaller.
  // Will be zero if there is just one block.
  std::int64_t last_block_size_;

358
  // For debugging
359
  SeismicStoreFile::LoggerFn logger_;
360

361
public:
362
363
  explicit DatasetInformation(const SeismicStoreFile::LoggerFn& logger);
  explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger);
364
365
366
367
368
369
370
371
372
373
  std::string toString() const;

public:
  std::int64_t blockCount() const { return block_count_; }
  std::int64_t block0Size() const { return block0_size_; }
  std::int64_t block1Size() const { return block1_size_; }
  std::int64_t lastBlockSize() const { return last_block_size_; }

public:
  std::int64_t totalSize() const;
374
  std::vector<std::int64_t> allSizes(bool complete) const;
375
376
377
378
379
  void getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const;
  void checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const;
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};

380
DatasetInformation::DatasetInformation(const SeismicStoreFile::LoggerFn& logger)
381
382
383
384
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
385
  , logger_(logger ? logger : LoggerBase::emptyCallback())
386
387
388
{
}

389
390
391
392
/**
 * Create and poplulate an instance.
 * Catching and translating exceptions from SDAPI is done by caller.
 */
393
DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger)
394
395
396
397
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
398
  , logger_(logger ? logger : LoggerBase::emptyCallback())
399
{
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
  // Note that sdapi is a bit confusing with respect to signed/unsigned.
  // getBlockNum() returns an unsigned (uint64_t).
  // getSize() and getBlockSize() return long long, may return -1 as error.
  // I will treat all of them as signed, since there is no way that
  // there can be more than 2^63 blocks.
  long long nblocks = sdgd->getBlockNum();
  long long nbytes = sdgd->getSize();
  std::vector<std::string> names;
  if (nblocks >= 1)
    names.push_back("0");
  if (nblocks >= 2)
    names.push_back("1");
  if (nblocks >= 3)
    names.push_back(std::to_string(nblocks - 1));
  std::vector<long long> sizearray;
  if (nblocks <= 0) {
416
  }
417
418
419
420
421
422
423
424
425
426
427
428
#if 0 // Chicken...
  else if (nblocks == 1) {
    // Not using getSize() because I do not trust it.
    // SDAPI requires a hint check_and_overwrite=true when writing
    // a block that might exist already. If the hint is missing
    // then getSize() will return the wrong result. OpenZGY has
    // no control over who wrote the file. Defensive programming
    // says to just don't use the getSize() until the current
    // behavior (which I consider a bug) is fixed. Suggestion:
    // would it be possible to scan the file on exit and fix up
    // any incorrect size?
    sizearray.push_back(nbytes);
429
  }
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
#endif
  else {
    sizearray = sdgd->getBlocksSize(names);
  }
  if (nblocks < 0)
    throw OpenZGY::Errors::ZgyInternalError("Unable to get block count for SDGenericDataset");
  for (const auto size : sizearray)
    if (size <= 0)
      throw OpenZGY::Errors::ZgyInternalError("Unable to get segment size for SDGenericDataset");

  this->block_count_ = nblocks;
  this->block0_size_ = nblocks >= 1 ? sizearray.at(0) : 0;
  this->block1_size_ = nblocks >= 2 ? sizearray.at(1) : 0;
  this->last_block_size_ = nblocks >= 3 ? sizearray.at(2) : this->block1_size_;

  // Not throwing exceptions here, because sdgd->getSize() is not reliable.
  // But the problem *could* be that the block sizes of all the middle blocks
  // is not the same. Which would be a real problem.
  if (nbytes >= 0 && nbytes != (long long)totalSize())
449
    this->logger_(0, "Dataset has inconsistent size");
450

451
452
  if (this->logger_(1, ""))
    this->logger_(1, toString());
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
}

std::string
DatasetInformation::toString() const
{
  std::stringstream ss;
  ss << "Total " << totalSize()
     << " in " << blockCount() <<" segments"
     << " of size "<< block0Size()
     << ", " << block1Size()
     << ", " << lastBlockSize();
  return ss.str();
}

/**
 * Return the total file size, including any holes caused by bricks
 * not written yet. The result currently computes the answer based on
 * the individual block sizes assuming all blocks except the first and
 * last will have the same size. This is more reliable than asking
 * sdapi for the total size. And if the block size assumption is false
 * then we are completely hosed anyway. It is possible to verify the
 * assumption but it is probably too expensive to do so.
475
 *
476
477
 * Thread safety: Safe because instances are immutable once constructed
 * and made available.
478
479
480
481
482
483
484
485
486
487
488
489
490
491
 */
std::int64_t
DatasetInformation::totalSize() const
{
  switch (block_count_) {
  case 0: return 0;
  case 1: return block0_size_;
  default: return (block0_size_ +
                   (block_count_ - 2) * block1_size_ +
                   last_block_size_);
  }
}

/**
492
493
 * Return the total file size broken down into segments, not including
 * the "open" segment which DatasetInformation doesn't know about.
494
495
 */
std::vector<std::int64_t>
496
DatasetInformation::allSizes(bool complete) const
497
{
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
  switch (block_count_) {
  case 0: return std::vector<std::int64_t>{};
  case 1: return std::vector<std::int64_t>{block0_size_};
  case 2: return std::vector<std::int64_t>{block0_size_, last_block_size_};
  default: {
    std::vector<std::int64_t> result;
    result.push_back(block0_size_);
    result.push_back(block1_size_);
    if (complete)
      for (int ii = 0; ii < block_count_ - 3; ++ii)
        result.push_back(block1_size_);
    result.push_back(last_block_size_);
    return result;
  }
  }
513
514
515
516
}

/**
 * Do consistency checks before data is written.
Paal Kvamme's avatar
Paal Kvamme committed
517
 * Some of these might be redundant due to checks in the caller.
518
519
520
521
522
523
524
525
526
527
528
529
 * E.g. caller raises SegmentIsClosed if attempting to write "backwards".
 * Throws exceptions on error.
 */
void
DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const
{
  if (blocknum < 0 || blocknum > std::numeric_limits<int>::max()) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " because it is out of range.");
  }
  if (blocknum != 0 && block0_size_ == 0) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
  }
530
531
532
  if (blocksize < 1) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write less that 1 byte.");
  }
533
534
535
536
537
538
539
540
541
542
543
544
545
  if (blocknum == 0) {
    // Write or overwrite block 0.
    if (block0_size_ != 0 && block0_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size of block zero");
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    if (block1_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Blocks must have the same size except the first and last");
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // TODO-Low: Technically I might have allowed this.
Paal Kvamme's avatar
Paal Kvamme committed
546
    // If update is to be supported then I probably need to.
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
    if (blocksize != last_block_size_)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size when re-writing the last block");
  }
  else if (blocknum == block_count_) {
    // Append a block, which is not block 0.
    // This is the new "last block" which means what we previous thought
    // was the last block is in fact a middle one. Which means the size
    // of the former last block must be the same as block 1.
    // (Note that the former last block might actually *be* block 1).
    if (block1_size_ != 0 && blocksize > block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " is too large");
    if (block_count_ != 0 && last_block_size_ != block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(block_count_ -1) + " had wrong size");
  }
  else {
    // Trying to write sparse data.
    throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " written out of sequence");
  }
}

/**
 * Update cached size information after data is successfully written.
 * checkOnWrite() must have been called already.
570
 *
571
 * Thread safety: NOT thread safe.
572
573
574
 * Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
 * Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
 * That one wll make sure the smart pointer being updated is unique.
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
 */
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
  if (blocknum == 0) {
    // Write or overwrite block 0.
    block0_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    // Size cannot change, so do nothing.
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // Redundant if checkOnWrite() forbids changing size.
    last_block_size_ = blocksize;
  }
  else if (blocknum == block_count_) {
    // Append a block which is not block 0.
    if (block1_size_ == 0)
      block1_size_ = blocksize;
    last_block_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
}

/**
 * Given a linear offset and count, convert this to a block local address.
 * The returned count may be smaller than requested in case the request
 * crossed a segment boundary. In that case the caller will need to read
 * in a loop.
 *
608
609
610
 * "blocks" refer to the block number in Seismic Store, not the potentially
 * larger logical blocks used by SeismicStoreDelayedWrite.
 *
611
612
613
614
615
616
617
618
619
620
621
622
 * Postcondition: If blocknum is returned as the last block, local_size
 * will be returned as requested size. If this were not so, the calling
 * function would be likely to loop forever.
 */
void
DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const
{
  if (offset < 0 || size < 0)
    throw OpenZGY::Errors::ZgyInternalError("Offset and size cannot be negative.");
  else if (size > std::numeric_limits<std::int64_t>::max() - offset)
    throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
  else if (offset + size > totalSize()) {
623
    if (this->logger_(1, "")) {
624
625
626
627
628
629
630
      std::stringstream ss;
      ss << "Reading past EOF: read("
         << "off=" << offset
         << ", size=" << size
         << ", end=" << offset+size
         << ") dataset: " << toString()
         << std::endl;
631
      this->logger_(1, ss.str());
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
    }
    throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
  }

  if (block_count_ <= 1) {
    // In first block which is also last block.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = size;
  }
  else if (block0_size_ == 0 || block1_size_ == 0) {
    // Can only happen when writing. Files open for read will have the sizes
    // set up when the DatasetInformation is first accessed.
    throw OpenZGY::Errors::ZgyInternalError("getLocalOffset() called before size is known.");
  }
  else if (offset < block0_size_) {
    // In first block, and there are more blocks following.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = std::min(size, block0_size_ - offset);
  }
  else {
    const std::int64_t bnum = std::min(block_count_ - 1, ((offset - block0_size_) / block1_size_) + 1);
    const std::int64_t segment_start = block0_size_ + (bnum - 1) * block1_size_;
    *blocknum = bnum;
    *local_offset = offset - segment_start;
    if (bnum + 1 < block_count_)
      *local_size = std::min(size, block1_size_ - (offset - segment_start));
    else
      *local_size = size; // In last block.
  }
}

/////////////////////////////////////////////////////////////////////////////
//    class SDGenericDatasetWrapper   ///////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * Wrapper around seismicdrive::SDGenericDataset.
 * Instances are typically managed by smart pointers.
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * What this adds compared to a raw SDGenericDataset:
 *
 * - We keep a smart pointer reference to the SDManager that was used
 *   to create the SDGenericDataset. The sdapi documentation does not
 *   state this explicitly, but I assume it is unhealthy to destruct
 *   the manager before the dataset is closed. And using an application
 *   global manager is not an option due to privilege separation.
 *
 * - The instance remembers whether it was created for create, read, etc.
 *
 * - We may later add wrappers for SDGenericDataset members for caching
 *   purposes (block size, file size, etc.) in case the sdapi doesn't.
 *
 * - We may later add wrappers for the other members to tweak the
 *   interface, map error messages, add logging, etc. Or to add
 *   additional checking such as not reading past eof / end of block.
690
 *
691
692
 * Thread safety: The class itself is thread safe. The data being wrapped
 * might not be.
693
 *
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
 * All mutable data is protected by this->mutex_. Access methods for
 * those return a smart pointer. If info() is called on one thread
 * while another thread is doing a write operation (which is actually
 * not allowed) then it is unspecified whether the returned value is
 * before or after the write. It is also unspecified whether
 * updateOnWrite() will have any effect in that case. The information
 * is still consistent though, and may be used for logging etc.
 *
 * CAVEAT: Make sure the returned pointer remains in scope long
 * enough. E.g. this is NOT safe, and might crash every 7 years or so.
 * Except on customer sites where it may crash every 7 minutes.
 *
 *    seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
 *    foo(dataset); // the returned smart pointer is already deleted.
 *
Paal Kvamme's avatar
Paal Kvamme committed
709
 * TODO-Low: Yagni: virgin_ is not used. It is related to the CTag mechanism.
710
711
 * It is still being discussed whether that needs to be ported from the
 * old accessor. It might not be needed if we go for immutable ZGY files.
712
713
714
715
716
 */
class SDGenericDatasetWrapper
{
  std::shared_ptr<seismicdrive::SDManager> manager_;
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
717
  std::shared_ptr<const DatasetInformation> info_;
718
719
  OpenMode disposition_;
  bool virgin_; // If true, the cached CTag should be ok.
720
  SeismicStoreFile::LoggerFn logger_;
721
  mutable std::mutex mutex_; // Protect all members.
722
723
724
725
  std::string saved_token_; // To avoid setting it again.
  std::string saved_tokentype_;
  OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
  std::string tokenrefreshtype_;
726
727
  std::string tokenmessage_;

728
729
730
731
public:
  typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
  SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
                          std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
732
733
                          OpenMode disp,
                          const SeismicStoreFile::LoggerFn& logger)
734
    : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
735
    , logger_(logger ? logger : LoggerBase::emptyCallback())
736
    , mutex_()
737
738
    , saved_token_(), saved_tokentype_()
    , tokenrefresh_(), tokenrefreshtype_()
739
    , tokenmessage_("Token not initialzed")
740
741
742
  {
  }
  ~SDGenericDatasetWrapper();
743
744
745
746
747
748
749
750
751
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
    std::lock_guard<std::mutex> lk(mutex_);
    return dataset_;
  }
  std::shared_ptr<seismicdrive::SDManager> manager() {
    std::lock_guard<std::mutex> lk(mutex_);
    return manager_;
  }
  OpenMode disposition() const {
752
753
754
    // This is immutable, except in close() which is not threadsafe
    // anyway, so no lock is needed.
    return disposition_;
755
756
  }
  std::shared_ptr<const DatasetInformation> info() {
757
    std::lock_guard<std::mutex> lk(mutex_);
758
    if (!info_) {
759
      try {
760
761
      switch (disposition_) {
      case OpenMode::Truncate:
762
        info_.reset(new DatasetInformation(logger_));
763
764
765
        break;
      case OpenMode::ReadOnly:
      case OpenMode::ReadWrite:
766
        info_.reset(new DatasetInformation(dataset_.get(), logger_));
767
768
769
770
771
        break;
      case OpenMode::Closed:
      default:
        throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
      }
772
773
774
775
      }
      catch (const std::exception& ex) {
        throwCloudException(ex, "Initialize");
      }
776
    }
777
    return info_;
778
  }
779

780
781
782
783
784
785
786
787
788
  void close()
  {
    auto victim = dataset_;
    dataset_.reset();
    disposition_ = OpenMode::Closed;
    victim->close();
    victim.reset(); // Any throw from SDGenericDataset dtor happens here.
  }

789
  void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
790
791
792
    std::lock_guard<std::mutex> lk(mutex_);
    dataset_ = dataset;
    info_.reset();
793
794
795
796
797
    // This would give a slight performance boost, saving a single call
    // to checkCTag() after a file has changed. This happens so rarely
    // that it isn't worth the extra testing.
    //virgin_ = true;
  }
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819

  /**
   * Adjust the information after data has been written to the cloud.
   *
   * Thread safety:
   * Multiple concurrent writers will have a race condition, but that
   * scenario is expressly forbidden anyway. Smart pointers from
   * info() held by callig code are race free because they are
   * immutable. updateOnWrite() makes a new pointer.
   */
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
  {
    std::shared_ptr<DatasetInformation> updated;
    updated.reset(new DatasetInformation(*this->info()));
    updated->updateOnWrite(blocknum, blocksize);
    info_ = updated;
  }

  bool isTouched() const {
    std::lock_guard<std::mutex> lk(mutex_);
    return !virgin_;
  }
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834

  bool touch() {
    // Called to inform us that getCTag() might return stale data.
    //
    // if this->virgin_, we have just created the wrapper which means
    // we have just opened it. Only "List" entries don't get opened
    // immediately. But those aren't cached so they are not relevant
    // here. Since we just created it, we can call getCTag() without
    // calling checkCTag() first. Saving one database hit.
    //
    // The flag must be reset at least when an existing dataset accessor
    // is being re-used to open a file again. Currently it will be reset
    // much more often (every time the accessor is used to read data)
    // and it won't be set true again by updateDataset(). This will
    // simplify testing and has little impact on performance.
835
    std::lock_guard<std::mutex> lk(mutex_);
836
837
838
839
    bool old = virgin_;
    virgin_ = false;
    return old != virgin_;
  }
840

841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
  /**
   * Called from the catch block after calling some method in SDAPI.
   * If no token was provided, this is an "I told you so" error.
   * It could have been reported earlier but it is possible that
   * the cloud library somehow managed to handle authenticcation
   * itself. The actual exception reported by SDAPI is in that case
   * less interesting.
   *
   * A missing token is probably a user error (with "user" in this
   * case being the application) and is reported as such.
   *
   * If the exception has alredy been wrapped by a ZgyException then
   * it will be re-thrown without modification.
   */
  void throwCloudException(const std::exception& ex, const char *message) {
856
    if (this->logger_(1, "")) {
857
858
859
      std::stringstream ss;
      ss << "Oops (" << message << ")" << " ("
         << tokenmessage_ << "): " << ex.what();
860
      this->logger_(1, ss.str());
861
    }
862
863
864
865
866
867
868
869
870
871
872
    if (dynamic_cast<const OpenZGY::Errors::ZgyError*>(&ex))
      throw;
    else if (!tokenmessage_.empty())
      throw OpenZGY::Errors::ZgyUserError(tokenmessage_);
    else
      throw OpenZGY::Errors::ZgyInternalError
        (std::string(message) + ": Seismic Store: " + std::string(ex.what()));
    // TODO-Low: Should I just re-throw the sdapi error instead?
    // TODO-Low: Either way, be more consistent about the answer.
  }

873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
  /**
   * Pass credentials down to the SDAPI layer.
   *
   * Thread safety: Caller is responsible for preventing concurrent calls
   * to update the same manager. CAVEAT: If the manager is cached and
   * shared between open files then this raises some challenges.
   */
  static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
  {
    // TODO-Low: handle impersonation better. SDAPI wants us to tell
    // whether we are passing an impersonation token. If so, it will
    // attempt to refresh it if expired. The problem is that looking
    // at the token (e.g. one provided by an environment variable)
    // to decide which type it is can be ridiculously complicated.
    // See authorizeManager() in the old code. Also, it would be nice
    // to be able to read back the refreshed token.
    if (tokentype == "sdtoken")
      manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
    else if (tokentype == "file")
      manager->setAuthProviderFromFile(token);
    else if (token.substr(0, 5) == "FILE:")
      manager->setAuthProviderFromFile(token.substr(5));
#else
    if (tokentype == "file" || token.substr(0, 5) == "FILE:")
      throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
  else if (tokentype == "imptoken")
    manager->setAuthProviderFromImpToken(token);
  else if (token.substr(0, 9) == "imptoken:")
    manager->setAuthProviderFromImpToken(token.substr(9));
  else
    manager->setAuthProviderFromString(token);
  }

  /**
   * Pass initial credentials down to the SDAPI layer.
   *
   * Thread safety: This is meant to be called from the constructor
   * when opening a file. So there should not be any race condition.
   * CAVEAT: If the manager is cached and shared between open files
   * then this raises some challenges.
   */
  void authorizeManager(
       const std::string& token,
       const std::string& tokentype,
       const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
       const std::string& tokencbtype)
  {
    std::lock_guard<std::mutex> lk(mutex_);

    // Save the refresh callback, if any, for use in reAuthorizeManager().
    tokenrefresh_     = tokencb;
    tokenrefreshtype_ = tokencbtype;

    // Obtain the token to use, preferring the callback. Currently the
    // type of token the callback returns is specified when the callback
    // is registered. Not when it is invoked. Change that if needed.
    std::string newtoken     = tokencb ? tokencb() : token;
    std::string newtokentype = tokencb ? tokencbtype : tokentype;

934
935
936
937
938
939
940
941
942
943
944
    // A missing token at this point is probably an error, but in case
    // the SDAPI library has some tricks up its sleeve don't throw
    // am error unless the SDAPI does so first.
    if (newtoken.empty())
      if (tokencb)
        tokenmessage_ = "Missing access token, first callback returned empty";
      else
        tokenmessage_ = "Missing access token or callback in iocontext";
    else
      tokenmessage_ = "";

945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
    authorizeManagerInSD(manager_.get(), newtoken, newtokentype);

    // Save what we just set so reAuthorizeManager() won't need to set
    // the exact same token again. Don't set if there was an exception
    // because then we can try again later. The saved token is NOT
    // used for credentials. So it it is possible to store a hash
    // instead as long as the risk of collision is negligible.
    saved_token_ = newtoken;
    saved_tokentype_ = newtokentype;
  }

  /**
   * Pass updated credentials down to the SDAPI layer if needed.
   * Needs to be called before any operation that needs credentials.
   *
   * Thread safety: *this is protected by a lock.
   * The lock is temporarily dropped before invoking the refresh callback.
   * This means the application provided callback MUST BE THREADSAFE.
   * It also means there is technically a race condition here, where a
   * particular read operation uses credentials that are a few milliseconds
   * out of date.
   *
   * Alternatively the code here could place a lock are require that the
   * callback doesn't do something that couuld cause a deadlock. Not sure
   * if that option would be preferable.
   */
  void reAuthorizeManager()
  {
    std::unique_lock<std::mutex> lk(mutex_);
    if (tokenrefresh_) {
      auto tokenrefresh = this->tokenrefresh_;
      auto tokenrefreshtype = this->tokenrefreshtype_;
      std::string newtoken;
      std::string newtokentype;
      // By design, no locks should be held when the callback is invoked
      // to avoid the risk of deadlocks. This means that the callback
      // itself must be threadsafe.
      lk.unlock();
      newtoken     = tokenrefresh();
      newtokentype = tokenrefreshtype;
      lk.lock();
      if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
987
988
989
990
        if (newtoken.empty())
          tokenmessage_ = "Missing access token, callback returned empty";
        else
          tokenmessage_ = "";
991
992
993
994
995
996
        // In case of exception, always allow trying again.
        saved_token_     = std::string();
        saved_tokentype_ = std::string();
        authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
        saved_token_     = newtoken;
        saved_tokentype_ = newtokentype;
997
        if (newtoken.empty())
998
          this->logger_(1, "The token was cleared");
999
        else
1000
          this->logger_(1, "A new token was provided");