Commit b0702565 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Add Global State object

This can be used to keep track of global counters.
parent 990c2c2d
......@@ -161,6 +161,8 @@ PyGlobal::initModule(py::module& m)
case OpenOptions::ConnectionType::Http : conn = std::string("Http" ); break;
case OpenOptions::ConnectionType::VDSFile : conn = std::string("VDSFile" ); break;
case OpenOptions::ConnectionType::InMemory : conn = std::string("InMemory" ); break;
case OpenOptions::ConnectionType::Other : conn = std::string("Other" ); break;
case OpenOptions::ConnectionType::ConnectionTypeCount : conn = std::string("ConnectionTypeCount"); break;
}
return std::string("OpenOptions(connectionType='" + conn + "')");
});
......
......@@ -91,6 +91,7 @@ set (PRIVATE_HEADER_FILES
VDS/ThreadPool.h
VDS/Env.h
VDS/ConnectionStringParser.h
VDS/GlobalStateImpl.h
)
set (EXPORTED_HEADER_FILES
......@@ -112,6 +113,7 @@ set (EXPORTED_HEADER_FILES
OpenVDS/CoordinateTransformer.h
OpenVDS/VolumeIndexer.h
OpenVDS/SimplexNoiseKernel.h
OpenVDS/GlobalState.h
)
if (MSVC)
......
......@@ -46,6 +46,10 @@ Request::Request(const std::string& objectName)
Request::~Request()
{}
IOManager::IOManager(OpenOptions::ConnectionType connectionType)
: m_connectionType(connectionType)
{
}
IOManager::~IOManager()
{}
IOManager* IOManager::CreateIOManager(const OpenOptions& options, Error &error)
......
......@@ -61,6 +61,7 @@ namespace OpenVDS
class IOManager
{
public:
IOManager(OpenOptions::ConnectionType connectionType);
virtual ~IOManager();
virtual std::shared_ptr<Request> ReadObjectInfo(const std::string &objectName, std::shared_ptr<TransferDownloadHandler> handler) = 0;
virtual std::shared_ptr<Request> ReadObject(const std::string &objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range = IORange()) = 0;
......@@ -74,11 +75,16 @@ namespace OpenVDS
return WriteObject(objectName, "", "application/json", std::vector<std::pair<std::string, std::string>>(), data, completedCallback);
}
OpenOptions::ConnectionType connectionType() const { return m_connectionType; }
OPENVDS_EXPORT
static IOManager *CreateIOManager(const OpenOptions &options, Error &error);
OPENVDS_EXPORT
static IOManager *CreateIOManager(const StringWrapper& url, const StringWrapper& connectionString, Error& error);
protected:
OpenOptions::ConnectionType m_connectionType;
};
}
......
......@@ -383,7 +383,8 @@ namespace OpenVDS
}
IOManagerAWS::IOManagerAWS(const AWSOpenOptions& openOptions, Error &error)
: m_region(openOptions.region)
: IOManager(OpenOptions::AWS)
, m_region(openOptions.region)
, m_bucket(openOptions.bucket)
, m_objectId(openOptions.key)
{
......
......@@ -347,7 +347,8 @@ void UploadRequestAzure::Cancel()
}
IOManagerAzure::IOManagerAzure(const AzureOpenOptions& openOptions, Error& error)
: m_connStr(openOptions.connectionString)
: IOManager(OpenOptions::Azure)
, m_connStr(openOptions.connectionString)
, m_containerStr(openOptions.container)
, m_prefix(openOptions.blob)
{
......
......@@ -22,7 +22,8 @@
namespace OpenVDS
{
IOManagerAzurePresigned::IOManagerAzurePresigned(const std::string& base, const std::string& suffix, Error &error)
: m_curlHandler(error)
: IOManager(OpenOptions::AzurePresigned)
, m_curlHandler(error)
, m_base(base)
, m_suffix(suffix)
{
......
......@@ -42,7 +42,8 @@ namespace OpenVDS
static const std::string GOOGLEAPIS = "https://storage.googleapis.com";
IOManagerGoogle::IOManagerGoogle(const GoogleOpenOptions& openOptions, Error &error)
: m_curlHandler(error)
: IOManager(OpenOptions::GoogleStorage)
, m_curlHandler(error)
, m_bucket(openOptions.bucket)
, m_pathPrefix(openOptions.pathPrefix)
{
......
......@@ -22,7 +22,8 @@
namespace OpenVDS
{
IOManagerHttp::IOManagerHttp(const HttpOpenOptions &openOptions, Error &error)
: m_curlHandler(error)
: IOManager(OpenOptions::Http)
, m_curlHandler(error)
{
const std::string &url = openOptions.url;
if (url.empty())
......
......@@ -49,7 +49,8 @@ public:
};
IOManagerInMemory::IOManagerInMemory(const InMemoryOpenOptions &openOptions, Error &error)
: m_threadPool(std::thread::hardware_concurrency())
: IOManager(OpenOptions::InMemory)
, m_threadPool(std::thread::hardware_concurrency())
{
}
......@@ -171,7 +172,8 @@ class IOManagerProxy : public OpenVDS::IOManager
{
public:
IOManagerProxy(OpenVDS::IOManager *backend)
: backend(backend)
: IOManager(backend->connectionType())
, backend(backend)
{}
std::shared_ptr<OpenVDS::Request> ReadObjectInfo(const std::string &objectName, std::shared_ptr<OpenVDS::TransferDownloadHandler> handler) override
......
......@@ -34,6 +34,7 @@
#include "VDS/ConnectionStringParser.h"
#include "VDS/VolumeDataStoreIOManager.h"
#include "VDS/VolumeDataStoreVDSFile.h"
#include "VDS/GlobalStateImpl.h"
#include "IO/IOManager.h"
......@@ -608,4 +609,10 @@ void Close(VDS *vds)
delete vds;
}
GlobalState *GetGlobalState()
{
static GlobalState *globalState = new GlobalStateImpl();
return globalState;
}
}
/****************************************************************************
** Copyright 2020 The Open Group
** Copyright 2020 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef OPENVDS_GLOBAL_STATE_H
#define OPENVDS_GLOBAL_STATE_H
#include <OpenVDS/OpenVDS.h>
#include <stdint.h>
namespace OpenVDS
{
/// <summary>
/// Object that contains global runtime data
/// </summary>
class GlobalState
{
public:
/// <summary>
/// Get the global amount of downloaded bytes from a cloud vendor.
/// </summary>
/// <param name="connectionType"> The counter to be retireved. </param>
/// <returns> Global number of bytes downloaded from the connection. This does not include any http header data. </returns>
virtual uint64_t GetBytesDownloaded(OpenOptions::ConnectionType connectionType) = 0;
/// <summary>
/// Get the global count of downloaded chunks.
/// </summary>
/// <param name="connectionType"> The counter to be retireved. </param>
/// <returns>Number of chunks downloaded.</returns>
virtual uint64_t GetChunksDownloaded(OpenOptions::ConnectionType connectionType) = 0;
/// <summary>
/// Get the global amount of decompressed bytes. This amount might
/// be smaller than the amount of downloaded bytes because of a small header
/// pr chunk. It can also be larger for non compressed data sets since chunks
/// can be cached.
/// </summary>
/// <param name="connectionType"></param>
/// <returns>Amount of decompressed bytes served the process.</returns>
virtual uint64_t GetBytesDecompressed(OpenOptions::ConnectionType connectionType) = 0;
/// <summary>
/// Get the global count of decompressed chunks.
/// </summary>
/// <param name="connectionType"> The counter to be retireved. </param>
/// <returns>Number of chunks decompressed.</returns>
virtual uint64_t GetChunksDecompressed(OpenOptions::ConnectionType connectionType) = 0;
};
}
#endif
\ No newline at end of file
......@@ -34,6 +34,7 @@ namespace OpenVDS
class VolumeDataLayoutDescriptor;
class VolumeDataAxisDescriptor;
class VolumeDataChannelDescriptor;
class GlobalState;
class IOManager;
struct OpenOptions
......@@ -46,7 +47,9 @@ struct OpenOptions
GoogleStorage,
Http,
VDSFile,
InMemory
InMemory,
Other,
ConnectionTypeCount
};
ConnectionType connectionType;
......@@ -416,6 +419,14 @@ OPENVDS_EXPORT VolumeDataAccessManager *GetAccessManager(VDSHandle handle);
/// </param>
OPENVDS_EXPORT void Close(VDSHandle handle);
/// <summary>
/// Get the GlobalState interface
/// </summary>
/// <returns>
/// A pointer to the GlobalState interface
/// </returns>
OPENVDS_EXPORT GlobalState *GetGlobalState();
}
#endif //OPENVDS_H
/****************************************************************************
** Copyright 2020 The Open Group
** Copyright 2020 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef OPENVDS_GLOBAL_STATE_IMPL_H
#define OPENVDS_GLOBAL_STATE_IMPL_H
#include <OpenVDS/GlobalState.h>
#include <atomic>
namespace OpenVDS
{
class GlobalStateImpl : public GlobalState
{
public:
GlobalStateImpl()
{
for (auto &i : downloaded)
i = 0;
for (auto &i : downloadedChunks)
i = 0;
for (auto &i : decompressed)
i = 0;
for (auto &i : decompressedChunks)
i = 0;
}
std::atomic<uint64_t> downloaded[OpenOptions::ConnectionTypeCount];
std::atomic<uint64_t> downloadedChunks[OpenOptions::ConnectionTypeCount];
std::atomic<uint64_t> decompressed[OpenOptions::ConnectionTypeCount];
std::atomic<uint64_t> decompressedChunks[OpenOptions::ConnectionTypeCount];
uint64_t GetBytesDownloaded(OpenOptions::ConnectionType connectionType) override
{
return downloaded[connectionType];
}
uint64_t GetChunksDownloaded(OpenOptions::ConnectionType connectionType) override
{
return downloadedChunks[connectionType];
}
uint64_t GetBytesDecompressed(OpenOptions::ConnectionType connectionType) override
{
return decompressed[connectionType];
}
uint64_t GetChunksDecompressed(OpenOptions::ConnectionType connectionType) override
{
return decompressedChunks[connectionType];
}
};
class GlobalStateVds
{
public:
GlobalStateVds(std::atomic<uint64_t> &downloaded, std::atomic<uint64_t> &downloadedChunks, std::atomic<uint64_t> &decompressed, std::atomic<uint64_t> &decompressedChunks)
: downloaded(downloaded)
, downloadedChunks(downloadedChunks)
, decompressed(decompressed)
, decompressedChunks(decompressedChunks)
{}
void addDownload(uint64_t download)
{
downloaded += download;
downloadedChunks++;
}
void addDecompressed(uint64_t decompress)
{
decompressed += decompress;
decompressedChunks++;
}
private:
std::atomic<uint64_t> &downloaded;
std::atomic<uint64_t> &downloadedChunks;
std::atomic<uint64_t> &decompressed;
std::atomic<uint64_t> &decompressedChunks;
};
}
#endif
\ No newline at end of file
......@@ -30,6 +30,7 @@
#include "VDS/VolumeDataAccessManagerImpl.h"
#include "VDS/VolumeDataRequestProcessor.h"
#include "VDS/VolumeDataStore.h"
#include "VDS/GlobalStateImpl.h"
#include <vector>
#include <map>
......@@ -85,6 +86,8 @@ struct VDS
accessManager;
std::unique_ptr<VolumeDataStore>
volumeDataStore;
GlobalStateImpl * globalState;
};
void CreateVolumeDataLayout(VDS &handle);
......
......@@ -303,7 +303,7 @@ bool VolumeDataPageAccessorImpl::ReadPreparedPaged(VolumeDataPage* page)
std::vector<uint8_t> page_data;
DataBlock dataBlock;
if (!VolumeDataStore::DeserializeVolumeData(volumeDataChunk, serialized_data, metadata, compressionInfo.GetCompressionMethod(), compressionInfo.GetAdaptiveLevel(), m_layer->GetFormat(), dataBlock, page_data, error))
if (!m_accessManager->GetVolumeDataStore()->DeserializeVolumeData(volumeDataChunk, serialized_data, metadata, compressionInfo.GetCompressionMethod(), compressionInfo.GetAdaptiveLevel(), m_layer->GetFormat(), dataBlock, page_data, error))
{
pageListMutexLock.lock();
pageImpl->SetError(error);
......
......@@ -26,6 +26,7 @@
#include "ParsedMetadata.h"
#include <OpenVDS/ValueConversion.h>
#include <VDS/VDS.h>
#include <VDS/GlobalStateImpl.h>
#include <fmt/format.h>
......@@ -36,6 +37,16 @@
namespace OpenVDS
{
VolumeDataStore::VolumeDataStore(OpenOptions::ConnectionType connectionType)
: m_globalStateVds(static_cast<GlobalStateImpl *>(GetGlobalState())->downloaded[connectionType],
static_cast<GlobalStateImpl *>(GetGlobalState())->downloadedChunks[connectionType],
static_cast<GlobalStateImpl *>(GetGlobalState())->decompressed[connectionType],
static_cast<GlobalStateImpl *>(GetGlobalState())->decompressedChunks[connectionType])
{
}
static bool CompressionMethodIsWavelet(CompressionMethod compressionMethod)
{
return compressionMethod == CompressionMethod::Wavelet ||
......@@ -479,7 +490,7 @@ bool VolumeDataStore::CreateConstantValueDataBlock(VolumeDataChunk const &volume
return true;
}
bool VolumeDataStore::DeserializeVolumeData(const VolumeDataChunk &volumeDataChunk, const std::vector<uint8_t>& serializedData, const std::vector<uint8_t>& metadata, CompressionMethod compressionMethod, int32_t adaptiveLevel, VolumeDataChannelDescriptor::Format loadFormat, DataBlock &dataBlock, std::vector<uint8_t>& target, Error& error)
bool VolumeDataStore::DeserializeVolumeData(const VolumeDataChunk& volumeDataChunk, const std::vector<uint8_t>& serializedData, const std::vector<uint8_t>& metadata, CompressionMethod compressionMethod, int32_t adaptiveLevel, VolumeDataChannelDescriptor::Format loadFormat, DataBlock& dataBlock, std::vector<uint8_t>& target, Error& error)
{
uint64_t volumeDataHashValue = VolumeDataHash::UNKNOWN;
......@@ -534,7 +545,9 @@ bool VolumeDataStore::DeserializeVolumeData(const VolumeDataChunk &volumeDataChu
}
}
return OpenVDS::DeserializeVolumeData(serializedData, loadFormat, compressionMethod, deserializeValueRange, volumeDataLayer->GetIntegerScale(), volumeDataLayer->GetIntegerOffset(), volumeDataLayer->IsUseNoValue(), volumeDataLayer->GetNoValue(), adaptiveLevel, dataBlock, target, error);
bool ret = OpenVDS::DeserializeVolumeData(serializedData, loadFormat, compressionMethod, deserializeValueRange, volumeDataLayer->GetIntegerScale(), volumeDataLayer->GetIntegerOffset(), volumeDataLayer->IsUseNoValue(), volumeDataLayer->GetNoValue(), adaptiveLevel, dataBlock, target, error);
m_globalStateVds.addDecompressed(target.size());
return ret;
}
uint64_t
......
......@@ -27,6 +27,7 @@
#include "DataBlock.h"
#include "VolumeDataHash.h"
#include "ParsedMetadata.h"
#include "GlobalStateImpl.h"
#include <vector>
......@@ -36,7 +37,7 @@ namespace OpenVDS
class VolumeDataStore
{
public:
VolumeDataStore() {};
VolumeDataStore(OpenOptions::ConnectionType connectionType);
virtual ~VolumeDataStore() {};
virtual CompressionInfo
......@@ -51,11 +52,15 @@ public:
virtual bool AddLayer(VolumeDataLayer* volumeDataLayer) = 0;
virtual bool RemoveLayer(VolumeDataLayer* volumeDataLayer) = 0;
bool DeserializeVolumeData(const VolumeDataChunk &volumeDataChunk, const std::vector<uint8_t>& serializedData, const std::vector<uint8_t>& metadata, CompressionMethod compressionMethod, int32_t adaptiveLevel, VolumeDataChannelDescriptor::Format loadFormat, DataBlock &dataBlock, std::vector<uint8_t>& target, Error& error);
static bool Verify(const VolumeDataChunk& volumeDataChunk, const std::vector<uint8_t>& serializedData, CompressionMethod compressionMethod, bool isFullyRead);
static bool DeserializeVolumeData(const VolumeDataChunk &volumeDataChunk, const std::vector<uint8_t>& serializedData, const std::vector<uint8_t>& metadata, CompressionMethod compressionMethod, int32_t adaptiveLevel, VolumeDataChannelDescriptor::Format loadFormat, DataBlock &dataBlock, std::vector<uint8_t>& target, Error& error);
static bool CreateConstantValueDataBlock(VolumeDataChunk const &volumeDataChunk, VolumeDataChannelDescriptor::Format format, float noValue, VolumeDataChannelDescriptor::Components components, VolumeDataHash const &constantValueVolumeDataHash, DataBlock &dataBlock, std::vector<uint8_t> &buffer, Error &error);
static uint64_t
SerializeVolumeData(const VolumeDataChunk& chunk, const DataBlock &dataBlock, const std::vector<uint8_t>& chunkData, CompressionMethod compressionMethod, std::vector<uint8_t>& destinationBuffer);
protected:
GlobalStateVds m_globalStateVds;
};
}
......
......@@ -226,7 +226,8 @@ static int WaveletAdaptiveLevelsMetadataDecode(uint64_t totalSize, int targetLev
}
VolumeDataStoreIOManager:: VolumeDataStoreIOManager(VDS &vds, IOManager *ioManager)
: m_vds(vds)
: VolumeDataStore(ioManager->connectionType())
, m_vds(vds)
, m_ioManager(ioManager)
, m_warnedAboutMissingMetadataTag(false)
{
......@@ -509,6 +510,7 @@ bool VolumeDataStoreIOManager::ReadChunk(const VolumeDataChunk &chunk, std::vect
if (transferHandler->m_data.size())
{
m_globalStateVds.addDownload(transferHandler->m_data.size());
if (moveData)
serializedData = std::move(transferHandler->m_data);
else
......
......@@ -21,6 +21,7 @@
#include "VDS.h"
#include "ParseVDSJson.h"
#include <OpenVDS/OpenVDS.h>
#include <fmt/format.h>
......@@ -106,6 +107,7 @@ bool VolumeDataStoreVDSFile::ReadChunk(const VolumeDataChunk& chunk, std::vector
if(buffer)
{
m_globalStateVds.addDownload(buffer->Size());
auto bufferData = reinterpret_cast<const uint8_t *>(buffer->Data());
serializedData.assign(bufferData, bufferData + buffer->Size());
m_dataStore->ReleaseBuffer(buffer);
......@@ -420,7 +422,8 @@ void VolumeDataStoreVDSFile::SetMetadataStatus(std::string const &layerName, Met
}
VolumeDataStoreVDSFile::VolumeDataStoreVDSFile(VDS &vds, const std::string &vdsFileName, Mode mode, Error &error)
: m_vds(vds)
: VolumeDataStore(OpenOptions::VDSFile)
, m_vds(vds)
, m_isVDSObjectFilePresent(false)
, m_isVolumeDataLayoutFilePresent(false)
, m_dataStore(HueBulkDataStore::Open(vdsFileName.c_str()), &HueBulkDataStore::Close)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment