Commit b31b40d1 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Serialize data without compression and Deflate

parent 4376ec82
......@@ -61,10 +61,10 @@ bool initializeDataBlock(VolumeDataChannelDescriptor::Format format, VolumeDataC
return false;
}
dataBlock.allocatedSize[0] = (format == VolumeDataChannelDescriptor::Format_1Bit) ? ((dataBlock.size[0] * components) + 7) / 8 : dataBlock.size[0];
dataBlock.allocatedSize[1] = dataBlock.size[1];
dataBlock.allocatedSize[2] = dataBlock.size[2];
dataBlock.allocatedSize[3] = dataBlock.size[3];
dataBlock.allocatedSize[0] = (format == VolumeDataChannelDescriptor::Format_1Bit) ? ((dataBlock.size[0] * components) + 7) / 8 : getAllocatedByteSizeForSize(dataBlock.size[0]);
dataBlock.allocatedSize[1] = getAllocatedByteSizeForSize(dataBlock.size[1]);
dataBlock.allocatedSize[2] = getAllocatedByteSizeForSize(dataBlock.size[2]);
dataBlock.allocatedSize[3] = getAllocatedByteSizeForSize(dataBlock.size[3]);
dataBlock.pitch[0] = 1;
for (int i = 1; i < DataStoreDimensionality_Max; i++)
{
......
......@@ -126,16 +126,21 @@ static uint32_t getByteSize(const int32_t (&size)[DataStoreDimensionality_Max],
return byteSize;
}
static uint32_t getByteSize(const DataBlock &block)
inline uint32_t getByteSize(const DataBlock &block)
{
return getByteSize(block.size, block.format, block.components);
}
static uint32_t getAllocatedByteSize(const DataBlock &block)
inline uint32_t getAllocatedByteSize(const DataBlock &block)
{
return getByteSize(block.allocatedSize, block.format, block.components, false);
}
inline int32_t getAllocatedByteSizeForSize(const int32_t size)
{
return (size + 7) & -8;
}
}
#endif //DATABLOCK_H
\ No newline at end of file
......@@ -241,13 +241,13 @@ public:
template<typename T1, typename T2, bool isUseNoValue>
class QuantizingValueConverterWithNoValue
{
QuantizedTypesToFloatConverter<T2, isUseNoValue> m_quantizedTypesToFloatConverter;
float m_integerOffset;
float m_reciprocalScale;
T2 m_noValue;
T1 m_replacementNoValue;
QuantizedTypesToFloatConverter<T2, isUseNoValue> m_quantizedTypesToFloatConverter;
public:
QuantizingValueConverterWithNoValue()
......
......@@ -23,6 +23,7 @@
#include "ValueConversion.h"
#include "VolumeSampler.h"
#include "ParseVDSJson.h"
#include "VolumeDataStore.h"
#include <cmath>
#include <algorithm>
......@@ -479,14 +480,24 @@ static int64_t createUploadJobId()
return --id;
}
int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk& chunk, std::shared_ptr<std::vector<uint8_t>> data)
int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk &chunk, const DataBlock &dataBlock, const std::vector<uint8_t> &data)
{
Error error;
std::string layerName = getLayerName(*chunk.layer);
std::string url = createUrlForChunk(layerName, chunk.chunkIndex);
std::shared_ptr<std::vector<uint8_t>> to_write = std::make_shared<std::vector<uint8_t>>();
uint64_t hash;
if (!VolumeDataStore::serializeVolumeData(chunk, dataBlock, data, CompressionMethod::Zip, *to_write, hash, error))
{
std::unique_lock<std::mutex> lock(m_mutex);
m_uploadErrors.emplace_back(new UploadError(error, url));
return 0;
}
auto metadataManager = getMetadataMangerForLayer(m_handle.layerMetadataContainer, layerName);
MetadataPage* lockedMetadataPage = nullptr;
std::string url = createUrlForChunk(layerName, chunk.chunkIndex);
int64_t jobId = createUploadJobId();
......@@ -504,7 +515,7 @@ int64_t VolumeDataAccessManagerImpl::requestWriteChunk(const VolumeDataChunk& ch
// add new pending upload request
std::unique_lock<std::mutex> lock(m_mutex);
m_pendingUploadRequests[jobId] = PendingUploadRequest(m_ioManager->uploadObject(url, data, completedCallback), lockedMetadataPage);
m_pendingUploadRequests[jobId] = PendingUploadRequest(m_ioManager->uploadObject(url, to_write, completedCallback), lockedMetadataPage);
return jobId;
}
......
......@@ -207,8 +207,8 @@ public:
bool readChunk(const VolumeDataChunk& chunk, std::vector<uint8_t>& serializedData, std::vector<uint8_t>& metadata, CompressionInfo& compressionInfo, Error& error);
void pageTransferCompleted(MetadataPage* page);
int64_t requestWriteChunk(const VolumeDataChunk &chunk, std::shared_ptr<std::vector<uint8_t>> data);
int64_t requestWriteChunk(const VolumeDataChunk &chunk, const DataBlock &dataBlock, const std::vector<uint8_t> &data);
IOManager *getIoManager() const { return m_ioManager; }
void flushUploadQueue() override;
......
......@@ -173,7 +173,7 @@ VolumeDataPage* VolumeDataPageAccessorImpl::createPage(int64_t chunk)
}
pageMutexLocker.lock();
page->setBufferData(std::move(page_data), pitch);
page->setBufferData(dataBlock, pitch, std::move(page_data));
page->makeDirty();
m_pageReadCondition.notify_all();
......@@ -290,7 +290,7 @@ VolumeDataPage* VolumeDataPageAccessorImpl::readPage(int64_t chunk)
}
pageMutexLocker.lock();
page->setBufferData(std::move(page_data), pitch);
page->setBufferData(dataBlock, pitch, std::move(page_data));
m_pagesRead++;
m_pageReadCondition.notify_all();
......@@ -395,9 +395,9 @@ void VolumeDataPageAccessorImpl::limitPageListSize(int maxPages, std::unique_loc
}
}
int64_t VolumeDataPageAccessorImpl::requestWritePage(int64_t chunk, std::shared_ptr<std::vector<uint8_t>> data)
int64_t VolumeDataPageAccessorImpl::requestWritePage(int64_t chunk, const DataBlock& dataBlock, const std::vector<uint8_t>& data)
{
return m_accessManager->requestWriteChunk({ m_layer, chunk }, data);
return m_accessManager->requestWriteChunk({ m_layer, chunk }, dataBlock, data);
}
/////////////////////////////////////////////////////////////////////////////
// Commit
......
......@@ -31,6 +31,7 @@ namespace OpenVDS
class VolumeDataPageImpl;
class VolumeDataLayer;
class VolumeDataAccessManagerImpl;
struct DataBlock;
class VolumeDataPageAccessorImpl : public VolumeDataPageAccessor
{
......@@ -80,7 +81,7 @@ public:
VolumeDataPage *createPage(int64_t chunk) override;
VolumeDataPage *readPage(int64_t chunk) override;
int64_t requestWritePage(int64_t chunk, std::shared_ptr<std::vector<uint8_t>> data);
int64_t requestWritePage(int64_t chunk, const DataBlock &dataBlock, const std::vector<uint8_t> &data);
void commit() override;
......
......@@ -162,29 +162,19 @@ void VolumeDataPageImpl::makeDirty()
m_isDirty = true;
}
void VolumeDataPageImpl::setBufferData(std::vector<uint8_t>&& blob, const int(&pitch)[Dimensionality_Max])
void VolumeDataPageImpl::setBufferData(const DataBlock &dataBlock, int32_t (&pitch)[Dimensionality_Max], std::vector<uint8_t>&& blob)
{
//assert(m_volumeDataPageAccessor->m_pageListMutex.isLockedByCurrentThread());
m_dataBlock = dataBlock;
static_assert(sizeof(pitch) == sizeof(m_pitch), "Pitch of different size");
memcpy(m_pitch, pitch, sizeof(m_pitch));
m_blob = std::move(blob);
for(int32_t iDimension = 0; iDimension < Dimensionality_Max; iDimension++)
{
m_pitch[iDimension] = pitch[iDimension];
}
}
void VolumeDataPageImpl::writeBack(VolumeDataLayer* volumeDataLayer, std::unique_lock<std::mutex>& pageListMutexLock)
{
assert(m_isDirty);
Error error;
std::shared_ptr<std::vector<uint8_t>> to_write = std::make_shared<std::vector<uint8_t>>();
if (!VolumeDataStore::serialize({volumeDataLayer, m_chunk}, m_blob, CompressionMethod::None, *to_write, error))
{
m_volumeDataPageAccessor->getManager()->addUploadError(error, volumeDataLayer, m_chunk);
return;
}
m_volumeDataPageAccessor->requestWritePage(m_chunk, to_write);
m_volumeDataPageAccessor->requestWritePage(m_chunk, m_dataBlock, m_blob);
m_isDirty = false;
}
......
......@@ -20,6 +20,8 @@
#include <OpenVDS/VolumeDataAccess.h>
#include "DataBlock.h"
#include <mutex>
#include <vector>
......@@ -29,11 +31,13 @@ class VolumeDataLayer;
class VolumeDataPageAccessorImpl;
class VolumeDataPageImpl : public VolumeDataPage
{
private:
public:
VolumeDataPageAccessorImpl * m_volumeDataPageAccessor;
int64_t m_chunk;
DataBlock m_dataBlock;
int32_t m_pitch[Dimensionality_Max];
std::vector<uint8_t> m_blob;
int32_t m_pins;
......@@ -42,8 +46,6 @@ private:
bool m_isDirty;
bool m_requestPrepared;
int32_t m_pitch[Dimensionality_Max];
int32_t m_writtenMin[Dimensionality_Max];
int32_t m_writtenMax[Dimensionality_Max];
......@@ -67,7 +69,7 @@ public:
bool isWritten();
void makeDirty();
void setBufferData(std::vector<uint8_t> &&blob, const int (&pitch)[Dimensionality_Max]);
void setBufferData(const DataBlock& dataBlock, int32_t(&pitch)[Dimensionality_Max], std::vector<uint8_t>&& blob);
void writeBack(VolumeDataLayer *volumeDataLayer, std::unique_lock<std::mutex> &pageListMutexLock);
void * getBufferInternal(int (&anPitch)[Dimensionality_Max], bool isReadWrite);
bool isCopyMarginNeeded(VolumeDataPageImpl *targetPage);
......
......@@ -127,6 +127,125 @@ static void copyLinearBufferIntoDataBlock(const void *sourceBuffer, const DataBl
}
}
static bool copyDataBlockIntoLinearBuffer(const DataBlock &dataBlock, const void *sourceBuffer, void *targetBuffer, int32_t bufferSize)
{
int32_t size[DataStoreDimensionality_Max];
memcpy(size, dataBlock.size, sizeof(size));
int32_t allocatedSize[DataStoreDimensionality_Max];
memcpy(allocatedSize, dataBlock.allocatedSize, sizeof(allocatedSize));
int32_t elementSize = int32_t(getElementSize(dataBlock));
if(dataBlock.format == VolumeDataChannelDescriptor::Format_1Bit)
{
size[0] = ((size[0] * dataBlock.components) + 7) / 8;
}
// Check if first row is constant
bool isConstant = true;
switch(dataBlock.format)
{
default:
fprintf(stderr, "Illegal format\n");
case VolumeDataChannelDescriptor::Format_1Bit:
isConstant = (reinterpret_cast<const uint8_t*>(sourceBuffer)[0] == 0x00 || reinterpret_cast<const uint8_t*>(sourceBuffer)[0] == 0xff);
// Fall through
case VolumeDataChannelDescriptor::Format_U8:
for(int32_t iX = 1; isConstant && iX < size[0]; iX++)
{
isConstant = reinterpret_cast<const uint8_t *>(sourceBuffer)[0] == reinterpret_cast<const uint8_t *>(sourceBuffer)[iX];
}
break;
case VolumeDataChannelDescriptor::Format_U16:
for(int32_t iX = 1; isConstant && iX < size[0]; iX++)
{
isConstant = reinterpret_cast<const uint16_t *>(sourceBuffer)[0] == reinterpret_cast<const uint16_t *>(sourceBuffer)[iX];
}
break;
case VolumeDataChannelDescriptor::Format_R32:
case VolumeDataChannelDescriptor::Format_U32:
for(int32_t iX = 1; isConstant && iX < size[0]; iX++)
{
isConstant = reinterpret_cast<const uint32_t *>(sourceBuffer)[0] == reinterpret_cast<const uint32_t *>(sourceBuffer)[iX];
}
break;
case VolumeDataChannelDescriptor::Format_U64:
case VolumeDataChannelDescriptor::Format_R64:
for(int32_t iX = 1; isConstant && iX < size[0]; iX++)
{
isConstant = reinterpret_cast<const uint64_t *>(sourceBuffer)[0] == reinterpret_cast<const uint64_t *>(sourceBuffer)[iX];
}
break;
}
assert(bufferSize >= size[0] * size[1] * size[2] * elementSize);
for(int32_t iZ = 0; iZ < size[2]; iZ++)
{
for(int32_t iY = 0; iY < size[1]; iY++)
{
uint8_t *puSource = (uint8_t *)sourceBuffer;
uint8_t *puTarget = (uint8_t *)targetBuffer;
puSource += (iZ * allocatedSize[1] + iY) * allocatedSize[0] * elementSize;
puTarget += (iZ * size[1] + iY) * size[0] * elementSize;
if(isConstant)
{
isConstant = (memcmp(sourceBuffer, puSource, size[0] * elementSize) == 0);
}
memcpy(puTarget, puSource, size[0] * elementSize);
}
}
return isConstant;
}
template<typename T>
static uint64_t getConstantValueVolumeDataHash(T value, const Range<float> &valueRange, float integerScale, float integerOffset, bool isUseNoValue, float noValue)
{
if (isUseNoValue)
{
if(value == convertNoValue<T>(noValue))
{
return VolumeDataHash::NOVALUE;
}
else
{
QuantizingValueConverterWithNoValue<float, T, true> converter(valueRange.min, valueRange.max, integerScale, integerOffset, noValue, noValue, false);
return VolumeDataHash(converter.convertValue(value)).calculateHash();
}
}
else
{
QuantizingValueConverterWithNoValue<float, T, false> converter(valueRange.min, valueRange.max, integerScale, integerOffset, noValue, noValue, false);
return VolumeDataHash(converter.convertValue(value)).calculateHash();
}
}
static uint64_t getConstantValueVolumeDataHash(const DataBlock &dataBlock, const uint8_t *sourceBuffer, const Range<float> &valueRange, float integerScale, float integerOffset, bool isUseNoValue, float noValue)
{
switch(dataBlock.format)
{
case VolumeDataChannelDescriptor::Format_1Bit:
assert(reinterpret_cast<const uint8_t *>(sourceBuffer)[0] == 0x00 || reinterpret_cast<const uint8_t *>(sourceBuffer)[0] == 0xff);
return VolumeDataHash((float)(reinterpret_cast<const uint8_t *>(sourceBuffer)[0] != 0)).calculateHash();
case VolumeDataChannelDescriptor::Format_U8: return getConstantValueVolumeDataHash(reinterpret_cast<const uint8_t *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
case VolumeDataChannelDescriptor::Format_U16: return getConstantValueVolumeDataHash(reinterpret_cast<const uint16_t *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
case VolumeDataChannelDescriptor::Format_R32: return getConstantValueVolumeDataHash(reinterpret_cast<const float *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
case VolumeDataChannelDescriptor::Format_U32: return getConstantValueVolumeDataHash(reinterpret_cast<const uint32_t *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
case VolumeDataChannelDescriptor::Format_R64: return getConstantValueVolumeDataHash(reinterpret_cast<const double *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
case VolumeDataChannelDescriptor::Format_U64: return getConstantValueVolumeDataHash(reinterpret_cast<const uint64_t *>(sourceBuffer)[0], valueRange, integerScale, integerOffset, isUseNoValue, noValue);
default:
assert(0 && "Unknown format");
return VolumeDataHash::UNKNOWN;
}
}
bool deserializeVolumeData(const std::vector<uint8_t> &serializedData, VolumeDataChannelDescriptor::Format format, CompressionMethod compressionMethod, const FloatRange &valueRange, float integerScale, float integerOffset, bool isUseNoValue, float noValue, int32_t adaptiveLevel, DataBlock &dataBlock, std::vector<uint8_t> &destination, Error &error)
{
if(compressionMethodIsWavelet(compressionMethod))
......@@ -412,44 +531,73 @@ bool VolumeDataStore::deserializeVolumeData(const VolumeDataChunk &volumeDataChu
return true;
}
bool VolumeDataStore::serialize(const VolumeDataChunk& chunk, const std::vector<uint8_t>& chunkData, CompressionMethod compressionMethod, std::vector<uint8_t>& destinationBuffer, Error& error)
bool VolumeDataStore::serializeVolumeData(const VolumeDataChunk& chunk, const DataBlock& dataBlock, const std::vector<uint8_t>& chunkData, CompressionMethod compressionMethod, std::vector<uint8_t>& destinationBuffer, uint64_t& outputHash, Error& error)
{
destinationBuffer = chunkData;
// DataBlockDescriptor dataBlockDescriptor;
//
// if(!dataBlockDescriptor->isValid())
// {
// error.code = -1;
// error.string = "Failed to decode DataBlockDescriptor";
// return false;
// }
//
// if (!initializeDataBlock(*dataBlockDescriptor, dataBlock, error))
// return false;
//
// void * source = dataBlockDescriptor + 1;
//
// int32_t byteSize = getByteSize(*dataBlockDescriptor);
// std::unique_ptr<uint8_t[]>buffer(new uint8_t[byteSize]);
//
// int32_t decompressedSize = rle_Decompress((uint8_t *)buffer.get(), byteSize, (uint8_t *)source);
// assert(decompressedSize == byteSize);
//
// int allocatedSize = getAllocatedByteSize(dataBlock);
// destination.resize(allocatedSize);
// copyLinearBufferIntoDataBlock(buffer.get(), dataBlock, destination);
// switch(compressionMethod)
// {
// case CompressionMethod::None:
// destinationBuffer = layerData;
// return true;
// case CompressionMethod::Zip:
// return true;
// default:
// error.code = -20;
// error.string = "Not implemented compresssion algorithm";
// return false;
// }
DataBlockDescriptor dataBlockHeader;
dataBlockHeader.components = dataBlock.components;
dataBlockHeader.dimensionality = dataBlock.dimensionality;
dataBlockHeader.format = dataBlock.format;
dataBlockHeader.sizeX = dataBlock.size[0];
dataBlockHeader.sizeY = dataBlock.size[1];
dataBlockHeader.sizeZ = dataBlock.size[2];
switch (compressionMethod)
{
case CompressionMethod::None:
{
int32_t byteSize = getByteSize(dataBlock) + sizeof(DataBlockDescriptor);
destinationBuffer.resize(byteSize);
void *targetBuffer = destinationBuffer.data();
memcpy(targetBuffer, &dataBlockHeader, sizeof(dataBlockHeader));
targetBuffer = ((uint8_t *)targetBuffer) + sizeof(dataBlockHeader);
bool isConstant = copyDataBlockIntoLinearBuffer(dataBlock, chunkData.data(), targetBuffer, byteSize - sizeof(dataBlockHeader));
if(isConstant)
{
auto& layer = *chunk.layer;
outputHash = getConstantValueVolumeDataHash(dataBlock, (const uint8_t *) targetBuffer, layer.getValueRange(), layer.getIntegerScale(), layer.getIntegerOffset(), layer.isUseNoValue(), layer.getNoValue());
return true;
}
break;
}
case CompressionMethod::Zip:
{
uint32_t tmpbuffersize = getByteSize(dataBlock);
std::unique_ptr<uint8_t[]> tmpdata(new uint8_t[tmpbuffersize]);
bool isConstant = copyDataBlockIntoLinearBuffer(dataBlock, chunkData.data(), tmpdata.get(), tmpbuffersize);
if (isConstant)
{
auto& layer = *chunk.layer;
outputHash = getConstantValueVolumeDataHash(dataBlock, tmpdata.get(), layer.getValueRange(), layer.getIntegerScale(), layer.getIntegerOffset(), layer.isUseNoValue(), layer.getNoValue());
return true;
}
unsigned long compressedMaxSize = compressBound(tmpbuffersize);
destinationBuffer.resize(compressedMaxSize + sizeof(dataBlockHeader));
void *targetBuffer = destinationBuffer.data();
memcpy(targetBuffer, &dataBlockHeader, sizeof(dataBlockHeader));
targetBuffer = ((uint8_t *)targetBuffer) + sizeof(dataBlockHeader);
unsigned long compressedSize = 0;
int status = compress((uint8_t *)targetBuffer, &compressedSize, tmpdata.get(), tmpbuffersize);
destinationBuffer.resize(compressedSize + sizeof(dataBlockHeader));
if (status != Z_OK)
{
error.code = -100;
error.string = "zlib compression failed";
return false;
}
break;
}
default:
error.code = -2;
error.string = "Invalid compression method specified when serializing a VolumeDataChunk";
return false;
}
outputHash = VolumeDataHash::UNKNOWN;
return true;
}
......
......@@ -36,7 +36,7 @@ struct VolumeDataStore
static bool verify(const VolumeDataChunk& volumeDataChunk, const std::vector<uint8_t>& serializedData, CompressionMethod compressionMethod, bool isFullyRead);
static bool deserializeVolumeData(const VolumeDataChunk& chunk, const std::vector<uint8_t>& serializedData, const std::vector<uint8_t>& metadata, CompressionMethod compressionMethod, int32_t adaptiveLevel, VolumeDataChannelDescriptor::Format loadFormat, DataBlock &dataBlock, std::vector<uint8_t>& target, Error& error);
static bool createConstantValueDataBlock(VolumeDataChunk const &volumeDataChunk, VolumeDataChannelDescriptor::Format format, float noValue, VolumeDataChannelDescriptor::Components components, VolumeDataHash const &constantValueVolumeDataHash, DataBlock &dataBlock, std::vector<uint8_t> &buffer, Error &error);
static bool serialize(const VolumeDataChunk &chunk, const std::vector<uint8_t> &chunkData, CompressionMethod compressionMethod, std::vector<uint8_t> &destinationBuffer, Error &error);
static bool serializeVolumeData(const VolumeDataChunk& chunk, const DataBlock &dataBlock, const std::vector<uint8_t>& chunkData, CompressionMethod compressionMethod, std::vector<uint8_t>& destinationBuffer, uint64_t &outputHash, Error& error);
};
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment