Commit 06b7cf08 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Upload files

with some new upload error checking api
parent c0017cbb
......@@ -21,8 +21,13 @@
namespace OpenVDS
{
Request::Request(const std::string& objectName)
: m_objectName(objectName)
{
}
Request::~Request()
{}
TransferHandler::~TransferHandler()
{}
void TransferHandler::handleMetadata(const std::string& key, const std::string& header)
......
......@@ -35,11 +35,15 @@ namespace OpenVDS
class Request
{
public:
Request(const std::string &objectName);
virtual ~Request();
virtual void waitForFinish() = 0;
virtual bool isDone() const = 0;
virtual bool isSuccess(Error &error) const = 0;
virtual void cancel() = 0;
const std::string &getObjectName() const { return m_objectName; }
private:
std::string m_objectName;
};
struct IORange
......
......@@ -25,7 +25,9 @@
#include <aws/s3/model/BucketLocationConstraint.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/logging/DefaultLogSystem.h>
#include <aws/core/utils/logging/AWSLogging.h>
#include <mutex>
#include <functional>
......@@ -48,10 +50,15 @@ namespace OpenVDS
static void initializeAWSSDK()
{
std::unique_lock<std::mutex> lock(initialize_sdk_mutex);
initialize_sdk++;
if (initialize_sdk == 1)
{
Aws::Utils::Logging::InitializeAWSLogging(
Aws::MakeShared<Aws::Utils::Logging::DefaultLogSystem>(
"OpenVDS-S3 Integration", Aws::Utils::Logging::LogLevel::Trace, "aws_sdk_"));
Aws::InitAPI(initialize_sdk_options);
}
}
......@@ -62,6 +69,7 @@ namespace OpenVDS
initialize_sdk--;
if (!initialize_sdk)
{
Aws::Utils::Logging::ShutdownAWSLogging();
Aws::ShutdownAPI(initialize_sdk_options);
}
......@@ -125,7 +133,8 @@ namespace OpenVDS
}
DownloadRequestAWS::DownloadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler, const IORange &range)
: m_handler(handler)
: Request(id)
, m_handler(handler)
, m_context(std::make_shared<AsyncDownloadContext>(this))
, m_done(false)
{
......@@ -191,7 +200,7 @@ namespace OpenVDS
}
UploadRequestAWS::UploadRequestAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, std::shared_ptr<std::vector<uint8_t>> data, const IORange& range)
: Request()
: Request(id)
, m_context(std::make_shared<AsyncUploadContext>(this))
, m_data(data)
, m_vectorBuf(*data)
......@@ -202,6 +211,14 @@ namespace OpenVDS
put.SetBucket(std::move(convertStdString(bucket)));
put.SetKey(std::move(convertStdString(id)));
put.SetBody(m_stream);
if (range.end)
{
assert(false);
//Have to use the Multi upload api instead. Maybe use TransferManager?
// char rangeHeaderBuffer[100];
// snprintf(rangeHeaderBuffer, sizeof(rangeHeaderBuffer), "bytes=%zu-%zu", range.start, range.end);
// object_request.SetRange(rangeHeaderBuffer);
}
using namespace std::placeholders;
auto bounded_callback = std::bind(&upload_callback, _1, _2, _3, _4, m_context);
......@@ -211,6 +228,8 @@ namespace OpenVDS
void UploadRequestAWS::waitForFinish()
{
std::unique_lock<std::mutex> lock(m_context->mutex);
if (m_done)
return;
m_waitForFinish.wait(lock);
}
bool UploadRequestAWS::isDone() const
......
......@@ -540,6 +540,12 @@ public:
/// A factor (between 0 and 1) indicating how much of the request has been completed.
/// </returns>
virtual float getCompletionFactor(int64_t requestID) = 0;
virtual void flushUploadQueue() = 0;
virtual void clearUploadErrors() = 0;
virtual void forceClearAllUploadErrors() = 0;
virtual int32_t uploadErrorCount() = 0;
virtual void getCurrentUploadError(const char **objectId, int32_t *errorCode, const char **errorString) = 0;
};
template<> inline VolumeDataReadWriteAccessor<IntVector4, double> *VolumeDataAccessManager::createVolumeDataAccessor<IntVector4, double> (VolumeDataPageAccessor* volumeDataPageAccessor, float replacementNoValue) { return create4DVolumeDataAccessorR64 (volumeDataPageAccessor, replacementNoValue); }
......
......@@ -571,9 +571,11 @@ main(int argc, char *argv[])
std::string persistentID;
std::string fileInfoFileName;
int brickSize;
bool force = false;
options.add_option("", "i", "file-info", "A JSON file (generated by the SEGYScan tool) containing information about the input SEG-Y file.", cxxopts::value<std::string>(fileInfoFileName), "<file>");
options.add_option("", "b", "brick-size", "The brick size for the volume data store.", cxxopts::value<int>(brickSize)->default_value("64"), "<value>");
options.add_option("", "f", "force", "Continue on upload error.", cxxopts::value<bool>(force), "");
options.add_option("", "", "bucket", "Bucket to upload to.", cxxopts::value<std::string>(bucket), "<string>");
options.add_option("", "", "region", "Region of bucket to upload to.", cxxopts::value<std::string>(region), "<string>");
options.add_option("", "", "prefix", "Top-level prefix to prepend to all object-keys.", cxxopts::value<std::string>(prefix), "<string>");
......@@ -751,6 +753,19 @@ main(int argc, char *argv[])
for(int64_t chunk = 0; chunk < amplitudeAccessor->getChunkCount(); chunk++)
{
int32_t errorCount = accessManager->uploadErrorCount();
for (int i = 0; i < errorCount; i++)
{
const char *object_id;
int32_t error_code;
const char *error_string;
accessManager->getCurrentUploadError(&object_id, &error_code, &error_string);
fprintf(stderr, "Failed to upload object: %s. Error code %d: %s\n", object_id, error_code, error_string);
}
if (errorCount && !force)
{
return EXIT_FAILURE;
}
int
min[OpenVDS::Dimensionality_Max], max[OpenVDS::Dimensionality_Max];
......
......@@ -219,6 +219,15 @@ VolumeDataAccessManagerImpl::VolumeDataAccessManagerImpl(VDSHandle* handle)
{
}
VolumeDataAccessManagerImpl::~VolumeDataAccessManagerImpl()
{
flushUploadQueue();
if (m_uploadErrors.size())
{
fprintf(stderr, "VolumeDataAccessManager destructor: there where upload errors\n");
}
}
VolumeDataLayout const* VolumeDataAccessManagerImpl::getVolumeDataLayout() const
{
return m_layout;
......@@ -302,23 +311,29 @@ static MetadataManager *getMetadataMangerForLayer(LayerMetadataContainer *contai
return metadataManager;
}
static std::string makeURLForChunk(const std::string &layerUrl, uint64_t chunk)
static std::string createUrlForChunk(const std::string &layerUrl, uint64_t chunk)
{
char url[1000];
char url[1024];
snprintf(url, sizeof(url), "%s/%" PRIu64, layerUrl.c_str(), chunk);
return std::string(url);
return url;
}
bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &chunk, bool verbose, Error &error)
static std::string createBaseUrl(const VolumeDataLayer *layer)
{
int32_t channel = chunk.layer->getChannelIndex();
const char *channelName = channel > 0 ? chunk.layer->getLayout()->getChannelName(chunk.layer->getChannelIndex()) : "";
int32_t lod = chunk.layer->getLOD();
const char *dimensions_string = DimensionGroupUtil::getDimensionsGroupString(DimensionGroupUtil::getDimensionsNDFromDimensionGroup(chunk.layer->getChunkDimensionGroup()));
char layerURL[1000];
int32_t channel = layer->getChannelIndex();
const char *channelName = channel > 0 ? layer->getLayout()->getChannelName(layer->getChannelIndex()) : "";
int32_t lod = layer->getLOD();
const char *dimensions_string = DimensionGroupUtil::getDimensionsGroupString(DimensionGroupUtil::getDimensionsNDFromDimensionGroup(layer->getChunkDimensionGroup()));
char layerURL[1024];
snprintf(layerURL, sizeof(layerURL), "%sDimensions_%sLOD%d", channelName, dimensions_string, lod);
return layerURL;
}
bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &chunk, bool verbose, Error &error)
{
std::string layerURL = createBaseUrl(chunk.layer);
auto metadataManager = getMetadataMangerForLayer(m_layerMetadataContainer, layerURL);
//do fallback
if (!metadataManager)
......@@ -350,7 +365,7 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
// Check if the page is not valid and we need to add the request later when the metadata page transfer completes
if (!metadataPage->IsValid())
{
m_pendingRequests[chunk] = PendingRequest(metadataPage);
m_pendingDownloadRequests[chunk] = PendingDownloadRequest(metadataPage);
return true;
}
......@@ -366,11 +381,11 @@ bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &ch
IORange ioRange = calculateRangeHeaderImpl(parsedMetadata, metadataManager->metadataStatus(), &adaptiveLevel);
std::string url = makeURLForChunk(layerURL, chunk.chunkIndex);
std::string url = createUrlForChunk(layerURL, chunk.chunkIndex);
lock.lock();
auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
m_pendingRequests[chunk] = PendingRequest(m_ioManager->downloadObject(url, transferHandler, ioRange), transferHandler);
m_pendingDownloadRequests[chunk] = PendingDownloadRequest(m_ioManager->downloadObject(url, transferHandler, ioRange), transferHandler);
return true;
}
......@@ -379,15 +394,15 @@ bool VolumeDataAccessManagerImpl::readChunk(const VolumeDataChunk &chunk, std::v
{
std::unique_lock<std::mutex> lock(m_mutex);
auto pendingRequestIterator = m_pendingRequests.find(chunk);
auto pendingRequestIterator = m_pendingDownloadRequests.find(chunk);
if(pendingRequestIterator == m_pendingRequests.end())
if(pendingRequestIterator == m_pendingDownloadRequests.end())
{
error.code = -1;
error.string = "Missing request for chunk: " + std::to_string(chunk.chunkIndex);
return false;
}
PendingRequest& pendingRequest = pendingRequestIterator->second;
PendingDownloadRequest& pendingRequest = pendingRequestIterator->second;
if (!pendingRequest.m_activeTransfer)
{
......@@ -400,7 +415,7 @@ bool VolumeDataAccessManagerImpl::readChunk(const VolumeDataChunk &chunk, std::v
auto activeTransfer = pendingRequest.m_activeTransfer;
auto transferHandler = pendingRequest.m_transferHandle;
m_pendingRequests.erase(pendingRequestIterator);
m_pendingDownloadRequests.erase(pendingRequestIterator);
lock.unlock();
......@@ -429,10 +444,10 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
{
std::unique_lock<std::mutex> lock(m_mutex);
for(auto &pendingRequestKeyValuePair : m_pendingRequests)
for(auto &pendingRequestKeyValuePair : m_pendingDownloadRequests)
{
VolumeDataChunk const volumeDataChunk = pendingRequestKeyValuePair.first;
PendingRequest &pendingRequest = pendingRequestKeyValuePair.second;
PendingDownloadRequest &pendingRequest = pendingRequestKeyValuePair.second;
if(pendingRequest.m_lockedMetadataPage == metadataPage)
{
......@@ -457,7 +472,7 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
IORange ioRange = calculateRangeHeaderImpl(parsedMetadata, metadataManager->metadataStatus(), &adaptiveLevel);
std::string url = makeURLForChunk(metadataManager->layerUrlStr(), volumeDataChunk.chunkIndex);
std::string url = createUrlForChunk(metadataManager->layerUrlStr(), volumeDataChunk.chunkIndex);
auto transferHandler = std::make_shared<ReadChunkTransfer>(metadataManager->metadataStatus().m_compressionMethod, adaptiveLevel);
pendingRequest.m_activeTransfer = m_ioManager->downloadObject(url, transferHandler, ioRange);
......@@ -467,5 +482,81 @@ void VolumeDataAccessManagerImpl::pageTransferCompleted(MetadataPage* metadataPa
}
m_pendingRequestChangedCondition.notify_all();
}
static int64_t gen_upload_jobid()
{
static std::atomic< std::int64_t > id(0);
return --id;
}
int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk& chunk, std::shared_ptr<std::vector<uint8_t>> data)
{
std::string url = createUrlForChunk(createBaseUrl(chunk.layer), chunk.chunkIndex);
m_pendingUploadRequests.erase(std::remove_if(m_pendingUploadRequests.begin(), m_pendingUploadRequests.end(), [this](PendingUploadRequest &request){
Error error;
bool done = request.request->isDone();
if (done && !request.request->isSuccess(error))
this->m_uploadErrors.emplace_back(new UploadError(error, request.request->getObjectName()));
return done;
}), m_pendingUploadRequests.end());
m_pendingUploadRequests.push_back({gen_upload_jobid(), m_ioManager->uploadObject(url, data)});
return 0;
}
void VolumeDataAccessManagerImpl::flushUploadQueue()
{
std::unique_lock<std::mutex> lock(m_mutex);
Error error;
for (auto &upload : m_pendingUploadRequests)
{
upload.request->waitForFinish();
if (!upload.request->isSuccess(error))
{
m_uploadErrors.emplace_back(new UploadError(error, upload.request->getObjectName()));
}
}
}
void VolumeDataAccessManagerImpl::clearUploadErrors()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.erase(m_uploadErrors.begin(), m_uploadErrors.begin() + m_currentErrorIndex);
m_currentErrorIndex = 0;
}
void VolumeDataAccessManagerImpl::forceClearAllUploadErrors()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.clear();
m_currentErrorIndex = 0;
}
int32_t VolumeDataAccessManagerImpl::uploadErrorCount()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_uploadErrors.size() - m_currentErrorIndex;
}
void VolumeDataAccessManagerImpl::getCurrentUploadError(const char** objectId, int32_t* errorCode, const char** errorString)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_currentErrorIndex >= m_uploadErrors.size())
{
if (objectId)
*objectId = nullptr;
if (errorCode)
*errorCode = 0;
if (errorString)
*errorString = nullptr;
}
const auto &error = m_uploadErrors[m_currentErrorIndex];
m_currentErrorIndex++;
lock.unlock();
*objectId = error->urlObject.c_str();
*errorCode = error->error.code;
*errorString = error->error.string.c_str();
}
}
......@@ -77,21 +77,22 @@ public:
std::vector<uint8_t> m_data;
std::vector<uint8_t> m_metadata;
};
struct PendingRequest
struct PendingDownloadRequest
{
MetadataPage* m_lockedMetadataPage;
std::shared_ptr<Request> m_activeTransfer;
std::shared_ptr<ReadChunkTransfer> m_transferHandle;
PendingRequest() : m_lockedMetadataPage(nullptr)
PendingDownloadRequest() : m_lockedMetadataPage(nullptr)
{
}
explicit PendingRequest(MetadataPage* lockedMetadataPage) : m_lockedMetadataPage(lockedMetadataPage), m_activeTransfer(nullptr)
explicit PendingDownloadRequest(MetadataPage* lockedMetadataPage) : m_lockedMetadataPage(lockedMetadataPage), m_activeTransfer(nullptr)
{
}
explicit PendingRequest(std::shared_ptr<Request> activeTransfer, std::shared_ptr<ReadChunkTransfer> handler) : m_lockedMetadataPage(nullptr), m_activeTransfer(activeTransfer), m_transferHandle(handler)
explicit PendingDownloadRequest(std::shared_ptr<Request> activeTransfer, std::shared_ptr<ReadChunkTransfer> handler) : m_lockedMetadataPage(nullptr), m_activeTransfer(activeTransfer), m_transferHandle(handler)
{
}
};
......@@ -119,10 +120,27 @@ static bool operator<(const VolumeDataChunk &a, const VolumeDataChunk &b)
return DimensionGroupUtil::getDimensionsNDFromDimensionGroup(a.layer->getChunkDimensionGroup()) < DimensionGroupUtil::getDimensionsNDFromDimensionGroup(b.layer->getChunkDimensionGroup());
}
struct PendingUploadRequest
{
int64_t jobID;
std::shared_ptr<Request> request;
};
struct UploadError
{
UploadError(const Error &error, const std::string &urlObject)
: error(error)
, urlObject(urlObject)
{}
Error error;
std::string urlObject;
};
class VolumeDataAccessManagerImpl : public VolumeDataAccessManager
{
public:
VolumeDataAccessManagerImpl(VDSHandle *handle);
~VolumeDataAccessManagerImpl();
VolumeDataLayout const *getVolumeDataLayout() const override;
VolumeDataPageAccessor *createVolumeDataPageAccessor(VolumeDataLayout const *volumeDataLayout, DimensionsND dimensionsND, int lod, int channel, int maxPages, AccessMode accessMode) override;
......@@ -181,7 +199,16 @@ public:
bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error);
void pageTransferCompleted(MetadataPage* page);
int64_t requestWriteChunk(const VolumeDataChunk &chunk, std::shared_ptr<std::vector<uint8_t>> data);
IOManager *getIoManager() const { return m_ioManager; }
void flushUploadQueue() override;
void clearUploadErrors() override;
void forceClearAllUploadErrors() override;
int32_t uploadErrorCount() override;
void getCurrentUploadError(const char **objectId, int32_t *errorCode, const char **errorString) override;
private:
VolumeDataLayout *m_layout;
IOManager *m_ioManager;
......@@ -189,7 +216,10 @@ private:
IntrusiveList<VolumeDataPageAccessorImpl, &VolumeDataPageAccessorImpl::m_volumeDataPageAccessorListNode> m_volumeDataPageAccessorList;
std::mutex m_mutex;
std::condition_variable m_pendingRequestChangedCondition;
std::map<VolumeDataChunk, PendingRequest> m_pendingRequests;
std::map<VolumeDataChunk, PendingDownloadRequest> m_pendingDownloadRequests;
std::vector<PendingUploadRequest> m_pendingUploadRequests;
std::vector<std::unique_ptr<UploadError>> m_uploadErrors;
uint32_t m_currentErrorIndex;
};
}
#endif //VOLUMEDATAACCESSMANAGERIMPL_H
\ No newline at end of file
......@@ -395,6 +395,10 @@ void VolumeDataPageAccessorImpl::limitPageListSize(int maxPages, std::unique_loc
}
}
int64_t VolumeDataPageAccessorImpl::requestWritePage(int64_t chunk, std::shared_ptr<std::vector<uint8_t>> data)
{
return m_accessManager->requestWriteChunk({ m_layer, chunk }, data);
}
/////////////////////////////////////////////////////////////////////////////
// Commit
......
......@@ -24,6 +24,7 @@
#include <list>
#include <mutex>
#include <condition_variable>
#include <vector>
namespace OpenVDS
{
......@@ -78,10 +79,13 @@ public:
VolumeDataPage *createPage(int64_t chunk) override;
VolumeDataPage *readPage(int64_t chunk) override;
int64_t requestWritePage(int64_t chunk, std::shared_ptr<std::vector<uint8_t>> data);
void commit() override;
bool isReadWrite() const { return m_isReadWrite; }
VolumeDataAccessManagerImpl *getManager() const { return m_accessManager; }
};
......
......@@ -176,7 +176,10 @@ void VolumeDataPageImpl::setBufferData(std::vector<uint8_t>&& blob, const int(&p
void VolumeDataPageImpl::writeBack(VolumeDataLayer* volumeDataLayer, std::unique_lock<std::mutex>& pageListMutexLock)
{
assert(m_isDirty);
IOManager *iomanager = m_volumeDataPageAccessor->getManager()->getIoManager();
std::shared_ptr<std::vector<uint8_t>> to_write = std::make_shared<std::vector<uint8_t>>();
*to_write = m_blob;
m_volumeDataPageAccessor->requestWritePage(m_chunk, to_write);
///IOManager *iomanager = m_volumeDataPageAccessor->wri
//iomanager->
}
......
......@@ -30,7 +30,7 @@ class VolumeDataPageAccessorImpl;
class VolumeDataPageImpl : public VolumeDataPage
{
private:
const VolumeDataPageAccessorImpl * m_volumeDataPageAccessor;
VolumeDataPageAccessorImpl * m_volumeDataPageAccessor;
int64_t m_chunk;
......
......@@ -34,7 +34,7 @@ VolumeDataRequestProcessor::VolumeDataRequestProcessor(VolumeDataAccessManagerIm
: m_manager(manager)
{}
int64_t gen_jobid()
static int64_t gen_jobid()
{
static std::atomic< std::int64_t > id(0);
return ++id;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment