file_sd.cpp 79.8 KB
Newer Older
1
// Copyright 2017-2021, Schlumberger
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_sd.h"

#ifdef HAVE_SD // Rest of file

#include "logger.h"
20
#include "file_consolidate.h"
21
#include "file_performance.h"
22
#include "file_parallelizer.h"
23
#include "fancy_timers.h"
24
#include "mtguard.h"
25
26
#include "../exception.h"
#include "../iocontext.h"
27
#include "environment.h"
28
29
30
31
32
33
34
35
36
37
38

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <iostream>
#include <sstream>
#include <limits>
#include <string.h>
#include <algorithm>
39
#include <numeric>
40
#include <mutex>
41
#include <omp.h>
42

Paal Kvamme's avatar
Paal Kvamme committed
43
44
45
// It would have been nice to have similar include paths in Linux and Windows
// but that is a cosmetic issue only and it is only a problem in this file.
#ifndef _WIN32
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include <SDManager.h>
#include <SDGenericDataset.h>
#include <SDUtils.h>
#include <Constants.h>
#else
#include <SDAPI/SDManager.h>
#include <SDAPI/SDGenericDataset.h>
#include <SDAPI/SDUtils.h>
#include <SDAPI/Constants.h>
#endif

/** \cond SSTORE */

/**
 * \file: file_sd.cpp
 * \brief Low level I/O, Seismic Store.
 */

// IByteIO::Disposition -> OpenMode
//OpenMode:    { Closed = 0, ReadOnly, ReadWrite, Truncate, Delete, };
//Disposition: { Create, Read, Update, Delete, Access, List };

// Plans for utilizing code from byteio_sd.cpp in ZGY-Cloud.
//
// cherry pick code from here for read and write.
//     ByteIoSeismicDrive::closeWrite()
//
// bells and whistles for configuration.
//     slurp()
//     expandArgument()
//
// Not now. Don't keep a long lived cache.
//     class SDGenericDatasetCache
//
// Worry: CTag / ETag handling? Not now
//     ByteIoSeismicDrive::readETag()
//
// Might need soon.
//     checkAccess()
//
// Utilities, might need later.
// Need a way for API users to access these directly
//     getTag()
//     setTag()
//     getMetaData()
//     setMetaData()
//     getSeisMeta()
//     setSeisMeta()
//
// maybe later, for tokentype and for error messages.
// TODO-Low: if imp token is refreshed then keep those in a cache.
// Might need support from SDAPI for that.
//     parseJsonString()
//     decodeJwt()
//     getTokenType()

using OpenZGY::IOContext;
103
using OpenZGY::SeismicStoreIOContext;
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
namespace InternalZGY {
#if 0
}
#endif

class SDGenericDatasetWrapper;
class DatasetInformation;

/**
 * Access data in seismic store as a linear file even when the dataset
 * has multiple segments. There are some limitations on write.
 *
 *   - Writes starting at EOF are allowed, and will cause a new segment
 *     to be written.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes starting before EOF are only allowed if offset,size exactly
 *     matches a previous write. This will cause that segment to be rewritten.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *
 * For read the class provides a readv() method to do scatter/gather reads.
 * The code will then consolidate adjacent bricks to get larger brick size
 * sent to SDAPI. Optionally parallelize requests that cannot be consolidated.
131
 *
132
133
134
135
136
137
 * Thread safety when used for reading:
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
 *
 * Thread safety when used for writing:
138
 * Not thread safe. See SeismicStoreFile::xx_write.
139
140
141
 *
 * Thread safety when closing a file:
 * Not thread safe.
142
143
144
145
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
146
147
148
 */
class SeismicStoreFile : public FileUtilsSeismicStore
{
149
150
  SeismicStoreFile(const SeismicStoreFile&) = delete;
  SeismicStoreFile& operator=(const SeismicStoreFile&) = delete;
151
152
153
154
155
156
157
158
159
160
161
162
public:
  typedef std::function<bool(int, const std::string&)> LoggerFn;
public:
  SeismicStoreFile(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFile();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  // Functions from FileADT
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close();
  virtual std::int64_t xx_eof() const;
163
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
164
165
166
  virtual bool xx_iscloud() const override;
  // Functions from FileUtilsSeismicStore
  virtual void deleteFile(const std::string& filename, bool missing_ok) const;
167
  virtual std::string altUrl(const std::string& filename) const;
168
169
170
private:
  void do_write_one(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const bool overwrite);
  void do_write_many(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const std::int64_t blobsize, const bool overwrite);
171
172
173
174
175
176
177
public:
  // TODO-Low per-instance logging. This is tedious to implement
  // because many of the helper classes will need to hold a logger
  // instance as well, or need the logger passed in each call.
  static bool _logger(int priority, const std::string& message = std::string());
  static bool _logger(int priority, const std::ios& ss);
private:
178
179
180
  /**
   * This class is used by _split_by_segment to describe a request as seen by
   * seismic store.
181
182
183
184
185
   *
   * Thread safety:
   * Modification may lead to a data race. This should not be an issue,
   * because instances are only meant to be modified when created or
   * copied or assigned prior to being made available to others.
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
   */
  class RawRequest
  {
  public:
    std::int64_t blocknum;	// Seismic store segment a.k.a. block
    std::int64_t local_offset;	// File offset inside this blocknum
    std::int64_t local_size;	// How much to read from this block
    std::int64_t outpos;
    RawRequest(std::int64_t a_blocknum,
               std::int64_t a_offset,
               std::int64_t a_size,
               std::int64_t a_outpos)
      : blocknum(a_blocknum)
      , local_offset(a_offset)
      , local_size(a_size)
      , outpos(a_outpos)
    {
    }
  };
  typedef std::vector<RawRequest> RawList;
  RawList _split_by_segment(const ReadList& requests);
Paal Kvamme's avatar
Paal Kvamme committed
207
  void _cached_read(/*TODO-Low: seg, offset, view*/);
208
209
private:
  OpenMode _mode;
210
211
212
  // TODO-Low: To improve isolation, the user visible context should
  // be copied into an equivalent InternalZGY::SeismicStoreConfig.
  // The downside is that it gets more tedious to maintain.
213
214
215
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SDGenericDatasetWrapper> _dataset;
  static LoggerFn _loggerfn;
216
217
218
  // As long as we don't inherit FileCommon we need our own timers.
  std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
  std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
};

SeismicStoreFile::LoggerFn SeismicStoreFile::_loggerfn;

/**
 * Improve on SeismicStoreFile, have it buffer large chunks of data before
 * writing it out to a new segment.
 *
 *   - Writes starting at EOF are allowed, and will buffer data in the
 *     "open segment" until explicitly flushed.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes fully inside the open segment are allowed.
 *
 *   - Writes starting before the open segment are only allowed if
 *     offset,size exactly matches a previous write. This will cause that
 *     segment to be rewritten. As a corollary, writes canot span the
 *     closed segment / open segment boundary.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *     Why we might want this: On opening a file with existing
 *     data bricks we might choose to read the last segment and
 *     turn it into an open segment. Then delete (in memory only)
 *     the last segment. When it is time to flush the data it gets
 *     rewritten. This allows adding bricks to a file, while still
 *     ensuring that all segments except first and last need to be
 *     the same size. Note that there are other tasks such as
 *     incrementally updating statistics and histogram that might
 *     turn out to be a lot of work.
 *
 *   - When used to create ZGY files, caller must honor the convention
 *     that all segments except the first and last must have the same size.
 *
 *   - Caveat: The fact that random writes are sometimes allowed, sometimes
 *     not depending on the segment number violates the principle of
 *     least surprise. And makes for more elaborate testing. For ZGY
 *     it is quite useful though. ZGY can recover from a ZgySegmentIsClosed
 *     exception by abandoning (leaking) the current block and write it
 *     to a new location. With a typical access pattern this will happen
 *     only occasionally.
 *
 * Note that this class doesn't implement FileUtilsSeismicStore, so for
 * that functionality you need to instanciate a regular SeismicStoreFile.
265
266
267
 *
 * Thread safety:
 * Not thread safe by design, as it is only used for files opened for write.
268
269
270
 * If a file is opened for read/write (currently not supported)
 * or when running finalize it is safe to read data as long as no writes
 * can be pending.
271
272
273
274
275
276
277
278
 */
class SeismicStoreFileDelayedWrite : public FileADT
{
  OpenMode _mode;
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SeismicStoreFile> _relay;
  std::vector<char> _open_segment;
  UsageHint _usage_hint;
279
  std::shared_ptr<SummaryPrintingTimerEx> _ctimer; // Access is thread safe
280

281
282
283
  SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
  SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;

284
285
286
287
288
289
290
291
292
public:
  SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFileDelayedWrite();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
293
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
  virtual bool xx_iscloud() const override;

private:
  void _flush_part(std::int64_t this_segsize);
  void _flush(bool final_call);
};

/////////////////////////////////////////////////////////////////////////////
//    class DatasetInformation   ////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * \brief Cached information for a SDGenericDataset.
 *
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * All fields will be filled in when any of them are first needed.
 * An exception is thrown if some of the information is inaccessible.
 * For new datasets, the information may either be set explicitly by
 * the writer or it may be set on the first write.
 * For existing datasets the sdapi is queried.
 * Even if the dataset is opened read/write the information is not
 * expected to change. Some other mechanism needs to deal with
 * stale data in that case.
318
319
 *
 * Thread safety:
320
321
322
323
 * All access SDGenericDatasetWrapper::info_ will be protected by
 * SDGenericDatasetWrapper::mutex_. Methods that expect to change data
 * (currently only updateOnWrite()) will need some special handling.
 * TODO-High: there is still a snag here.
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
 */
class DatasetInformation
{
private:
  // Number of blocks on file.
  // On write, this will include the holes if data is written out of order.
  std::int64_t block_count_;

  // Size of first block. On write this might need to be explicitly set if the
  // writer decides to output blocks non sequentially. It is not possible to
  // just assume block 0 is the same size as block 1 because block 1 might
  // be the last block on file and this might have a different size.
  std::int64_t block0_size_;

  // Size of blocks that are neither the first nor last on the file.
  // Zero if there is only a single block on the file.
  std::int64_t block1_size_;

  // Size of the last block, which is allowed to be smaller.
  // Will be zero if there is just one block.
  std::int64_t last_block_size_;

public:
  DatasetInformation();
  explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd);
  std::string toString() const;

public:
  std::int64_t blockCount() const { return block_count_; }
  std::int64_t block0Size() const { return block0_size_; }
  std::int64_t block1Size() const { return block1_size_; }
  std::int64_t lastBlockSize() const { return last_block_size_; }

public:
  std::int64_t totalSize() const;
359
  std::vector<std::int64_t> allSizes(bool complete) const;
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
  void getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const;
  void checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const;
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};

DatasetInformation::DatasetInformation()
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
}

DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd)
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
{
  try {
    // Note that sdapi is a bit confusing with respect to signed/unsigned.
    // getBlockNum() returns an unsigned (uint64_t).
    // getSize() and getBlockSize() return long long, may return -1 as error.
    // I will treat all of them as signed, since there is no way that
    // there can be more than 2^63 blocks.
    long long nblocks = sdgd->getBlockNum();
    long long nbytes = sdgd->getSize();
    std::vector<std::string> names;
    if (nblocks >= 1)
      names.push_back("0");
    if (nblocks >= 2)
      names.push_back("1");
    if (nblocks >= 3)
      names.push_back(std::to_string(nblocks - 1));
    std::vector<long long> sizearray;
    if (nblocks <= 0) {
    }
#if 0 // Chicken...
    else if (nblocks == 1) {
Paal Kvamme's avatar
Paal Kvamme committed
399
400
401
402
403
404
405
406
407
      // Not using getSize() because I do not trust it.
      // SDAPI requires a hint check_and_overwrite=true when writing
      // a block that might exist already. If the hint is missing
      // then getSize() will return the wrong result. OpenZGY has
      // no control over who wrote the file. Defensive programming
      // says to just don't use the getSize() until the current
      // behavior (which I consider a bug) is fixed. Suggestion:
      // would it be possible to scan the file on exit and fix up
      // any incorrect size?
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
      sizearray.push_back(nbytes);
    }
#endif
    else {
      sizearray = sdgd->getBlocksSize(names);
    }
    if (nblocks < 0)
      throw OpenZGY::Errors::ZgyInternalError("Unable to get block count for SDGenericDataset");
    for (const auto size : sizearray)
      if (size <= 0)
        throw OpenZGY::Errors::ZgyInternalError("Unable to get segment size for SDGenericDataset");

    this->block_count_ = nblocks;
    this->block0_size_ = nblocks >= 1 ? sizearray.at(0) : 0;
    this->block1_size_ = nblocks >= 2 ? sizearray.at(1) : 0;
    this->last_block_size_ = nblocks >= 3 ? sizearray.at(2) : this->block1_size_;

    // Not throwing exceptions here, because sdgd->getSize() is not reliable.
    // But the problem *could* be that the block sizes of all the middle blocks
    // is not the same. Which would be a real problem.
    if (nbytes >= 0 && nbytes != (long long)totalSize())
      SeismicStoreFile::_logger(0, "Dataset has inconsistent size");

    SeismicStoreFile::_logger(1, toString());
  }
  catch (const OpenZGY::Errors::ZgyError& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw;
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(1, "Oops (DataSetInformation): " + std::string(ex.what()));
    throw OpenZGY::Errors::ZgyInternalError("Seismic Store says: " + std::string(ex.what()));
  }
}

std::string
DatasetInformation::toString() const
{
  std::stringstream ss;
  ss << "Total " << totalSize()
     << " in " << blockCount() <<" segments"
     << " of size "<< block0Size()
     << ", " << block1Size()
     << ", " << lastBlockSize();
  return ss.str();
}

/**
 * Return the total file size, including any holes caused by bricks
 * not written yet. The result currently computes the answer based on
 * the individual block sizes assuming all blocks except the first and
 * last will have the same size. This is more reliable than asking
 * sdapi for the total size. And if the block size assumption is false
 * then we are completely hosed anyway. It is possible to verify the
 * assumption but it is probably too expensive to do so.
463
 *
464
465
 * Thread safety: Safe because instances are immutable once constructed
 * and made available.
466
467
468
469
470
471
472
473
474
475
476
477
478
479
 */
std::int64_t
DatasetInformation::totalSize() const
{
  switch (block_count_) {
  case 0: return 0;
  case 1: return block0_size_;
  default: return (block0_size_ +
                   (block_count_ - 2) * block1_size_ +
                   last_block_size_);
  }
}

/**
480
481
 * Return the total file size broken down into segments, not including
 * the "open" segment which DatasetInformation doesn't know about.
482
483
 */
std::vector<std::int64_t>
484
DatasetInformation::allSizes(bool complete) const
485
{
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
  switch (block_count_) {
  case 0: return std::vector<std::int64_t>{};
  case 1: return std::vector<std::int64_t>{block0_size_};
  case 2: return std::vector<std::int64_t>{block0_size_, last_block_size_};
  default: {
    std::vector<std::int64_t> result;
    result.push_back(block0_size_);
    result.push_back(block1_size_);
    if (complete)
      for (int ii = 0; ii < block_count_ - 3; ++ii)
        result.push_back(block1_size_);
    result.push_back(last_block_size_);
    return result;
  }
  }
501
502
503
504
}

/**
 * Do consistency checks before data is written.
Paal Kvamme's avatar
Paal Kvamme committed
505
 * Some of these might be redundant due to checks in the caller.
506
507
508
509
510
511
512
513
514
515
516
517
 * E.g. caller raises SegmentIsClosed if attempting to write "backwards".
 * Throws exceptions on error.
 */
void
DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const
{
  if (blocknum < 0 || blocknum > std::numeric_limits<int>::max()) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " because it is out of range.");
  }
  if (blocknum != 0 && block0_size_ == 0) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
  }
518
519
520
  if (blocksize < 1) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write less that 1 byte.");
  }
521
522
523
524
525
526
527
528
529
530
531
532
533
  if (blocknum == 0) {
    // Write or overwrite block 0.
    if (block0_size_ != 0 && block0_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size of block zero");
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    if (block1_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Blocks must have the same size except the first and last");
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // TODO-Low: Technically I might have allowed this.
Paal Kvamme's avatar
Paal Kvamme committed
534
    // If update is to be supported then I probably need to.
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
    if (blocksize != last_block_size_)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size when re-writing the last block");
  }
  else if (blocknum == block_count_) {
    // Append a block, which is not block 0.
    // This is the new "last block" which means what we previous thought
    // was the last block is in fact a middle one. Which means the size
    // of the former last block must be the same as block 1.
    // (Note that the former last block might actually *be* block 1).
    if (block1_size_ != 0 && blocksize > block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " is too large");
    if (block_count_ != 0 && last_block_size_ != block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(block_count_ -1) + " had wrong size");
  }
  else {
    // Trying to write sparse data.
    throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " written out of sequence");
  }
}

/**
 * Update cached size information after data is successfully written.
 * checkOnWrite() must have been called already.
558
 *
559
 * Thread safety: NOT thread safe.
560
561
562
 * Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
 * Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
 * That one wll make sure the smart pointer being updated is unique.
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
 */
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
  if (blocknum == 0) {
    // Write or overwrite block 0.
    block0_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    // Size cannot change, so do nothing.
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // Redundant if checkOnWrite() forbids changing size.
    last_block_size_ = blocksize;
  }
  else if (blocknum == block_count_) {
    // Append a block which is not block 0.
    if (block1_size_ == 0)
      block1_size_ = blocksize;
    last_block_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
}

/**
 * Given a linear offset and count, convert this to a block local address.
 * The returned count may be smaller than requested in case the request
 * crossed a segment boundary. In that case the caller will need to read
 * in a loop.
 *
596
597
598
 * "blocks" refer to the block number in Seismic Store, not the potentially
 * larger logical blocks used by SeismicStoreDelayedWrite.
 *
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
 * Postcondition: If blocknum is returned as the last block, local_size
 * will be returned as requested size. If this were not so, the calling
 * function would be likely to loop forever.
 */
void
DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const
{
  if (offset < 0 || size < 0)
    throw OpenZGY::Errors::ZgyInternalError("Offset and size cannot be negative.");
  else if (size > std::numeric_limits<std::int64_t>::max() - offset)
    throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
  else if (offset + size > totalSize()) {
    if (SeismicStoreFile::_logger(1)) {
      std::stringstream ss;
      ss << "Reading past EOF: read("
         << "off=" << offset
         << ", size=" << size
         << ", end=" << offset+size
         << ") dataset: " << toString()
         << std::endl;
      SeismicStoreFile::_logger(1, ss.str());
    }
    throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
  }

  if (block_count_ <= 1) {
    // In first block which is also last block.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = size;
  }
  else if (block0_size_ == 0 || block1_size_ == 0) {
    // Can only happen when writing. Files open for read will have the sizes
    // set up when the DatasetInformation is first accessed.
    throw OpenZGY::Errors::ZgyInternalError("getLocalOffset() called before size is known.");
  }
  else if (offset < block0_size_) {
    // In first block, and there are more blocks following.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = std::min(size, block0_size_ - offset);
  }
  else {
    const std::int64_t bnum = std::min(block_count_ - 1, ((offset - block0_size_) / block1_size_) + 1);
    const std::int64_t segment_start = block0_size_ + (bnum - 1) * block1_size_;
    *blocknum = bnum;
    *local_offset = offset - segment_start;
    if (bnum + 1 < block_count_)
      *local_size = std::min(size, block1_size_ - (offset - segment_start));
    else
      *local_size = size; // In last block.
  }
}

/////////////////////////////////////////////////////////////////////////////
//    class SDGenericDatasetWrapper   ///////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * Wrapper around seismicdrive::SDGenericDataset.
 * Instances are typically managed by smart pointers.
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * What this adds compared to a raw SDGenericDataset:
 *
 * - We keep a smart pointer reference to the SDManager that was used
 *   to create the SDGenericDataset. The sdapi documentation does not
 *   state this explicitly, but I assume it is unhealthy to destruct
 *   the manager before the dataset is closed. And using an application
 *   global manager is not an option due to privilege separation.
 *
 * - The instance remembers whether it was created for create, read, etc.
 *
 * - We may later add wrappers for SDGenericDataset members for caching
 *   purposes (block size, file size, etc.) in case the sdapi doesn't.
 *
 * - We may later add wrappers for the other members to tweak the
 *   interface, map error messages, add logging, etc. Or to add
 *   additional checking such as not reading past eof / end of block.
678
 *
679
680
 * Thread safety: The class itself is thread safe. The data being wrapped
 * might not be.
681
 *
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
 * All mutable data is protected by this->mutex_. Access methods for
 * those return a smart pointer. If info() is called on one thread
 * while another thread is doing a write operation (which is actually
 * not allowed) then it is unspecified whether the returned value is
 * before or after the write. It is also unspecified whether
 * updateOnWrite() will have any effect in that case. The information
 * is still consistent though, and may be used for logging etc.
 *
 * CAVEAT: Make sure the returned pointer remains in scope long
 * enough. E.g. this is NOT safe, and might crash every 7 years or so.
 * Except on customer sites where it may crash every 7 minutes.
 *
 *    seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
 *    foo(dataset); // the returned smart pointer is already deleted.
 *
Paal Kvamme's avatar
Paal Kvamme committed
697
 * TODO-Low: Yagni: virgin_ is not used. It is related to the CTag mechanism.
698
699
 * It is still being discussed whether that needs to be ported from the
 * old accessor. It might not be needed if we go for immutable ZGY files.
700
701
702
703
704
 */
class SDGenericDatasetWrapper
{
  std::shared_ptr<seismicdrive::SDManager> manager_;
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
705
  std::shared_ptr<const DatasetInformation> info_;
706
707
  OpenMode disposition_;
  bool virgin_; // If true, the cached CTag should be ok.
708
  mutable std::mutex mutex_; // Protect all members.
709
710
711
712
  std::string saved_token_; // To avoid setting it again.
  std::string saved_tokentype_;
  OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
  std::string tokenrefreshtype_;
713
714
715
716
717
718
public:
  typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
  SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
                          std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
                          OpenMode disp)
    : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
719
    , mutex_()
720
721
    , saved_token_(), saved_tokentype_()
    , tokenrefresh_(), tokenrefreshtype_()
722
723
724
  {
  }
  ~SDGenericDatasetWrapper();
725
726
727
728
729
730
731
732
733
734
735
736
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
    std::lock_guard<std::mutex> lk(mutex_);
    return dataset_;
  }
  std::shared_ptr<seismicdrive::SDManager> manager() {
    std::lock_guard<std::mutex> lk(mutex_);
    return manager_;
  }
  OpenMode disposition() const {
    return disposition_; // immutable, so no lock.
  }
  std::shared_ptr<const DatasetInformation> info() {
737
    std::lock_guard<std::mutex> lk(mutex_);
738
739
740
741
742
743
744
745
746
747
748
749
750
751
    if (!info_) {
      switch (disposition_) {
      case OpenMode::Truncate:
        info_.reset(new DatasetInformation());
        break;
      case OpenMode::ReadOnly:
      case OpenMode::ReadWrite:
        info_.reset(new DatasetInformation(dataset_.get()));
        break;
      case OpenMode::Closed:
      default:
        throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
      }
    }
752
    return info_;
753
  }
754

755
  void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
756
757
758
    std::lock_guard<std::mutex> lk(mutex_);
    dataset_ = dataset;
    info_.reset();
759
760
761
762
763
    // This would give a slight performance boost, saving a single call
    // to checkCTag() after a file has changed. This happens so rarely
    // that it isn't worth the extra testing.
    //virgin_ = true;
  }
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785

  /**
   * Adjust the information after data has been written to the cloud.
   *
   * Thread safety:
   * Multiple concurrent writers will have a race condition, but that
   * scenario is expressly forbidden anyway. Smart pointers from
   * info() held by callig code are race free because they are
   * immutable. updateOnWrite() makes a new pointer.
   */
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
  {
    std::shared_ptr<DatasetInformation> updated;
    updated.reset(new DatasetInformation(*this->info()));
    updated->updateOnWrite(blocknum, blocksize);
    info_ = updated;
  }

  bool isTouched() const {
    std::lock_guard<std::mutex> lk(mutex_);
    return !virgin_;
  }
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800

  bool touch() {
    // Called to inform us that getCTag() might return stale data.
    //
    // if this->virgin_, we have just created the wrapper which means
    // we have just opened it. Only "List" entries don't get opened
    // immediately. But those aren't cached so they are not relevant
    // here. Since we just created it, we can call getCTag() without
    // calling checkCTag() first. Saving one database hit.
    //
    // The flag must be reset at least when an existing dataset accessor
    // is being re-used to open a file again. Currently it will be reset
    // much more often (every time the accessor is used to read data)
    // and it won't be set true again by updateDataset(). This will
    // simplify testing and has little impact on performance.
801
    std::lock_guard<std::mutex> lk(mutex_);
802
803
804
805
    bool old = virgin_;
    virgin_ = false;
    return old != virgin_;
  }
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919

  /**
   * Pass credentials down to the SDAPI layer.
   *
   * Thread safety: Caller is responsible for preventing concurrent calls
   * to update the same manager. CAVEAT: If the manager is cached and
   * shared between open files then this raises some challenges.
   */
  static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
  {
    // TODO-Low: handle impersonation better. SDAPI wants us to tell
    // whether we are passing an impersonation token. If so, it will
    // attempt to refresh it if expired. The problem is that looking
    // at the token (e.g. one provided by an environment variable)
    // to decide which type it is can be ridiculously complicated.
    // See authorizeManager() in the old code. Also, it would be nice
    // to be able to read back the refreshed token.
    if (tokentype == "sdtoken")
      manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
    else if (tokentype == "file")
      manager->setAuthProviderFromFile(token);
    else if (token.substr(0, 5) == "FILE:")
      manager->setAuthProviderFromFile(token.substr(5));
#else
    if (tokentype == "file" || token.substr(0, 5) == "FILE:")
      throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
  else if (tokentype == "imptoken")
    manager->setAuthProviderFromImpToken(token);
  else if (token.substr(0, 9) == "imptoken:")
    manager->setAuthProviderFromImpToken(token.substr(9));
  else
    manager->setAuthProviderFromString(token);
  }

  /**
   * Pass initial credentials down to the SDAPI layer.
   *
   * Thread safety: This is meant to be called from the constructor
   * when opening a file. So there should not be any race condition.
   * CAVEAT: If the manager is cached and shared between open files
   * then this raises some challenges.
   */
  void authorizeManager(
       const std::string& token,
       const std::string& tokentype,
       const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
       const std::string& tokencbtype)
  {
    std::lock_guard<std::mutex> lk(mutex_);

    // Save the refresh callback, if any, for use in reAuthorizeManager().
    tokenrefresh_     = tokencb;
    tokenrefreshtype_ = tokencbtype;

    // Obtain the token to use, preferring the callback. Currently the
    // type of token the callback returns is specified when the callback
    // is registered. Not when it is invoked. Change that if needed.
    std::string newtoken     = tokencb ? tokencb() : token;
    std::string newtokentype = tokencb ? tokencbtype : tokentype;

    authorizeManagerInSD(manager_.get(), newtoken, newtokentype);

    // Save what we just set so reAuthorizeManager() won't need to set
    // the exact same token again. Don't set if there was an exception
    // because then we can try again later. The saved token is NOT
    // used for credentials. So it it is possible to store a hash
    // instead as long as the risk of collision is negligible.
    saved_token_ = newtoken;
    saved_tokentype_ = newtokentype;
  }

  /**
   * Pass updated credentials down to the SDAPI layer if needed.
   * Needs to be called before any operation that needs credentials.
   *
   * Thread safety: *this is protected by a lock.
   * The lock is temporarily dropped before invoking the refresh callback.
   * This means the application provided callback MUST BE THREADSAFE.
   * It also means there is technically a race condition here, where a
   * particular read operation uses credentials that are a few milliseconds
   * out of date.
   *
   * Alternatively the code here could place a lock are require that the
   * callback doesn't do something that couuld cause a deadlock. Not sure
   * if that option would be preferable.
   */
  void reAuthorizeManager()
  {
    std::unique_lock<std::mutex> lk(mutex_);
    if (tokenrefresh_) {
      auto tokenrefresh = this->tokenrefresh_;
      auto tokenrefreshtype = this->tokenrefreshtype_;
      std::string newtoken;
      std::string newtokentype;
      // By design, no locks should be held when the callback is invoked
      // to avoid the risk of deadlocks. This means that the callback
      // itself must be threadsafe.
      lk.unlock();
      newtoken     = tokenrefresh();
      newtokentype = tokenrefreshtype;
      lk.lock();
      if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
        // In case of exception, always allow trying again.
        saved_token_     = std::string();
        saved_tokentype_ = std::string();
        authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
        saved_token_     = newtoken;
        saved_tokentype_ = newtokentype;
        SeismicStoreFile::_logger(1, "A new token was provided");
      }
    }
  }
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
};

SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
{
  info_.reset();
  try {
    // Explicit close so we have control of how to deal with errors. Or not.
    // Hopefully the destructors won't try to close again when they see
    // that we tried to do so ourselves. Note: currently  they will try that.
    if (dataset_)
      dataset_->close();
  }
  catch (const std::exception& ex) {
    if (std::string(ex.what()).find("dataset is not open") == std::string::npos)
      SeismicStoreFile::_logger(0, "SDGenericDataset::close(): " + std::string(ex.what()));
  }
  try {
    // Catch exceptions raised inside a destructor might not work. But I tried.
    dataset_.reset();
  }
  catch (const std::exception& ex) {
    SeismicStoreFile::_logger(0, "SDGenericDataset::dtor(): " + std::string(ex.what()));
  }
  manager_.reset();
}

/////////////////////////////////////////////////////////////////////////////
//    FileADT -> SeismicStoreFile   /////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

FileUtilsSeismicStore::~FileUtilsSeismicStore()
{
}

SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, const IOContext *iocontext)
  : FileUtilsSeismicStore()
  , _mode(mode)
  , _config()
{
959
960
961
  _rtimer.reset(new SummaryPrintingTimerEx(mode == OpenMode::ReadWrite || mode == OpenMode::Truncate ? "Cloud.reread" : "Cloud.read"));
  _wtimer.reset(new SummaryPrintingTimerEx("Cloud.write"));

962
  // TODO-Low a better way of handling this.
963
964
965
966
967
968
969
970
  // Logger passed in iocontext, then storing it per file.
  {
    // Must protect against double init because _loggerfn is global.
    static std::mutex mutex;
    std::lock_guard<std::mutex> lk(mutex);
    if (!_loggerfn)
      _loggerfn = LoggerBase::standardCallback(LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"), "zgy: ", "");
  }
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990

  _logger(3, std::stringstream() << "SeismicStoreFile("
          << "\"" << filename << "\", " << int(mode) << ", *)\n");

  auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
  if (!context)
    throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
  this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));

  std::unordered_map<std::string, std::string> extra;
  using seismicdrive::api::json::Constants;
  if (!context->_legaltag.empty())
    extra[Constants::kLegalTagLabel] = context->_legaltag;
  if (!context->_writeid.empty())
    extra[Constants::kWriteIdLabel] = context->_writeid;
  if (!context->_seismicmeta.empty())
    extra[Constants::kDDMSSeismicMetadataLabel] = context->_seismicmeta;
  //TODO-Low: if (!context->_pedantic.empty())
  //  extra[Constants::KPedantic] = context->_pedantic;

991
992
  bool sd_mgr_log = Environment::getNumericEnv("OPENZGY_SDMANAGER_LOG",0) > 0;
  bool sd_ds_log = Environment::getNumericEnv("OPENZGY_SDDATASET_LOG",0) > 0;
993
994
  auto manager = std::make_shared<seismicdrive::SDManager>
    (context->_sdurl, context->_sdapikey);
995
  manager->setLogStatus(sd_mgr_log);
996
997
998
999
1000
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset;
  if (mode != OpenMode::Closed) {
    _logger(5, std::stringstream()
            << "make dataset using manager "
            << std::hex << (std::uint64_t)manager.get());