Commit 9a919c4c authored by Jørgen Lind's avatar Jørgen Lind
Browse files

networking interface

parent 9160f5a2
......@@ -4,6 +4,8 @@ set(SOURCE_FILES
IO/Linux_File.cpp
IO/Win_File.cpp
IO/S3_Downloader.cpp
IO/IOManager.cpp
IO/IOManagerAWS.cpp
VDS/VolumeDataPartition.cpp
VDS/VolumeDataLayer.cpp
VDS/VolumeDataLayout.cpp
......@@ -12,11 +14,14 @@ set(SOURCE_FILES
VDS/VolumeDataPageAccessorImpl.cpp
VDS/VolumeDataPageImpl.cpp
VDS/DimensionGroup.cpp
VDS/ParseVDSJson.cpp)
VDS/ParseVDSJson.cpp
VDS/MetaDataManager.cpp)
set (PRIVATE_HEADER_FILES
IO/File.h
IO/S3_Downloader.h
IO/IOManager.h
IO/IOManagerAWS.h
VDS/VolumeDataPartition.h
VDS/VolumeDataLayer.h
VDS/VolumeDataLayout.h
......@@ -29,7 +34,8 @@ set (PRIVATE_HEADER_FILES
VDS/Hash.h
Math/Vector.h
VDS/Bitmask.h
VDS/ParseVDSJson.h)
VDS/ParseVDSJson.h
VDS/MetaDataManager.h)
set (EXPORTED_HEADER_FILES
OpenVDS/OpenVDS.h
......
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#include "IOManager.h"
#include "IOManagerAWS.h"
namespace OpenVDS
{
ObjectRequester::~ObjectRequester()
{}
TransferHandler::~TransferHandler()
{}
IOManager::~IOManager()
{}
IOManager* IOManager::createIOManager(const OpenOptions& options, Error &error)
{
switch(options.connectionType)
{
case OpenOptions::AWS:
return new IOManagerAWS(static_cast<const AWSOpenOptions &>(options), error);
default:
error.code = -1;
error.string = "Unknwon type for OpenOptions";
return nullptr;
}
}
}
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef IOMANAGER_H
#define IOMANAGER_H
#include <memory>
#include <OpenVDS/OpenVDS.h>
namespace OpenVDS
{
class TransferHandler
{
public:
virtual ~TransferHandler();
virtual void handleData(std::vector<uint8_t> &&data) = 0;
virtual void handleError(Error &error) = 0;
};
class ObjectRequester
{
public:
virtual ~ObjectRequester();
virtual void waitForFinish() = 0;
virtual bool isDone() const = 0;
virtual bool isSuccess(Error &error) const = 0;
virtual void cancel() = 0;
};
class IOManager
{
public:
virtual ~IOManager();
virtual std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler) = 0;
static IOManager *createIOManager(const OpenOptions &options, Error &error);
};
}
#endif
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#include "IOManagerAWS.h"
#include <aws/core/Aws.h>
#include <aws/s3/model/Bucket.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/GetBucketLocationRequest.h>
#include <aws/s3/model/GetBucketLocationResult.h>
#include <aws/s3/model/BucketLocationConstraint.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <mutex>
#include <functional>
namespace OpenVDS
{
static int initialize_sdk = 0;
static std::mutex initialize_sdk_mutex;
static Aws::SDKOptions initialize_sdk_options;
static void initializeAWSSDK()
{
std::unique_lock<std::mutex> lock(initialize_sdk_mutex);
initialize_sdk++;
if (initialize_sdk == 1)
{
Aws::InitAPI(initialize_sdk_options);
}
}
static void deinitizlieAWSSDK()
{
std::unique_lock<std::mutex> lock(initialize_sdk_mutex);
initialize_sdk--;
if (!initialize_sdk)
{
Aws::ShutdownAPI(initialize_sdk_options);
}
}
struct AsyncCallerContext
{
AsyncCallerContext(ObjectRequesterAWS *back)
: back(back)
{}
ObjectRequesterAWS *back;
std::mutex mutex;
};
struct NotifyAll
{
NotifyAll(std::condition_variable &to_notify)
: to_notify(to_notify)
{}
~NotifyAll()
{
to_notify.notify_all();
}
std::condition_variable &to_notify;
};
static void callback(const Aws::S3::S3Client *client, const Aws::S3::Model::GetObjectRequest& objreq, const Aws::S3::Model::GetObjectOutcome &getObjectOutcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&awsContext, std::shared_ptr<AsyncCallerContext> context)
{
std::unique_lock<std::mutex> lock(context->mutex);
auto or = context->back;
if (!or)
return;
NotifyAll notify(or->m_waitForFinish);
or->m_done = true;
if (!getObjectOutcome.IsSuccess())
{
auto s3error = getObjectOutcome.GetError();
or->m_error.code = int(s3error.GetResponseCode());
or->m_error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
or->m_handler->handleError(or->m_error);
return;
}
Aws::S3::Model::GetObjectResult result = const_cast<Aws::S3::Model::GetObjectOutcome &>(getObjectOutcome).GetResultWithOwnership();
auto &retrieved_object = result.GetBody();
auto content_length = result.GetContentLength();
if (content_length > 0)
{
std::vector<uint8_t> data;
data.resize(content_length);
retrieved_object.read((char *)&data[0], content_length);
or->m_handler->handleData(std::move(data));
}
}
ObjectRequesterAWS::ObjectRequesterAWS(Aws::S3::S3Client& client, const std::string& bucket, const std::string& id, const std::shared_ptr<TransferHandler>& handler)
: m_handler(handler)
, m_context(std::make_shared<AsyncCallerContext>(this))
, m_done(false)
{
Aws::S3::Model::GetObjectRequest object_request;
object_request.SetBucket(bucket.c_str());
object_request.SetKey(id.c_str());
using namespace std::placeholders;
auto bounded_callback = std::bind(&callback, _1, _2, _3, _4, m_context);
client.GetObjectAsync(object_request, bounded_callback);
}
ObjectRequesterAWS::~ObjectRequesterAWS()
{
cancel();
}
void ObjectRequesterAWS::waitForFinish()
{
std::unique_lock<std::mutex> lock(m_context->mutex);
if (m_done)
return;
m_waitForFinish.wait(lock);
}
bool ObjectRequesterAWS::isDone() const
{
std::unique_lock<std::mutex> lock(m_context->mutex);
return m_done;
}
bool ObjectRequesterAWS::isSuccess(Error& error) const
{
std::unique_lock<std::mutex> lock(m_context->mutex);
error = m_error;
return m_error.code == 0;
}
void ObjectRequesterAWS::cancel()
{
std::unique_lock<std::mutex> lock(m_context->mutex);
m_context->back = nullptr;
}
IOManagerAWS::IOManagerAWS(const AWSOpenOptions& openOptions, Error &error)
: m_region(openOptions.region)
, m_bucket(openOptions.bucket)
, m_objectId(openOptions.key)
{
if (m_region.empty() || m_bucket.empty())
{
error.code = -1;
error.string = "AWS Config error. Empty bucket or region";
return;
}
if (m_objectId[m_objectId.size() -1] == '/')
m_objectId.resize(m_objectId.size() - 1);
initializeAWSSDK();
Aws::Client::ClientConfiguration config;
config.region = m_region.c_str();
m_s3Client.reset(new Aws::S3::S3Client(config));
}
IOManagerAWS::~IOManagerAWS()
{
deinitizlieAWSSDK();
}
std::shared_ptr<ObjectRequester> IOManagerAWS::requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler)
{
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
return std::make_shared<ObjectRequesterAWS>(*m_s3Client.get(), m_bucket, id, handler);
}
}
/****************************************************************************
** Copyright 2019 The Open Group
** Copyright 2019 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef IOMANAGERAWS_H
#define IOMANAGERAWS_H
#include "IOManager.h"
#include <vector>
#include <string>
#include <aws/s3/S3Client.h>
namespace OpenVDS
{
struct AsyncCallerContext;
class ObjectRequesterAWS : public ObjectRequester
{
public:
ObjectRequesterAWS(Aws::S3::S3Client &client, const std::string &bucket, const std::string &id, const std::shared_ptr<TransferHandler> &handler);
~ObjectRequesterAWS();
void waitForFinish() override;
bool isDone() const override;
bool isSuccess(Error &error) const override;
void cancel() override;
std::shared_ptr<TransferHandler> m_handler;
std::shared_ptr<AsyncCallerContext> m_context;
Error m_error;
bool m_done;
std::condition_variable m_waitForFinish;
};
class IOManagerAWS : public IOManager
{
public:
IOManagerAWS(const AWSOpenOptions &openOptions, Error &error);
~IOManagerAWS();
std::shared_ptr<ObjectRequester> requestObject(const std::string objectName, std::shared_ptr<TransferHandler> handler) override;
private:
std::string m_region;
std::string m_bucket;
std::string m_objectId;
std::unique_ptr<Aws::S3::S3Client> m_s3Client;
};
}
#endif //IOMANAGERAWS_H
......@@ -19,6 +19,8 @@
#include "OpenVDS/openvds_export.h"
#include <vector>
#include "OpenVDSHandle.h"
#include <aws/core/Aws.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/Bucket.h>
......@@ -164,45 +166,5 @@ void test_function()
}
}
bool DownloadJson(const std::string &region, const std::string& bucket, const std::string &key, std::string &json, Error &error)
{
if (bucket.empty() || key.empty())
{
error.code = -1;
error.string = "AWS Config error. Empty bucket or key";
return false;
}
initializeAWSSDK();
Aws::Client::ClientConfiguration config;
config.region = region.c_str();
Aws::S3::S3Client s3_client(config);
Aws::S3::Model::GetObjectRequest object_request;
object_request.SetBucket(bucket.c_str());
object_request.SetKey(key.c_str());
auto get_object_outcome = s3_client.GetObject(object_request);
if (!get_object_outcome.IsSuccess())
{
auto s3error = get_object_outcome.GetError();
error.code = int(s3error.GetResponseCode());
error.string = (s3error.GetExceptionName() + " : " + s3error.GetMessage()).c_str();
return false;
}
auto result = get_object_outcome.GetResultWithOwnership();
auto& retrieved_object = result.GetBody();
auto content_length = result.GetContentLength();
if (content_length > 0)
{
json.resize(content_length);
retrieved_object.read(&json[0], content_length);
}
return true;
}
bool downloadChunk(VDSHandle &handle, std::vector<uint8_t> &blob, int32_t (&pitch)[Dimensionality_Max], Error &error)
{
assert(false && "TODO: IMPLEMENT");
return false;
}
}
}
......@@ -27,9 +27,5 @@ namespace OpenVDS
namespace S3
{
OPENVDS_EXPORT void test_function();
bool DownloadJson(const std::string &region, const std::string& bucket, const std::string &key, std::string &json, Error &error);
bool downloadChunk(VDSHandle &handle, std::vector<uint8_t> &blob, int32_t (&pitch)[Dimensionality_Max]);
}
}
......@@ -33,18 +33,13 @@ namespace OpenVDS
{
VDSHandle *open(const OpenOptions &options, Error &error)
{
std::unique_ptr<VDSHandle> ret(new VDSHandle());
error = Error();
std::unique_ptr<VDSHandle> ret(new VDSHandle(options, error));
if (error.code)
return nullptr;
switch(options.connectionType)
if (!downloadAndParseVDSJson(*ret.get(), error))
{
case OpenOptions::AWS:
{
if (!downloadAndParseVDSJson(static_cast<const AWSOpenOptions &>(options), *ret.get(), error))
{
return nullptr;
}
}
default:
return nullptr;
}
return ret.release();
......@@ -52,20 +47,14 @@ VDSHandle *open(const OpenOptions &options, Error &error)
VDSHandle* create(const OpenOptions& options, VolumeDataLayoutDescriptor const &layoutDescriptor, std::vector<VolumeDataAxisDescriptor> const &axisDescriptors, std::vector<VolumeDataChannelDescriptor> const &channelDescriptors, MetadataContainer const &metadataContainer, Error &error)
{
std::unique_ptr<VDSHandle> ret(new VDSHandle());
error = Error();
std::unique_ptr<VDSHandle> ret(new VDSHandle(options, error));
if (error.code)
return nullptr;
switch(options.connectionType)
{
case OpenOptions::AWS:
{
if (!serializeAndUploadVDSJson(static_cast<const AWSOpenOptions &>(options), *ret.get(), error))
{
return nullptr;
}
}
default:
if (!serializeAndUploadVDSJson(*ret.get(), error))
return nullptr;
}
return ret.release();
}
......
......@@ -91,6 +91,18 @@ enum class CompressionMethod
WaveletNormalizeBlockLossless,
Max
};
enum Dimensionality
{
Dimensionality_1 = 1,
Dimensionality_2,
Dimensionality_3,
Dimensionality_4,
Dimensionality_5,
Dimensionality_6,
Dimensionality_Max = Dimensionality_6
};
} /* namespace OpenVDS */
#endif //VOLUMEDATA_H_INCLUDED
......@@ -27,6 +27,7 @@
#include "VDS/VolumeDataLayer.h"
#include "VDS/VolumeDataLayout.h"
#include "IO/IOManager.h"
#include <vector>
#include <memory>
......@@ -37,8 +38,10 @@ namespace OpenVDS
struct VDSHandle
{
std::string url;
VDSHandle(const OpenOptions &openOptions, Error &error)
: ioManager(IOManager::createIOManager(openOptions, error))
{
}
VolumeDataLayoutDescriptor
layoutDescriptor;
......@@ -60,6 +63,8 @@ struct VDSHandle
volumeDataLayout;
std::vector<VolumeDataPageAccessor *>
pageAccessors;
std::unique_ptr<IOManager>
ioManager;
};
}
......
......@@ -1358,6 +1358,51 @@ const char * getDimensionGroupName(DimensionGroup dimensionGroup)
assert(dimensionGroup >= 0 && dimensionGroup < DimensionGroup_Max);
return apzDimensionNames[dimensionGroup];
}
const char* getDimensionsGroupString(DimensionsND dimensions)
{
switch(dimensions)
{
case DimensionsND::Group012: return "012";
case DimensionsND::Group013: return "013";
case DimensionsND::Group014: return "014";
case DimensionsND::Group015: return "015";
case DimensionsND::Group023: return "023";
case DimensionsND::Group024: return "024";
case DimensionsND::Group025: return "025";
case DimensionsND::Group034: return "034";
case DimensionsND::Group035: return "035";
case DimensionsND::Group045: return "045";
case DimensionsND::Group123: return "123";
case DimensionsND::Group124: return "124";
case DimensionsND::Group125: return "125";
case DimensionsND::Group134: return "134";
case DimensionsND::Group135: return "135";
case DimensionsND::Group145: return "145";
case DimensionsND::Group234: return "234";
case DimensionsND::Group235: return "235";
case DimensionsND::Group245: return "245";
case DimensionsND::Group345: return "345";
case DimensionsND::Group01: return "01";
case DimensionsND::Group02: return "02";
case DimensionsND::Group03: return "03";
case DimensionsND::Group04: return "04";
case DimensionsND::Group05: return "05";
case DimensionsND::Group12: return "12";
case DimensionsND::Group13: return "13";
case DimensionsND::Group14: return "14";
case DimensionsND::Group15: return "15";
case DimensionsND::Group23: return "23";
case DimensionsND::Group24: return "24";
case DimensionsND::Group25: return "25";
case DimensionsND::Group34: return "34";
case DimensionsND::Group35: return "35";
case DimensionsND::Group45: return "45";
default:
;
}
return "";
}
bool isRemappingPossible(DimensionGroup dimensionGroupA, DimensionGroup dimensionGroupB)
{
......
......@@ -106,16 +106,6 @@ enum DimensionGroup
DimensionGroup_6D_Max = DimensionGroup_012345 + 1
};
enum Dimensionality
{
Dimensionality_1 = 1,
Dimensionality_2,
Dimensionality_3,
Dimensionality_4,
Dimensionality_5,
Dimensionality_6,
Dimensionality_Max = Dimensionality_6
};
using IndexArray = int32_t[Dimensionality_Max];