file_local.cpp 14.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2017-2020, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _WIN32 // Entire file is linux only

#include "file.h"
#include "../exception.h"
#include "timer.h"
#include "environment.h"
21
#include "file_performance.h"
22
23
24
25
26
27
28
29
30
31
32
33
34
35

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <iostream>
#include <sstream>
#include <algorithm>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
36
#include <mutex>
37
#include <atomic>
38
#include <omp.h>
39
40
41
42
43
44
45
46
47
48
49
50
51

using OpenZGY::IOContext;
namespace InternalZGY {
#if 0
}
#endif

/**
 * \file: file_local.cpp
 * \brief Low level I/O, regular files.
 */

namespace {
52
53
54
55
  /**
   * \brief Check whether performance logging is enabled.
   * \details Thread safety: Yes, locking handled by compiler.
   */
56
57
58
59
60
  static bool timers_on()
  {
    static int enable = Environment::getNumericEnv("OPENZGY_TIMERS", 0);
    return enable > 0;
  }
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
  /**
   * \brief SummaryTimer that prints its result when going out of scope.
   * \details This is a private extension that also knows about total
   * number of bytes transfered, so it can report the actuar throughput.
   */
  class OPENZGY_TEST_API SummaryPrintingTimerEx : public SummaryPrintingTimer
  {
    std::atomic<std::int64_t> bytes_read_;
    std::atomic<std::int64_t> bytes_written_;
  public:
    explicit SummaryPrintingTimerEx(const char *name, bool csv = false)
      : SummaryPrintingTimer(name, csv)
      , bytes_read_(0)
      , bytes_written_(0)
    {
    }
    virtual ~SummaryPrintingTimerEx() {
      print();
    }
    static std::string niceNumber(const std::string& label, std::int64_t n) {
        if (n > 10*1024*1024)
          return label + std::to_string(n/(1024*1024)) + " MB";
        else if (n > 10*1024)
          return label + std::to_string(n/1024) + " kB";
        else if (n != 0)
          return label + std::to_string(n) + " bytes";
        else
          return std::string();
    }
    virtual void print() {
      if (getCount() != 0) {
        std::string msg(csv_ ? getCSV() : getValue(true, true));
        if (!msg.empty() && msg.back() == '\n')
          msg = msg.substr(0, msg.size()-1);
        if (csv_)
          std::cerr << msg
                    << "," << bytes_read_.load()
                    << "," << bytes_written_.load()
                    << std::endl;
        else
          std::cerr << msg
                    << niceNumber(", R: ", bytes_read_.load())
                    << niceNumber(", W: ", bytes_written_.load())
                    << std::endl;
      }
      reset(); // Prevent the base class from printing as well.
    }
    void addBytesRead(std::int64_t nbytes) {
      bytes_read_.fetch_add(nbytes);
    }
    void addBytesWritten(std::int64_t nbytes) {
      bytes_written_.fetch_add(nbytes);
    }
  };
115
116
}

117
118
/**
 * Thread safety when used for reading:
119
120
121
 * Designed to be thread safe as no internal data structures should change
 * after the file has been opened. Any lazy-evaluated information needs
 * to do appropriate locking.
122
123
 *
 * Thread safety when used for writing:
124
125
126
127
 * Yes, but caller won't mak use of this because the high level design
 * states that writes (i.e. calls  from ZgyWriter) are not thread safe.
 * Besides, other file backends might not be thread safe and those might
 * not be so easy to change.
128
129
130
 *
 * Thread safety when closing a file:
 * Not thread safe.
131
132
133
134
 *
 * The class is noncopyable with copy and assign method deleted.
 * Not that the users could easily copy it anyway, as the class should
 * always be used via the FileADT interface.
135
 */
136
137
class LocalFileLinux : public FileCommon
{
138
139
  LocalFileLinux(const LocalFileLinux&) = delete;
  LocalFileLinux& operator=(const LocalFileLinux&) = delete;
140
141
142
143
144
145
146
147
148
149
150
151
152
public:
  LocalFileLinux(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual ~LocalFileLinux();
  static std::shared_ptr<FileADT> xx_make_instance(const std::string& filename, OpenMode mode, const OpenZGY::IOContext *iocontext);
  virtual void xx_close() override;
  virtual std::int64_t xx_eof() const override;
  virtual bool xx_iscloud() const override;
  virtual void xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_readv(const ReadList& requests, bool parallel_ok=false, bool immutable_ok=false, bool transient_ok=false, UsageHint usagehint=UsageHint::Unknown) override;
  virtual void xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint=UsageHint::Unknown) override;
  virtual std::int64_t _real_eof() const;
private:
  int _fd;
153
  mutable std::mutex _mutex;
154
155
156
157
158
159
};

/////////////////////////////////////////////////////////////////////////////
//    FileADT -> FileCommon -> LocalFileLinux   /////////////////////////////
/////////////////////////////////////////////////////////////////////////////

160
LocalFileLinux::LocalFileLinux(const std::string& filename, OpenMode mode, const IOContext*)
161
  : FileCommon(filename, mode)
162
  , _fd(-1)
163
  , _mutex()
164
{
165
166
167
168
169
  // Use the SummaryPrintingTimer extension that keeps track of bytes done.
  // The instance allocated in the base class constructor is simply dropped.
  _rtimer.reset(new SummaryPrintingTimerEx("File::read"));
  _wtimer.reset(new SummaryPrintingTimerEx("File::write"));

170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
  switch (mode) {
  case OpenMode::ReadOnly:
    _fd = ::open(filename.c_str(), O_RDONLY, 0666);
    break;
  case OpenMode::ReadWrite:
    _fd = ::open(filename.c_str(), O_RDWR, 0666);
    break;
  case OpenMode::Truncate:
    _fd = ::open(filename.c_str(), O_RDWR|O_CREAT|O_TRUNC, 0666);
    break;
  case OpenMode::Closed:
  default:
    _fd = -2;
    break;
  }
  if (_fd == -1)
    throw OpenZGY::Errors::ZgyIoError(filename, errno);

  if (_fd >= 0) {
    _eof = static_cast<std::int64_t>(::lseek(_fd, 0, SEEK_END));
    (void)::lseek(_fd, 0, SEEK_SET);
    if (false)
      std::cout << "Opened file \"" << filename
      << "\" size " << std::hex << _eof << std::dec << "\n";
  }
}

LocalFileLinux::~LocalFileLinux()
{
199
200
201
202
203
204
205
206
207
208
  if (_mode != OpenMode::Closed) {
    try {
      xx_close();
    }
    catch (const std::exception& ex) {
      // The calling layer is supposed to do an explicit xx_close()
      // so it can catch and handle exceptions. This blind catch is
      // just a desperate attempt to avoid an application crash.
      std::cerr << "EXCEPTION closing file: " << ex.what() << std::endl;
    }
209
210
211
212
213
214
  }
}

std::shared_ptr<FileADT>
LocalFileLinux::xx_make_instance(const std::string& filename, OpenMode mode, const IOContext *iocontext)
{
215
216
217
218
219
220
  if (filename.find("://") == std::string::npos) {
    auto file = std::shared_ptr<FileADT>(new LocalFileLinux(filename, mode, iocontext));
    // This is a no-op unless enabled by enviroment variables
    file = FileWithPerformanceLogger::inject(file);
    return file;
  }
221
222
223
224
  else
    return std::shared_ptr<FileADT>();
}

225
226
227
/**
 * \details: Thread safety: No. All other operations must be completed first.
 */
228
229
230
void
LocalFileLinux::xx_close()
{
231
232
233
234
235
236
237
238
239
240
  if (_mode == OpenMode::Closed) {
    // Note: I might "be nice" to the application and simply ignore a duplicate
    // close or a close on a file that was never open in the first place.
    // In that case I should probably check using an atomic_flag.
    // But if the application issues extraneous xx_close(), let alone multiple
    // concurrent calls to xx_close(), this is a bug. That may indicate there
    // is something else wrong as well.
    throw OpenZGY::Errors::ZgyUserError("Attemping to close a file twice.");
  }

241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
  OpenMode mode = _mode;
  _mode = OpenMode::Closed;     // In case we throw.

  switch (mode) {

  default:
  case OpenMode::Closed:
    break;

  case OpenMode::ReadOnly:
  case OpenMode::ReadWrite:
  case OpenMode::Truncate:
    if (::close(_fd) < 0)
      throw OpenZGY::Errors::ZgyIoError(_name, errno);
    _fd = -2;
    break;
  }

  _fd = -2;
  _name = std::string();
  _rtimer.reset();
  _wtimer.reset();
}

265
/**
266
 * \details: Thread safety: Yes, by locking.
267
 */
268
269
270
std::int64_t
LocalFileLinux::xx_eof() const
{
271
  std::lock_guard<std::mutex> lk(_mutex); // protect _eof
272
273
274
  return this->_eof;
}

275
276
277
/**
 * \details: Thread safety: Yes.
 */
278
279
280
281
282
283
bool
LocalFileLinux::xx_iscloud() const
{
  return false;
}

284
285
286
/**
 * \details: Thread safety: Yes, assuming that the linux ::pread is thread safe.
 */
287
288
289
290
291
292
void
LocalFileLinux::xx_read(void *data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
  SimpleTimer tt(*_rtimer, timers_on());
  _validate_read(data, offset, size, xx_eof(), _mode);
  ssize_t nbytes = ::pread(_fd, data, size, offset);
293
  static_cast<SummaryPrintingTimerEx*>(_rtimer.get())->addBytesRead(nbytes);
294
295
296
  _check_short_read(offset, size, nbytes);
}

297
298
/**
 * \details: Thread safety: Yes, assuming that the linux ::pread is thread safe.
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
 *
 * If the caller passes parallel_ok=true this means the caller allows and
 * even prefers that we deliver each request on a different thread. This
 * parallelization comes in addition to allowing multiple reads in parallel
 * at the OpenZGY API level.
 *
 * Caveat: Consider carefully whether you want both. If the
 * application uses OpenMP for multi threading then by default nested
 * parallel regions are disabled. You can change this. If the
 * application uses some other mechanism than OpenMP used here might
 * not realize that it is creating nested loops. Or maybe it does, if
 * it uses an application-wide thread pool?
 *
 * Caveat: Since finalize() is single threaded then it should probably
 * enable parallel here. One problem is that the application might
 * still be inside an OpenMP loop, using a lock to make sure that
 * finalize() runs unmolested. OpenMP wikk still see it is inside a
 * parallel region so it might refuse to make one here.
317
 */
318
319
320
321
322
323
324
void
LocalFileLinux::xx_readv(const ReadList& requests, bool parallel_ok, bool immutable_ok, bool transient_ok, UsageHint usagehint)
{
  // Note that I need the xx_read() in this class, not any overrides.
  // If xx_read() is overridden then whoever did that wouldn't expect
  // xx_readv() to change. The fact that I choose to implement one in
  // terms of the other is an implementation detail.
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
  if (!parallel_ok || requests.size() < 2) {
    for (const ReadRequest& r : requests) {
      std::unique_ptr<char[]> data(new char[r.size]);
      this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
      r.delivery(data.get(), r.size);
    }
  }
  else {
    // OpenMP needs signed loop variable on windows.
    const std::int64_t requestcount = requests.size();

    // Re-use buffers within one thread, to avoid lock contention in
    // the CRT. Assume that in almost all cases the requests will have
    // the same size. If this is not true and the sizes vary wildly
    // then we may be wasting memory here. Even more memory than the
    // size being requested.
    // TODO-Low, if number of requests per call is typically less than
    // the number of available threads then the re-use is pointless.
    std::int64_t maxsize = 0;
    for (const ReadRequest& r : requests)
      maxsize = std::max(maxsize, r.size);

    // Cannot use more threads than we have requests, and OpenMP might
    // not be smart enough to see this. Definitely not if the parallel
    // region starts before the for loop, as is needed to reuse
    // buffers. And sorry for the pedantic guard against more than
    // 2**31 bricks.
    const int threadcount = std::min(std::min(std::numeric_limits<int>::max(), static_cast<int>(requestcount)), omp_get_max_threads());

    // Exceptions thrown out of an OpenMP loop are fatal, so I need to
    // handle them here.
    std::atomic<int> errors(0);
    std::string first_error;
#pragma omp parallel num_threads(threadcount)
    {
      std::unique_ptr<char[]> data(new char[maxsize]);
#pragma omp for
      for (std::int64_t ii=0; ii<requestcount; ++ii) {
        if (errors.load() != 0)
          continue;
        const ReadRequest& r = requests[ii];
        try {
          this->LocalFileLinux::xx_read(data.get(), r.offset, r.size, usagehint);
          r.delivery(data.get(), r.size);
        }
        catch (const std::exception& ex) {
          if (errors.fetch_add(1) == 0) {
            auto what = ex.what();
            first_error = std::string(what && what[0] ? what : "EXCEPTION");
          }
          continue;
        }
      } // end omp for
    }
    // end parallel
    if (errors.load() != 0) {
      // The distinction between UserError, EndOfFile, and InternalError
      // (and maybe even others) is lost. If it matters I can handle code
      // for this as well.
      throw OpenZGY::Errors::ZgyInternalError(first_error);
    }
386
387
388
  }
}

389
/**
390
 * \details: Thread safety: Yes.
391
392
393
394
 * OpenZGY is in general not thread safe when writing, but in this
 * low level function it doesn't cost much to synchronize _eof
 * both here and the places it is read.
 */
395
396
397
398
399
400
401
402
403
404
405
406
void
LocalFileLinux::xx_write(const void* data, std::int64_t offset, std::int64_t size, UsageHint usagehint)
{
  SimpleTimer tt(*_wtimer, timers_on());
  _validate_write(data, offset, size, _mode);
  if (false)
    std::cout << "xx_write(*, " << std::hex
              << offset << ", " << size << ", hint=" << (int)usagehint
              << std::dec << ")\n";
  ssize_t nbytes = ::pwrite(_fd, data, size, offset);
  if (nbytes < 0)
      throw OpenZGY::Errors::ZgyIoError(_name, errno);
407
  static_cast<SummaryPrintingTimerEx*>(_wtimer.get())->addBytesWritten(nbytes);
408
  std::lock_guard<std::mutex> lk(_mutex); // protect _eof
409
410
411
412
413
  _eof = std::max(_eof, offset + nbytes);
  if (nbytes != size)
    throw OpenZGY::Errors::ZgyInternalError(_name + ": Short write");
}

414
/**
415
 * \details: Thread safety: Yes. Uses xx_eof().
416
417
 * And should in any case be changed to call ::stat().
 */
418
419
420
421
422
423
424
425
std::int64_t
LocalFileLinux::_real_eof() const
{
  // TODO-Low: Use stat() to get the real file size.
  return xx_eof();
}

namespace {
426
427
428
  /**
   * \details: Thread safety: Yes. add_factory() is synchronized.
   */
429
430
431
432
433
434
435
436
437
438
439
440
441
  class Register
  {
  public:
    Register()
    {
      FileFactory::instance().add_factory(LocalFileLinux::xx_make_instance);
    }
  } dummy;
} // anonymous namespace for registration

} // namespace

#endif // Entire file is linux only