Commit 4a1f756f authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Merge branch feature/jorgen.lind/waveletadaptiveconnectionstring with...

Merge branch feature/jorgen.lind/waveletadaptiveconnectionstring with refs/heads/master into refs/merge-requests/446/train
parents 205ddf07 fcd747bc
Pipeline #58274 passed with stages
in 16 minutes and 21 seconds
......@@ -25,8 +25,20 @@ of the SAS url and prepend ``https``.
The ``connection`` string will provide ``protocol`` or backend specific data for
the connection to the VDS. The string is a key/value string where the key and
value is separated with a ``=``. Each key/value pair is separated with a ``;``. It
is allowed for the value to contain extra ``=``, but the key can not.
value is separated with a ``=``. Each key/value pair is separated with a ``;``.
It is allowed for the value to contain extra ``=``, but the key can not. Having
duplicate keys is allowed, and the value of the latter specified key that will
be used.
There are some globally recognized keys for the connection string. They are:
- WaveletAdaptiveTolerance
- WaveletAdaptiveRatio
- WaveletAdaptiveMode
``WaveletAdaptiveMode`` should normally not be used since specifying either
``WaveletAdaptiveTolerance`` or ``WaveletAdaptiveRatio`` will set the correct
``WaveletAdaptiveMode``. However, it is possible to set ``WaveletAdaptiveMode`` to
``BestQuality`` to get the default behavior for a ``connection`` string.
The recognised keys for ``s3`` connection string are
......
......@@ -14,6 +14,7 @@ set(SOURCE_FILES
IO/IOManagerGoogle.cpp
IO/IOManagerHttp.cpp
IO/IOManagerDms.cpp
IO/IORefreshToken.cpp
BulkDataStore/HueBulkDataStore.cpp
BulkDataStore/ExtentAllocator.cpp
BulkDataStore/VDSObjectParser.cpp
......@@ -43,8 +44,8 @@ set(SOURCE_FILES
VDS/VolumeDataRequestProcessor.cpp
VDS/VolumeIndexer.cpp
VDS/Env.cpp
${COMMON_DIR}/Base64/Base64.cpp
"IO/IORefreshToken.cpp")
VDS/StringToDouble.cpp
${COMMON_DIR}/Base64/Base64.cpp)
set (PRIVATE_HEADER_FILES
IO/File.h
......@@ -60,6 +61,7 @@ set (PRIVATE_HEADER_FILES
IO/IOManagerHttp.h
IO/IOManagerRequestImpl.h
IO/IOManagerTransformer.h
IO/IORefreshToken.h
VDS/VDS.h
VDS/VolumeDataPartition.h
VDS/VolumeDataChannelMapping.h
......@@ -96,9 +98,9 @@ set (PRIVATE_HEADER_FILES
VDS/Env.h
VDS/ConnectionStringParser.h
VDS/GlobalStateImpl.h
VDS/StringToDouble.h
${COMMON_DIR}/Base64/Base64.h
${COMMON_DIR}/ThreadPool/ThreadPool.h
"IO/IORefreshToken.h")
${COMMON_DIR}/ThreadPool/ThreadPool.h)
file(GLOB EXPORTED_HEADER_FILES OpenVDS/*.h)
......
......@@ -38,6 +38,7 @@
#include "VDS/VolumeDataStoreVDSFile.h"
#include "VDS/GlobalStateImpl.h"
#include "VDS/WaveletTypes.h"
#include "VDS/StringToDouble.h"
#include "IO/IOManager.h"
#include "IO/IOManagerTransformer.h"
......@@ -49,30 +50,27 @@ namespace OpenVDS
static std::function<IOManager* (IOManager*)> iomanagerTransformer;
static bool isProtocol(const StringWrapper &str, const StringWrapper &literal)
static bool isProtocol(const std::string &str, const std::string &literal)
{
if (str.size < literal.size)
if (str.size() < literal.size())
return false;
std::string protocol(str.data, str.data + literal.size);
std::string protocol(str.data(), str.data() + literal.size());
std::transform(protocol.begin(), protocol.end(), protocol.begin(), asciitolower);
return memcmp(protocol.data(), literal.data, literal.size) == 0;
return memcmp(protocol.data(), literal.data(), literal.size()) == 0;
}
static StringWrapper removeProtocol(const StringWrapper& str, const StringWrapper &literal)
static std::string removeProtocol(const std::string &str, const std::string &literal)
{
StringWrapper ret(str);
ret.data += literal.size;
ret.size -= literal.size;
return ret;
return std::string(str.begin() + literal.size(), str.end());
}
static std::string urlDecode(const StringWrapper& url)
static std::string urlDecode(const std::string & url)
{
const char *input = url.data;
const char *input = url.data();
std::vector<char> output;
output.reserve(url.size);
output.reserve(url.size());
for(int i = 0; i < (int)url.size; i++)
for(int i = 0; i < (int)url.size(); i++)
{
if(input[i] == '+')
{
......@@ -81,8 +79,8 @@ static std::string urlDecode(const StringWrapper& url)
else if(input[i] == '%')
{
char temp[5] = "0x";
if(i + 1 < (int)url.size) temp[2] = input[++i];
if(i + 1 < (int)url.size) temp[3] = input[++i];
if(i + 1 < (int)url.size()) temp[2] = input[++i];
if(i + 1 < (int)url.size()) temp[3] = input[++i];
output.push_back(atoi(temp));
}
else
......@@ -109,24 +107,24 @@ static bool isTrue(const std::string& str)
return false;
}
static std::unique_ptr<OpenOptions> createS3OpenOptions(const StringWrapper &url, const StringWrapper &connectionString, Error &error)
static std::unique_ptr<OpenOptions> createS3OpenOptions(const std::string &url, const std::string &connectionString, Error &error)
{
std::unique_ptr<AWSOpenOptions> openOptions(new AWSOpenOptions());
auto connectionStringMap = ParseConnectionString(connectionString.data, connectionString.size, error);
auto connectionStringMap = ParseConnectionString(connectionString, error);
if (error.code)
{
return nullptr;
}
if (url.size < 1)
if (url.size() < 1)
{
error.code = -1;
error.string = "S3 url is missing bucket";
}
auto end = url.data + url.size;
auto bucket_end = std::find(url.data, end, '/');
openOptions->bucket = std::string(url.data, bucket_end);
auto end = url.data() + url.size();
auto bucket_end = std::find(url.data(), end, '/');
openOptions->bucket = std::string(url.data(), bucket_end);
auto urlKeyBegin = bucket_end + 1;
if (urlKeyBegin < end)
......@@ -203,17 +201,17 @@ static std::unique_ptr<OpenOptions> createS3OpenOptions(const StringWrapper &url
return openOptions;
}
static std::unique_ptr<OpenOptions> createAzureOpenOptions(const StringWrapper &url, const StringWrapper &connectionString, Error &error)
static std::unique_ptr<OpenOptions> createAzureOpenOptions(const std::string &url, const std::string &connectionString, Error &error)
{
std::unique_ptr<AzureOpenOptions> openOptions(new AzureOpenOptions());
if (url.size < 1)
if (url.size() < 1)
{
error.code = -1;
error.string = "Azure url is missing container";
}
auto end = url.data + url.size;
auto container_end = std::find(url.data, end, '/');
openOptions->container = std::string(url.data, container_end);
auto end = url.data() + url.size();
auto container_end = std::find(url.data(), end, '/');
openOptions->container = std::string(url.data(), container_end);
auto urlBlobBegin = container_end + 1;
if (urlBlobBegin < end)
......@@ -221,7 +219,7 @@ static std::unique_ptr<OpenOptions> createAzureOpenOptions(const StringWrapper &
openOptions->blob = std::string(urlBlobBegin, end);
}
auto connectionStringMap = ParseConnectionString(connectionString.data, connectionString.size, error);
auto connectionStringMap = ParseConnectionString(connectionString, error);
auto bearer_it = connectionStringMap.find("bearertoken");
if (bearer_it != connectionStringMap.end())
{
......@@ -245,20 +243,20 @@ static std::unique_ptr<OpenOptions> createAzureOpenOptions(const StringWrapper &
}
else
{
openOptions->connectionString = std::string(connectionString.data, connectionString.data + connectionString.size);
openOptions->connectionString = connectionString;
}
return openOptions;
}
static std::unique_ptr<OpenOptions> createAzureSASOpenOptions(const StringWrapper &url, const StringWrapper &connectionString, Error& error)
static std::unique_ptr<OpenOptions> createAzureSASOpenOptions(const std::string &url, const std::string &connectionString, Error& error)
{
std::unique_ptr<AzurePresignedOpenOptions> openOptions(new AzurePresignedOpenOptions());
const char http[] = "https://";
openOptions->baseUrl.reserve(sizeof(http) - 1 + url.size);
openOptions->baseUrl.reserve(sizeof(http) - 1 + url.size());
openOptions->baseUrl.insert(0, http, sizeof(http) - 1);
openOptions->baseUrl.append(url.data, url.data + url.size);
openOptions->baseUrl.append(url.data(), url.data() + url.size());
auto connectionStringMap = ParseConnectionString(connectionString.data, connectionString.size, error);
auto connectionStringMap = ParseConnectionString(connectionString, error);
for (auto& connectionPair : connectionStringMap)
{
if (connectionPair.first == "suffix")
......@@ -278,23 +276,23 @@ static std::unique_ptr<OpenOptions> createAzureSASOpenOptions(const StringWrappe
const std::set<std::string> TrueWords = { "true", "yes", "on" };
const std::set<std::string> FalseWords = { "false", "no", "off" };
static std::unique_ptr<OpenOptions> createGoogleOpenOptions(const StringWrapper& url, const StringWrapper& connectionString, Error& error)
static std::unique_ptr<OpenOptions> createGoogleOpenOptions(const std::string & url, const std::string & connectionString, Error& error)
{
std::unique_ptr<GoogleOpenOptions> openOptions(new GoogleOpenOptions());
auto connectionStringMap = ParseConnectionString(connectionString.data, connectionString.size, error);
auto connectionStringMap = ParseConnectionString(connectionString, error);
if (error.code)
{
return nullptr;
}
if (url.size < 1)
if (url.size() < 1)
{
error.code = -1;
error.string = "GS url is missing bucket";
}
auto end = url.data + url.size;
auto bucket_end = std::find(url.data, end, '/');
openOptions->bucket = std::string(url.data, bucket_end);
auto end = url.data() + url.size();
auto bucket_end = std::find(url.data(), end, '/');
openOptions->bucket = std::string(url.data(), bucket_end);
auto urlKeyBegin = bucket_end + 1;
if (urlKeyBegin < end)
......@@ -356,23 +354,23 @@ static std::unique_ptr<OpenOptions> createGoogleOpenOptions(const StringWrapper&
return openOptions;
}
static std::unique_ptr<OpenOptions> createDMSOpenOptions(const StringWrapper& url, const StringWrapper& connectionString, Error& error)
static std::unique_ptr<OpenOptions> createDMSOpenOptions(const std::string & url, const std::string & connectionString, Error& error)
{
std::unique_ptr<DMSOpenOptions> openOptions(new DMSOpenOptions());
openOptions->logLevel = 0;
auto connectionStringMap = ParseConnectionString(connectionString.data, connectionString.size, error);
auto connectionStringMap = ParseConnectionString(connectionString, error);
if (error.code)
{
return nullptr;
}
if (url.size < 1)
if (url.size() < 1)
{
error.code = -1;
error.string = "SD url is missing bucket";
}
openOptions->datasetPath = std::string(url.data, url.data + url.size);
openOptions->datasetPath = url;
for (auto& connectionPair : connectionStringMap)
{
......@@ -402,28 +400,28 @@ static std::unique_ptr<OpenOptions> createDMSOpenOptions(const StringWrapper& ur
return openOptions;
}
static std::unique_ptr<OpenOptions> createHttpOpenOptions(const StringWrapper& url, const StringWrapper& connectionString, Error& error)
static std::unique_ptr<OpenOptions> createHttpOpenOptions(const std::string & url, const std::string & connectionString, Error& error)
{
std::unique_ptr<HttpOpenOptions> openOptions(new HttpOpenOptions(std::string(url.data, url.data + url.size)));
std::unique_ptr<HttpOpenOptions> openOptions(new HttpOpenOptions(url));
return openOptions;
}
static std::unique_ptr<OpenOptions> createVDSFileOpenOptions(const StringWrapper& url, const StringWrapper& connectionString, Error& error)
static std::unique_ptr<OpenOptions> createVDSFileOpenOptions(const std::string & url, const std::string & connectionString, Error& error)
{
std::unique_ptr<VDSFileOpenOptions> openOptions(new VDSFileOpenOptions());
openOptions->fileName = urlDecode(url);
return openOptions;
}
static std::unique_ptr<OpenOptions> createInMemoryOpenOptions(const StringWrapper& url, const StringWrapper& connectionString, Error& error)
static std::unique_ptr<OpenOptions> createInMemoryOpenOptions(const std::string & url, const std::string & connectionString, Error& error)
{
std::unique_ptr<InMemoryOpenOptions> openOptions(new InMemoryOpenOptions());
openOptions->name = urlDecode(url);
return openOptions;
}
typedef StringWrapper (*UrlTransformer)(const StringWrapper &url, const StringWrapper &transformer);
typedef std::unique_ptr<OpenOptions>(*OpenOptionsCreator)(const StringWrapper& url, const StringWrapper& connectionString, Error& error);
typedef std::string (*UrlTransformer)(const std::string &url, const std::string &transformer);
typedef std::unique_ptr<OpenOptions>(*OpenOptionsCreator)(const std::string &url, const std::string &connectionString, Error& error);
struct UrlToOpenOptions
{
std::string protocol;
......@@ -444,10 +442,59 @@ static const std::vector<UrlToOpenOptions> urlToOpenOptions = {
{std::string("inmemory://"), &removeProtocol, &createInMemoryOpenOptions}
};
OpenOptions* CreateOpenOptions(StringWrapper url, StringWrapper connectionString, Error& error)
static bool GetWaveletAdaptiveInfo(std::string& connectionString, WaveletAdaptiveMode &mode, float &tolerance, float &ratio, Error &error)
{
std::vector<std::string> waveletAdaptiveKeys;
waveletAdaptiveKeys.emplace_back("WaveletAdaptiveMode");
waveletAdaptiveKeys.emplace_back("WaveletAdaptiveTolerance");
waveletAdaptiveKeys.emplace_back("WaveletAdaptiveRatio");
auto waveletAdaptivePair = RemoveKeyValue(waveletAdaptiveKeys, connectionString, error);
if (error.code || waveletAdaptivePair.first < 0)
return false;
assert(waveletAdaptivePair.first < waveletAdaptiveKeys.size());
if (waveletAdaptivePair.first == 0)
{
std::string value = waveletAdaptivePair.second;
std::transform(value.begin(), value.end(), value.begin(), asciitolower);
if (value != "bestquality" && value != "best_quality")
{
error.code = -1;
error.string = fmt::format("Connection string parameter WaveletAdaptiveMode is specified with illigal value {}. The only value valid for this property is BestQuality, the other modes will be set by specifying WavletAdaptiveTolerance and WaveletAdaptiveRatio properties.", waveletAdaptivePair.second);
return false;
}
mode = WaveletAdaptiveMode::BestQuality;
}
if (waveletAdaptivePair.first == 1)
{
tolerance = float(StringToDouble(waveletAdaptivePair.second, error));
if (error.code)
error.string = fmt::format("Connection string parameter WaveletAdaptiveRatio: {}", error.string);
mode = WaveletAdaptiveMode::Tolerance;
}
else if (waveletAdaptivePair.first == 2)
{
ratio = float(StringToDouble(waveletAdaptivePair.second, error));
if (error.code)
error.string = fmt::format("Connection string parameter WaveletAdaptiveRatio: {}", error.string);
mode = WaveletAdaptiveMode::Ratio;
}
return true;
}
OpenOptions* CreateOpenOptions(StringWrapper urlWrapper, StringWrapper connectionStringWrapper, Error& error)
{
error = Error();
std::unique_ptr<OpenOptions> openOptions;
std::string url(urlWrapper.data, urlWrapper.size);
std::string connectionString(connectionStringWrapper.data, connectionStringWrapper.size);
WaveletAdaptiveMode adaptiveMode;
float adaptiveTolerance;
float adaptiveRatio;
bool adaptiveDataSet = GetWaveletAdaptiveInfo(connectionString, adaptiveMode, adaptiveTolerance, adaptiveRatio, error);
if (error.code)
return nullptr;
for (auto& urlToOpenOption : urlToOpenOptions)
{
......@@ -465,7 +512,16 @@ OpenOptions* CreateOpenOptions(StringWrapper url, StringWrapper connectionString
if (!openOptions)
{
openOptions.reset(new VDSFileOpenOptions(std::string(url.data, url.size)));
openOptions.reset(new VDSFileOpenOptions(url));
}
if (openOptions && adaptiveDataSet)
{
openOptions->waveletAdaptiveMode = adaptiveMode;
if (adaptiveMode == WaveletAdaptiveMode::Ratio)
openOptions->waveletAdaptiveRatio = adaptiveRatio;
if (adaptiveMode == WaveletAdaptiveMode::Tolerance)
openOptions->waveletAdaptiveTolerance = adaptiveTolerance;
}
return openOptions.release();
......@@ -473,9 +529,10 @@ OpenOptions* CreateOpenOptions(StringWrapper url, StringWrapper connectionString
bool IsSupportedProtocol(StringWrapper url)
{
std::string u(url.data, url.size);
for (auto& urlToOpenOpiton : urlToOpenOptions)
{
if (isProtocol(url, urlToOpenOpiton.protocol))
if (isProtocol(u, urlToOpenOpiton.protocol))
return true;
}
return false;
......
......@@ -35,7 +35,8 @@ inline char asciitolower(char in) {
return in;
}
inline std::string trim(const char *start, const char *end)
template<typename IT>
inline std::string trim(IT start, IT end)
{
while (std::isspace(*start) && start < end)
start++;
......@@ -45,49 +46,103 @@ inline std::string trim(const char *start, const char *end)
return std::string(start, end + 1);
}
inline std::map<std::string, std::string> ParseConnectionString(const char* connectionString, size_t connectionStringSize, Error &error)
template<typename It, typename Callback>
inline void TokenizeConnectionString(It begin, It end, Callback callback, Error& error)
{
std::map<std::string, std::string> ret;
auto it = connectionString;
auto end = connectionString + connectionStringSize;
const char* name_begin = nullptr;
const char* name_end = nullptr;
auto it = begin;
while (it < end)
{
auto keyValueEnd = std::find(it, end, ';');
auto equals = std::find(it, keyValueEnd, '=');
name_begin = it;
name_end = equals;
auto name_begin = it;
auto name_end = equals;
it = equals + 1;
std::string name = trim(name_begin, name_end);
if (name.size() && equals == keyValueEnd)
if (name.size() && equals >= keyValueEnd)
{
error.code = - 1;
error.string = fmt::format("Missing required = sign for name {} in connection string. Every key has to have a = sign next to it.", name);
return ret;
return;
}
if (name.empty() && it < keyValueEnd)
{
error.code = - 1;
error.string = fmt::format("Empty name in connection string. Name must consist of more than empty spaces.");
return ret;
return;
}
if (it > keyValueEnd)
continue;
std::string value = trim(it, keyValueEnd);
std::transform(name.begin(), name.end(), name.begin(), asciitolower);
ret.emplace(std::move(name), std::move(value));
it = keyValueEnd + 1;
std::string value = it == keyValueEnd ? "" : trim(it, keyValueEnd);
It newKeyValueEnd = keyValueEnd;
int endAdjust = 0;
callback(name_begin, keyValueEnd, std::move(name), std::move(value), &newKeyValueEnd, &endAdjust);
if (keyValueEnd != newKeyValueEnd)
{
keyValueEnd = newKeyValueEnd;
end += endAdjust;
it = keyValueEnd;
}
else
{
it = keyValueEnd < end ? keyValueEnd + 1 : keyValueEnd;
}
}
return;
}
inline std::pair<int, std::string> RemoveKeyValue(const std::vector<std::string> & matchKeys, std::string &connectionString, Error &error)
{
using It = decltype(connectionString.begin());
std::pair<int, std::string> ret = std::make_pair(-1, std::string());
std::vector<std::string> lowercaseMatchKeys = matchKeys;
for (auto& lowercaseMatchKey : lowercaseMatchKeys)
{
std::transform(lowercaseMatchKey.begin(), lowercaseMatchKey.end(), lowercaseMatchKey.begin(), asciitolower);
}
TokenizeConnectionString(connectionString.begin(), connectionString.end(),
[&lowercaseMatchKeys, &connectionString, &ret](It keyValueBegin, It keyValueEnd, std::string&& key, std::string&& value, It *newKeyValueEnd, int *endAdjust)
{
auto it = std::find(lowercaseMatchKeys.begin(), lowercaseMatchKeys.end(), key);
if (it != lowercaseMatchKeys.end())
{
It eraseBegin = keyValueBegin;
It eraseEnd = keyValueEnd;
if (keyValueBegin != connectionString.begin())
eraseBegin--;
else if (keyValueEnd != connectionString.end())
eraseEnd++;
connectionString.erase(eraseBegin, eraseEnd);
ret = std::make_pair(int(it - lowercaseMatchKeys.begin()), std::move(value));
*endAdjust = int(eraseBegin - eraseEnd);
*newKeyValueEnd = eraseEnd + *endAdjust;
}
}
, error);
return ret;
}
inline std::map<std::string, std::string> ParseConnectionString(const char* connectionString, size_t connectionStringSize, Error &error)
{
std::map<std::string, std::string> ret;
auto end = connectionString + connectionStringSize;
using It = const char*;
TokenizeConnectionString(connectionString, end,
[&ret](It keyValueBegin, It keyValueEnd, std::string&& key, std::string&& value, It *newKeyValueEnd, int *endAdjust)
{
ret.emplace(std::move(key), std::move(value));
}, error);
return ret;
}
inline std::map<std::string, std::string> ParseConnectionString(const std::string connectionString, Error& error)
{
return ParseConnectionString(connectionString.data(), connectionString.size(), error);
}
}
#endif //CONNECTIONSTRINGPARSER_H
#include "StringToDouble.h"
#include <sstream>
#include <locale.h>
#include <stdint.h>
#include <fmt/format.h>
#ifdef _WIN32
typedef _locale_t locale_t;
#define strtod_l _strtod_l
#define strtof_l _strtof_l
#define strtoll _strtoi64
#define strtoull _strtoui64
#endif
namespace OpenVDS
{
static locale_t &
GetCLocale()
{
#ifdef _WIN32
static locale_t hCLocale = _create_locale(LC_CTYPE, "C");
#else
static locale_t hCLocale = newlocale(LC_CTYPE_MASK, "C", NULL);
#endif
return hCLocale;
}
double StringToDouble(const std::string& number, Error& error)
{
char* endptr;
double ret = strtod_l(number.c_str(), &endptr, GetCLocale());
if (endptr != (number.data() + number.size()))
{
error.code = -1;
error.string = fmt::format("Failed to parse number {}.", number);
}
return ret;
}
}
#ifndef OPENVDS_STRING_TO_DOUBLE_H
#define OPENVDS_STRING_TO_DOUBLE_H
#include <string>
#include <OpenVDS/OpenVDS.h>
namespace OpenVDS
{
double StringToDouble(const std::string& number, Error& error);
}
#endif
\ No newline at end of file
......@@ -18,6 +18,12 @@
#include <VDS/ConnectionStringParser.h>
#include <gtest/gtest.h>
std::vector<std::string> mkStringVec(const std::string& str)
{
std::vector<std::string> ret;
ret.emplace_back(str);
return ret;
}