Commit 385f0c4b authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Add rudimentary retry on chunk upload

parent 439a4ce1
......@@ -554,7 +554,16 @@ int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk &ch
if(error.code != 0)
{
m_uploadErrors.emplace_back(new UploadError(error, request.getObjectName()));
auto &uploadRequest = m_pendingUploadRequests[jobId];
if (uploadRequest.attempts < 2)
{
uploadRequest.startNewUpload(*m_ioManager);
return;
}
else
{
m_uploadErrors.emplace_back(new UploadError(error, request.getObjectName()));
}
}
else
{
......@@ -572,7 +581,7 @@ int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk &ch
meta_map.emplace_back("vdschunkmetadata", std::string(base64Hash.begin(), base64Hash.end()));
// add new pending upload request
std::unique_lock<std::mutex> lock(m_mutex);
m_pendingUploadRequests[jobId] = PendingUploadRequest(m_ioManager->uploadBinary(url, contentDispositionName, meta_map, to_write, completedCallback), lockedMetadataPage);
m_pendingUploadRequests[jobId] = PendingUploadRequest(*m_ioManager, url, contentDispositionName, meta_map, to_write, completedCallback);
return jobId;
}
......@@ -582,7 +591,7 @@ void VolumeDataAccessManagerImpl::flushUploadQueue()
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_pendingUploadRequests.empty()) break;
Request &request = *m_pendingUploadRequests.begin()->second.m_request;
Request &request = *m_pendingUploadRequests.begin()->second.request;
lock.unlock();
request.waitForFinish();
}
......
......@@ -123,15 +123,36 @@ inline bool operator<(const VolumeDataChunk &a, const VolumeDataChunk &b)
struct PendingUploadRequest
{
std::shared_ptr<Request> m_request;
MetadataPage* m_lockedMetadataPage;
std::string url;
std::string contentDispositionName;
std::vector<std::pair<std::string, std::string>> metaMap;
std::shared_ptr<std::vector<uint8_t>> data;
std::function<void(const Request & request, const Error & error)> completedCallback;
uint32_t attempts;
std::shared_ptr<Request> request;
PendingUploadRequest()
: request()
, attempts(0)
{
}
PendingUploadRequest() : m_request(), m_lockedMetadataPage()
PendingUploadRequest(IOManager &ioManager, const std::string &url, const std::string contentDispositionName, std::vector<std::pair<std::string, std::string>> &metaMap, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback)
: url(url)
, contentDispositionName(contentDispositionName)
, metaMap(metaMap)
, data(data)
, completedCallback(completedCallback)
, attempts(0)
{
startNewUpload(ioManager);
}
PendingUploadRequest(std::shared_ptr<Request> request, MetadataPage* lockedMetadataPage) : m_request(request), m_lockedMetadataPage(lockedMetadataPage) {}
void startNewUpload(IOManager &ioManager)
{
request = ioManager.uploadBinary(url, contentDispositionName, metaMap, data, completedCallback);
attempts++;
}
};
struct UploadError
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment