Commit 181271db authored by Jørgen Lind's avatar Jørgen Lind
Browse files

prepareReadChunkData and readChunk

parent 79c8d420
......@@ -16,7 +16,8 @@ set(SOURCE_FILES
VDS/VolumeDataPageImpl.cpp
VDS/DimensionGroup.cpp
VDS/ParseVDSJson.cpp
VDS/MetadataManager.cpp)
VDS/MetadataManager.cpp
VDS/Base64.cpp)
set (PRIVATE_HEADER_FILES
IO/File.h
......@@ -38,7 +39,8 @@ set (PRIVATE_HEADER_FILES
VDS/Bitmask.h
VDS/ParseVDSJson.h
VDS/MetadataManager.h
VDS/IntrusiveList.h)
VDS/IntrusiveList.h
VDS/Base64.h)
set (EXPORTED_HEADER_FILES
OpenVDS/OpenVDS.h
......
......@@ -25,6 +25,8 @@ ObjectRequester::~ObjectRequester()
{}
TransferHandler::~TransferHandler()
{}
void TransferHandler::handleMetadata(const std::string& key, const std::string& header)
{}
IOManager::~IOManager()
{}
IOManager* IOManager::createIOManager(const OpenOptions& options, Error &error)
......
......@@ -19,7 +19,6 @@
#define IOMANAGER_H
#include <memory>
#include <OpenVDS/OpenVDS.h>
namespace OpenVDS
......@@ -28,6 +27,7 @@ namespace OpenVDS
{
public:
virtual ~TransferHandler();
virtual void handleMetadata(const std::string &key, const std::string &header);
virtual void handleData(std::vector<uint8_t> &&data) = 0;
virtual void handleError(Error &error) = 0;
};
......@@ -42,11 +42,17 @@ namespace OpenVDS
virtual void cancel() = 0;
};
struct IORange
{
size_t start = 0;
size_t end = 0;
};
class IOManager
{
public:
virtual ~IOManager();
virtual std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler) = 0;
virtual std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) = 0;
static IOManager *createIOManager(const OpenOptions &options, Error &error);
};
......
......@@ -35,6 +35,11 @@ namespace OpenVDS
static std::mutex initialize_sdk_mutex;
static Aws::SDKOptions initialize_sdk_options;
static std::string convertAwsString(const Aws::String &s)
{
return std::string(s.begin(), s.end());
}
static void initializeAWSSDK()
{
std::unique_lock<std::mutex> lock(initialize_sdk_mutex);
......@@ -96,18 +101,23 @@ namespace OpenVDS
}
Aws::S3::Model::GetObjectResult result = const_cast<Aws::S3::Model::GetObjectOutcome &>(getObjectOutcome).GetResultWithOwnership();
for (auto it : result.GetMetadata())
{
objReq->m_handler->handleMetadata(convertAwsString(it.first), convertAwsString(it.second));
}
auto &retrieved_object = result.GetBody();
auto content_length = result.GetContentLength();
std::vector<uint8_t> data;
if (content_length > 0)
{
std::vector<uint8_t> data;
data.resize(content_length);
retrieved_object.read((char *)&data[0], content_length);
objReq->m_handler->handleData(std::move(data));
}
}
ObjectRequesterAWS::ObjectRequesterAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler)
ObjectRequesterAWS::ObjectRequesterAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler, const IORange &range)
: m_handler(handler)
, m_context(std::make_shared<AsyncCallerContext>(this))
, m_done(false)
......@@ -115,7 +125,12 @@ namespace OpenVDS
Aws::S3::Model::GetObjectRequest object_request;
object_request.SetBucket(bucket.c_str());
object_request.SetKey(id.c_str());
if (range.end)
{
char rangeHeaderBuffer[100];
snprintf(rangeHeaderBuffer, sizeof(rangeHeaderBuffer), "bytes=%d-%d", range.start, range.end);
object_request.SetRange(rangeHeaderBuffer);
}
using namespace std::placeholders;
auto bounded_callback = std::bind(&callback, _1, _2, _3, _4, m_context);
client.GetObjectAsync(object_request, bounded_callback);
......@@ -177,9 +192,9 @@ namespace OpenVDS
deinitizlieAWSSDK();
}
std::shared_ptr<ObjectRequester> IOManagerAWS::requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler)
std::shared_ptr<ObjectRequester> IOManagerAWS::requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range)
{
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::make_shared<ObjectRequesterAWS>(*m_s3Client.get(), m_bucket, id, handler);
return std::make_shared<ObjectRequesterAWS>(*m_s3Client.get(), m_bucket, id, handler, range);
}
}
......@@ -29,7 +29,7 @@ namespace OpenVDS
class ObjectRequesterAWS : public ObjectRequester
{
public:
ObjectRequesterAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler);
ObjectRequesterAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler, const IORange &range);
~ObjectRequesterAWS();
void waitForFinish() override;
......@@ -50,7 +50,7 @@ namespace OpenVDS
IOManagerAWS(const AWSOpenOptions &openOptions, Error &error);
~IOManagerAWS();
std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler) override;
std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler, const IORange &range = IORange()) override;
private:
std::string m_region;
std::string m_bucket;
......
......@@ -91,6 +91,28 @@ enum class CompressionMethod
WaveletNormalizeBlockLossless,
Max
};
class CompressionInfo
{
CompressionMethod compressionMethod;
int adaptiveLevel;
public:
CompressionInfo() : compressionMethod(CompressionMethod::None), adaptiveLevel(0)
{
}
CompressionInfo(CompressionMethod compressionMethod, int adaptiveLevel) : compressionMethod(compressionMethod), adaptiveLevel(adaptiveLevel)
{
}
CompressionMethod GetCompressionMethod() const
{
return compressionMethod;
}
int GetAdaptiveLevel() const
{
return adaptiveLevel;
}
};
enum Dimensionality
{
......
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#include "Base64.h"
#include <cctype>
namespace OpenVDS
{
class Base64Table
{
static const char alphabet[65];
static unsigned char table[256];
static Base64Table instance;
Base64Table()
{
memset(table, -1, sizeof(table));
for(int i = 0; i < sizeof(alphabet) - 1; i++)
{
table[alphabet[i]] = i;
}
}
public:
static int decode(char a) { return table[(unsigned char)a]; }
static char encode(unsigned char u) { return alphabet[u]; }
};
Base64Table
Base64Table::instance;
const char
Base64Table::alphabet[65] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
unsigned char
Base64Table::table[256];
bool Base64Decode(const char *data, int len, std::vector<unsigned char> &result)
{
bool error = false;
int
decode = 0;
// skip leading whitespace
while(len && isspace(*data)) len--, data++;
if(len == 0) { error = true; }
result.reserve(result.size() + len / 4 * 3);
while(len != 0 && !isspace(*data))
{
int a = Base64Table::decode(*data++); len--; if(a == -1) { error = true; break; }
if(len == 0 || isspace(*data)) { error = true; break; }
int b = Base64Table::decode(*data++); len--; if(b == -1) { error = true; break; }
result.push_back((a << 2) | (b >> 4));
if(len == 0 || isspace(*data)) break; if(*data == '=') { data++; len--; if(len==0 || *data++ != '=') { error = true; break; } len--; break; }
int c = Base64Table::decode(*data++); len--; if(c == -1) { error = true; break; }
result.push_back(((b & 0xf) << 4) | (c >> 2));
if(len == 0 || isspace(*data)) break; if(*data == '=') { data++; len--; break; }
int d = Base64Table::decode(*data++); len--; if(d == -1) { error = true; break; }
result.push_back(((c & 0x3) << 6) | d);
}
// skip trailing whitespace
while(len && isspace(*data)) len--, data++;
if(len != 0) { error = true; }
return !error;
}
}
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef BASE64_H
#define BASE64_H
#include <vector>
namespace OpenVDS
{
bool Base64Decode(const char* data, int len, std::vector<unsigned char>& result);
}
#endif
\ No newline at end of file
......@@ -32,6 +32,13 @@
namespace OpenVDS
{
enum AdaptiveMode
{
AdaptiveMode_BestQuality,
AdaptiveMode_Tolerance,
AdaptiveMode_Ratio
};
struct MetadataStatus
{
enum
......
......@@ -26,77 +26,10 @@
#include "Bitmask.h"
namespace OpenVDS
{
class Base64Table
{
static const char alphabet[65];
static unsigned char table[256];
static Base64Table instance;
Base64Table()
{
memset(table, -1, sizeof(table));
for(int i = 0; i < sizeof(alphabet) - 1; i++)
{
table[alphabet[i]] = i;
}
}
public:
static int decode(char a) { return table[(unsigned char)a]; }
static char encode(unsigned char u) { return alphabet[u]; }
};
Base64Table
Base64Table::instance;
#include "Base64.h"
const char
Base64Table::alphabet[65] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
unsigned char
Base64Table::table[256];
bool
Base64Decode(const char *data, int len, std::vector<unsigned char> &result)
namespace OpenVDS
{
bool error = false;
int
decode = 0;
// skip leading whitespace
while(len && isspace(*data)) len--, data++;
if(len == 0) { error = true; }
result.reserve(result.size() + len / 4 * 3);
while(len != 0 && !isspace(*data))
{
int a = Base64Table::decode(*data++); len--; if(a == -1) { error = true; break; }
if(len == 0 || isspace(*data)) { error = true; break; }
int b = Base64Table::decode(*data++); len--; if(b == -1) { error = true; break; }
result.push_back((a << 2) | (b >> 4));
if(len == 0 || isspace(*data)) break; if(*data == '=') { data++; len--; if(len==0 || *data++ != '=') { error = true; break; } len--; break; }
int c = Base64Table::decode(*data++); len--; if(c == -1) { error = true; break; }
result.push_back(((b & 0xf) << 4) | (c >> 2));
if(len == 0 || isspace(*data)) break; if(*data == '=') { data++; len--; break; }
int d = Base64Table::decode(*data++); len--; if(d == -1) { error = true; break; }
result.push_back(((c & 0x3) << 6) | d);
}
// skip trailing whitespace
while(len && isspace(*data)) len--, data++;
if(len != 0) { error = true; }
return !error;
}
static enum VolumeDataLayoutDescriptor::BrickSize convertToBrickSize(Json::Value const &jsonBrickSize)
{
......
......@@ -15,14 +15,163 @@
** limitations under the License.
****************************************************************************/
#define _USE_MATH_DEFINES
#include "VolumeDataAccessManagerImpl.h"
#include <OpenVDSHandle.h>
#include "VolumeDataPageAccessorImpl.h"
#include <cmath>
#include <algorithm>
#include <inttypes.h>
namespace OpenVDS
{
struct ParsedMetadata
{
ParsedMetadata()
: m_chunkHash(0)
, m_chunkSize(0)
{}
uint64_t m_chunkHash;
int32_t m_chunkSize;
std::vector<uint8_t> m_adaptiveLevels;
};
static ParsedMetadata parseMetadata(int metadataByteSize, unsigned char const *metadata)
{
ParsedMetadata parsedMetadata;
if (metadataByteSize == 4 + 24)
{
parsedMetadata.m_chunkSize = *reinterpret_cast<int32_t const *>(metadata);
parsedMetadata.m_chunkHash = *reinterpret_cast<uint64_t const *>(metadata + 4);
parsedMetadata.m_adaptiveLevels.resize(MetadataStatus::WAVELET_ADAPTIVE_LEVELS);
memcpy(parsedMetadata.m_adaptiveLevels.data(), metadata + 4 + 8, MetadataStatus::WAVELET_ADAPTIVE_LEVELS);
}
else if (metadataByteSize == 8)
{
parsedMetadata.m_chunkHash = *reinterpret_cast<uint64_t const *>(metadata);
}
else
{
fprintf(stderr, "%s%s\n", std::string(" Unsupported chunkMetadataByteSize: ").c_str(), std::to_string(metadataByteSize).c_str());
}
return parsedMetadata;
}
static bool isConstantChunkHash(uint64_t chunkHash)
{
const uint64_t unknownHash = 0;
const uint64_t noValueHash = ~0ULL;
const uint64_t constantHash = 0x01010101;
if (chunkHash == unknownHash || chunkHash == noValueHash || (chunkHash >> 32) == constantHash)
{
return true;
}
return false;
}
static int getEffectiveAdaptiveLoadLevel(float effectiveCompressionTolerance, float compressionTolerance)
{
// assert(effectiveCompressionTolerance >= adaptiveToleranceMin);
// assert(compressionTolerance >= adaptiveToleranceMin);
int adaptiveLoadLevel = (int)(log(effectiveCompressionTolerance / compressionTolerance) / M_LN2);
return std::max(0, adaptiveLoadLevel);
}
static int getEffectiveAdaptiveLevel(AdaptiveMode adaptiveMode, float desiredTolerance, float desiredRatio, float remoteTolerance, int64_t const adaptiveLevelSizes[MetadataStatus::WAVELET_ADAPTIVE_LEVELS], int64_t uncompressedSize)
{
if (adaptiveMode == AdaptiveMode_BestQuality)
{
return -1;
}
else if (adaptiveMode == AdaptiveMode_Ratio && desiredRatio <= 1.0f)
{
return 0;
}
int level = 0;
if (adaptiveMode == AdaptiveMode_Tolerance)
{
level = getEffectiveAdaptiveLoadLevel(desiredTolerance, remoteTolerance);
}
else if (adaptiveMode == AdaptiveMode_Ratio)
{
// Matches HueVolumeDataStoreVersion4_c::GetEffectiveAdaptiveLevel
while (level + 1 < MetadataStatus::WAVELET_ADAPTIVE_LEVELS)
{
if (adaptiveLevelSizes[level + 1] == 0 || ((float)uncompressedSize / (float)adaptiveLevelSizes[level + 1]) > desiredRatio)
{
break;
}
level++;
}
}
else
{
assert(0 && "Unknown compression mode");
}
return level;
}
static int waveletAdaptiveLevelsMetadataDecode(uint64_t totalSize, int targetLevel, uint8_t const *levels)
{
assert(targetLevel >= -1 && targetLevel < MetadataStatus::WAVELET_ADAPTIVE_LEVELS);
int remainingSize = (int)totalSize;
for (int level = 0; level <= targetLevel; level++)
{
if (levels[level] == 0)
{
break;
}
remainingSize = (int)((uint64_t)remainingSize * levels[level] / 255);
}
return remainingSize;
}
static IORange calculateRangeHeaderImpl(const ParsedMetadata& parsedMetadata, const MetadataStatus &metadataStatus, int * const adaptiveLevel)
{
if (isConstantChunkHash(parsedMetadata.m_chunkHash))
{
*adaptiveLevel = -1;
}
else if (parsedMetadata.m_adaptiveLevels.empty())
{
*adaptiveLevel = -1;
}
else
{
*adaptiveLevel = getEffectiveAdaptiveLevel(AdaptiveMode_BestQuality, 0.01f, 1.0f , metadataStatus.m_compressionTolerance, metadataStatus.m_adaptiveLevelSizes, metadataStatus.m_uncompressedSize);
int range = waveletAdaptiveLevelsMetadataDecode(parsedMetadata.m_chunkSize, *adaptiveLevel, parsedMetadata.m_adaptiveLevels.data());
if (range && range != parsedMetadata.m_chunkSize)
{
return {0 , size_t(range - 1)};
}
}
return IORange();
}
static VolumeDataLayer *getVolumeDataLayer(VolumeDataLayout const *layout, DimensionsND dimension, int channel, int lod, bool isAllowFailure)
{
if(!layout)
......@@ -62,6 +211,7 @@ static VolumeDataLayer *getVolumeDataLayer(VolumeDataLayout const *layout, Dimen
VolumeDataAccessManagerImpl::VolumeDataAccessManagerImpl(VDSHandle* handle)
: m_layout(handle->volumeDataLayout.get())
, m_ioManager(handle->ioManager.get())
, m_layerMetadataContainer(&handle->layerMetadataContainer)
{
}
......@@ -267,4 +417,151 @@ int64_t VolumeDataAccessManagerImpl::prefetchVolumeChunk(VolumeDataLayout const*
{
return int64_t(0);
}
static MetadataManager *getMetadataMangerForLayer(LayerMetadataContainer *container, const std::string &layer)
{
std::unique_lock<std::mutex> lock(container->mutex);
MetadataManager *metadataManager = nullptr;
auto metadataManagerIterator = container->managers.find(layer);
if(metadataManagerIterator != container->managers.end())
{
metadataManager = metadataManagerIterator->second.get();
}
return metadataManager;
}
static std::string makeURLForChunk(const std::string &layerUrl, uint64_t chunk)
{
char url[1000];
snprintf(url, sizeof(url), "%s/%" PRIu64, layerUrl.c_str(), (long long)chunk);
return std::string(url);
}
bool VolumeDataAccessManagerImpl::prepareReadChunkData(const VolumeDataChunk &chunk, std::vector<uint8_t> &blob, int32_t (&pitch)[Dimensionality_Max], bool verbose, Error &error)
{
blob.clear();
// This can probably be improved by looking up the data directly in the cache and not requesting it if it's valid,
// similar to the VolumeSamples code
for(auto &p : pitch)
p = 0;
int32_t channel = chunk.layer->getChannelIndex();
const char *channelName = channel > 0 ? chunk.layer->getLayout()->getChannelName(chunk.layer->getChannelIndex()) : "";
int32_t lod = chunk.layer->getLod();
const char *dimensions_string = DimensionGroupUtil::getDimensionsGroupString(DimensionGroupUtil::getDimensionsNDFromDimensionGroup(chunk.layer->getChunkDimensionGroup()));
char layerURL[1000