IOManagerAzure.cpp 15.2 KB
Newer Older
Jørgen Lind's avatar
Jørgen Lind committed
1
/****************************************************************************
2
3
** Copyright 2020 The Open Group
** Copyright 2020 Bluware, Inc.
4
** Copyright 2020 Microsoft Corp.
Jørgen Lind's avatar
Jørgen Lind committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
**     http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/

#include "IOManagerAzure.h"

Prashant Karbhari's avatar
Prashant Karbhari committed
21
22
#include <cpprest/filestream.h>

Jørgen Lind's avatar
Jørgen Lind committed
23
#include <fmt/format.h>
Prashant Karbhari's avatar
Prashant Karbhari committed
24
25
#include <mutex>
#include <string>
26
27
28
29
30
31
32
33
#include <functional>
#include <algorithm>

#ifdef WIN32
#undef WIN32_LEAN_AND_MEAN // avoid warnings if defined on command line
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#endif
Jørgen Lind's avatar
Jørgen Lind committed
34

Prashant Karbhari's avatar
Prashant Karbhari committed
35
namespace OpenVDS
Jørgen Lind's avatar
Jørgen Lind committed
36
{
37
#ifdef WIN32
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static utility::string_t convertToUtilString(const std::string& str)
{
  utility::string_t ret;
  int len;
  int slength = (int)str.length();
  len = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), slength, nullptr, 0);
  ret.resize(len);
  MultiByteToWideChar(CP_UTF8, 0, str.c_str(), slength, &ret[0], len);
  return ret;
}

static std::string convertFromUtilString(const utility::string_t& s)
{
  int len;
  int slength = (int)s.length() + 1;
  len = WideCharToMultiByte(CP_UTF8, 0, s.c_str(), slength, nullptr, 0, nullptr, nullptr);
  char* buf = new char[len];
  WideCharToMultiByte(CP_UTF8, 0, s.c_str(), slength, buf, len, nullptr, nullptr);
  std::string r(buf);
  delete[] buf;
  return r;
}
Prashant Karbhari's avatar
Prashant Karbhari committed
60

61
#else
62
63
64
65
static utility::string_t convertToUtilString(const std::string& str)
{
  return str;
}
Prashant Karbhari's avatar
Prashant Karbhari committed
66

67
68
69
70
static std::string convertFromUtilString(const utility::string_t& str)
{
  return str;
}
71
#endif
Prashant Karbhari's avatar
Prashant Karbhari committed
72

73
GetHeadRequestAzure::GetHeadRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
74
75
76
77
78
79
  : Request(id)
  , m_handler(handler)
  , m_cancelled(false)
  , m_done(false)
{
}
Prashant Karbhari's avatar
Prashant Karbhari committed
80

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
GetHeadRequestAzure::~GetHeadRequestAzure()
{
  GetHeadRequestAzure::Cancel();
}

void GetHeadRequestAzure::WaitForFinish()
{
  std::unique_lock<std::mutex> lock(m_mutex);
  m_waitForFinish.wait(lock, [this]
    {
      return m_done;
    });

}

bool GetHeadRequestAzure::IsDone() const
{
  std::unique_lock<std::mutex> lock(m_mutex);
  return m_done;
}

bool GetHeadRequestAzure::IsSuccess(Error& error) const
{
  std::unique_lock<std::mutex> lock(m_mutex);
  if (!m_done)
  {
    error.code = -1;
    error.string = "GetHead not done.";
    return false;
  }
  error = m_error;
  return m_error.code == 0;
}

void GetHeadRequestAzure::Cancel()
{
  //m_cancelTokenSrc.cancel();
  m_cancelled = true;
}

ReadObjectInfoRequestAzure::ReadObjectInfoRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
  : GetHeadRequestAzure(id, handler)
{
}

void ReadObjectInfoRequestAzure::run(azure::storage::cloud_blob_container& container, azure::storage::blob_request_options options, const std::string & requestName, std::weak_ptr<ReadObjectInfoRequestAzure> request)
{
  // set options, we should probably get these through AzureOpenOptions instead of haddong here - default set in the IOMangerAzure
  azure::storage::blob_request_options local_options;
  local_options.set_parallelism_factor(options.parallelism_factor()); //example: (4)
  local_options.set_maximum_execution_time(options.maximum_execution_time()); //example: (std::chrono::milliseconds(10000));

  // set the cancellation token
  m_cancelTokenSrc = pplx::cancellation_token_source();
  m_context = azure::storage::operation_context();

  m_blob = container.get_block_blob_reference(convertToUtilString(requestName));
  //m_task = m_blob.download_range_to_stream_async(m_outStream.create_ostream(), range.start, range.end - range.start, azure::storage::access_condition(), local_options, m_context, m_cancelTokenSrc.get_token());
  m_task = m_blob.download_attributes_async(azure::storage::access_condition(), local_options, m_context, m_cancelTokenSrc.get_token());
  m_task.then([request, this](pplx::task<void> task)
    {
      auto readObjectRequest = request.lock();
      if (!readObjectRequest)
        return;
      try
      {
        // when the task is completed
        task.get();

        if (auto tmp = request.lock())
        {
          m_handler->HandleObjectSize(m_blob.properties().size());
153
154
155
156

          m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));

          // send metadata one at a time to the metadata handler
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
          for (auto it : m_blob.metadata())
          {
            m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
          }

          // declare success and set completion status
          m_error.code = 0;
          m_done = true;
          m_waitForFinish.notify_all();
          m_handler->Completed(*this, m_error);
        }

      }
      catch (const azure::storage::storage_exception & e)
      {
        // display the erro message, set completion (error) status and return the error to the handler
        ucout << _XPLATSTR("Error message is: ") << e.what() << std::endl;
        m_error.code = -1;
        m_error.string = e.what();
        m_done = true;
        m_waitForFinish.notify_all();
        m_handler->Completed(*this, m_error);
      }
    });
}

DownloadRequestAzure::DownloadRequestAzure(const std::string& id, const std::shared_ptr<TransferDownloadHandler>& handler)
  : GetHeadRequestAzure(id, handler)
185
186
{
}
Prashant Karbhari's avatar
Prashant Karbhari committed
187

188
189
190
191
192
193
194
195
196
197
void DownloadRequestAzure::run(azure::storage::cloud_blob_container& container, azure::storage::blob_request_options options, const std::string& requestName, const IORange& range, std::weak_ptr<DownloadRequestAzure> request)
{
  // set options, we should probably get these through AzureOpenOptions instead of haddong here - default set in the IOMangerAzure
  azure::storage::blob_request_options local_options;
  local_options.set_parallelism_factor(options.parallelism_factor()); //example: (4)
  local_options.set_maximum_execution_time(options.maximum_execution_time()); //example: (std::chrono::milliseconds(10000));

  // set the cancellation token
  m_cancelTokenSrc = pplx::cancellation_token_source();
  m_context = azure::storage::operation_context();
198
  m_requestedRange = range;
199
200

  m_blob = container.get_block_blob_reference(convertToUtilString(requestName));
201
202
  m_task = m_blob.download_range_to_stream_async(m_outStream.create_ostream(), range.start, range.end - range.start, azure::storage::access_condition(), local_options, m_context, m_cancelTokenSrc.get_token());
  m_task.then([request, this](pplx::task<void> downloadTask)
Prashant Karbhari's avatar
Prashant Karbhari committed
203
    {
204
205
206
207
208
209
      auto downloadRequest = request.lock();
      if (!downloadRequest)
        return;
      try
      {
        // when the task is completed
210
        downloadTask.get();
211
        // download properties and metadata
Jørgen Lind's avatar
Jørgen Lind committed
212
213
        //m_blob.download_attributes();
        std::vector<unsigned char> &data = m_outStream.collection();
214

215
216
217
        if (m_context.request_results().size() == 2 && m_context.request_results()[0].http_status_code() == 416 && m_requestedRange.start == 0 && m_requestedRange.end == 0)
          data.clear();

218
219
        if (auto tmp = request.lock())
        {
220
221
          m_handler->HandleObjectSize(m_blob.properties().size());

222
223
224
          m_handler->HandleObjectLastWriteTime(convertFromUtilString(m_blob.properties().last_modified().to_string(utility::datetime::ISO_8601)));

          // send metadata one at a time to the metadata handler
225
          for (auto it : m_blob.metadata())
226
          {
227
228
229
            m_handler->HandleMetadata(convertFromUtilString(it.first), convertFromUtilString(it.second));
          }
          // send data to the data handler
Jørgen Lind's avatar
Jørgen Lind committed
230
          m_handler->HandleData(std::move(data));
231
232
233
234
235
236
237

          // declare success and set completion status
          m_error.code = 0;
          m_done = true;
          m_waitForFinish.notify_all();
          m_handler->Completed(*this, m_error);
        }
Prashant Karbhari's avatar
Prashant Karbhari committed
238

239
240
241
242
243
244
245
246
247
248
249
250
251
      }
      catch (const azure::storage::storage_exception & e)
      {
        // display the erro message, set completion (error) status and return the error to the handler
        ucout << _XPLATSTR("Error message is: ") << e.what() << std::endl;
        m_error.code = -1;
        m_error.string = e.what();
        m_done = true;
        m_waitForFinish.notify_all();
        m_handler->Completed(*this, m_error);
      }
    });
}
Prashant Karbhari's avatar
Prashant Karbhari committed
252

253
254
255
256
257
258
259
UploadRequestAzure::UploadRequestAzure(const std::string& id, std::function<void(const Request & request, const Error & error)> completedCallback)
  : Request(id)
  , m_completedCallback(completedCallback)
  , m_cancelled(false)
  , m_done(false)
{
}
Prashant Karbhari's avatar
Prashant Karbhari committed
260

261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
void UploadRequestAzure::run(azure::storage::cloud_blob_container& container, azure::storage::blob_request_options options, const std::string& requestName, const std::string& contentDispositionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::weak_ptr<UploadRequestAzure> uploadRequest)
{
  // set options, we should probably get these through AzureOpenOptions instead of haddong here - default set in the IOMangerAzure
  azure::storage::blob_request_options local_options;
  local_options.set_parallelism_factor(options.parallelism_factor()); // Example:(4);
  local_options.set_maximum_execution_time(options.maximum_execution_time()); // Example: (std::chrono::seconds(10000));

  m_data = data;
  // Set teh cancellation token
  m_cancelTokenSrc = pplx::cancellation_token_source();
  m_blob = container.get_block_blob_reference(convertToUtilString(requestName));

  m_context = azure::storage::operation_context();
  // Get the provided metadata ready for upload
  for (auto it : metadataHeader)
  {
    m_blob.metadata()[convertToUtilString(it.first)] = convertToUtilString(it.second);
  }
  m_blob.properties().set_content_type(convertToUtilString(contentType));
  m_blob.properties().set_content_disposition(convertToUtilString(contentDispositionFilename));

  m_taskResult = m_blob.upload_from_stream_async(concurrency::streams::bytestream::open_istream(*data), data->size(), azure::storage::access_condition(), local_options, m_context, m_cancelTokenSrc.get_token());
  m_taskResult.then(
284
    [this, uploadRequest](pplx::task<void> uploadTask)
285
286
287
288
289
290
291
    {
      auto request = uploadRequest.lock();
      if (!request)
        return;

      try
      {
292
        uploadTask.get();
Jørgen Lind's avatar
Jørgen Lind committed
293
294
        m_data.reset();

295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
        m_error.code = 0;
        m_done = true;
        m_waitForFinish.notify_all();
        if (m_completedCallback) m_completedCallback(*this, m_error);
      }
      catch (azure::storage::storage_exception & e)
      {
        // On error set the completion (error) status and call the completion callback
        std::string ex_msg;
        ex_msg = std::string(e.what());
        m_error.code = -1;
        m_error.string = ex_msg;
        m_done = true;
        m_waitForFinish.notify_all();
        if (m_completedCallback) m_completedCallback(*this, m_error);
      }
    });
}
Prashant Karbhari's avatar
Prashant Karbhari committed
313

314
315
316
void UploadRequestAzure::WaitForFinish()
{
  std::unique_lock<std::mutex> lock(m_mutex);
Prashant Karbhari's avatar
Prashant Karbhari committed
317

318
  m_waitForFinish.wait(lock, [this]
Prashant Karbhari's avatar
Prashant Karbhari committed
319
    {
320
321
322
      return this->m_done;
    });
}
323

324
325
326
327
328
bool UploadRequestAzure::IsDone() const
{
  std::unique_lock<std::mutex> lock(m_mutex);
  return m_done;
}
Prashant Karbhari's avatar
Prashant Karbhari committed
329

330
331
332
333
334
335
336
337
338
339
340
341
bool UploadRequestAzure::IsSuccess(Error& error) const
{
  std::unique_lock<std::mutex> lock(m_mutex);
  if (!m_done)
  {
    error.code = -1;
    error.string = "Upload not done.";
    return false;
  }
  error = m_error;
  return m_error.code == 0;
}
Prashant Karbhari's avatar
Prashant Karbhari committed
342

343
344
345
346
347
void UploadRequestAzure::Cancel()
{
  m_cancelled = true;
  m_cancelTokenSrc.cancel();
}
Prashant Karbhari's avatar
Prashant Karbhari committed
348

349
IOManagerAzure::IOManagerAzure(const AzureOpenOptions& openOptions, Error& error)
Jørgen Lind's avatar
Jørgen Lind committed
350
  : IOManager(OpenOptions::Azure)
351
  , m_containerStr(openOptions.container)
Jørgen Lind's avatar
Jørgen Lind committed
352
  , m_prefix(openOptions.blob)
353
{
354
  if (openOptions.connectionString.empty() && openOptions.bearerToken.empty())
355
356
  {
    error.code = -1;
357
    error.string = "Azure Config error. Must provide a connection string or a bearer token";
358
359
360
361
362
363
364
365
    return;
  }
  if (m_containerStr.empty())
  {
    error.code = -1;
    error.string = "Azure Config error. Empty container or blob name";
    return;
  }
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393

  if (openOptions.connectionString.size())
  {
    m_storage_account = azure::storage::cloud_storage_account::parse(convertToUtilString(openOptions.connectionString));
  } else
  {
    assert(openOptions.bearerToken.size());
    if (openOptions.accountName.empty())
    {
      error.code = -1;
      error.string = "Azure Config error. Account Name is mandatory when specifying bearer token";
    }
    azure::storage::storage_credentials::bearer_token_credential bearerToken(convertToUtilString(openOptions.bearerToken));
    auto storage_credentials = azure::storage::storage_credentials(convertToUtilString(openOptions.accountName), bearerToken);
    m_storage_account = azure::storage::cloud_storage_account(storage_credentials, true);
  }
  
  try
  {
    m_blobClient = m_storage_account.create_cloud_blob_client();
    m_container = m_blobClient.get_container_reference(convertToUtilString(m_containerStr));
    m_container.create_if_not_exists();
  } catch (azure::storage::storage_exception &ex)
  {
    error.code = -1;
    error.string = ex.what();
    return;
  }
394
395
396
397
398
399
400

  //m_options.set_server_timeout(std::chrono::seconds(60000));

  m_options = azure::storage::blob_request_options();
  m_options.set_parallelism_factor(openOptions.parallelism_factor);
  m_options.set_maximum_execution_time(std::chrono::seconds(openOptions.max_execution_time));
}
Prashant Karbhari's avatar
Prashant Karbhari committed
401

402
403
404
IOManagerAzure::~IOManagerAzure()
{
}
Prashant Karbhari's avatar
Prashant Karbhari committed
405

406
std::shared_ptr<Request> IOManagerAzure::ReadObjectInfo(const std::string &objectName, std::shared_ptr<TransferDownloadHandler> handler)
Jørgen Lind's avatar
Jørgen Lind committed
407
{
408
409
410
  std::shared_ptr<ReadObjectInfoRequestAzure> azureRequest = std::make_shared<ReadObjectInfoRequestAzure>(objectName, handler);
  azureRequest->run(m_container, m_options, objectName, azureRequest);
  return azureRequest;
Jørgen Lind's avatar
Jørgen Lind committed
411
412
}

413
std::shared_ptr<Request> IOManagerAzure::ReadObject(const std::string &requestName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range)
414
{
Jørgen Lind's avatar
Jørgen Lind committed
415
416
417
  std::string id = requestName.empty() ? m_prefix : m_prefix + "/" + requestName;
  std::shared_ptr<DownloadRequestAzure> azureRequest = std::make_shared<DownloadRequestAzure>(id, handler);
  azureRequest->run(m_container, m_options, id, range, azureRequest);
418
419
  return azureRequest;
}
Prashant Karbhari's avatar
Prashant Karbhari committed
420

421
std::shared_ptr<Request> IOManagerAzure::WriteObject(const std::string &requestName, const std::string& contentDispositionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback)
422
{
Jørgen Lind's avatar
Jørgen Lind committed
423
424
425
  std::string id = requestName.empty() ? m_prefix : m_prefix + "/" + requestName;
  std::shared_ptr<UploadRequestAzure> azureRequest = std::make_shared<UploadRequestAzure>(id, completedCallback);
  azureRequest->run(m_container, m_options, id, contentDispositionFilename, contentType, metadataHeader, data, azureRequest);
426
427
  return azureRequest;
}
Jørgen Lind's avatar
Jørgen Lind committed
428
}