From 4888372f5de2a409433ef9eaf85a98271276d492 Mon Sep 17 00:00:00 2001 From: "morten.ofstad" Date: Wed, 10 Mar 2021 17:53:16 +0100 Subject: [PATCH] Implemented accumulation of uncompressed and adaptive sizes when writing data. --- src/OpenVDS/VDS/MetadataManager.cpp | 25 ++++++- src/OpenVDS/VDS/MetadataManager.h | 3 +- src/OpenVDS/VDS/VolumeDataStoreIOManager.cpp | 69 ++++++++++++++++++-- src/OpenVDS/VDS/VolumeDataStoreVDSFile.cpp | 25 +++++++ src/OpenVDS/VDS/WaveletTypes.h | 46 +++++++++++++ 5 files changed, 160 insertions(+), 8 deletions(-) diff --git a/src/OpenVDS/VDS/MetadataManager.cpp b/src/OpenVDS/VDS/MetadataManager.cpp index c7f9acae..02c02a47 100644 --- a/src/OpenVDS/VDS/MetadataManager.cpp +++ b/src/OpenVDS/VDS/MetadataManager.cpp @@ -19,6 +19,7 @@ #include #include "VolumeDataStoreIOManager.h" +#include "WaveletTypes.h" #include #include #include @@ -202,7 +203,7 @@ uint8_t const *MetadataManager::GetPageEntry(MetadataPage *page, int entryIndex) return &page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize]; } -void MetadataManager::SetPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength) +void MetadataManager::SetPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength, uint8_t *oldMetadata) { std::unique_lock lock(m_mutex); @@ -217,6 +218,10 @@ void MetadataManager::SetPageEntry(MetadataPage *page, int entryIndex, uint8_t c m_dirtyPageList.splice(m_dirtyPageList.end(), m_pageList, pageMapIterator->second); } + if(oldMetadata) + { + memcpy(oldMetadata, &page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize], metadataLength); + } //std::copy(metadata, metadata + metadataLength, &page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize]); memcpy(&page->m_data[entryIndex * m_metadataStatus.m_chunkMetadataByteSize], metadata, metadataLength); } @@ -252,4 +257,20 @@ void MetadataManager::UnlockPage(MetadataPage *page) LimitPages(); } -} +void MetadataManager::UpdateMetadataStatus(int64_t uncompressedSize, int serializedSize, bool subtract, const uint8_t (&targetLevels)[WAVELET_ADAPTIVE_LEVELS]) +{ + std::unique_lock lock(m_mutex); + + if(subtract) + { + m_metadataStatus.m_uncompressedSize -= uncompressedSize; + } + else + { + m_metadataStatus.m_uncompressedSize += uncompressedSize; + } + + Wavelet_AccumulateAdaptiveLevelSizes(serializedSize, m_metadataStatus.m_adaptiveLevelSizes, subtract, targetLevels); +} + +} diff --git a/src/OpenVDS/VDS/MetadataManager.h b/src/OpenVDS/VDS/MetadataManager.h index 648634f7..a068cabe 100644 --- a/src/OpenVDS/VDS/MetadataManager.h +++ b/src/OpenVDS/VDS/MetadataManager.h @@ -127,11 +127,12 @@ namespace OpenVDS void UploadDirtyPages(VolumeDataStoreIOManager* accessManager); uint8_t const *GetPageEntry(MetadataPage *page, int entry) const; - void SetPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength); + void SetPageEntry(MetadataPage *page, int entryIndex, uint8_t const *metadata, int metadataLength, uint8_t *oldMetadata = nullptr); void UnlockPage(MetadataPage *page); MetadataStatus const &GetMetadataStatus() const { return m_metadataStatus; } + void UpdateMetadataStatus(int64_t uncompressedSize, int serializedSize, bool subtract, const uint8_t (&targetLevels)[WAVELET_ADAPTIVE_LEVELS]); }; } diff --git a/src/OpenVDS/VDS/VolumeDataStoreIOManager.cpp b/src/OpenVDS/VDS/VolumeDataStoreIOManager.cpp index 08602ce4..12d3f178 100644 --- a/src/OpenVDS/VDS/VolumeDataStoreIOManager.cpp +++ b/src/OpenVDS/VDS/VolumeDataStoreIOManager.cpp @@ -17,6 +17,8 @@ #define _USE_MATH_DEFINES +#include + #include "VolumeDataStoreIOManager.h" #include "VDS.h" @@ -267,9 +269,18 @@ VolumeDataStoreIOManager::AddLayer(VolumeDataLayer* volumeDataLayer, int chunkMe { assert(chunkMetadataPageSize > 0); MetadataStatus metadataStatus = {}; + + int + chunkMetadataByteSize = int(sizeof(VDSChunkMetadata)); + + if(CompressionMethod_IsWavelet(volumeDataLayer->GetEffectiveCompressionMethod())) + { + chunkMetadataByteSize = int(sizeof(uint32_t) + sizeof(VDSWaveletAdaptiveLevelsChunkMetadata)); + } + metadataStatus.m_chunkIndexCount = (int)volumeDataLayer->GetTotalChunkCount(); metadataStatus.m_chunkMetadataPageSize = chunkMetadataPageSize; - metadataStatus.m_chunkMetadataByteSize = sizeof(int64_t); + metadataStatus.m_chunkMetadataByteSize = chunkMetadataByteSize; metadataStatus.m_compressionMethod = volumeDataLayer->GetEffectiveCompressionMethod(); metadataStatus.m_compressionTolerance = volumeDataLayer->GetEffectiveCompressionTolerance(); @@ -641,6 +652,7 @@ CompressionInfo VolumeDataStoreIOManager::GetEffectiveAdaptiveLevel(VolumeDataLa float compressionTolerance = 0.0f; + compressionTolerance = 0.0f; int adaptiveLevel = (waveletAdaptiveMode == WaveletAdaptiveMode::BestQuality) ? -1 : 0; @@ -681,9 +693,28 @@ bool VolumeDataStoreIOManager::WriteChunk(const VolumeDataChunk& chunk, const st std::shared_ptr> to_write = std::make_shared>(serializedData); auto metadataManager = GetMetadataMangerForLayer(layerName); + MetadataStatus metadataStatus = metadataManager->GetMetadataStatus(); + + int pageIndex = (int)(chunk.index / metadataStatus.m_chunkMetadataPageSize); + int entryIndex = (int)(chunk.index % metadataStatus.m_chunkMetadataPageSize); - int pageIndex = (int)(chunk.index / metadataManager->GetMetadataStatus().m_chunkMetadataPageSize); - int entryIndex = (int)(chunk.index % metadataManager->GetMetadataStatus().m_chunkMetadataPageSize); + std::vector indexEntry(metadataStatus.m_chunkMetadataByteSize); + + // If adaptive wavelet, we store the size of the serialized chunk in front of the chunk metadata + if(metadataStatus.m_chunkMetadataByteSize == sizeof(uint32_t) + sizeof(VDSWaveletAdaptiveLevelsChunkMetadata)) + { + uint32_t + serializedSize = (uint32_t)serializedData.size(); + indexEntry[0] = (serializedSize >> 0) & 0xff; + indexEntry[1] = (serializedSize >> 8) & 0xff; + indexEntry[2] = (serializedSize >> 16) & 0xff; + indexEntry[3] = (serializedSize >> 24) & 0xff; + std::copy(metadata.begin(), metadata.end(), indexEntry.begin() + 4); + } + else + { + indexEntry = metadata; + } bool initiateTransfer; @@ -697,7 +728,7 @@ bool VolumeDataStoreIOManager::WriteChunk(const VolumeDataChunk& chunk, const st int64_t jobId = CreateUploadJobId(); - auto completedCallback = [this, metadata, metadataManager, lockedMetadataPage, entryIndex, jobId](const Request &request, const Error &error) + auto completedCallback = [this, chunk, indexEntry, metadataManager, lockedMetadataPage, entryIndex, jobId](const Request &request, const Error &error) { std::unique_lock lock(m_mutex); @@ -716,7 +747,35 @@ bool VolumeDataStoreIOManager::WriteChunk(const VolumeDataChunk& chunk, const st } else { - metadataManager->SetPageEntry(lockedMetadataPage, entryIndex, metadata.data(), (int)metadata.size()); + //Update MetadataStatus + MetadataStatus metadataStatus = metadataManager->GetMetadataStatus(); + + std::vector oldIndexEntry(metadataStatus.m_chunkMetadataByteSize); + metadataManager->SetPageEntry(lockedMetadataPage, entryIndex, indexEntry.data(), (int)indexEntry.size(), oldIndexEntry.data()); + + if(metadataStatus.m_chunkMetadataByteSize == sizeof(uint32_t) + sizeof(VDSWaveletAdaptiveLevelsChunkMetadata)) + { + int size[DataBlock::Dimensionality_Max]; + chunk.layer->GetChunkVoxelSize(chunk.index, size); + int64_t uncompressedSize = GetByteSize(size, chunk.layer->GetFormat(), chunk.layer->GetComponents()); + + // If adaptive wavelet, we store the size of the serialized chunk in front of the chunk metadata + uint32_t const &newSize = *reinterpret_cast(indexEntry.data()); + VDSWaveletAdaptiveLevelsChunkMetadata const &newMetadata = *reinterpret_cast(indexEntry.data() + sizeof(uint32_t)); + uint32_t const &oldSize = *reinterpret_cast(oldIndexEntry.data()); + VDSWaveletAdaptiveLevelsChunkMetadata const &oldMetadata = *reinterpret_cast(oldIndexEntry.data() + sizeof(uint32_t)); + + if (newMetadata.m_hash != VolumeDataHash::UNKNOWN && !VolumeDataHash(newMetadata.m_hash).IsConstant()) + { + metadataManager->UpdateMetadataStatus(uncompressedSize, newSize, false, newMetadata.m_levels); + } + + if (oldMetadata.m_hash != VolumeDataHash::UNKNOWN && !VolumeDataHash(oldMetadata.m_hash).IsConstant()) + { + metadataManager->UpdateMetadataStatus(uncompressedSize, oldSize, true, oldMetadata.m_levels); + } + } + } m_vds.volumeDataLayout->ChangePendingWriteRequestCount(-1); diff --git a/src/OpenVDS/VDS/VolumeDataStoreVDSFile.cpp b/src/OpenVDS/VDS/VolumeDataStoreVDSFile.cpp index 4ba84493..5c407acd 100644 --- a/src/OpenVDS/VDS/VolumeDataStoreVDSFile.cpp +++ b/src/OpenVDS/VDS/VolumeDataStoreVDSFile.cpp @@ -211,6 +211,31 @@ bool VolumeDataStoreVDSFile::WriteChunk(const VolumeDataChunk& chunk, const std: layerFile->layerMetadata.m_validChunkCount--; } + if(layerFile->layerChunksWaveletAdaptive) + { + int size[DataBlock::Dimensionality_Max]; + chunk.layer->GetChunkVoxelSize(chunk.index, size); + int64_t uncompressedSize = GetByteSize(size, chunk.layer->GetFormat(), chunk.layer->GetComponents()); + + if (newMetadata.m_hash != VolumeDataHash::UNKNOWN && !VolumeDataHash(newMetadata.m_hash).IsConstant()) + { + if(layerFile->layerChunksWaveletAdaptive) + { + layerFile->layerMetadata.m_uncompressedSize += uncompressedSize; + Wavelet_AccumulateAdaptiveLevelSizes(indexEntry.m_length, layerFile->layerMetadata.m_adaptiveLevelSizes, false, newMetadata.m_levels); + } + } + + if (oldMetadata.m_hash != VolumeDataHash::UNKNOWN && !VolumeDataHash(oldMetadata.m_hash).IsConstant()) + { + if(layerFile->layerChunksWaveletAdaptive) + { + layerFile->layerMetadata.m_uncompressedSize -= uncompressedSize; + Wavelet_AccumulateAdaptiveLevelSizes(oldSize, layerFile->layerMetadata.m_adaptiveLevelSizes, true, oldMetadata.m_levels); + } + } + } + assert(layerFile->layerMetadata.m_validChunkCount <= layerFile->fileInterface->GetChunkCount()); return true; } diff --git a/src/OpenVDS/VDS/WaveletTypes.h b/src/OpenVDS/VDS/WaveletTypes.h index 8c189b01..8a02acf8 100644 --- a/src/OpenVDS/VDS/WaveletTypes.h +++ b/src/OpenVDS/VDS/WaveletTypes.h @@ -73,6 +73,52 @@ inline int Wavelet_GetEffectiveWaveletAdaptiveLoadLevel(float desiredRatio, int6 return level; } +inline void Wavelet_AccumulateAdaptiveLevelSizes(int32_t totalSize, int64_t (&adaptiveLevelSizes)[WAVELET_ADAPTIVE_LEVELS], bool subtract, const uint8_t (&targetLevels)[WAVELET_ADAPTIVE_LEVELS]) +{ + int32_t remainingSize = totalSize; + + for(int level = 0; level < WAVELET_ADAPTIVE_LEVELS; level++) + { + if(targetLevels[level] == 0) + { + break; + } + assert(remainingSize >= 0); + + remainingSize = (int32_t)((uint64_t)remainingSize * targetLevels[level] / 255); + + if(!subtract) + { + adaptiveLevelSizes[level] += remainingSize; + } + else + { + adaptiveLevelSizes[level] -= remainingSize; + } + assert(adaptiveLevelSizes[level] >= 0); + } +} + +inline void Wavelet_EncodeAdaptiveLevelsMetadata(int32_t totalSize, int const (&adaptiveLevels)[WAVELET_ADAPTIVE_LEVELS], uint8_t (&targetLevels)[WAVELET_ADAPTIVE_LEVELS]) +{ + int32_t remainingSize = totalSize; + + for(int level = 0; level < WAVELET_ADAPTIVE_LEVELS; level++) + { + assert(adaptiveLevels[level] <= remainingSize); + + if(remainingSize == 0) + { + targetLevels[level] = 0; + continue; + } + + targetLevels[level] = (uint8_t)(((uint64_t)adaptiveLevels[level] * 255 + (remainingSize - 1)) / remainingSize); + + remainingSize = (int)((uint64_t)remainingSize * targetLevels[level] / 255); + } +} + inline int Wavelet_DecodeAdaptiveLevelsMetadata(uint64_t totalSize, int targetLevel, uint8_t const *levels) { assert(targetLevel >= -1 && targetLevel < WAVELET_ADAPTIVE_LEVELS); -- GitLab