file_parallelizer.cpp 5.45 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Copyright 2017-2021, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "file_parallelizer.h"
#include "mtguard.h"
#include "../exception.h"
#include <omp.h>
#include <string.h>
#include <iostream>

namespace InternalZGY {
#if 0
}
#endif

/**
 * \brief Help parallelize the decompression and copy-out steps.
 *
 * \detailed
 * - Intercept calls to xx_readv.
 *
 * - Send a single read request list down to the layer below.
 *   At the level where the parallelized is injected, each
 *   individal request will almost always be for a single brick.
 *
 * - Wait for all the data to be delvered. In the seismic server
 *   case the accessor will in any case defer sending results until
 *   it has all the requested data. Changing that will not happen
 *   anytime soon
 *
 * - Use an OpemMP loop to return each brick in a potentially
 *   different thread.
 *
 * The net effect is similar to using parallel loops at the
 * end of SeismicStoreFile::xx_readv() and also in in the
 * ConsolidateRequests::_consolidated_delivery() method.
48
49
50
 * In that case no additional buffer copy would have been needed.
 * The first place is for when bricks were not consolidated and the
 * second for when that did happen. The problem is that this may lead
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
 * to nested parallelization.
 *
 * Hence the code in this file. Which is in any case better
 * due to being more modular. The extra buffer copies are bad
 * though.
 */
FileParallelizer::FileParallelizer(std::shared_ptr<FileADT> relay, std::int64_t cputhreads)
  : FileRelay(relay)
  , _cputhreads(cputhreads)
{
  std::cerr << "Parallelizer has been created with "
            << _cputhreads << " threads\n";
}

FileParallelizer::~FileParallelizer()
{
  std::cerr << "Parallelizer has been destructed\n";
}

void
FileParallelizer::xx_readv(
     const ReadList& requests,
     bool parallel_ok,  // usually true
     bool immutable_ok, // usually false
     bool transient_ok, // usually true
     UsageHint hint)
{
  const bool shortcut = false; // Set false only for debugging.
  const std::int64_t requestcount = requests.size();

  // Shortcut if no parallelizing possible.
  if (!parallel_ok || (shortcut && requests.size() <= 1)) {
    std::cerr << "Parallelizer: Nothing to do\n";
    relay().xx_readv(requests, parallel_ok, immutable_ok, transient_ok, hint);
    return;
  }

  // Future: For each request, try fulfilling it entirely from the cache
  // and remove from the queue because that part would have much lower
  // overhead.

  // Negotiating immutable_ok and transient_ok:
  // Currently the data is always copied inside this method to a new buffer.
  // Tell the layer below: parallel_ok, immutable_ok, transient_ok.
  // If/when the delivery is done by smart pointer it may be possible
  // to avoid the copy but negotiating those flags beomes non trivial.
  //
  // With delivery using smart pointers, SeismicStoreFile::xx_readv()
  // has little or no additional cost for a mutable and not transient
  // buffer except for some corner cases. So pass down the immutable_ok
  // from the caller and pass the transient_ok=false needed here.
  // A future caching module might make this trickier.

  // The new request list sends the data to our buffers instead of to caller.
  ReadList newrequests(requests);
  std::vector<std::shared_ptr<char>> buffers(requestcount);
  for (std::int64_t ii = 0; ii < requestcount; ++ii) {
    std::int64_t buflen = newrequests[ii].size;
    std::shared_ptr<char> bufptr(new char[buflen]);
    buffers[ii] = bufptr;
    newrequests[ii].delivery =
112
      [bufptr,buflen](ReadRequest::data_t data, std::int64_t len) {
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
        std::cerr << "+";
        // TODO-High: Verify: len can be more than requested due to speculative
        // caching and less due to encountering EOF. The EOF ought to
        // have beem tested for already and oversize is just ignored.
        memcpy(bufptr.get(), data, std::min(len, buflen));
        if (buflen > len)
          memset(bufptr.get() + len, 0, buflen - len);
      };
  }

  // The actual read. It will not return until all is processed.
  // We copy delivered data immediately, so immutable_ok && transient_ok.
  std::cerr << "Parallelizer: n=" << requestcount << " ";
  relay().xx_readv(newrequests, true, true, true, hint);

  // Deliver the buffers that we cached to the caller.
  const std::int64_t threadcount = std::min(requestcount, (std::int64_t)omp_get_max_threads());
  MTGuard guard;
#pragma omp parallel for num_threads(threadcount)
  for (std::int64_t ii = 0; ii < requestcount; ++ii) {
    guard.run([&](){
      std::cerr << "0123456789"[omp_get_thread_num() % 10];
135
      _deliver(requests[ii].delivery, buffers[ii], 0, requests[ii].size, transient_ok);
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    });
  }
  guard.finished();
  std::cerr << "$\n";
}

/**
 * Inject a parallelizer module.
 */
std::shared_ptr<FileADT>
FileParallelizer::inject(std::shared_ptr<FileADT> file, std::int64_t cputhreads)
{
  if (cputhreads > 1)
    file = std::shared_ptr<FileADT>(new FileParallelizer(file, cputhreads));
  return file;
}

} // namespace