file_sd.cpp 63.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Copyright 2017-2020, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_sd.h"

#ifdef HAVE_SD // Rest of file

#include "logger.h"
20
#include "file_consolidate.h"
21
#include "file_performance.h"
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "../exception.h"
#include "../iocontext.h"

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <iostream>
#include <sstream>
#include <limits>
#include <string.h>
#include <algorithm>
35
#include <numeric>
36
#include <mutex>
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

#ifndef _WIN32 // TODO-Low: SDAPI/ prefix also on Linux.
#include <SDManager.h>
#include <SDGenericDataset.h>
#include <SDUtils.h>
#include <Constants.h>
#else
#include <SDAPI/SDManager.h>
#include <SDAPI/SDGenericDataset.h>
#include <SDAPI/SDUtils.h>
#include <SDAPI/Constants.h>
#endif

/** \cond SSTORE */

/**
 * \file: file_sd.cpp
 * \brief Low level I/O, Seismic Store.
 */

// IByteIO::Disposition -> OpenMode
//OpenMode:    { Closed = 0, ReadOnly, ReadWrite, Truncate, Delete, };
//Disposition: { Create, Read, Update, Delete, Access, List };

// Plans for utilizing code from byteio_sd.cpp in ZGY-Cloud.
//
// cherry pick code from here for read and write.
//     ByteIoSeismicDrive::closeWrite()
//
// bells and whistles for configuration.
//     slurp()
//     expandArgument()
//
// Not now. Don't keep a long lived cache.
//     class SDGenericDatasetCache
//
// Worry: CTag / ETag handling? Not now
//     ByteIoSeismicDrive::readETag()
//
// Might need soon.
//     checkAccess()
//
// Utilities, might need later.
// Need a way for API users to access these directly
//     getTag()
//     setTag()
//     getMetaData()
//     setMetaData()
//     getSeisMeta()
//     setSeisMeta()
//
// maybe later, for tokentype and for error messages.
// TODO-Low: if imp token is refreshed then keep those in a cache.
// Might need support from SDAPI for that.
//     parseJsonString()
//     decodeJwt()
//     getTokenType()

using OpenZGY::IOContext;
namespace InternalZGY {
#if 0
}
#endif

class SDGenericDatasetWrapper;
class DatasetInformation;

/**
 * Access data in seismic store as a linear file even when the dataset
 * has multiple segments. There are some limitations on write.
 *
 *   - Writes starting at EOF are allowed, and will cause a new segment
 *     to be written.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes starting before EOF are only allowed if offset,size exactly
 *     matches a previous write. This will cause that segment to be rewritten.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *
 * For read the class provides a readv() method to do scatter/gather reads.
 * The code will then consolidate adjacent bricks to get larger brick size
 * sent to SDAPI. Optionally parallelize requests that cannot be consolidated.
123
 *
124
125
126
127
128
129
 * Thread safety when used for reading:
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
 *
 * Thread safety when used for writing:
130
 * Not thread safe. See SeismicStoreFile::xx_write.
131
132
133
 *
 * Thread safety when closing a file:
 * Not thread safe.
134
135
136
137
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
138
139
140
 */
class SeismicStoreFile : public FileUtilsSeismicStore
{
141
142
  SeismicStoreFile(const SeismicStoreFile&) = delete;
  SeismicStoreFile& operator=(const SeismicStoreFile&) = delete;
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
public:
  typedef std::function<bool(int, const std::string&)> LoggerFn;
public:
  SeismicStoreFile(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFile();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  // Functions from FileADT
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close();
  virtual std::int64_t xx_eof() const;
  virtual bool xx_iscloud() const override;
  // Functions from FileUtilsSeismicStore
  virtual void deleteFile(const std::string& filename, bool missing_ok) const;
158
  virtual std::string altUrl(const std::string& filename) const;
159
160
161
162
163
164
165
166
167
public:
  // TODO-Low per-instance logging. This is tedious to implement
  // because many of the helper classes will need to hold a logger
  // instance as well, or need the logger passed in each call.
  static bool _logger(int priority, const std::string& message = std::string());
  static bool _logger(int priority, const std::ios& ss);
public:
  // For use by debug_trace, allow SeismicStoreFileDelayedWrite() access
  // to details of the file.
168
  std::shared_ptr<const DatasetInformation> debug_info();
169
private:
170
171
172
  /**
   * This class is used by _split_by_segment to describe a request as seen by
   * seismic store.
173
174
175
176
177
   *
   * Thread safety:
   * Modification may lead to a data race. This should not be an issue,
   * because instances are only meant to be modified when created or
   * copied or assigned prior to being made available to others.
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
   */
  class RawRequest
  {
  public:
    std::int64_t blocknum;	// Seismic store segment a.k.a. block
    std::int64_t local_offset;	// File offset inside this blocknum
    std::int64_t local_size;	// How much to read from this block
    std::int64_t outpos;
    RawRequest(std::int64_t a_blocknum,
               std::int64_t a_offset,
               std::int64_t a_size,
               std::int64_t a_outpos)
      : blocknum(a_blocknum)
      , local_offset(a_offset)
      , local_size(a_size)
      , outpos(a_outpos)
    {
    }
  };
  typedef std::vector<RawRequest> RawList;
  RawList _split_by_segment(const ReadList& requests);
199
200
201
  void _cached_read(/*TODO-SeismicStore seg, offset, view*/);
private:
  OpenMode _mode;
202
203
204
  // TODO-Low: To improve isolation, the user visible context should
  // be copied into an equivalent InternalZGY::SeismicStoreConfig.
  // The downside is that it gets more tedious to maintain.
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SDGenericDatasetWrapper> _dataset;
  static LoggerFn _loggerfn;
};

SeismicStoreFile::LoggerFn SeismicStoreFile::_loggerfn;

/**
 * Improve on SeismicStoreFile, have it buffer large chunks of data before
 * writing it out to a new segment.
 *
 *   - Writes starting at EOF are allowed, and will buffer data in the
 *     "open segment" until explicitly flushed.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes fully inside the open segment are allowed.
 *
 *   - Writes starting before the open segment are only allowed if
 *     offset,size exactly matches a previous write. This will cause that
 *     segment to be rewritten. As a corollary, writes canot span the
 *     closed segment / open segment boundary.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *     Why we might want this: On opening a file with existing
 *     data bricks we might choose to read the last segment and
 *     turn it into an open segment. Then delete (in memory only)
 *     the last segment. When it is time to flush the data it gets
 *     rewritten. This allows adding bricks to a file, while still
 *     ensuring that all segments except first and last need to be
 *     the same size. Note that there are other tasks such as
 *     incrementally updating statistics and histogram that might
 *     turn out to be a lot of work.
 *
 *   - When used to create ZGY files, caller must honor the convention
 *     that all segments except the first and last must have the same size.
 *
 *   - Caveat: The fact that random writes are sometimes allowed, sometimes
 *     not depending on the segment number violates the principle of
 *     least surprise. And makes for more elaborate testing. For ZGY
 *     it is quite useful though. ZGY can recover from a ZgySegmentIsClosed
 *     exception by abandoning (leaking) the current block and write it
 *     to a new location. With a typical access pattern this will happen
 *     only occasionally.
 *
 * Note that this class doesn't implement FileUtilsSeismicStore, so for
 * that functionality you need to instanciate a regular SeismicStoreFile.
254
255
256
 *
 * Thread safety:
 * Not thread safe by design, as it is only used for files opened for write.
257
258
259
 * If a file is opened for read/write (currently not supported)
 * or when running finalize it is safe to read data as long as no writes
 * can be pending.
260
261
262
263
264
265
266
267
268
 */
class SeismicStoreFileDelayedWrite : public FileADT
{
  OpenMode _mode;
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SeismicStoreFile> _relay;
  std::vector<char> _open_segment;
  UsageHint _usage_hint;

269
270
271
  SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
  SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
public:
  SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFileDelayedWrite();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
  virtual bool xx_iscloud() const override;

private:
  void _flush_part(std::int64_t this_segsize);
  void _flush(bool final_call);
};

/////////////////////////////////////////////////////////////////////////////
//    class DatasetInformation   ////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * \brief Cached information for a SDGenericDataset.
 *
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * All fields will be filled in when any of them are first needed.
 * An exception is thrown if some of the information is inaccessible.
 * For new datasets, the information may either be set explicitly by
 * the writer or it may be set on the first write.
 * For existing datasets the sdapi is queried.
 * Even if the dataset is opened read/write the information is not
 * expected to change. Some other mechanism needs to deal with
 * stale data in that case.
305
306
 *
 * Thread safety:
307
308
309
310
 * All access SDGenericDatasetWrapper::info_ will be protected by
 * SDGenericDatasetWrapper::mutex_. Methods that expect to change data
 * (currently only updateOnWrite()) will need some special handling.
 * TODO-High: there is still a snag here.
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
 */
class DatasetInformation
{
private:
  // Number of blocks on file.
  // On write, this will include the holes if data is written out of order.
  std::int64_t block_count_;

  // Size of first block. On write this might need to be explicitly set if the
  // writer decides to output blocks non sequentially. It is not possible to
  // just assume block 0 is the same size as block 1 because block 1 might
  // be the last block on file and this might have a different size.
  std::int64_t block0_size_;

  // Size of blocks that are neither the first nor last on the file.
  // Zero if there is only a single block on the file.
  std::int64_t block1_size_;

  // Size of the last block, which is allowed to be smaller.
  // Will be zero if there is just one block.
  std::int64_t last_block_size_;

public:
  DatasetInformation();
  explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd);
  std::string toString() const;

public:
  std::int64_t blockCount() const { return block_count_; }
  std::int64_t block0Size() const { return block0_size_; }
  std::int64_t block1Size() const { return block1_size_; }
  std::int64_t lastBlockSize() const { return last_block_size_; }

public:
  std::int64_t totalSize() const;
  std::vector<std::int64_t> allSizes(std::int64_t open_size) const;
  void getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const;
  void checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const;
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};

DatasetInformation::DatasetInformation()
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
}

DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd)
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
  try {
    // Note that sdapi is a bit confusing with respect to signed/unsigned.
    // getBlockNum() returns an unsigned (uint64_t).
    // getSize() and getBlockSize() return long long, may return -1 as error.
    // I will treat all of them as signed, since there is no way that
    // there can be more than 2^63 blocks.
    long long nblocks = sdgd->getBlockNum();
    long long nbytes = sdgd->getSize();
    std::vector<std::string> names;
    if (nblocks >= 1)
      names.push_back("0");
    if (nblocks >= 2)
      names.push_back("1");
    if (nblocks >= 3)
      names.push_back(std::to_string(nblocks - 1));
    std::vector<long long> sizearray;
    if (nblocks <= 0) {
    }
#if 0 // Chicken...
    else if (nblocks == 1) {
      // WARNING: getSize is less reliable.
      // I need to trust that each block is only written once.
      // Or that if rewritten, caller passed check_and_overwrite=true.
      // TODO-Low: SDAPI should really have been able to figure out by itself
      // whether the check is needed.
      // TODO-Worry: ensure that ZGY-Cloud doesn't need that flag.
      sizearray.push_back(nbytes);
    }
#endif
    else {
      sizearray = sdgd->getBlocksSize(names);
    }
    if (nblocks < 0)
      throw OpenZGY::Errors::ZgyInternalError("Unable to get block count for SDGenericDataset");
    for (const auto size : sizearray)
      if (size <= 0)
        throw OpenZGY::Errors::ZgyInternalError("Unable to get segment size for SDGenericDataset");

    this->block_count_ = nblocks;
    this->block0_size_ = nblocks >= 1 ? sizearray.at(0) : 0;
    this->block1_size_ = nblocks >= 2 ? sizearray.at(1) : 0;
    this->last_block_size_ = nblocks >= 3 ? sizearray.at(2) : this->block1_size_;

    // Not throwing exceptions here, because sdgd->getSize() is not reliable.
    // But the problem *could* be that the block sizes of all the middle blocks
    // is not the same. Which would be a real problem.
    if (nbytes >= 0 && nbytes != (long long)totalSize())
      SeismicStoreFile::_logger(0, "Dataset has inconsistent size");

    SeismicStoreFile::_logger(1, toString());
  }
  catch (const OpenZGY::Errors::ZgyError& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw;
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw OpenZGY::Errors::ZgyInternalError("Seismic Store says: " + std::string(ex.what()));
  }
}

std::string
DatasetInformation::toString() const
{
  std::stringstream ss;
  ss << "Total " << totalSize()
     << " in " << blockCount() <<" segments"
     << " of size "<< block0Size()
     << ", " << block1Size()
     << ", " << lastBlockSize();
  return ss.str();
}

/**
 * Return the total file size, including any holes caused by bricks
 * not written yet. The result currently computes the answer based on
 * the individual block sizes assuming all blocks except the first and
 * last will have the same size. This is more reliable than asking
 * sdapi for the total size. And if the block size assumption is false
 * then we are completely hosed anyway. It is possible to verify the
 * assumption but it is probably too expensive to do so.
447
 *
448
449
 * Thread safety: Safe because instances are immutable once constructed
 * and made available.
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
 */
std::int64_t
DatasetInformation::totalSize() const
{
  switch (block_count_) {
  case 0: return 0;
  case 1: return block0_size_;
  default: return (block0_size_ +
                   (block_count_ - 2) * block1_size_ +
                   last_block_size_);
  }
}

/**
 * Return the total file size broken down into segments, including
 * the "open" segment which size needs to be provided explicitly.
 * This functon is currently only used for debugging.
 */
std::vector<std::int64_t>
DatasetInformation::allSizes(std::int64_t open_size) const
{
  std::vector<std::int64_t> result;
  for (int ii = 0; ii < block_count_; ++ii)
    result.push_back(ii == 0 ? block0_size_ :
                     ii == block_count_-1 ? last_block_size_ :
                     block1_size_);
  if (open_size >= 0)
    result.push_back(open_size);
  return result;
}

/**
 * Do consistency checks before data is written.
 * TODO-Low: Parts might be redundant due to checks in caller.
 * E.g. caller raises SegmentIsClosed if attempting to write "backwards".
 * Throws exceptions on error.
 */
void
DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const
{
  if (blocknum < 0 || blocknum > std::numeric_limits<int>::max()) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " because it is out of range.");
  }
  if (blocknum != 0 && block0_size_ == 0) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
  }
  if (blocknum == 0) {
    // Write or overwrite block 0.
    if (block0_size_ != 0 && block0_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size of block zero");
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    if (block1_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Blocks must have the same size except the first and last");
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // TODO-Low: Technically I might have allowed this.
    if (blocksize != last_block_size_)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size when re-writing the last block");
  }
  else if (blocknum == block_count_) {
    // Append a block, which is not block 0.
    // This is the new "last block" which means what we previous thought
    // was the last block is in fact a middle one. Which means the size
    // of the former last block must be the same as block 1.
    // (Note that the former last block might actually *be* block 1).
    if (block1_size_ != 0 && blocksize > block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " is too large");
    if (block_count_ != 0 && last_block_size_ != block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(block_count_ -1) + " had wrong size");
  }
  else {
    // Trying to write sparse data.
    throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " written out of sequence");
  }
}

/**
 * Update cached size information after data is successfully written.
 * checkOnWrite() must have been called already.
532
533
534
535
536
 *
 * Thread safety: NOT thred safe.
 * Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
 * Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
 * That one wll make sure the smart pointer being updated is unique.
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
 */
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
  if (blocknum == 0) {
    // Write or overwrite block 0.
    block0_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    // Size cannot change, so do nothing.
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // Redundant if checkOnWrite() forbids changing size.
    last_block_size_ = blocksize;
  }
  else if (blocknum == block_count_) {
    // Append a block which is not block 0.
    if (block1_size_ == 0)
      block1_size_ = blocksize;
    last_block_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
}

/**
 * Given a linear offset and count, convert this to a block local address.
 * The returned count may be smaller than requested in case the request
 * crossed a segment boundary. In that case the caller will need to read
 * in a loop.
 *
 * Postcondition: If blocknum is returned as the last block, local_size
 * will be returned as requested size. If this were not so, the calling
 * function would be likely to loop forever.
 */
void
DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const
{
  if (offset < 0 || size < 0)
    throw OpenZGY::Errors::ZgyInternalError("Offset and size cannot be negative.");
  else if (size > std::numeric_limits<std::int64_t>::max() - offset)
    throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
  else if (offset + size > totalSize()) {
    if (SeismicStoreFile::_logger(1)) {
      std::stringstream ss;
      ss << "Reading past EOF: read("
         << "off=" << offset
         << ", size=" << size
         << ", end=" << offset+size
         << ") dataset: " << toString()
         << std::endl;
      SeismicStoreFile::_logger(1, ss.str());
    }
    throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
  }

  if (block_count_ <= 1) {
    // In first block which is also last block.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = size;
  }
  else if (block0_size_ == 0 || block1_size_ == 0) {
    // Can only happen when writing. Files open for read will have the sizes
    // set up when the DatasetInformation is first accessed.
    throw OpenZGY::Errors::ZgyInternalError("getLocalOffset() called before size is known.");
  }
  else if (offset < block0_size_) {
    // In first block, and there are more blocks following.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = std::min(size, block0_size_ - offset);
  }
  else {
    const std::int64_t bnum = std::min(block_count_ - 1, ((offset - block0_size_) / block1_size_) + 1);
    const std::int64_t segment_start = block0_size_ + (bnum - 1) * block1_size_;
    *blocknum = bnum;
    *local_offset = offset - segment_start;
    if (bnum + 1 < block_count_)
      *local_size = std::min(size, block1_size_ - (offset - segment_start));
    else
      *local_size = size; // In last block.
  }
}

/////////////////////////////////////////////////////////////////////////////
//    class SDGenericDatasetWrapper   ///////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * Wrapper around seismicdrive::SDGenericDataset.
 * Instances are typically managed by smart pointers.
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * What this adds compared to a raw SDGenericDataset:
 *
 * - We keep a smart pointer reference to the SDManager that was used
 *   to create the SDGenericDataset. The sdapi documentation does not
 *   state this explicitly, but I assume it is unhealthy to destruct
 *   the manager before the dataset is closed. And using an application
 *   global manager is not an option due to privilege separation.
 *
 * - The instance remembers whether it was created for create, read, etc.
 *
 * - We may later add wrappers for SDGenericDataset members for caching
 *   purposes (block size, file size, etc.) in case the sdapi doesn't.
 *
 * - We may later add wrappers for the other members to tweak the
 *   interface, map error messages, add logging, etc. Or to add
 *   additional checking such as not reading past eof / end of block.
649
 *
650
651
 * Thread safety: The class itself is thread safe. The data being wrapped
 * might not be.
652
 *
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
 * All mutable data is protected by this->mutex_. Access methods for
 * those return a smart pointer. If info() is called on one thread
 * while another thread is doing a write operation (which is actually
 * not allowed) then it is unspecified whether the returned value is
 * before or after the write. It is also unspecified whether
 * updateOnWrite() will have any effect in that case. The information
 * is still consistent though, and may be used for logging etc.
 *
 * CAVEAT: Make sure the returned pointer remains in scope long
 * enough. E.g. this is NOT safe, and might crash every 7 years or so.
 * Except on customer sites where it may crash every 7 minutes.
 *
 *    seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
 *    foo(dataset); // the returned smart pointer is already deleted.
 *
 * TODO-Yagni: _virgin is not used. It is related to the CTag mechanism.
 * It is still being discussed whether that needs to be ported from the
 * old accessor. It might not be needed if we go for immutable ZGY files.
671
672
673
674
675
 */
class SDGenericDatasetWrapper
{
  std::shared_ptr<seismicdrive::SDManager> manager_;
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
676
  std::shared_ptr<const DatasetInformation> info_;
677
678
  OpenMode disposition_;
  bool virgin_; // If true, the cached CTag should be ok.
679
  mutable std::mutex mutex_; // Protect all members.
680
681
682
683
684
685
public:
  typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
  SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
                          std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
                          OpenMode disp)
    : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
686
    , mutex_()
687
688
689
  {
  }
  ~SDGenericDatasetWrapper();
690
691
692
693
694
695
696
697
698
699
700
701
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
    std::lock_guard<std::mutex> lk(mutex_);
    return dataset_;
  }
  std::shared_ptr<seismicdrive::SDManager> manager() {
    std::lock_guard<std::mutex> lk(mutex_);
    return manager_;
  }
  OpenMode disposition() const {
    return disposition_; // immutable, so no lock.
  }
  std::shared_ptr<const DatasetInformation> info() {
702
    std::lock_guard<std::mutex> lk(mutex_);
703
704
705
706
707
708
709
710
711
712
713
714
715
716
    if (!info_) {
      switch (disposition_) {
      case OpenMode::Truncate:
        info_.reset(new DatasetInformation());
        break;
      case OpenMode::ReadOnly:
      case OpenMode::ReadWrite:
        info_.reset(new DatasetInformation(dataset_.get()));
        break;
      case OpenMode::Closed:
      default:
        throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
      }
    }
717
    return info_;
718
  }
719

720
  void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
721
722
723
    std::lock_guard<std::mutex> lk(mutex_);
    dataset_ = dataset;
    info_.reset();
724
725
726
727
728
    // This would give a slight performance boost, saving a single call
    // to checkCTag() after a file has changed. This happens so rarely
    // that it isn't worth the extra testing.
    //virgin_ = true;
  }
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750

  /**
   * Adjust the information after data has been written to the cloud.
   *
   * Thread safety:
   * Multiple concurrent writers will have a race condition, but that
   * scenario is expressly forbidden anyway. Smart pointers from
   * info() held by callig code are race free because they are
   * immutable. updateOnWrite() makes a new pointer.
   */
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
  {
    std::shared_ptr<DatasetInformation> updated;
    updated.reset(new DatasetInformation(*this->info()));
    updated->updateOnWrite(blocknum, blocksize);
    info_ = updated;
  }

  bool isTouched() const {
    std::lock_guard<std::mutex> lk(mutex_);
    return !virgin_;
  }
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765

  bool touch() {
    // Called to inform us that getCTag() might return stale data.
    //
    // if this->virgin_, we have just created the wrapper which means
    // we have just opened it. Only "List" entries don't get opened
    // immediately. But those aren't cached so they are not relevant
    // here. Since we just created it, we can call getCTag() without
    // calling checkCTag() first. Saving one database hit.
    //
    // The flag must be reset at least when an existing dataset accessor
    // is being re-used to open a file again. Currently it will be reset
    // much more often (every time the accessor is used to read data)
    // and it won't be set true again by updateDataset(). This will
    // simplify testing and has little impact on performance.
766
    std::lock_guard<std::mutex> lk(mutex_);
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
    bool old = virgin_;
    virgin_ = false;
    return old != virgin_;
  }
};

SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
{
  info_.reset();
  try {
    // Explicit close so we have control of how to deal with errors. Or not.
    // Hopefully the destructors won't try to close again when they see
    // that we tried to do so ourselves. Note: currently  they will try that.
    if (dataset_)
      dataset_->close();
  }
  catch (const std::exception& ex) {
    if (std::string(ex.what()).find("dataset is not open") == std::string::npos)
      SeismicStoreFile::_logger(0, "SDGenericDataset::close(): " + std::string(ex.what()));
  }
  try {
    // Catch exceptions raised inside a destructor might not work. But I tried.
    dataset_.reset();
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(0, "SDGenericDataset::dtor(): " + std::string(ex.what()));
  }
  manager_.reset();
}

/////////////////////////////////////////////////////////////////////////////
//    FileADT -> SeismicStoreFile   /////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

FileUtilsSeismicStore::~FileUtilsSeismicStore()
{
}

SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, const IOContext *iocontext)
  : FileUtilsSeismicStore()
  , _mode(mode)
  , _config()
{
  // TODO-Low a better way of handling this.
811
812
813
814
815
816
817
818
  // Logger passed in iocontext, then storing it per file.
  {
    // Must protect against double init because _loggerfn is global.
    static std::mutex mutex;
    std::lock_guard<std::mutex> lk(mutex);
    if (!_loggerfn)
      _loggerfn = LoggerBase::standardCallback(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"), "zgy: ", "");
  }
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905

  _logger(3, std::stringstream() << "SeismicStoreFile("
          << "\"" << filename << "\", " << int(mode) << ", *)\n");

  auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
  if (!context)
    throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
  this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));

  std::unordered_map<std::string, std::string> extra;
  using seismicdrive::api::json::Constants;
  if (!context->_legaltag.empty())
    extra[Constants::kLegalTagLabel] = context->_legaltag;
  if (!context->_writeid.empty())
    extra[Constants::kWriteIdLabel] = context->_writeid;
  if (!context->_seismicmeta.empty())
    extra[Constants::kDDMSSeismicMetadataLabel] = context->_seismicmeta;
  //TODO-Low: if (!context->_pedantic.empty())
  //  extra[Constants::KPedantic] = context->_pedantic;

  // TODO-Low enable logging for manager and/or dataset
  auto manager = std::make_shared<seismicdrive::SDManager>
    (context->_sdurl, context->_sdapikey);
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset;
  if (mode != OpenMode::Closed) {
    _logger(5, std::stringstream()
            << "make dataset using manager "
            << std::hex << (std::uint64_t)manager.get());
    dataset = std::make_shared<seismicdrive::SDGenericDataset>
      (manager.get(), filename);
    _logger(5, "dataset ok");
  }

  // TODO-Low: Cache the manager and possibly the SDUtils instance.

  // TODO-Low: handle impersonation better. SDAPI wants us to tell
  // whether we are passing an impersonation token. If so, it will
  // attempt to refresh it if expired. The problem is that looking
  // at the token (e.g. one provided by an environment variable)
  // to decide which type it is can be ridiculously complicated.
  // See authorizeManager() in the old code. Also, it would be nice
  // to be able to read back the refreshed token.
  if (context->_sdtokentype == "sdtoken")
    manager->setAuthProviderFromString(context->_sdtoken);
#ifdef HAVE_SAUTH_EXTENSIONS
  else if (context->_sdtokentype == "file")
    manager->setAuthProviderFromFile(context->_sdtoken);
  else if (context->_sdtoken.substr(0, 5) == "FILE:")
    manager->setAuthProviderFromFile(context->_sdtoken.substr(5));
#else
  if (context->_sdtokentype == "file" || context->_sdtoken.substr(0, 5) == "FILE:")
    throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
  else if (context->_sdtokentype == "imptoken")
    manager->setAuthProviderFromImpToken(context->_sdtoken);
  else if (context->_sdtoken.substr(0, 9) == "imptoken:")
    manager->setAuthProviderFromImpToken(context->_sdtoken.substr(9));
  else
    manager->setAuthProviderFromString(context->_sdtoken);

  try {
    switch (mode) {
    case OpenMode::Closed:
      break;
    case OpenMode::ReadOnly:
      dataset->open(seismicdrive::SDDatasetDisposition::READ_ONLY, extra);
      break;
    case OpenMode::ReadWrite:
      dataset->open(seismicdrive::SDDatasetDisposition::READ_WRITE, extra);
      break;
    case OpenMode::Truncate:
      dataset->open(seismicdrive::SDDatasetDisposition::OVERWRITE, extra);
      break;
    }
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(1, std::stringstream()
                              << "Oops (Opening file mode "
                              << int(mode) << "): "
                              << ex.what());
    throw OpenZGY::Errors::ZgyInternalError("Seismic Store says: " + std::string(ex.what()));
    // TODO-Low: Should I just re-throw the sdapi error?
    // TODO-Low: If not, should other SDAPI errors be translated?
  }

  this->_dataset = std::make_shared<SDGenericDatasetWrapper>
    (manager, dataset, mode);
906
907
908
909
  // Removed this because it causes info() to be populated early,
  // thereby negating the benefit of lazy evaluation. And, worse,
  // causing different behavior when debugging is on.
  //if (_logger(1) && mode != OpenMode::Closed)
910
  //  _logger(1, this->_dataset->info()->toString());
911
912
913
914
}

SeismicStoreFile::~SeismicStoreFile()
{
915
916
917
918
919
920
921
922
923
924
  if (_mode != OpenMode::Closed) {
    try {
      xx_close();
    }
    catch (const std::exception& ex) {
      // The calling layer is supposed to do an explicit xx_close()
      // so it can catch and handle exceptions. This blind catch is
      // just a desperate attempt to avoid an application crash.
      _logger(0, "EXCEPTION closing file: " + std::string(ex.what()));
    }
925
926
927
928
929
930
931
932
933
  }
}

/**
 * Convenience for invoking _loggerfn with a simple message.
 * This isn't much different from invoking the callback directly.
 * But it makes debugging slightly simpler to have an easy place
 * to set a breakpoint. It also adds more symmetry with respect
 * to the stringstream version, which does add value.
934
935
936
 *
 * Thread safety: Yes, because the global _loggerfn can only be set
 * while the very first instance is being constructed.
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
 */
bool
SeismicStoreFile::_logger(int priority, const std::string& message)
{
  return _loggerfn(priority, message);
}

/**
 * Convenience for invoking _loggerfn with a stringstream.
 * Due to a somewhat naughty cast, the function can be caller as:
 *
 *   if(_logger(pr1))
 *    _logger(pri, std::stringstream() << some << data << here);
 *
 * The first line is optional. It just prevents the expression in
 * the second line from being evaluatet if debugging is disabled.
953
954
955
 *
 * Thread safety: Yes, because the global _loggerfn can only be set
 * while the very first instance is being constructed.
956
957
958
959
960
961
962
963
964
965
966
967
 */
bool
SeismicStoreFile::_logger(int priority, const std::ios& ss)
{
  auto sstream = dynamic_cast<const std::stringstream*>(&ss);
  return _logger(priority, sstream ? sstream->str() : std::string());
}

std::shared_ptr<FileADT>
SeismicStoreFile::xx_make_instance(const std::string& filename, OpenMode mode, const IOContext *iocontext)
{
  if (filename.substr(0, 5) == "sd://" &&
968
969
970
971
972
973
      (mode != OpenMode::ReadWrite && mode != OpenMode::Truncate)) {
    auto file = std::shared_ptr<FileADT>(new SeismicStoreFile(filename, mode, iocontext));
    // This is a no-op unless enabled by enviroment variables
    file = FileWithPerformanceLogger::inject(file);
    return file;
  }
974
975
976
977
  else
    return std::shared_ptr<FileADT>();
}

978
979
980
981
982
/**
 * Thread safety: Designed to be thread safe as long as the underlying
 * SDGenericDataset is. Even when data is being written in another
 * thread.
 */
983
984
985
986
987
void
SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
  this->_validate_read(data, offset, size, this->xx_eof(), this->_mode);
  ReadRequest request(offset, size, nullptr);
988
  RawList split = this->_split_by_segment(ReadList{request});
989
  if (this->_config->_debug_trace)
990
    this->_config->_debug_trace("read", /*need=*/size, /*want=*/size,/*parts*/ split.size(), this->_dataset->info()->allSizes(-1));
991
  for (const RawRequest& it : split) {
992
    // TODO-Low: port _cached_read ?
993
    this->_dataset->dataset()->readBlock
994
995
996
997
      (static_cast<int>(it.blocknum),
       static_cast<char*>(data)+it.outpos,
       static_cast<size_t>(it.local_offset),
       static_cast<size_t>(it.local_size));
998
999
1000
  }
}