file_sd.cpp 97.1 KB
Newer Older
1
// Copyright 2017-2021, Schlumberger
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_sd.h"

#ifdef HAVE_SD // Rest of file

#include "logger.h"
20
#include "file_consolidate.h"
21
#include "file_performance.h"
22
#include "file_parallelizer.h"
23
#include "fancy_timers.h"
24
#include "mtguard.h"
25
26
#include "../exception.h"
#include "../iocontext.h"
27
#include "environment.h"
28
29
30
31
32
33
34
35
36
37
38

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <iostream>
#include <sstream>
#include <limits>
#include <string.h>
#include <algorithm>
39
#include <numeric>
40
#include <mutex>
41
#include <omp.h>
42

Paal Kvamme's avatar
Paal Kvamme committed
43
44
45
// It would have been nice to have similar include paths in Linux and Windows
// but that is a cosmetic issue only and it is only a problem in this file.
#ifndef _WIN32
46
47
48
49
#include <SDManager.h>
#include <SDGenericDataset.h>
#include <SDUtils.h>
#include <Constants.h>
50
#include <HttpContext.h>
51
#include <SDException.h>
52
53
54
55
56
#else
#include <SDAPI/SDManager.h>
#include <SDAPI/SDGenericDataset.h>
#include <SDAPI/SDUtils.h>
#include <SDAPI/Constants.h>
57
#include <SDAPI/HttpContext.h>
58
#include <SDAPI/SDException.h>
59
60
#endif

61
62
63
64
65
66
67
68
69
70
71
// SDAPI footprint, might not be accurate.
// Use e.g. to look for places where a try/catch might be needed.
//
// SDGenericDataset constructors, destructor
// \b(open|close|readBlock|writeBlock|deleteBlock|getBlockNum|getBlocksSize|getSize|getConsistencyID|getSerializedContext)\(
// Not used yet in C++, the first two supported in Python+SdGlue.
// \b((get|set)(MetaData|SeismicMeta|Tags|ReadOnlyMode))\(
//
// SDManager constructors, destructor
// \b(setAuthProviderFrom(String|File|ImpToken)|setLogStatus)\(

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/** \cond SSTORE */

/**
 * \file: file_sd.cpp
 * \brief Low level I/O, Seismic Store.
 */

// IByteIO::Disposition -> OpenMode
//OpenMode:    { Closed = 0, ReadOnly, ReadWrite, Truncate, Delete, };
//Disposition: { Create, Read, Update, Delete, Access, List };

// Plans for utilizing code from byteio_sd.cpp in ZGY-Cloud.
//
// cherry pick code from here for read and write.
//     ByteIoSeismicDrive::closeWrite()
//
// bells and whistles for configuration.
//     slurp()
//     expandArgument()
//
// Not now. Don't keep a long lived cache.
//     class SDGenericDatasetCache
//
// Worry: CTag / ETag handling? Not now
//     ByteIoSeismicDrive::readETag()
//
// Might need soon.
//     checkAccess()
//
// Utilities, might need later.
// Need a way for API users to access these directly
//     getTag()
//     setTag()
//     getMetaData()
//     setMetaData()
//     getSeisMeta()
//     setSeisMeta()
//
// maybe later, for tokentype and for error messages.
// TODO-Low: if imp token is refreshed then keep those in a cache.
// Might need support from SDAPI for that.
//     parseJsonString()
//     decodeJwt()
//     getTokenType()

using OpenZGY::IOContext;
118
using OpenZGY::SeismicStoreIOContext;
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
namespace InternalZGY {
#if 0
}
#endif

class SDGenericDatasetWrapper;
class DatasetInformation;

/**
 * Access data in seismic store as a linear file even when the dataset
 * has multiple segments. There are some limitations on write.
 *
 *   - Writes starting at EOF are allowed, and will cause a new segment
 *     to be written.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes starting before EOF are only allowed if offset,size exactly
 *     matches a previous write. This will cause that segment to be rewritten.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *
 * For read the class provides a readv() method to do scatter/gather reads.
 * The code will then consolidate adjacent bricks to get larger brick size
 * sent to SDAPI. Optionally parallelize requests that cannot be consolidated.
146
 *
147
148
149
150
151
152
 * Thread safety when used for reading:
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
 *
 * Thread safety when used for writing:
153
 * Not thread safe. See SeismicStoreFile::xx_write.
154
155
156
 *
 * Thread safety when closing a file:
 * Not thread safe.
157
158
159
160
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
161
162
163
 */
class SeismicStoreFile : public FileUtilsSeismicStore
{
164
165
  SeismicStoreFile(const SeismicStoreFile&) = delete;
  SeismicStoreFile& operator=(const SeismicStoreFile&) = delete;
166
167
168
169
170
171
172
173
174
175
176
177
public:
  typedef std::function<bool(int, const std::string&)> LoggerFn;
public:
  SeismicStoreFile(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFile();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  // Functions from FileADT
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close();
  virtual std::int64_t xx_eof() const;
178
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
179
180
181
  virtual bool xx_iscloud() const override;
  // Functions from FileUtilsSeismicStore
  virtual void deleteFile(const std::string& filename, bool missing_ok) const;
182
  virtual std::string altUrl(const std::string& filename) const;
183
184
185
private:
  void do_write_one(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const bool overwrite);
  void do_write_many(const void*  const data, const std::int64_t blocknum, const std::int64_t size, const std::int64_t blobsize, const bool overwrite);
186
public:
187
188
  // Needed by SeismicStoreFileDelayedWrite.
  OpenMode _mode() const;
189
190
191
  // The raw SDGenericDataset is needed by SeismicStoreFileDelayedWrite
  // when opening a file for update.
  std::shared_ptr<SDGenericDatasetWrapper> datasetwrapper() const {return _dataset;}
192
193
  bool _mylogger(int priority, const std::string& message) const;
  bool _sslogger(int priority, const std::ios& ss) const;
194
  void _set_backoff(seismicdrive::SDGenericDataset* sdgd);
195
196
197
198
199
200
201
202
203
204
205
  std::shared_ptr<seismicdrive::SDGenericDataset>_open_dataset_ro(
       const std::shared_ptr<seismicdrive::SDManager>& manager,
       const std::string& filename,
       const std::unordered_map<std::string, std::string>& extra,
       bool sd_ds_log);
  std::shared_ptr<seismicdrive::SDGenericDataset>_open_dataset_rw(
       const std::shared_ptr<seismicdrive::SDManager>& manager,
       const std::string& filename,
       bool truncate,
       const std::unordered_map<std::string, std::string>& extra,
       bool sd_ds_log);
206
private:
207
208
209
  /**
   * This class is used by _split_by_segment to describe a request as seen by
   * seismic store.
210
211
212
213
214
   *
   * Thread safety:
   * Modification may lead to a data race. This should not be an issue,
   * because instances are only meant to be modified when created or
   * copied or assigned prior to being made available to others.
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
   */
  class RawRequest
  {
  public:
    std::int64_t blocknum;	// Seismic store segment a.k.a. block
    std::int64_t local_offset;	// File offset inside this blocknum
    std::int64_t local_size;	// How much to read from this block
    std::int64_t outpos;
    RawRequest(std::int64_t a_blocknum,
               std::int64_t a_offset,
               std::int64_t a_size,
               std::int64_t a_outpos)
      : blocknum(a_blocknum)
      , local_offset(a_offset)
      , local_size(a_size)
      , outpos(a_outpos)
    {
    }
  };
  typedef std::vector<RawRequest> RawList;
  RawList _split_by_segment(const ReadList& requests);
Paal Kvamme's avatar
Paal Kvamme committed
236
  void _cached_read(/*TODO-Low: seg, offset, view*/);
237
private:
238
239
240
  // TODO-Low: To improve isolation, the user visible context should
  // be copied into an equivalent InternalZGY::SeismicStoreConfig.
  // The downside is that it gets more tedious to maintain.
241
242
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SDGenericDatasetWrapper> _dataset;
243
  LoggerFn _logger;
244
245
246
  // As long as we don't inherit FileCommon we need our own timers.
  std::shared_ptr<SummaryPrintingTimerEx> _rtimer; // Access is thread safe
  std::shared_ptr<SummaryPrintingTimerEx> _wtimer; // Access is thread safe
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
};

/**
 * Improve on SeismicStoreFile, have it buffer large chunks of data before
 * writing it out to a new segment.
 *
 *   - Writes starting at EOF are allowed, and will buffer data in the
 *     "open segment" until explicitly flushed.
 *
 *   - Writes starting past EOF, signifying a hole in the data, are not
 *     allowed.
 *
 *   - Writes fully inside the open segment are allowed.
 *
 *   - Writes starting before the open segment are only allowed if
 *     offset,size exactly matches a previous write. This will cause that
 *     segment to be rewritten. As a corollary, writes canot span the
 *     closed segment / open segment boundary.
 *
 *   - Possible future extension: For the  last segment only offset
 *     needs to match. This means the last segment may be resized.
 *     Why we might want this: On opening a file with existing
 *     data bricks we might choose to read the last segment and
 *     turn it into an open segment. Then delete (in memory only)
 *     the last segment. When it is time to flush the data it gets
 *     rewritten. This allows adding bricks to a file, while still
 *     ensuring that all segments except first and last need to be
 *     the same size. Note that there are other tasks such as
 *     incrementally updating statistics and histogram that might
 *     turn out to be a lot of work.
 *
 *   - When used to create ZGY files, caller must honor the convention
 *     that all segments except the first and last must have the same size.
 *
 *   - Caveat: The fact that random writes are sometimes allowed, sometimes
 *     not depending on the segment number violates the principle of
 *     least surprise. And makes for more elaborate testing. For ZGY
 *     it is quite useful though. ZGY can recover from a ZgySegmentIsClosed
 *     exception by abandoning (leaking) the current block and write it
 *     to a new location. With a typical access pattern this will happen
 *     only occasionally.
 *
 * Note that this class doesn't implement FileUtilsSeismicStore, so for
 * that functionality you need to instanciate a regular SeismicStoreFile.
291
292
293
 *
 * Thread safety:
 * Not thread safe by design, as it is only used for files opened for write.
294
295
296
 * If a file is opened for read/write (currently not supported)
 * or when running finalize it is safe to read data as long as no writes
 * can be pending.
297
298
299
300
301
302
303
 */
class SeismicStoreFileDelayedWrite : public FileADT
{
  std::shared_ptr<OpenZGY::SeismicStoreIOContext> _config;
  std::shared_ptr<SeismicStoreFile> _relay;
  std::vector<char> _open_segment;
  UsageHint _usage_hint;
304
  std::shared_ptr<SummaryPrintingTimerEx> _ctimer; // Access is thread safe
305

306
307
308
  SeismicStoreFileDelayedWrite(const SeismicStoreFileDelayedWrite&) = delete;
  SeismicStoreFileDelayedWrite& operator=(const SeismicStoreFileDelayedWrite&) = delete;

309
310
311
312
313
314
315
316
317
public:
  SeismicStoreFileDelayedWrite(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~SeismicStoreFileDelayedWrite();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
318
  virtual std::vector<std::int64_t> xx_segments(bool complete) const override;
319
320
321
  virtual bool xx_iscloud() const override;

private:
322
  void _reopen_last_segment();
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
  void _flush_part(std::int64_t this_segsize);
  void _flush(bool final_call);
};

/////////////////////////////////////////////////////////////////////////////
//    class DatasetInformation   ////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * \brief Cached information for a SDGenericDataset.
 *
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * All fields will be filled in when any of them are first needed.
 * An exception is thrown if some of the information is inaccessible.
 * For new datasets, the information may either be set explicitly by
 * the writer or it may be set on the first write.
 * For existing datasets the sdapi is queried.
 * Even if the dataset is opened read/write the information is not
 * expected to change. Some other mechanism needs to deal with
 * stale data in that case.
344
345
 *
 * Thread safety:
346
347
348
349
 * All access SDGenericDatasetWrapper::info_ will be protected by
 * SDGenericDatasetWrapper::mutex_. Methods that expect to change data
 * (currently only updateOnWrite()) will need some special handling.
 * TODO-High: there is still a snag here.
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
 */
class DatasetInformation
{
private:
  // Number of blocks on file.
  // On write, this will include the holes if data is written out of order.
  std::int64_t block_count_;

  // Size of first block. On write this might need to be explicitly set if the
  // writer decides to output blocks non sequentially. It is not possible to
  // just assume block 0 is the same size as block 1 because block 1 might
  // be the last block on file and this might have a different size.
  std::int64_t block0_size_;

  // Size of blocks that are neither the first nor last on the file.
  // Zero if there is only a single block on the file.
  std::int64_t block1_size_;

  // Size of the last block, which is allowed to be smaller.
  // Will be zero if there is just one block.
  std::int64_t last_block_size_;

372
  // For debugging
373
  SeismicStoreFile::LoggerFn logger_;
374

375
public:
376
377
  explicit DatasetInformation(const SeismicStoreFile::LoggerFn& logger);
  explicit DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger);
378
379
380
381
382
383
384
385
386
387
  std::string toString() const;

public:
  std::int64_t blockCount() const { return block_count_; }
  std::int64_t block0Size() const { return block0_size_; }
  std::int64_t block1Size() const { return block1_size_; }
  std::int64_t lastBlockSize() const { return last_block_size_; }

public:
  std::int64_t totalSize() const;
388
  std::vector<std::int64_t> allSizes(bool complete) const;
389
390
391
392
393
  void getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const;
  void checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const;
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize);
};

394
DatasetInformation::DatasetInformation(const SeismicStoreFile::LoggerFn& logger)
395
396
397
398
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
399
  , logger_(logger ? logger : LoggerBase::emptyCallback())
400
401
402
{
}

403
404
405
406
/**
 * Create and poplulate an instance.
 * Catching and translating exceptions from SDAPI is done by caller.
 */
407
DatasetInformation::DatasetInformation(seismicdrive::SDGenericDataset* sdgd, const SeismicStoreFile::LoggerFn& logger)
408
409
410
411
  : block_count_(0)
  , block0_size_(0)
  , block1_size_(0)
  , last_block_size_(0)
412
  , logger_(logger ? logger : LoggerBase::emptyCallback())
413
{
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
  // Note that sdapi is a bit confusing with respect to signed/unsigned.
  // getBlockNum() returns an unsigned (uint64_t).
  // getSize() and getBlockSize() return long long, may return -1 as error.
  // I will treat all of them as signed, since there is no way that
  // there can be more than 2^63 blocks.
  long long nblocks = sdgd->getBlockNum();
  long long nbytes = sdgd->getSize();
  std::vector<std::string> names;
  if (nblocks >= 1)
    names.push_back("0");
  if (nblocks >= 2)
    names.push_back("1");
  if (nblocks >= 3)
    names.push_back(std::to_string(nblocks - 1));
  std::vector<long long> sizearray;
  if (nblocks <= 0) {
430
  }
431
432
433
434
435
436
437
438
439
440
441
442
#if 0 // Chicken...
  else if (nblocks == 1) {
    // Not using getSize() because I do not trust it.
    // SDAPI requires a hint check_and_overwrite=true when writing
    // a block that might exist already. If the hint is missing
    // then getSize() will return the wrong result. OpenZGY has
    // no control over who wrote the file. Defensive programming
    // says to just don't use the getSize() until the current
    // behavior (which I consider a bug) is fixed. Suggestion:
    // would it be possible to scan the file on exit and fix up
    // any incorrect size?
    sizearray.push_back(nbytes);
443
  }
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
#endif
  else {
    sizearray = sdgd->getBlocksSize(names);
  }
  if (nblocks < 0)
    throw OpenZGY::Errors::ZgyInternalError("Unable to get block count for SDGenericDataset");
  for (const auto size : sizearray)
    if (size <= 0)
      throw OpenZGY::Errors::ZgyInternalError("Unable to get segment size for SDGenericDataset");

  this->block_count_ = nblocks;
  this->block0_size_ = nblocks >= 1 ? sizearray.at(0) : 0;
  this->block1_size_ = nblocks >= 2 ? sizearray.at(1) : 0;
  this->last_block_size_ = nblocks >= 3 ? sizearray.at(2) : this->block1_size_;

  // Not throwing exceptions here, because sdgd->getSize() is not reliable.
  // But the problem *could* be that the block sizes of all the middle blocks
  // is not the same. Which would be a real problem.
  if (nbytes >= 0 && nbytes != (long long)totalSize())
463
    this->logger_(0, "Dataset has inconsistent size");
464

465
466
  if (this->logger_(1, ""))
    this->logger_(1, toString());
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
}

std::string
DatasetInformation::toString() const
{
  std::stringstream ss;
  ss << "Total " << totalSize()
     << " in " << blockCount() <<" segments"
     << " of size "<< block0Size()
     << ", " << block1Size()
     << ", " << lastBlockSize();
  return ss.str();
}

/**
 * Return the total file size, including any holes caused by bricks
 * not written yet. The result currently computes the answer based on
 * the individual block sizes assuming all blocks except the first and
 * last will have the same size. This is more reliable than asking
 * sdapi for the total size. And if the block size assumption is false
 * then we are completely hosed anyway. It is possible to verify the
 * assumption but it is probably too expensive to do so.
489
 *
490
491
 * Thread safety: Safe because instances are immutable once constructed
 * and made available.
492
493
494
495
496
497
498
499
500
501
502
503
504
505
 */
std::int64_t
DatasetInformation::totalSize() const
{
  switch (block_count_) {
  case 0: return 0;
  case 1: return block0_size_;
  default: return (block0_size_ +
                   (block_count_ - 2) * block1_size_ +
                   last_block_size_);
  }
}

/**
506
507
 * Return the total file size broken down into segments, not including
 * the "open" segment which DatasetInformation doesn't know about.
508
509
 */
std::vector<std::int64_t>
510
DatasetInformation::allSizes(bool complete) const
511
{
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
  switch (block_count_) {
  case 0: return std::vector<std::int64_t>{};
  case 1: return std::vector<std::int64_t>{block0_size_};
  case 2: return std::vector<std::int64_t>{block0_size_, last_block_size_};
  default: {
    std::vector<std::int64_t> result;
    result.push_back(block0_size_);
    result.push_back(block1_size_);
    if (complete)
      for (int ii = 0; ii < block_count_ - 3; ++ii)
        result.push_back(block1_size_);
    result.push_back(last_block_size_);
    return result;
  }
  }
527
528
529
530
}

/**
 * Do consistency checks before data is written.
Paal Kvamme's avatar
Paal Kvamme committed
531
 * Some of these might be redundant due to checks in the caller.
532
533
534
535
536
537
538
539
540
541
542
543
 * E.g. caller raises SegmentIsClosed if attempting to write "backwards".
 * Throws exceptions on error.
 */
void
DatasetInformation::checkOnWrite(std::int64_t blocknum, std::int64_t blocksize) const
{
  if (blocknum < 0 || blocknum > std::numeric_limits<int>::max()) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " because it is out of range.");
  }
  if (blocknum != 0 && block0_size_ == 0) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write block " + std::to_string(blocknum) + " before size of block 0 is known.");
  }
544
545
546
  if (blocksize < 1) {
    throw OpenZGY::Errors::ZgyInternalError("Cannot write less that 1 byte.");
  }
547
548
549
550
551
552
553
554
555
556
557
558
559
  if (blocknum == 0) {
    // Write or overwrite block 0.
    if (block0_size_ != 0 && block0_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size of block zero");
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    if (block1_size_ != blocksize)
      throw OpenZGY::Errors::ZgyInternalError("Blocks must have the same size except the first and last");
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // TODO-Low: Technically I might have allowed this.
Paal Kvamme's avatar
Paal Kvamme committed
560
    // If update is to be supported then I probably need to.
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
    if (blocksize != last_block_size_)
      throw OpenZGY::Errors::ZgyInternalError("Cannot change the size when re-writing the last block");
  }
  else if (blocknum == block_count_) {
    // Append a block, which is not block 0.
    // This is the new "last block" which means what we previous thought
    // was the last block is in fact a middle one. Which means the size
    // of the former last block must be the same as block 1.
    // (Note that the former last block might actually *be* block 1).
    if (block1_size_ != 0 && blocksize > block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " is too large");
    if (block_count_ != 0 && last_block_size_ != block1_size_)
      throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(block_count_ -1) + " had wrong size");
  }
  else {
    // Trying to write sparse data.
    throw OpenZGY::Errors::ZgyInternalError("block " + std::to_string(blocknum) + " written out of sequence");
  }
}

/**
 * Update cached size information after data is successfully written.
 * checkOnWrite() must have been called already.
584
 *
585
 * Thread safety: NOT thread safe.
586
587
588
 * Do not invoke SDGenericDatasetWrapper::info()->updateOnWrite() directly.
 * Call the thread safe SDGenericDatasetWrapper::updateOnWrite() instead.
 * That one wll make sure the smart pointer being updated is unique.
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
 */
void
DatasetInformation::updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
{
  if (blocknum == 0) {
    // Write or overwrite block 0.
    block0_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
  else if (blocknum + 1 < block_count_) {
    // Overwrite a block in the middle.
    // Size cannot change, so do nothing.
  }
  else if (blocknum + 1 == block_count_) {
    // Overwrite the last block, which is not block 0.
    // Redundant if checkOnWrite() forbids changing size.
    last_block_size_ = blocksize;
  }
  else if (blocknum == block_count_) {
    // Append a block which is not block 0.
    if (block1_size_ == 0)
      block1_size_ = blocksize;
    last_block_size_ = blocksize;
    block_count_ = std::max(block_count_, blocknum + 1);
  }
}

/**
 * Given a linear offset and count, convert this to a block local address.
 * The returned count may be smaller than requested in case the request
 * crossed a segment boundary. In that case the caller will need to read
 * in a loop.
 *
622
623
624
 * "blocks" refer to the block number in Seismic Store, not the potentially
 * larger logical blocks used by SeismicStoreDelayedWrite.
 *
625
626
627
628
629
630
631
632
633
634
635
636
 * Postcondition: If blocknum is returned as the last block, local_size
 * will be returned as requested size. If this were not so, the calling
 * function would be likely to loop forever.
 */
void
DatasetInformation::getLocalOffset(std::int64_t offset, std::int64_t size, std::int64_t *blocknum, std::int64_t *local_offset, std::int64_t *local_size) const
{
  if (offset < 0 || size < 0)
    throw OpenZGY::Errors::ZgyInternalError("Offset and size cannot be negative.");
  else if (size > std::numeric_limits<std::int64_t>::max() - offset)
    throw OpenZGY::Errors::ZgyInternalError("Overflow in offset + size.");
  else if (offset + size > totalSize()) {
637
    if (this->logger_(1, "")) {
638
639
640
641
642
643
644
      std::stringstream ss;
      ss << "Reading past EOF: read("
         << "off=" << offset
         << ", size=" << size
         << ", end=" << offset+size
         << ") dataset: " << toString()
         << std::endl;
645
      this->logger_(1, ss.str());
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
    }
    throw OpenZGY::Errors::ZgyInternalError("Reading past EOF");
  }

  if (block_count_ <= 1) {
    // In first block which is also last block.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = size;
  }
  else if (block0_size_ == 0 || block1_size_ == 0) {
    // Can only happen when writing. Files open for read will have the sizes
    // set up when the DatasetInformation is first accessed.
    throw OpenZGY::Errors::ZgyInternalError("getLocalOffset() called before size is known.");
  }
  else if (offset < block0_size_) {
    // In first block, and there are more blocks following.
    *blocknum = 0;
    *local_offset = offset;
    *local_size = std::min(size, block0_size_ - offset);
  }
  else {
    const std::int64_t bnum = std::min(block_count_ - 1, ((offset - block0_size_) / block1_size_) + 1);
    const std::int64_t segment_start = block0_size_ + (bnum - 1) * block1_size_;
    *blocknum = bnum;
    *local_offset = offset - segment_start;
    if (bnum + 1 < block_count_)
      *local_size = std::min(size, block1_size_ - (offset - segment_start));
    else
      *local_size = size; // In last block.
  }
}

/////////////////////////////////////////////////////////////////////////////
//    class SDGenericDatasetWrapper   ///////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

/**
 * Wrapper around seismicdrive::SDGenericDataset.
 * Instances are typically managed by smart pointers.
 * Copied nearly verbatim from byteio_sd.cpp in the old accessor.
 *
 * What this adds compared to a raw SDGenericDataset:
 *
 * - We keep a smart pointer reference to the SDManager that was used
 *   to create the SDGenericDataset. The sdapi documentation does not
 *   state this explicitly, but I assume it is unhealthy to destruct
 *   the manager before the dataset is closed. And using an application
 *   global manager is not an option due to privilege separation.
 *
 * - The instance remembers whether it was created for create, read, etc.
 *
 * - We may later add wrappers for SDGenericDataset members for caching
 *   purposes (block size, file size, etc.) in case the sdapi doesn't.
 *
 * - We may later add wrappers for the other members to tweak the
 *   interface, map error messages, add logging, etc. Or to add
 *   additional checking such as not reading past eof / end of block.
704
 *
705
706
 * Thread safety: The class itself is thread safe. The data being wrapped
 * might not be.
707
 *
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
 * All mutable data is protected by this->mutex_. Access methods for
 * those return a smart pointer. If info() is called on one thread
 * while another thread is doing a write operation (which is actually
 * not allowed) then it is unspecified whether the returned value is
 * before or after the write. It is also unspecified whether
 * updateOnWrite() will have any effect in that case. The information
 * is still consistent though, and may be used for logging etc.
 *
 * CAVEAT: Make sure the returned pointer remains in scope long
 * enough. E.g. this is NOT safe, and might crash every 7 years or so.
 * Except on customer sites where it may crash every 7 minutes.
 *
 *    seismicdrive::SDGenericDataset& dataset = *wrapper->dataset(); // NO!!!
 *    foo(dataset); // the returned smart pointer is already deleted.
 *
Paal Kvamme's avatar
Paal Kvamme committed
723
 * TODO-Low: Yagni: virgin_ is not used. It is related to the CTag mechanism.
724
725
 * It is still being discussed whether that needs to be ported from the
 * old accessor. It might not be needed if we go for immutable ZGY files.
726
727
728
729
730
 */
class SDGenericDatasetWrapper
{
  std::shared_ptr<seismicdrive::SDManager> manager_;
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset_;
731
  std::shared_ptr<const DatasetInformation> info_;
732
733
  OpenMode disposition_;
  bool virgin_; // If true, the cached CTag should be ok.
734
  SeismicStoreFile::LoggerFn logger_;
735
  mutable std::mutex mutex_; // Protect all members.
736
737
738
739
  std::string saved_token_; // To avoid setting it again.
  std::string saved_tokentype_;
  OpenZGY::SeismicStoreIOContext::tokencb_t tokenrefresh_;
  std::string tokenrefreshtype_;
740
741
  std::string tokenmessage_;

742
743
744
745
public:
  typedef std::shared_ptr<SDGenericDatasetWrapper> Ptr;
  SDGenericDatasetWrapper(std::shared_ptr<seismicdrive::SDManager> manager,
                          std::shared_ptr<seismicdrive::SDGenericDataset> dataset,
746
747
                          OpenMode disp,
                          const SeismicStoreFile::LoggerFn& logger)
748
    : manager_(manager), dataset_(dataset), disposition_(disp), virgin_(true)
749
    , logger_(logger ? logger : LoggerBase::emptyCallback())
750
    , mutex_()
751
752
    , saved_token_(), saved_tokentype_()
    , tokenrefresh_(), tokenrefreshtype_()
753
    , tokenmessage_("Token not initialzed")
754
755
756
  {
  }
  ~SDGenericDatasetWrapper();
757
758
759
760
761
762
763
764
765
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset() {
    std::lock_guard<std::mutex> lk(mutex_);
    return dataset_;
  }
  std::shared_ptr<seismicdrive::SDManager> manager() {
    std::lock_guard<std::mutex> lk(mutex_);
    return manager_;
  }
  OpenMode disposition() const {
766
767
768
    // This is immutable, except in close() which is not threadsafe
    // anyway, so no lock is needed.
    return disposition_;
769
770
  }
  std::shared_ptr<const DatasetInformation> info() {
771
    std::lock_guard<std::mutex> lk(mutex_);
772
    if (!info_) {
773
      try {
774
775
      switch (disposition_) {
      case OpenMode::Truncate:
776
        info_.reset(new DatasetInformation(logger_));
777
778
779
        break;
      case OpenMode::ReadOnly:
      case OpenMode::ReadWrite:
780
        info_.reset(new DatasetInformation(dataset_.get(), logger_));
781
782
783
784
785
        break;
      case OpenMode::Closed:
      default:
        throw OpenZGY::Errors::ZgyInternalError("DatasetInformation: Dataset not open.");
      }
786
787
788
789
      }
      catch (const std::exception& ex) {
        throwCloudException(ex, "Initialize");
      }
790
    }
791
    return info_;
792
  }
793

794
795
796
797
798
799
800
801
802
803
804
805
  /**
   * Close the underlying SDGenericDataset.
   * If an exception occurs then the wrapper will still see
   * the dataset as closed. And dataset() will return empty.
   *
   * Caveat: There was at one point a bug where SDGenericDataset::close()
   * threw an appropriate exception (e.g. credentials timed out) then
   * destructing the dataset could also throw. Exceptions from a
   * destructor usually causes the program to terminate. Especially in
   * C++11 and later. There are workarounds with multiple try/catch that
   * *might* help but it should be a lot easier to fix the underlying bug.
   */
806
  void wrapper_close(bool set_readonly)
807
808
  {
    auto victim = dataset_;
809
    OpenMode disp = disposition_;
810
    dataset_.reset();
811
    disposition_ = OpenMode::Closed;
812
813
814
    if (victim) {
      if (set_readonly &&
          (disp == OpenMode::Truncate || disp == OpenMode::ReadWrite))
815
        victim->setReadonlyMode(true);
816
817
      victim->close();
      victim.reset(); // Any throw from SDGenericDataset dtor happens here.
818
    }
819
820
  }

821
  void updateDataset(std::shared_ptr<seismicdrive::SDGenericDataset> dataset) {
822
823
824
    std::lock_guard<std::mutex> lk(mutex_);
    dataset_ = dataset;
    info_.reset();
825
826
827
828
829
    // This would give a slight performance boost, saving a single call
    // to checkCTag() after a file has changed. This happens so rarely
    // that it isn't worth the extra testing.
    //virgin_ = true;
  }
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851

  /**
   * Adjust the information after data has been written to the cloud.
   *
   * Thread safety:
   * Multiple concurrent writers will have a race condition, but that
   * scenario is expressly forbidden anyway. Smart pointers from
   * info() held by callig code are race free because they are
   * immutable. updateOnWrite() makes a new pointer.
   */
  void updateOnWrite(std::int64_t blocknum, std::int64_t blocksize)
  {
    std::shared_ptr<DatasetInformation> updated;
    updated.reset(new DatasetInformation(*this->info()));
    updated->updateOnWrite(blocknum, blocksize);
    info_ = updated;
  }

  bool isTouched() const {
    std::lock_guard<std::mutex> lk(mutex_);
    return !virgin_;
  }
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866

  bool touch() {
    // Called to inform us that getCTag() might return stale data.
    //
    // if this->virgin_, we have just created the wrapper which means
    // we have just opened it. Only "List" entries don't get opened
    // immediately. But those aren't cached so they are not relevant
    // here. Since we just created it, we can call getCTag() without
    // calling checkCTag() first. Saving one database hit.
    //
    // The flag must be reset at least when an existing dataset accessor
    // is being re-used to open a file again. Currently it will be reset
    // much more often (every time the accessor is used to read data)
    // and it won't be set true again by updateDataset(). This will
    // simplify testing and has little impact on performance.
867
    std::lock_guard<std::mutex> lk(mutex_);
868
869
870
871
    bool old = virgin_;
    virgin_ = false;
    return old != virgin_;
  }
872

873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
  /**
   * Called from the catch block after calling some method in SDAPI.
   * If no token was provided, this is an "I told you so" error.
   * It could have been reported earlier but it is possible that
   * the cloud library somehow managed to handle authenticcation
   * itself. The actual exception reported by SDAPI is in that case
   * less interesting.
   *
   * A missing token is probably a user error (with "user" in this
   * case being the application) and is reported as such.
   *
   * If the exception has alredy been wrapped by a ZgyException then
   * it will be re-thrown without modification.
   */
  void throwCloudException(const std::exception& ex, const char *message) {
888
    if (this->logger_(1, "")) {
889
890
891
      std::stringstream ss;
      ss << "Oops (" << message << ")" << " ("
         << tokenmessage_ << "): " << ex.what();
892
      this->logger_(1, ss.str());
893
    }
894
895
896
897
898
899
900
901
902
903
904
    if (dynamic_cast<const OpenZGY::Errors::ZgyError*>(&ex))
      throw;
    else if (!tokenmessage_.empty())
      throw OpenZGY::Errors::ZgyUserError(tokenmessage_);
    else
      throw OpenZGY::Errors::ZgyInternalError
        (std::string(message) + ": Seismic Store: " + std::string(ex.what()));
    // TODO-Low: Should I just re-throw the sdapi error instead?
    // TODO-Low: Either way, be more consistent about the answer.
  }

905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
  /**
   * Pass credentials down to the SDAPI layer.
   *
   * Thread safety: Caller is responsible for preventing concurrent calls
   * to update the same manager. CAVEAT: If the manager is cached and
   * shared between open files then this raises some challenges.
   */
  static void authorizeManagerInSD(seismicdrive::SDManager *manager, const std::string& token, const std::string& tokentype)
  {
    // TODO-Low: handle impersonation better. SDAPI wants us to tell
    // whether we are passing an impersonation token. If so, it will
    // attempt to refresh it if expired. The problem is that looking
    // at the token (e.g. one provided by an environment variable)
    // to decide which type it is can be ridiculously complicated.
    // See authorizeManager() in the old code. Also, it would be nice
    // to be able to read back the refreshed token.
    if (tokentype == "sdtoken")
      manager->setAuthProviderFromString(token);
#ifdef HAVE_SAUTH_EXTENSIONS
    else if (tokentype == "file")
      manager->setAuthProviderFromFile(token);
    else if (token.substr(0, 5) == "FILE:")
      manager->setAuthProviderFromFile(token.substr(5));
#else
    if (tokentype == "file" || token.substr(0, 5) == "FILE:")
      throw OpenZGY::Errors::ZgyInternalError("Reading SAuth token from file is not supported");
#endif
  else if (tokentype == "imptoken")
    manager->setAuthProviderFromImpToken(token);
  else if (token.substr(0, 9) == "imptoken:")
    manager->setAuthProviderFromImpToken(token.substr(9));
  else
    manager->setAuthProviderFromString(token);
  }

  /**
   * Pass initial credentials down to the SDAPI layer.
   *
   * Thread safety: This is meant to be called from the constructor
   * when opening a file. So there should not be any race condition.
   * CAVEAT: If the manager is cached and shared between open files
   * then this raises some challenges.
   */
  void authorizeManager(
       const std::string& token,
       const std::string& tokentype,
       const OpenZGY::SeismicStoreIOContext::tokencb_t& tokencb,
       const std::string& tokencbtype)
  {
    std::lock_guard<std::mutex> lk(mutex_);

    // Save the refresh callback, if any, for use in reAuthorizeManager().
    tokenrefresh_     = tokencb;
    tokenrefreshtype_ = tokencbtype;

    // Obtain the token to use, preferring the callback. Currently the
    // type of token the callback returns is specified when the callback
    // is registered. Not when it is invoked. Change that if needed.
    std::string newtoken     = tokencb ? tokencb() : token;
    std::string newtokentype = tokencb ? tokencbtype : tokentype;

966
967
968
969
970
971
972
973
974
975
976
    // A missing token at this point is probably an error, but in case
    // the SDAPI library has some tricks up its sleeve don't throw
    // am error unless the SDAPI does so first.
    if (newtoken.empty())
      if (tokencb)
        tokenmessage_ = "Missing access token, first callback returned empty";
      else
        tokenmessage_ = "Missing access token or callback in iocontext";
    else
      tokenmessage_ = "";

977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
    authorizeManagerInSD(manager_.get(), newtoken, newtokentype);

    // Save what we just set so reAuthorizeManager() won't need to set
    // the exact same token again. Don't set if there was an exception
    // because then we can try again later. The saved token is NOT
    // used for credentials. So it it is possible to store a hash
    // instead as long as the risk of collision is negligible.
    saved_token_ = newtoken;
    saved_tokentype_ = newtokentype;
  }

  /**
   * Pass updated credentials down to the SDAPI layer if needed.
   * Needs to be called before any operation that needs credentials.
   *
   * Thread safety: *this is protected by a lock.
   * The lock is temporarily dropped before invoking the refresh callback.
   * This means the application provided callback MUST BE THREADSAFE.
   * It also means there is technically a race condition here, where a
   * particular read operation uses credentials that are a few milliseconds
   * out of date.
   *
   * Alternatively the code here could place a lock are require that the
   * callback doesn't do something that couuld cause a deadlock. Not sure
   * if that option would be preferable.
   */
  void reAuthorizeManager()
  {
    std::unique_lock<std::mutex> lk(mutex_);
    if (tokenrefresh_) {
      auto tokenrefresh = this->tokenrefresh_;
      auto tokenrefreshtype = this->tokenrefreshtype_;
      std::string newtoken;
      std::string newtokentype;
      // By design, no locks should be held when the callback is invoked
      // to avoid the risk of deadlocks. This means that the callback
      // itself must be threadsafe.
      lk.unlock();
      newtoken     = tokenrefresh();
      newtokentype = tokenrefreshtype;
      lk.lock();
      if (saved_token_ != newtoken || saved_tokentype_ != newtokentype) {
1019
1020
1021
1022
        if (newtoken.empty())
          tokenmessage_ = "Missing access token, callback returned empty";
        else
          tokenmessage_ = "";
1023
1024
1025
1026
1027
1028
        // In case of exception, always allow trying again.
        saved_token_     = std::string();
        saved_tokentype_ = std::string();
        authorizeManagerInSD(manager_.get(), newtoken, newtokentype);
        saved_token_     = newtoken;
        saved_tokentype_ = newtokentype;
1029
        if (newtoken.empty())
1030
          this->logger_(1, "The token was cleared");
1031
        else
1032
          this->logger_(1, "A new token was provided");
1033
1034
1035
      }
    }
  }
1036
1037
};

1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
/**
 * Automatically close the dataset when the last reference to it goes away.
 * This is a fallback. Please do NOT rely on this behavior. Especially if
 * the dataset is open for write. Instead, make sure xx_close() is called
 * in a timely manner. If we get here with an open dataset then:
 *
 *  - Exceptions will be logged and swallowed.
 *  - The C++ runtime might abort due to exception during unwind.
 *  - Cleanup such as making the dataset read-only might be skipped.
 */
1048
1049
1050
1051
1052
1053
1054
1055
SDGenericDatasetWrapper::~SDGenericDatasetWrapper()
{
  info_.reset();
  try {
    // Explicit close so we have control of how to deal with errors. Or not.
    // Hopefully the destructors won't try to close again when they see
    // that we tried to do so ourselves. Note: currently  they will try that.
    if (dataset_)
1056
      wrapper_close(false);
1057
1058
1059
  }
  catch (const std::exception& ex) {
    if (std::string(ex.what()).find("dataset is not open") == std::string::npos)
1060
      this->logger_(0, "SDGenericDataset::close(): " + std::string(ex.what()));
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
  }
  manager_.reset();
}

/////////////////////////////////////////////////////////////////////////////
//    FileADT -> SeismicStoreFile   /////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////

FileUtilsSeismicStore::~FileUtilsSeismicStore()
{
}

SeismicStoreFile::SeismicStoreFile(const std::string& filename, OpenMode mode, const IOContext *iocontext)
  : FileUtilsSeismicStore()
  , _config()
{
1077
  auto context = dynamic_cast<const OpenZGY::SeismicStoreIOContext*>(iocontext);
1078
1079
1080
1081
  _logger = ((context && context->_logger) ? context->_logger :
             LoggerBase::standardCallback
             (LoggerBase::getVerboseFromEnv("OPENZGY_VERBOSE"),
              "openzgy-cloud: ", ""));
1082
1083
1084
  if (!context)
    throw OpenZGY::Errors::ZgyUserError("Opening a file from seismic store requires a SeismicStoreIOContext");
  this->_config.reset(new OpenZGY::SeismicStoreIOContext(*context));
1085
1086
1087
  _rtimer.reset(new SummaryPrintingTimerEx(mode == OpenMode::ReadWrite || mode == OpenMode::Truncate ? "Cloud.reread" : "Cloud.read"));
  _wtimer.reset(new SummaryPrintingTimerEx("Cloud.write"));

1088
1089
1090
  if (_logger(3, ""))
    _sslogger(3, std::stringstream() << "SeismicStoreFile("
              << "\"" << filename << "\", " << int(mode) << ", *)\n");
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102

  std::unordered_map<std::string, std::string> extra;
  using seismicdrive::api::json::Constants;
  if (!context->_legaltag.empty())
    extra[Constants::kLegalTagLabel] = context->_legaltag;
  if (!context->_writeid.empty())
    extra[Constants::kWriteIdLabel] = context->_writeid;
  if (!context->_seismicmeta.empty())
    extra[Constants::kDDMSSeismicMetadataLabel] = context->_seismicmeta;
  //TODO-Low: if (!context->_pedantic.empty())
  //  extra[Constants::KPedantic] = context->_pedantic;

1103
1104
  bool sd_mgr_log = Environment::getNumericEnv("OPENZGY_SDMANAGER_LOG",0) > 0;
  bool sd_ds_log = Environment::getNumericEnv("OPENZGY_SDDATASET_LOG",0) > 0;
1105
1106
  auto manager = std::make_shared<seismicdrive::SDManager>
    (context->_sdurl, context->_sdapikey);
1107
  manager->setLogStatus(sd_mgr_log);
1108
1109
1110

  // TODO-Low: Cache the manager and possibly the SDUtils instance.

1111
  auto datasetwrapper = std::make_shared<SDGenericDatasetWrapper>
1112
    (manager, nullptr, mode, _logger);
1113
1114
1115
1116

  datasetwrapper->authorizeManager
    (context->_sdtoken, context->_sdtokentype,
     context->_sdtokencb, context->_sdtoken_cbtype);
1117

1118
  std::shared_ptr<seismicdrive::SDGenericDataset> dataset;
1119
1120
1121
1122
1123
  try {
    switch (mode) {
    case OpenMode::Closed:
      break;
    case OpenMode::ReadOnly:
1124
      dataset = _open_dataset_ro(manager, filename, extra, sd_ds_log);
1125
1126
      break;
    case OpenMode::ReadWrite:
1127
      dataset = _open_dataset_rw(manager, filename, false, extra, sd_ds_log);
1128
1129
      break;
    case OpenMode::Truncate:
1130
      dataset = _open_dataset_rw(manager, filename, true, extra, sd_ds_log);
1131
1132
1133
1134
      break;
    }
  }
  catch (const std::exception& ex) {
1135
    datasetwrapper->throwCloudException(ex, "Open");
1136
1137
  }

1138
  datasetwrapper->updateDataset(dataset);
1139
  this->_dataset = datasetwrapper;
1140
1141
1142
1143
  // Removed this because it causes info() to be populated early,
  // thereby negating the benefit of lazy evaluation. And, worse,
  // causing different behavior when debugging is on.
  //if (_logger(1) && mode != OpenMode::Closed)
1144
  //  _logger(1, this->_dataset->info()->toString());
1145
1146
1147
1148
}

SeismicStoreFile::~SeismicStoreFile()
{
1149
1150
1151
1152
1153
1154
  // The calling layer is supposed to do an explicit xx_close() so it
  // can catch and handle exceptions, and so we can be sure the token
  // callback, if used, is still valid. Do *not* try to re-authorize
  // the manager. It might not be safe to invoke the callback any
  // more. And do a blind catch of any exception because if we don't
  // the application will crash.
1155
  if (_dataset && _dataset->dataset() && _dataset->disposition() != OpenMode::Closed) {
1156
    try {
1157
      _dataset->wrapper_close(_config->_set_ro_after_write);
1158
1159
1160
1161
    }
    catch (const std::exception& ex) {
      _logger(0, "EXCEPTION closing file: " + std::string(ex.what()));
    }
1162
1163
1164
    catch (...) {
      _logger(0, "EXCEPTION closing file.");
    }
1165
  }
1166
  _dataset.reset();
1167
1168
1169
}

/**
1170
1171
 * Convenience for invoking _logger with a simple message. Useful for
 * logging from outside this class, e.g. in the delayed-write class.
1172
1173
 */
bool
1174
SeismicStoreFile::_mylogger(int priority, const std::string& message) const
1175
{
1176
  return _logger(priority, message);
1177
1178
1179
}

/**
1180
 * Convenience for invoking _logger with a stringstream.
1181
1182
1183
 * Due to a somewhat naughty cast, the function can be caller as:
 *
 *   if(_logger(pr1))
1184
 *    _sslogger(pri, std::stringstream() << some << data << here);
1185
1186
1187
 *
 * The first line is optional. It just prevents the expression in
 * the second line from being evaluatet if debugging is disabled.
1188
 *
1189
 * Thread safety: Yes, because the global _logger can only be set
1190
 * while the very first instance is being constructed.
1191
1192
 */
bool
1193
SeismicStoreFile::_sslogger(int priority, const std::ios& ss) const
1194
1195
1196
1197
1198
{
  auto sstream = dynamic_cast<const std::stringstream*>(&ss);
  return _logger(priority, sstream ? sstream->str() : std::string());
}

1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
/**
 * Configure the exponential backoff used by Seismic Store.
 * Only expected to be used for debugging.
 *
 * * -1 => use defaults.
 * *  0 => Turn off exponential backoff completely.
 * * >0 =? Set maximum repeat count.
 */
void
SeismicStoreFile::_set_backoff(seismicdrive::SDGenericDataset* sdgd)
{
1210
  const int retries = this->_config->_retry_count;
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
  if (retries >= 0) {
    seismicdrive::ExponentialRetryBackoffPolicy policy;
    if (retries == 0) {
      policy.enabled = false;
    }
    else {
      policy.enabled = true;
      policy.maxRetry = retries;
      policy.initialWaitingTimeMicroSec = 500 * 1000;
      policy.maxWaitingTimeMicroSec = 32 * 1000 * 1000;
    }
    sdgd->setExponentialRetryBackoffPolicy(&policy);
    if (_logger(2, "")) {
      std::stringstream ss;
      ss << "Backoff " << (policy.enabled ? "enabled" : "disabled")
         << " retries " << policy.maxRetry
         << " start " << (float)policy.initialWaitingTimeMicroSec*1.0e-6
         << " max "   << (float)policy.maxWaitingTimeMicroSec*1.0e-6;
      _logger(2, ss.str());
    }
  }
}

1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
/**
 * Allocate and open a SDGenericDataset for read.
 * If dictated by the iocontext, turn on the read-only flag first.
 */
std::shared_ptr<seismicdrive::SDGenericDataset>
SeismicStoreFile::_open_dataset_ro(const std::shared_ptr<seismicdrive::SDManager>& manager, const std::string& filename, const std::unordered_map<std::string, std::string>& extra, bool sd_ds_log)
{
  if (_logger(5, ""))
    _sslogger(5, std::stringstream()
              << "make dataset for reading using manager "
              << std::hex << (std::uint64_t)manager.get());
  auto dataset = std::make_shared<seismicdrive::SDGenericDataset>
    (manager.get(), filename, sd_ds_log);
  _set_backoff(dataset.get());
  dataset->open(seismicdrive::SDDatasetDisposition::READ_ONLY, extra);
  if (this->_config->_force_ro_before_read) {
    if (!dataset->getReadonlyMode()) {
      dataset->setReadonlyMode(true);
      // For robustness, assume SDAPI needs a re-open to clear the read lock.
      // For robustness, assume the dataset instance cannot be re-used.
      dataset->close();
      dataset = std::make_shared<seismicdrive::SDGenericDataset>
        (manager.get(), filename, sd_ds_log);
      _set_backoff(dataset.get());
      dataset->open(seismicdrive::SDDatasetDisposition::READ_ONLY, extra);
      _logger(2, "Readonly flag forced on for \"" + filename + "\"");
    }
    else {
      _logger(2, "Readonly flag already on for \"" + filename + "\"");
    }
  }
  if (_logger(5, ""))
    _sslogger(5, std::stringstream()
              << "dataset for reading is "
              << std::hex << (std::uint64_t)dataset.get());
  return dataset;
}

/**
 * Allocate and open a SDGenericDataset for write.*
 *
 * If the file is already open for write elsewhere then SDAPI will throw.
 * If the file already has a read lock this implies that the read-only
 * flag is already off, and the SDAPI will throw.
 *
 * If dictated by the iocontext, turn off the read-only flag first.
 * This change will only happen if the file is currently unlocked.
 * Otherwise the read-only flag has to be off already.
 * This might still be a bad idea. The application assumes all responsibility.
 */
std::shared_ptr<seismicdrive::SDGenericDataset>
SeismicStoreFile::_open_dataset_rw(const std::shared_ptr<seismicdrive::SDManager>& manager, const std::string& filename, bool truncate, const std::unordered_map<std::string, std::string>& extra, bool sd_ds_log)
{
  if (_logger(5, ""))
    _sslogger(5, std::stringstream()
              << "make dataset for writing using manager "
              << std::hex << (std::uint64_t)manager.get());
  const seismicdrive::SDDatasetDisposition disp =
    truncate ?
    seismicdrive::SDDatasetDisposition::OVERWRITE :
    seismicdrive::SDDatasetDisposition::READ_WRITE;
  auto dataset = std::make_shared<seismicdrive::SDGenericDataset>
    (manager.get(), filename, sd_ds_log);
  _set_backoff(dataset.get());
  if (!this->_config->_force_rw_before_write) {
    dataset->open(disp, extra);
  }
  else {
    try {
      // Unlike the open for read case, incorrect read-only state will throw.
      dataset->open(disp, extra);
      _logger(2, "Readonly flag already off for \"" + filename + "\"");
    }
    catch (const seismicdrive::SDException& ex) {
      // TODO-Low: A specific SDAPI exception "read-only dataset"
      // Currently a SDExceptionSDAccessorError is thrown, which is
      // more about *where* the error occured and not *what* went wrong.
      // So the catch might as well be on SDException so that fixing
      // SDAPI won't break this code. Or maybe just std::exception?
      dataset = std::make_shared<seismicdrive::SDGenericDataset>
        (manager.get(), filename, sd_ds_log);
      _set_backoff(dataset.get());
      // This might throw if there is a current write lock.
      dataset->open(seismicdrive::SDDatasetDisposition::READ_ONLY, extra);
      if (dataset->getReadonlyMode()) {
        dataset->setReadonlyMode(false);
        dataset->close();
        dataset = std::make_shared<seismicdrive::SDGenericDataset>
          (manager.get(), filename, sd_ds_log);
        _set_backoff(dataset.get());
        // Any second throw will be passed on to the caller.
        dataset->open(disp, extra);
        _logger(2, "Readonly flag forced off for \"" + filename + "\"");
      }
      else {
        _logger(2, "Readonly flag already on? for \"" + filename + "\"");
        throw;
      }
    }
  }
  if (_logger(5, ""))
    _sslogger(5, std::stringstream()
              << "dataset for writing is "
              << std::hex << (std::uint64_t)dataset.get());
  return dataset;
}

1341
1342
1343
1344
std::shared_ptr<FileADT>
SeismicStoreFile::xx_make_instance(const std::string& filename, OpenMode mode, const IOContext *iocontext)
{
  if (filename.substr(0, 5) == "sd://" &&
1345
1346
      (mode != OpenMode::ReadWrite && mode != OpenMode::Truncate)) {
    auto file = std::shared_ptr<FileADT>(new SeismicStoreFile(filename, mode, iocontext));
1347
1348
    // This is a no-op unless enabled by enviroment variables.
    // Note, this might have been injected after the FileParallelizer instead.
1349
    file = FileWithPerformanceLogger::inject(file, filename);
1350
1351
1352

    // Improve multi-threading of decompress and copy-out.
    auto context = dynamic_cast<const SeismicStoreIOContext*>(iocontext);
1353
1354
    if (context && context->_cputhreads > 1)
      file = FileParallelizer::inject(file, context->_cputhreads);
1355

1356
1357
    return file;
  }
1358
1359
1360
1361
  else
    return std::shared_ptr<FileADT>();
}

1362
1363
1364
1365
1366
/**
 * Thread safety: Designed to be thread safe as long as the underlying
 * SDGenericDataset is. Even when data is being written in another
 * thread.
 */
1367
1368
1369
void
SeismicStoreFile::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
1370
  this->_validate_read(data, offset, size, this->xx_eof(), this->_mode());
1371
  this->_dataset->reAuthorizeManager();
1372
  ReadRequest request(offset, size, nullptr);
1373
  RawList split = this->_split_by_segment(ReadList{request});
1374
  if (this->_config->_debug_trace)
1375
    this->_config->_debug_trace("read", /*need=*/size, /*want=*/size,/*parts*/ split.size(), this->xx_segments(true));
1376
  for (const RawRequest& it : split) {
1377
    // TODO-Low: port _cached_read ?
1378
    SimpleTimerEx tt(*this->_rtimer);
1379
    this->_dataset->dataset()->readBlock
1380
1381
1382
1383
      (static_cast<int>(it.blocknum),
       static_cast<char*>(data)+it.outpos,
       static_cast<size_t>(it.local_offset),
       static_cast<size_t>(it.local_size));
1384
    _rtimer->addBytesRead(it.local_size);
1385
1386
1387
  }
}

1388
1389
1390
1391
/**
 * Thread safety: Designed to be thread safe as long as the underlying
 * SDGenericDataset is. Even when data is being written in another
 * thread.
1392
1393
1394
1395
 *
 * TODO-Worry: even with consolidate_overlaps=false, overlapping
 * requests might cause surprises. Since this isn't supposed to
 * happen anyway, maybe just fall back to one brick at a time?
1396
 */
1397
1398
1399
void
SeismicStoreFile::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
{
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
  if (requests.size() == 1) {
    // Handle this simple case specially. There will be more cases to test
    // but the shortcut might help performance. Especially if the memory
    // allocation can be made more efficient. For testing the shortcut can be
    // made unconditional. But that will disable the consolidate-brick logic.
    // Explicitly use xx_read() in this class, not any overrides. If xx_read()
    // is overridden then whoever did that wouldn't expect xx_readv() to change.
    // The fact that one is implemented using the other is an implementation detail.
    // Note that the delivery function can retain a reference to the data.
    // This is allowed as long as the data is still short lived. If not then
    // this isn't a disaster due to the eviction code in _allocate().
    for (const ReadRequest& r : requests) {
      std::shared_ptr<void> data = _allocate(r.size);
      this->SeismicStoreFile::xx_read(data.get(), r.offset, r.size, usagehint);
      _deliver(r.delivery, data, 0, r.size, transient_ok);
    }
    return;
1417
  }
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427

  // Consolidate adjacent bricks before reading.
  //
  // Remember to use a nonvirtual call when xx_readv() is implemented
  // in terms of xx_read() or vice versa. Because this is an
  // implementation detail and overriding one of them should not
  // affect the other. Similarly use a nonvirtual xx_eof() because we
  // are not supposed to know that SeismicStoreFileDelayedWrite makes
  // the file look bigger.
  //
Paal Kvamme's avatar
Paal Kvamme committed
1428
  // This implementation can issue requests in multiple
1429
1430
1431
1432
  // threads, wait for all threads to complete, and then deliver all
  // the results. For this reason it needs to allocate a buffer to
  // hold the entire data to be read.
  //
Paal Kvamme's avatar
Paal Kvamme committed
1433
1434
1435
1436
1437
1438
1439
1440
1441
  // TODO-Performance: Allow read from cloud and copy-out/decompress
  // in parallel inside a single request from the application. Probably
  // not worth the (significant) trouble, and probably won't help
  // multi-threaded applications anyway. Theoretically it might help
  // lowres computation on write. The main caveat is that requests that
  // cross segment boundaries will then need some complicated "partial
  // delivery" mechanism. Or the entire request may need to fall back
  // to the original implementation if a boundary crossing, which is
  // likely to be very rare, is detected.
1442
1443

  std::int64_t current_eof = SeismicStoreFile::xx_eof(); // exclude open segment
1444
  _validate_readv(requests, current_eof, this->_mode());
1445
  this->_dataset->reAuthorizeManager();
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468

  // For debugging / logging only
  const std::int64_t asked =
    std::accumulate(requests.begin(), requests.end(), std::int64_t(0),
                    [](std::int64_t a, const ReadRequest& b) {
                      return a + b.size;
                    });

  ReadList new_requests = ConsolidateRequests::consolidate
    (requests, _config->_maxhole, _config->_maxsize, _config->_aligned,
     false/*consolidate_overlaps*/, current_eof);
  auto work = _split_by_segment(new_requests);

  // Carefully get the required buffer size. Normally it would be
  // enough to just look at work.back() but there may be some odd
  // corner cases, and I just don't want to worry about those.

  const std::int64_t realsize =
    std::accumulate(work.begin(), work.end(), std::int64_t(0),
                    [](std::int64_t a, const RawRequest& b) {
                      return std::max(a, b.local_size + b.outpos);
                    });

1469
1470
1471
1472
1473
  // This would probably work, but the single-brick case is already
  // handled and the case for two to four int8 bricks or two int16
  // bricks are not that interesting. At least not for applications
  // that read just one brick at a time. Those apps will not get here.
  //std::shared_ptr<void> data = _allocate(r.size);
Paal Kvamme's avatar
Paal Kvamme committed
1474
  std::shared_ptr<char> data(new char[realsize], std::default_delete<char[]>());
1475
1476

  if (this->_config->_debug_trace)
1477
    this->_config->_debug_trace("readv", /*need=*/asked, /*want=*/realsize,/*parts*/ work.size(), this->xx_segments(true));
1478

Paal Kvamme's avatar
Paal Kvamme committed
1479
1480
  // Do the actual reading of the consolidated chunks, possibly using
  // multiple threads.
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
  //
  //  * Worry: Can there be multiple requests targeting the same area
  //    of the output buffer? Probably not although there can be multiple
  //    read requests for the same area of the file.
  //
  // If parallel_ok, can I then deliver data as it is received without
  // waiting for the last bit? That allows reading and e.g. decompressing
  // in parallel. Not a big deal if application has multiple reads in
  // flight. Otherwise this might in theory double the speed.
  //
  //  * Tricky to implement. Receiver doesn't allow partial delivery.
  //    So if one request requires concatenating data from multiple
  //    cloud reads then this needs to wait until the end. Or of really
  //    fancy, keep track of when all the data has need read for each
  //    of the original requests.
1496
1497
1498
1499
1500
1501
  const std::int64_t worksize = work.size();
  const std::int64_t threadcount = std::max(std::min(std::min(
      worksize,
      static_cast<std::int64_t>(omp_get_max_threads())),
      _config->_iothreads),
      static_cast<std::int64_t>(1));