file_sd.cpp 79.3 KB
Newer Older
1
// Copyright 2017-2021, Schlumberger
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_sd.h"

#ifdef HAVE_SD // Rest of file

#include "logger.h"
20
#include "file_consolidate.h"
21
#include "file_performance.h"
22
#include "file_parallelizer.h"
23
#include "fancy_timers.h"
24
#include "mtguard.h"
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "../exception.h"
#include "../iocontext.h"

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <iostream>
#include <sstream>
#include <limits>
#include <string.h>
#include <algorithm>
38
#include <numeric>
39
#include <mutex>
40
#include <omp.h>
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

#ifndef _WIN32 // TODO-Low: SDAPI/ prefix also on Linux.
#include <SDManager.h>
#include <SDGenericDataset.h>
#include <SDUtils.h>
#include <Constants.h>
#else
#include <SDAPI/SDManager.h>
#include <SDAPI/SDGenericDataset.h>
#include <SDAPI/SDUtils.h>
#include <SDAPI/Constants.h>
#endif

/** \cond SSTORE */

/**
 * \file: file_sd.cpp
 * \brief Low level I/O, Seismic Store.
 */

// IByteIO::Disposition -> OpenMode
//OpenMode:    { Closed = 0, ReadOnly, ReadWrite, Truncate, Delete, };
//Disposition: { Create, Read, Update, Delete, Access, List };

// Plans for utilizing code from byteio_sd.cpp in ZGY-Cloud.
//
// cherry pick code from here for read and write.
//     ByteIoSeismicDrive::closeWrite()
//
// bells and whistles for configuration.
//     slurp()
//     expandArgument()
//
// Not now. Don't keep a long lived cache.
//     class SDGenericDatasetCache
//
// Worry: CTag / ETag handling? Not now
//     ByteIoSeismicDrive::readETag()
//
// Might need soon.
//     checkAccess()
//
// Utilities, might need later.
// Need a way for API users to access these directly
//     getTag()
//     setTag()
//     getMetaData()
//     setMetaData()
//     getSeisMeta()
//     setSeisMeta()
//
// maybe later, for tokentype and for error messages.
// TODO-Low: if imp token is refreshed then keep those in a cache.
// Might need support from SDAPI for that.
//     parseJsonString()
//     decodeJwt()
//     getTokenType()

using OpenZGY::IOContext;
100
using OpenZGY::SeismicStoreIOContext;
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
namespace InternalZGY {
#if 0
}
#endif

class SDGenericDatasetWrapper;
class DatasetInformation;

/**
 * Access data in seismic store as a linear file even when the dataset
 * has multiple segments. There are some limitations on write.
 *
 *   - Writes starting at EOF are allowed, and will cause a new segment
 *     to be written.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes starting before EOF are only allowed if offset,size exactly
 *     matches a previous write. This will cause that segment to be rewritten.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *
 * For read the class provides a readv() method to do scatter/gather reads.
 * The code will then consolidate adjacent bricks to get larger brick size
 * sent to SDAPI. Optionally parallelize requests that cannot be consolidated.
128
 *
129
130
131
132
133
134
 * Thread safety when used for reading:
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
 *
 * Thread safety when used for writing:
135
 * Not thread safe. See SeismicStoreFile::xx_write.
136
137
138
 *
 * Thread safety when closing a file:
 * Not thread safe.
139
140
141
142
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
143
144
145
 */
class SeismicStoreFile : public FileUtilsSeismicStore
{
146
147
  SeismicStoreFile(const SeismicStoreFile&) = delete;
  SeismicStoreFile& operator=(const SeismicStoreFile&) = delete;
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
public:
  typedef std::function<bool(int, const std::string&)> LoggerFn;
public:
  SeismicStoreFile(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFile();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  // Functions from FileADT
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close();
  virtual std::int64_t xx_eof() const;
  virtual bool xx_iscloud() const override;
  // Functions from FileUtilsSeismicStore
  virtual void deleteFile(const std::string& filename, bool missing_ok) const;
163
  virtual std::string altUrl(const std::string& filename) const;
164
165
166
private:
  void do_write_one(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const bool overwrite);
  void do_write_many(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const std::int64_t blobsize, const bool overwrite);
167
168
169
170
171
172
173
174
175
public:
  // TODO-Low per-instance logging. This is tedious to implement
  // because many of the helper classes will need to hold a logger
  // instance as well, or need the logger passed in each call.
  static bool _logger(int priority, const std::string& message = std::string());
  static bool _logger(int priority, const std::ios& ss);
public:
  // For use by debug_trace, allow SeismicStoreFileDelayedWrite() access
  // to details of the file.
176
  std::shared_ptr<const DatasetInformation> debug_info();
177
private:
178
179
180
  /**
   * This class is used by _split_by_segment to describe a request as seen by
   * seismic store.
181
182
183
184
185
   *
   * Thread safety:
   * Modification may lead to a data race. This should not be an issue,
   * because instances are only meant to be modified when created or
   * copied or assigned prior to being made available to others.
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
   */
  class RawRequest
  {
  public:
    std::int64_t blocknum;	// Seismic store segment a.k.a. block
    std::int64_t local_offset;	// File offset inside this blocknum
    std::int64_t local_size;	// How much to read from this block
    std::int64_t outpos;
    RawRequest(std::int64_t a_blocknum,
               std::int64_t a_offset,
               std::int64_t a_size,
               std::int64_t a_outpos)
      : blocknum(a_blocknum)
      , local_offset(a_offset)
      , local_size(a_size)
      , outpos(a_outpos)
    {
    }
  };
  typedef std::vector<RawRequest> RawList;
  RawList _split_by_segment(const ReadList& requests);
207
208
209
  void _cached_read(/*TODO-SeismicStore seg, offset, view*/);
private:
  OpenMode _mode;
210
211
212
  // TODO-Low: To improve isolation, the user visible context should
  // be copied into an equivalent InternalZGY::SeismicStoreConfig.
  // The downside is that it gets more tedious to maintain.
213
214
215
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SDGenericDatasetWrapper> _dataset;
  static LoggerFn _loggerfn;
216
217
218
  // As long as we don't inherit FileCommon we need our own timers.
  std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
  std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
};

SeismicStoreFile::LoggerFn SeismicStoreFile::_loggerfn;

/**
 * Improve on SeismicStoreFile, have it buffer large chunks of data before
 * writing it out to a new segment.
 *
 *   - Writes starting at EOF are allowed, and will buffer data in the
 *     "open segment" until explicitly flushed.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes fully inside the open segment are allowed.
 *
 *   - Writes starting before the open segment are only allowed if
 *     offset,size exactly matches a previous write. This will cause that
 *     segment to be rewritten. As a corollary, writes canot span the
 *     closed segment / open segment boundary.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *     Why we might want this: On opening a file with existing
 *     data bricks we might choose to read the last segment and
 *     turn it into an open segment. Then delete (in memory only)
 *     the last segment. When it is time to flush the data it gets
 *     rewritten. This allows adding bricks to a file, while still
 *     ensuring that all segments except first and last need to be
 *     the same size. Note that there are other tasks such as
 *     incrementally updating statistics and histogram that might
 *     turn out to be a lot of work.
 *
 *   - When used to create ZGY files, caller must honor the convention
 *     that all segments except the first and last must have the same size.
 *
 *   - Caveat: The fact that random writes are sometimes allowed, sometimes
 *     not depending on the segment number violates the principle of
 *     least surprise. And makes for more elaborate testing. For ZGY
 *     it is quite useful though. ZGY can recover from a ZgySegmentIsClosed
 *     exception by abandoning (leaking) the current block and write it
 *     to a new location. With a typical access pattern this will happen
 *     only occasionally.
 *
 * Note that this class doesn't implement FileUtilsSeismicStore, so for
 * that functionality you need to instanciate a regular SeismicStoreFile.
265
266
267
 *
 * Thread safety:
 * Not thread safe by design, as it is only used for files opened for write.
268
269
270
 * If a file is opened for read/write (currently not supported)
 * or when running finalize it is safe to read data as long as no writes
 * can be pending.
271
272
273
274
275
276
277
278
 */
class SeismicStoreFileDelayedWrite : public FileADT
{
  OpenMode _mode;
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SeismicStoreFile> _relay;
  std::vector<char> _open_segment;
  UsageHint _usage_hint;
279
  std::shared_ptr<SummaryPrintingTimerEx> _ctimer; // Access is thread safe
280

281
282
283
  SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
  SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;

284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
public:
  SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFileDelayedWrite();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
  virtual bool xx_iscloud() const override;

private:
  void _flush_part(std::int64_t this_segsize);
  void _flush(bool final_call);
};

/////////////////////////////////////////////////////////////////////////////
//    class DatasetInformation   ////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * \brief Cached information for a SDGenericDataset.
 *
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * All fields will be filled in when any of them are first needed.
 * An exception is thrown if some of the information is inaccessible.
 * For new datasets, the information may either be set explicitly by
 * the writer or it may be set on the first write.
 * For existing datasets the sdapi is queried.
 * Even if the dataset is opened read/write the information is not
 * expected to change. Some other mechanism needs to deal with
 * stale data in that case.
317
318
 *
 * Thread safety:
319
320
321
322
 * All access SDGenericDatasetWrapper::info_ will be protected by
 * SDGenericDatasetWrapper::mutex_. Methods that expect to change data
 * (currently only updateOnWrite()) will need some special handling.
 * TODO-High: there is still a snag here.
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
 */
class DatasetInformation
{
private:
  // Number of blocks on file.
  // On write, this will include the holes if data is written out of order.
  std::int64_t block_count_;

  // Size of first block. On write this might need to be explicitly set if the
  // writer decides to output blocks non sequentially. It is not possible to
  // just assume block 0 is the same size as block 1 because block 1 might
  // be the last block on file and this might have a different size.
  std::int64_t block0_size_;

  // Size of blocks that are neither the first nor last on the file.
  // Zero if there is only a single block on the file.
  std::int64_t block1_size_;

  // Size of the last block, which is allowed to be smaller.
  // Will be zero if there is just one block.
  std::int64_t last_block_size_;

public:
  DatasetInformation();
  explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd);
  std::string toString() const;

public:
  std::int64_t blockCount() const { return block_count_; }
  std::int64_t block0Size() const { return block0_size_; }
  std::int64_t block1Size() const { return block1_size_; }
  std::int64_t lastBlockSize() const { return last_block_size_; }

public:
  std::int64_t totalSize() const;
  std::vector<std::int64_t> allSizes(std::int64_t open_size) const;
  void getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const;
  void checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const;
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};

DatasetInformation::DatasetInformation()
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
}

DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd)
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
  try {
    // Note that sdapi is a bit confusing with respect to signed/unsigned.
    // getBlockNum() returns an unsigned (uint64_t).
    // getSize() and getBlockSize() return long long, may return -1 as error.
    // I will treat all of them as signed, since there is no way that
    // there can be more than 2^63 blocks.
    long long nblocks = sdgd->getBlockNum();
    long long nbytes = sdgd->getSize();
    std::vector<std::string> names;
    if (nblocks >= 1)
      names.push_back("0");
    if (nblocks >= 2)
      names.push_back("1");
    if (nblocks >= 3)
      names.push_back(std::to_string(nblocks - 1));
    std::vector<long long> sizearray;
    if (nblocks <= 0) {
    }
#if 0 // Chicken...
    else if (nblocks == 1) {
      // WARNING: getSize is less reliable.
      // I need to trust that each block is only written once.
      // Or that if rewritten, caller passed check_and_overwrite=true.
      // TODO-Low: SDAPI should really have been able to figure out by itself
      // whether the check is needed.
      // TODO-Worry: ensure that ZGY-Cloud doesn't need that flag.
      sizearray.push_back(nbytes);
    }
#endif
    else {
      sizearray = sdgd->getBlocksSize(names);
    }
    if (nblocks < 0)
      throw OpenZGY::Errors::ZgyInternalError("Unable to get block count for SDGenericDataset");
    for (const auto size : sizearray)
      if (size <= 0)
        throw OpenZGY::Errors::ZgyInternalError("Unable to get segment size for SDGenericDataset");

    this->block_count_ = nblocks;
    this->block0_size_ = nblocks >= 1 ? sizearray.at(0) : 0;
    this->block1_size_ = nblocks >= 2 ? sizearray.at(1) : 0;
    this->last_block_size_ = nblocks >= 3 ? sizearray.at(2) : this->block1_size_;

    // Not throwing exceptions here, because sdgd->getSize() is not reliable.
    // But the problem *could* be that the block sizes of all the middle blocks
    // is not the same. Which would be a real problem.
    if (nbytes >= 0 && nbytes != (long long)totalSize())
      SeismicStoreFile::_logger(0, "Dataset has inconsistent size");

    SeismicStoreFile::_logger(1, toString());
  }
  catch (const OpenZGY::Errors::ZgyError& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw;
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw OpenZGY::Errors::ZgyInternalError("Seismic Store says: " + std::string(ex.what()));
  }
}

std::string
DatasetInformation::toString() const
{
  std::stringstream ss;
  ss << "Total " << totalSize()
     << " in " << blockCount() <<" segments"
     << " of size "<< block0Size()
     << ", " << block1Size()
     << ", " << lastBlockSize();
  return ss.str();
}

/**
 * Return the total file size, including any holes caused by bricks
 * not written yet. The result currently computes the answer based on
 * the individual block sizes assuming all blocks except the first and
 * last will have the same size. This is more reliable than asking
 * sdapi for the total size. And if the block size assumption is false
 * then we are completely hosed anyway. It is possible to verify the
 * assumption but it is probably too expensive to do so.
459
 *
460
461
 * Thread safety: Safe because instances are immutable once constructed
 * and made available.
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
 */
std::int64_t
DatasetInformation::totalSize() const
{
  switch (block_count_) {
  case 0: return 0;
  case 1: return block0_size_;
  default: return (block0_size_ +
                   (block_count_ - 2) * block1_size_ +
                   last_block_size_);
  }
}

/**
 * Return the total file size broken down into segments, including
 * the "open" segment which size needs to be provided explicitly.
 * This functon is currently only used for debugging.
 */
std::vector<std::int64_t>
DatasetInformation::allSizes(std::int64_t open_size) const
{
  std::vector<std::int64_t> result;
  for (int ii = 0; ii < block_count_; ++ii)
    result.push_back(ii == 0 ? block0_size_ :
                     ii == block_count_-1 ? last_block_size_ :
                     block1_size_);
  if (open_size >= 0)
    result.push_back(open_size);
  return result;
}

/**
 * Do consistency checks before data is written.
 * TODO-Low: Parts might be redundant due to checks in caller.
 * E.g. caller raises SegmentIsClosed if attempting to write "backwards".
 * Throws exceptions on error.
 */
void
DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const
{
  if (blocknum < 0 || blocknum > std::numeric_limits<int>::max()) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " because it is out of range.");
  }
  if (blocknum != 0 && block0_size_ == 0) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
  }
508
509
510
  if (blocksize < 1) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write less that 1 byte.");
  }
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
  if (blocknum == 0) {
    // Write or overwrite block 0.
    if (block0_size_ != 0 && block0_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size of block zero");
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    if (block1_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Blocks must have the same size except the first and last");
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // TODO-Low: Technically I might have allowed this.
    if (blocksize != last_block_size_)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size when re-writing the last block");
  }
  else if (blocknum == block_count_) {
    // Append a block, which is not block 0.
    // This is the new "last block" which means what we previous thought
    // was the last block is in fact a middle one. Which means the size
    // of the former last block must be the same as block 1.
    // (Note that the former last block might actually *be* block 1).
    if (block1_size_ != 0 && blocksize > block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " is too large");
    if (block_count_ != 0 && last_block_size_ != block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(block_count_ -1) + " had wrong size");
  }
  else {
    // Trying to write sparse data.
    throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " written out of sequence");
  }
}

/**
 * Update cached size information after data is successfully written.
 * checkOnWrite() must have been called already.
547
548
549
550
551
 *
 * Thread safety: NOT thred safe.
 * Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
 * Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
 * That one wll make sure the smart pointer being updated is unique.
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
 */
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
  if (blocknum == 0) {
    // Write or overwrite block 0.
    block0_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    // Size cannot change, so do nothing.
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // Redundant if checkOnWrite() forbids changing size.
    last_block_size_ = blocksize;
  }
  else if (blocknum == block_count_) {
    // Append a block which is not block 0.
    if (block1_size_ == 0)
      block1_size_ = blocksize;
    last_block_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
}

/**
 * Given a linear offset and count, convert this to a block local address.
 * The returned count may be smaller than requested in case the request
 * crossed a segment boundary. In that case the caller will need to read
 * in a loop.
 *
585
586
587
 * "blocks" refer to the block number in Seismic Store, not the potentially
 * larger logical blocks used by SeismicStoreDelayedWrite.
 *
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
 * Postcondition: If blocknum is returned as the last block, local_size
 * will be returned as requested size. If this were not so, the calling
 * function would be likely to loop forever.
 */
void
DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const
{
  if (offset < 0 || size < 0)
    throw OpenZGY::Errors::ZgyInternalError("Offset and size cannot be negative.");
  else if (size > std::numeric_limits<std::int64_t>::max() - offset)
    throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
  else if (offset + size > totalSize()) {
    if (SeismicStoreFile::_logger(1)) {
      std::stringstream ss;
      ss << "Reading past EOF: read("
         << "off=" << offset
         << ", size=" << size
         << ", end=" << offset+size
         << ") dataset: " << toString()
         << std::endl;
      SeismicStoreFile::_logger(1, ss.str());
    }
    throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
  }

  if (block_count_ <= 1) {
    // In first block which is also last block.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = size;
  }
  else if (block0_size_ == 0 || block1_size_ == 0) {
    // Can only happen when writing. Files open for read will have the sizes
    // set up when the DatasetInformation is first accessed.
    throw OpenZGY::Errors::ZgyInternalError("getLocalOffset() called before size is known.");
  }
  else if (offset < block0_size_) {
    // In first block, and there are more blocks following.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = std::min(size, block0_size_ - offset);
  }
  else {
    const std::int64_t bnum = std::min(block_count_ - 1, ((offset - block0_size_) / block1_size_) + 1);
    const std::int64_t segment_start = block0_size_ + (bnum - 1) * block1_size_;
    *blocknum = bnum;
    *local_offset = offset - segment_start;
    if (bnum + 1 < block_count_)
      *local_size = std::min(size, block1_size_ - (offset - segment_start));
    else
      *local_size = size; // In last block.
  }
}

/////////////////////////////////////////////////////////////////////////////
//    class SDGenericDatasetWrapper   ///////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * Wrapper around seismicdrive::SDGenericDataset.
 * Instances are typically managed by smart pointers.
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * What this adds compared to a raw SDGenericDataset:
 *
 * - We keep a smart pointer reference to the SDManager that was used
 *   to create the SDGenericDataset. The sdapi documentation does not
 *   state this explicitly, but I assume it is unhealthy to destruct
 *   the manager before the dataset is closed. And using an application
 *   global manager is not an option due to privilege separation.
 *
 * - The instance remembers whether it was created for create, read, etc.
 *
 * - We may later add wrappers for SDGenericDataset members for caching
 *   purposes (block size, file size, etc.) in case the sdapi doesn't.
 *
 * - We may later add wrappers for the other members to tweak the
 *   interface, map error messages, add logging, etc. Or to add
 *   additional checking such as not reading past eof / end of block.
667
 *
668
669
 * Thread safety: The class itself is thread safe. The data being wrapped
 * might not be.
670
 *
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
 * All mutable data is protected by this->mutex_. Access methods for
 * those return a smart pointer. If info() is called on one thread
 * while another thread is doing a write operation (which is actually
 * not allowed) then it is unspecified whether the returned value is
 * before or after the write. It is also unspecified whether
 * updateOnWrite() will have any effect in that case. The information
 * is still consistent though, and may be used for logging etc.
 *
 * CAVEAT: Make sure the returned pointer remains in scope long
 * enough. E.g. this is NOT safe, and might crash every 7 years or so.
 * Except on customer sites where it may crash every 7 minutes.
 *
 *    seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
 *    foo(dataset); // the returned smart pointer is already deleted.
 *
 * TODO-Yagni: _virgin is not used. It is related to the CTag mechanism.
 * It is still being discussed whether that needs to be ported from the
 * old accessor. It might not be needed if we go for immutable ZGY files.
689
690
691
692
693
 */
class SDGenericDatasetWrapper
{
  std::shared_ptr<seismicdrive::SDManager> manager_;
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
694
  std::shared_ptr<const DatasetInformation> info_;
695
696
  OpenMode disposition_;
  bool virgin_; // If true, the cached CTag should be ok.
697
  mutable std::mutex mutex_; // Protect all members.
698
699
700
701
  std::string saved_token_; // To avoid setting it again.
  std::string saved_tokentype_;
  OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
  std::string tokenrefreshtype_;
702
703
704
705
706
707
public:
  typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
  SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
                          std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
                          OpenMode disp)
    : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
708
    , mutex_()
709
710
    , saved_token_(), saved_tokentype_()
    , tokenrefresh_(), tokenrefreshtype_()
711
712
713
  {
  }
  ~SDGenericDatasetWrapper();
714
715
716
717
718
719
720
721
722
723
724
725
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
    std::lock_guard<std::mutex> lk(mutex_);
    return dataset_;
  }
  std::shared_ptr<seismicdrive::SDManager> manager() {
    std::lock_guard<std::mutex> lk(mutex_);
    return manager_;
  }
  OpenMode disposition() const {
    return disposition_; // immutable, so no lock.
  }
  std::shared_ptr<const DatasetInformation> info() {
726
    std::lock_guard<std::mutex> lk(mutex_);
727
728
729
730
731
732
733
734
735
736
737
738
739
740
    if (!info_) {
      switch (disposition_) {
      case OpenMode::Truncate:
        info_.reset(new DatasetInformation());
        break;
      case OpenMode::ReadOnly:
      case OpenMode::ReadWrite:
        info_.reset(new DatasetInformation(dataset_.get()));
        break;
      case OpenMode::Closed:
      default:
        throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
      }
    }
741
    return info_;
742
  }
743

744
  void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
745
746
747
    std::lock_guard<std::mutex> lk(mutex_);
    dataset_ = dataset;
    info_.reset();
748
749
750
751
752
    // This would give a slight performance boost, saving a single call
    // to checkCTag() after a file has changed. This happens so rarely
    // that it isn't worth the extra testing.
    //virgin_ = true;
  }
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774

  /**
   * Adjust the information after data has been written to the cloud.
   *
   * Thread safety:
   * Multiple concurrent writers will have a race condition, but that
   * scenario is expressly forbidden anyway. Smart pointers from
   * info() held by callig code are race free because they are
   * immutable. updateOnWrite() makes a new pointer.
   */
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
  {
    std::shared_ptr<DatasetInformation> updated;
    updated.reset(new DatasetInformation(*this->info()));
    updated->updateOnWrite(blocknum, blocksize);
    info_ = updated;
  }

  bool isTouched() const {
    std::lock_guard<std::mutex> lk(mutex_);
    return !virgin_;
  }
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789

  bool touch() {
    // Called to inform us that getCTag() might return stale data.
    //
    // if this->virgin_, we have just created the wrapper which means
    // we have just opened it. Only "List" entries don't get opened
    // immediately. But those aren't cached so they are not relevant
    // here. Since we just created it, we can call getCTag() without
    // calling checkCTag() first. Saving one database hit.
    //
    // The flag must be reset at least when an existing dataset accessor
    // is being re-used to open a file again. Currently it will be reset
    // much more often (every time the accessor is used to read data)
    // and it won't be set true again by updateDataset(). This will
    // simplify testing and has little impact on performance.
790
    std::lock_guard<std::mutex> lk(mutex_);
791
792
793
794
    bool old = virgin_;
    virgin_ = false;
    return old != virgin_;
  }
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908

  /**
   * Pass credentials down to the SDAPI layer.
   *
   * Thread safety: Caller is responsible for preventing concurrent calls
   * to update the same manager. CAVEAT: If the manager is cached and
   * shared between open files then this raises some challenges.
   */
  static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
  {
    // TODO-Low: handle impersonation better. SDAPI wants us to tell
    // whether we are passing an impersonation token. If so, it will
    // attempt to refresh it if expired. The problem is that looking
    // at the token (e.g. one provided by an environment variable)
    // to decide which type it is can be ridiculously complicated.
    // See authorizeManager() in the old code. Also, it would be nice
    // to be able to read back the refreshed token.
    if (tokentype == "sdtoken")
      manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
    else if (tokentype == "file")
      manager->setAuthProviderFromFile(token);
    else if (token.substr(0, 5) == "FILE:")
      manager->setAuthProviderFromFile(token.substr(5));
#else
    if (tokentype == "file" || token.substr(0, 5) == "FILE:")
      throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
  else if (tokentype == "imptoken")
    manager->setAuthProviderFromImpToken(token);
  else if (token.substr(0, 9) == "imptoken:")
    manager->setAuthProviderFromImpToken(token.substr(9));
  else
    manager->setAuthProviderFromString(token);
  }

  /**
   * Pass initial credentials down to the SDAPI layer.
   *
   * Thread safety: This is meant to be called from the constructor
   * when opening a file. So there should not be any race condition.
   * CAVEAT: If the manager is cached and shared between open files
   * then this raises some challenges.
   */
  void authorizeManager(
       const std::string& token,
       const std::string& tokentype,
       const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
       const std::string& tokencbtype)
  {
    std::lock_guard<std::mutex> lk(mutex_);

    // Save the refresh callback, if any, for use in reAuthorizeManager().
    tokenrefresh_     = tokencb;
    tokenrefreshtype_ = tokencbtype;

    // Obtain the token to use, preferring the callback. Currently the
    // type of token the callback returns is specified when the callback
    // is registered. Not when it is invoked. Change that if needed.
    std::string newtoken     = tokencb ? tokencb() : token;
    std::string newtokentype = tokencb ? tokencbtype : tokentype;

    authorizeManagerInSD(manager_.get(), newtoken, newtokentype);

    // Save what we just set so reAuthorizeManager() won't need to set
    // the exact same token again. Don't set if there was an exception
    // because then we can try again later. The saved token is NOT
    // used for credentials. So it it is possible to store a hash
    // instead as long as the risk of collision is negligible.
    saved_token_ = newtoken;
    saved_tokentype_ = newtokentype;
  }

  /**
   * Pass updated credentials down to the SDAPI layer if needed.
   * Needs to be called before any operation that needs credentials.
   *
   * Thread safety: *this is protected by a lock.
   * The lock is temporarily dropped before invoking the refresh callback.
   * This means the application provided callback MUST BE THREADSAFE.
   * It also means there is technically a race condition here, where a
   * particular read operation uses credentials that are a few milliseconds
   * out of date.
   *
   * Alternatively the code here could place a lock are require that the
   * callback doesn't do something that couuld cause a deadlock. Not sure
   * if that option would be preferable.
   */
  void reAuthorizeManager()
  {
    std::unique_lock<std::mutex> lk(mutex_);
    if (tokenrefresh_) {
      auto tokenrefresh = this->tokenrefresh_;
      auto tokenrefreshtype = this->tokenrefreshtype_;
      std::string newtoken;
      std::string newtokentype;
      // By design, no locks should be held when the callback is invoked
      // to avoid the risk of deadlocks. This means that the callback
      // itself must be threadsafe.
      lk.unlock();
      newtoken     = tokenrefresh();
      newtokentype = tokenrefreshtype;
      lk.lock();
      if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
        // In case of exception, always allow trying again.
        saved_token_     = std::string();
        saved_tokentype_ = std::string();
        authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
        saved_token_     = newtoken;
        saved_tokentype_ = newtokentype;
        SeismicStoreFile::_logger(1, "A new token was provided");
      }
    }
  }
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
};

SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
{
  info_.reset();
  try {
    // Explicit close so we have control of how to deal with errors. Or not.
    // Hopefully the destructors won't try to close again when they see
    // that we tried to do so ourselves. Note: currently  they will try that.
    if (dataset_)
      dataset_->close();
  }
  catch (const std::exception& ex) {
    if (std::string(ex.what()).find("dataset is not open") == std::string::npos)
      SeismicStoreFile::_logger(0, "SDGenericDataset::close(): " + std::string(ex.what()));
  }
  try {
    // Catch exceptions raised inside a destructor might not work. But I tried.
    dataset_.reset();
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(0, "SDGenericDataset::dtor(): " + std::string(ex.what()));
  }
  manager_.reset();
}

/////////////////////////////////////////////////////////////////////////////
//    FileADT -> SeismicStoreFile   /////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

FileUtilsSeismicStore::~FileUtilsSeismicStore()
{
}

SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, const IOContext *iocontext)
  : FileUtilsSeismicStore()
  , _mode(mode)
  , _config()
{
948
949
950
  _rtimer.reset(new SummaryPrintingTimerEx(mode == OpenMode::ReadWrite || mode == OpenMode::Truncate ? "Cloud.reread" : "Cloud.read"));
  _wtimer.reset(new SummaryPrintingTimerEx("Cloud.write"));

951
  // TODO-Low a better way of handling this.
952
953
954
955
956
957
958
959
  // Logger passed in iocontext, then storing it per file.
  {
    // Must protect against double init because _loggerfn is global.
    static std::mutex mutex;
    std::lock_guard<std::mutex> lk(mutex);
    if (!_loggerfn)
      _loggerfn = LoggerBase::standardCallback(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"), "zgy: ", "");
  }
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994

  _logger(3, std::stringstream() << "SeismicStoreFile("
          << "\"" << filename << "\", " << int(mode) << ", *)\n");

  auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
  if (!context)
    throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
  this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));

  std::unordered_map<std::string, std::string> extra;
  using seismicdrive::api::json::Constants;
  if (!context->_legaltag.empty())
    extra[Constants::kLegalTagLabel] = context->_legaltag;
  if (!context->_writeid.empty())
    extra[Constants::kWriteIdLabel] = context->_writeid;
  if (!context->_seismicmeta.empty())
    extra[Constants::kDDMSSeismicMetadataLabel] = context->_seismicmeta;
  //TODO-Low: if (!context->_pedantic.empty())
  //  extra[Constants::KPedantic] = context->_pedantic;

  // TODO-Low enable logging for manager and/or dataset
  auto manager = std::make_shared<seismicdrive::SDManager>
    (context->_sdurl, context->_sdapikey);
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset;
  if (mode != OpenMode::Closed) {
    _logger(5, std::stringstream()
            << "make dataset using manager "
            << std::hex << (std::uint64_t)manager.get());
    dataset = std::make_shared<seismicdrive::SDGenericDataset>
      (manager.get(), filename);
    _logger(5, "dataset ok");
  }

  // TODO-Low: Cache the manager and possibly the SDUtils instance.

995
996
997
998
999
1000
  auto datasetwrapper = std::make_shared<SDGenericDatasetWrapper>
    (manager, dataset, mode);

  datasetwrapper->authorizeManager
    (context->_sdtoken, context->_sdtokentype,
     context->_sdtokencb, context->_sdtoken_cbtype);