Commit acf32fb0 authored by Jørgen Lind's avatar Jørgen Lind
Browse files

Merge branch Segyimportcloud with refs/heads/master into refs/merge-requests/62/train

parents 1e2b5ebd 4ac979e2
Pipeline #794 passed with stages
in 8 minutes and 15 seconds
......@@ -29,8 +29,8 @@ namespace OpenVDS
class TransferDownloadHandler
{
public:
virtual ~TransferDownloadHandler();
virtual void HandleMetadata(const std::string &key, const std::string &header);
OPENVDS_EXPORT virtual ~TransferDownloadHandler();
OPENVDS_EXPORT virtual void HandleMetadata(const std::string &key, const std::string &header);
virtual void HandleData(std::vector<uint8_t> &&data) = 0;
virtual void Completed(const Request &request, const Error &error) = 0;
};
......@@ -52,14 +52,21 @@ namespace OpenVDS
struct IORange
{
size_t start;
size_t end;
int64_t start;
int64_t end;
};
struct HeadInfo
{
int64_t contentLength;
};
class IOManager
{
public:
virtual ~IOManager();
virtual HeadInfo Head(const std::string &objectName, Error &error, const IORange& range = IORange()) = 0;
virtual std::shared_ptr<Request> Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range = IORange()) = 0;
virtual std::shared_ptr<Request> Upload(const std::string objectName, const std::string &contentDispostionFilename, const std::string &contentType, const std::vector<std::pair<std::string, std::string>> &metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request &request, const Error &error)> completedCallback = nullptr) = 0;
std::shared_ptr<Request> UploadBinary(const std::string objectName, const std::string &contentDispositionFilename, const std::vector<std::pair<std::string, std::string>> &metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request &request, const Error &error)> completedCallback = nullptr)
......@@ -71,7 +78,7 @@ namespace OpenVDS
return Upload(objectName, "", "application/json", std::vector<std::pair<std::string, std::string>>(), data, completedCallback);
}
OPENVDS_EXPORT
static IOManager *CreateIOManager(const OpenOptions &options, Error &error);
};
......
......@@ -25,6 +25,7 @@
#include <aws/s3/model/BucketLocationConstraint.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/logging/DefaultLogSystem.h>
......@@ -283,7 +284,7 @@ namespace OpenVDS
return;
}
if (m_objectId[m_objectId.size() -1] == '/')
if (m_objectId.size() && m_objectId[m_objectId.size() -1] == '/')
m_objectId.resize(m_objectId.size() - 1);
initializeAWSSDK();
......@@ -348,6 +349,30 @@ namespace OpenVDS
deinitializeAWSSDK();
}
HeadInfo IOManagerAWS::Head(const std::string &objectName, Error &error, const IORange& range)
{
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
Aws::S3::Model::HeadObjectRequest object_request;
object_request.SetBucket(convertStdString(m_bucket));
object_request.SetKey(convertStdString(id));
if (range.end)
{
object_request.SetRange(convertStdString(fmt::format("bytes={}-{}", range.start, range.end)));
}
auto head = m_s3Client.get()->HeadObject(object_request);
if (!head.IsSuccess())
{
error.code = int(head.GetError().GetResponseCode());
error.string = convertAwsString(head.GetError().GetMessage());
return {};
}
auto result = head.GetResultWithOwnership();
HeadInfo headInfo;
headInfo.contentLength = result.GetContentLength();
return headInfo;
}
std::shared_ptr<Request> IOManagerAWS::Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range)
{
std::string id = objectName.empty()? m_objectId : m_objectId + "/" + objectName;
......
......@@ -98,6 +98,7 @@ namespace OpenVDS
IOManagerAWS(const AWSOpenOptions &openOptions, Error &error);
~IOManagerAWS() override;
HeadInfo Head(const std::string &objectName, Error &error, const IORange& range = IORange()) override;
std::shared_ptr<Request> Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range = IORange()) override;
std::shared_ptr<Request> Upload(const std::string objectName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback = nullptr) override;
private:
......
......@@ -307,6 +307,36 @@ IOManagerAzure::~IOManagerAzure()
{
}
HeadInfo IOManagerAzure::Head(const std::string &objectName, Error &error, const IORange& range)
{
azure::storage::blob_request_options local_options;
local_options.set_parallelism_factor(m_options.parallelism_factor()); //example: (4)
local_options.set_maximum_execution_time(m_options.maximum_execution_time()); //example: (std::chrono::milliseconds(10000));
auto blob = m_container.get_block_blob_reference(convertToUtilString(objectName));
HeadInfo ret;
try
{
blob.download_attributes();
ret.contentLength = blob.properties().size();
}
catch (azure::storage::storage_exception & e)
{
std::string code = convertFromUtilString(e.result().extended_error().code());
char *endptr = &code[0] + code.size();
int codeint = strtol(code.c_str(), &endptr, 10);
if (endptr <= &code[0])
codeint = -1;
error.code = codeint;
error.string = convertFromUtilString(e.result().extended_error().message());
return {};
}
assert(false);
return ret;
}
std::shared_ptr<Request> IOManagerAzure::Download(const std::string requestName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range)
{
std::shared_ptr<DownloadRequestAzure> azureRequest = std::make_shared<DownloadRequestAzure>(requestName, handler);
......
......@@ -89,6 +89,7 @@ namespace OpenVDS
IOManagerAzure(const AzureOpenOptions& openOptions, Error& error);
~IOManagerAzure() override;
HeadInfo Head(const std::string &objectName, Error &error, const IORange& range = IORange());
std::shared_ptr<Request> Download(const std::string requestName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range = IORange()) override;
std::shared_ptr<Request> Upload(const std::string requestName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback = nullptr) override;
private:
......
#include "IOManagerInMemory.h"
#include <fmt/format.h>
namespace OpenVDS
{
......@@ -57,6 +59,22 @@ IOManagerInMemory::~IOManagerInMemory()
}
HeadInfo IOManagerInMemory::Head(const std::string &objectName, Error &error, const IORange& range)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto it = m_data.find(objectName);
HeadInfo ret;
if (it == m_data.end())
{
error.code = -2;
error.string = fmt::format("Object {} not found\n", objectName);
ret.contentLength = 0;
return ret;
}
ret.contentLength = it->second.data.size();
return ret;
}
std::shared_ptr<OpenVDS::Request> IOManagerInMemory::Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange &range)
{
auto request = std::make_shared<InMemoryRequest>(objectName);
......
......@@ -41,6 +41,7 @@ namespace OpenVDS
IOManagerInMemory(const InMemoryOpenOptions &openOptions, Error &error);
~IOManagerInMemory() override;
HeadInfo Head(const std::string &objectName, Error &error, const IORange& range = IORange());
std::shared_ptr<Request> Download(const std::string objectName, std::shared_ptr<TransferDownloadHandler> handler, const IORange& range = IORange()) override;
std::shared_ptr<Request> Upload(const std::string objectName, const std::string& contentDispostionFilename, const std::string& contentType, const std::vector<std::pair<std::string, std::string>>& metadataHeader, std::shared_ptr<std::vector<uint8_t>> data, std::function<void(const Request & request, const Error & error)> completedCallback = nullptr) override;
private:
......
......@@ -4,8 +4,9 @@ add_executable(SEGYImport
SEGY.h
SEGY.cpp
SEGYFileInfo.h
SEGYFileInfo.cpp
SEGYFileInfo.cpp
SEGYImport.cpp
DataProvider.h
)
add_executable(SEGYExport
......
/****************************************************************************
** Copyright 2020 The Open Group
** Copyright 2020 Bluware, Inc.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
****************************************************************************/
#ifndef DATA_PROVIDER_H
#define DATA_PROVIDER_H
#include <OpenVDS/OpenVDS.h>
#include "IO/IOManager.h"
#include "IO/File.h"
#include <map>
#include <memory>
#include <mutex>
struct SynchronousDataTransfer : public OpenVDS::TransferDownloadHandler
{
void HandleMetadata(const std::string &key, const std::string &header) override
{
}
void HandleData(std::vector<uint8_t> &&data) override
{
this->data = std::move(data);
}
void Completed(const OpenVDS::Request &request, const OpenVDS::Error &error) override
{
this->error = error;
}
std::vector<uint8_t> data;
OpenVDS::Error error;
};
struct DataProvider
{
DataProvider(OpenVDS::File *file)
: m_file(file)
, m_ioManager(nullptr)
{
}
DataProvider(OpenVDS::IOManager *ioManager, const std::string &objectName)
: m_file(nullptr)
, m_ioManager(ioManager)
, m_objectName(objectName)
{
if (m_ioManager)
{
OpenVDS::Error error;
auto info = m_ioManager->Head(objectName, error);
if (error.code == 0)
{
m_size = info.contentLength;
}
}
}
bool Read(void* data, int64_t offset, int32_t length, OpenVDS::Error& error) const
{
if (m_file)
{
return m_file->Read(data, offset, length, error);
}
else if (m_ioManager)
{
auto dataTransfer = std::make_shared<SynchronousDataTransfer>();
auto request = m_ioManager->Download(m_objectName, dataTransfer, { offset, offset + length});
request->WaitForFinish();
if (dataTransfer->error.code)
{
error = dataTransfer->error;
return false;
}
memcpy(data, dataTransfer->data.data(), std::min(size_t(length), dataTransfer->data.size()));
return true;
}
error.code = -1;
error.string = "Not implemented";
return false;
}
bool Size(OpenVDS::Error &error) const
{
if (m_file)
return m_file->Size(error);
if (m_ioManager)
return m_size;
error.code = -1;
error.string = "Not implemented";
return 0;
}
std::unique_ptr<OpenVDS::File> m_file;
std::unique_ptr<OpenVDS::IOManager> m_ioManager;
const std::string m_objectName;
int64_t m_size;
};
struct DataView
{
DataView(DataProvider &dataProvider, int64_t pos, int64_t size, bool isPopulate, OpenVDS::Error &error)
: m_fileView(nullptr)
, m_pos(0)
, m_size(0)
, m_ref(1)
{
if (dataProvider.m_file)
{
m_fileView = dataProvider.m_file->CreateFileView(pos, size, isPopulate, error);
}
else if (dataProvider.m_ioManager)
{
auto dataTransfer = std::make_shared<SynchronousDataTransfer>();
auto request = dataProvider.m_ioManager->Download(dataProvider.m_objectName, dataTransfer, { pos, pos + size});
request->WaitForFinish();
if (dataTransfer->error.code)
{
error = dataTransfer->error;
return;
}
m_data = std::move(dataTransfer->data);
m_pos = pos;
m_size = size;
}
else
{
error.code = 2;
error.string = "Missing data provider";
}
}
~DataView()
{
if (m_fileView)
OpenVDS::FileView::RemoveReference(m_fileView);
}
const void * Pointer() const
{
if (m_fileView)
return m_fileView->Pointer();
return m_data.data();
}
int64_t Pos() const
{
if (m_fileView)
return m_fileView->Pos();
return m_pos;
}
int64_t Size() const
{
if (m_fileView)
return m_fileView->Size();
return m_size;
}
void ref()
{
m_ref++;
}
bool deref()
{
m_ref--;
return m_ref == 0;
}
OpenVDS::FileView *m_fileView;
std::vector<uint8_t> m_data;
int64_t m_pos;
int64_t m_size;
int m_ref;
};
class DataViewManager
{
public:
DataViewManager(DataProvider &dataProvider)
: m_dataProvider(dataProvider)
{}
std::shared_ptr<DataView> acquireDataView(int64_t pos, int64_t size, bool isPopulate, OpenVDS::Error& error)
{
std::unique_lock<std::mutex> lock(m_mutex);
DataView* dataView = nullptr;
auto key = DataViewMap::key_type(pos, size);
auto it = m_dataViewMap.lower_bound(key);
if (it == m_dataViewMap.end() || it->first != key)
{
auto dataView = new DataView(m_dataProvider, pos, size, isPopulate, error);
it = m_dataViewMap.insert(it, {key, dataView});
}
else
{
it->second->ref();
}
auto ptr = std::shared_ptr<DataView>(it->second, [this](DataView* dataView) { if (dataView) this->releaseDataView(dataView); });
return ptr;
}
private:
typedef std::map<std::pair<int64_t, int64_t>, DataView *> DataViewMap;
std::mutex m_mutex;
DataViewMap m_dataViewMap;
DataProvider &m_dataProvider;
void releaseDataView(DataView *dataView)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (dataView->deref())
{
auto it = m_dataViewMap.find(DataViewMap::key_type(dataView->Pos(), dataView->Size()));
assert(it != m_dataViewMap.end());
delete it->second;
m_dataViewMap.erase(it);
}
}
};
#endif //DATA_PROVIDER_H
......@@ -19,6 +19,7 @@
#include "IO/File.h"
#include "SEGYFileInfo.h"
#include "DataProvider.h"
#include <iostream>
#include <algorithm>
......@@ -100,7 +101,7 @@ SEGYFileInfo::StaticGetUniqueID()
}
bool
SEGYFileInfo::Scan(OpenVDS::File const &file, HeaderField const &primaryKeyHeaderField, HeaderField const &secondaryKeyHeaderField, SEGYBinInfoHeaderFields const &binInfoHeaderFields)
SEGYFileInfo::Scan(DataProvider &dataProvider, HeaderField const &primaryKeyHeaderField, HeaderField const &secondaryKeyHeaderField, SEGYBinInfoHeaderFields const &binInfoHeaderFields)
{
char textualFileHeader[TextualFileHeaderSize];
char binaryFileHeader[BinaryFileHeaderSize];
......@@ -111,8 +112,8 @@ SEGYFileInfo::Scan(OpenVDS::File const &file, HeaderField const &primaryKeyHeade
OpenVDS::Error error;
file.Read(textualFileHeader, 0, TextualFileHeaderSize, error) &&
file.Read(binaryFileHeader, TextualFileHeaderSize, BinaryFileHeaderSize, error);
dataProvider.Read(textualFileHeader, 0, TextualFileHeaderSize, error) &&
dataProvider.Read(binaryFileHeader, TextualFileHeaderSize, BinaryFileHeaderSize, error);
if(error.code != 0)
{
......@@ -121,7 +122,7 @@ SEGYFileInfo::Scan(OpenVDS::File const &file, HeaderField const &primaryKeyHeade
m_dataSampleFormatCode = BinaryHeader::DataSampleFormatCode(ReadFieldFromHeader(binaryFileHeader, BinaryHeader::DataSampleFormatCodeHeaderField, m_headerEndianness));
int64_t fileSize = file.Size(error);
int64_t fileSize = dataProvider.Size(error);
if(error.code != 0)
{
......@@ -134,7 +135,7 @@ SEGYFileInfo::Scan(OpenVDS::File const &file, HeaderField const &primaryKeyHeade
}
// Read first trace header
file.Read(traceHeader, TextualFileHeaderSize + BinaryFileHeaderSize, TraceHeaderSize, error);
dataProvider.Read(traceHeader, TextualFileHeaderSize + BinaryFileHeaderSize, TraceHeaderSize, error);
if(error.code != 0)
{
......@@ -188,7 +189,7 @@ SEGYFileInfo::Scan(OpenVDS::File const &file, HeaderField const &primaryKeyHeade
while(segmentInfo.m_traceStop != lastTrace)
{
file.Read(traceHeader, TextualFileHeaderSize + BinaryFileHeaderSize + trace * TraceByteSize(), TraceHeaderSize, error);
dataProvider.Read(traceHeader, TextualFileHeaderSize + BinaryFileHeaderSize + trace * TraceByteSize(), TraceHeaderSize, error);
if(error.code != 0)
{
......
......@@ -16,6 +16,7 @@
****************************************************************************/
#include "SEGY.h"
#include "DataProvider.h"
#include <cstdint>
#include <vector>
......@@ -96,5 +97,5 @@ struct SEGYFileInfo
int TraceByteSize() const;
bool Scan(OpenVDS::File const &file, SEGY::HeaderField const &primaryKeyHeaderField, SEGY::HeaderField const &secondaryKeyHeaderField = SEGY::HeaderField(), SEGYBinInfoHeaderFields const &binInfoHeaderFields = SEGYBinInfoHeaderFields::StandardHeaderFields());
bool Scan(DataProvider &dataprovider, SEGY::HeaderField const &primaryKeyHeaderField, SEGY::HeaderField const &secondaryKeyHeaderField = SEGY::HeaderField(), SEGYBinInfoHeaderFields const &binInfoHeaderFields = SEGYBinInfoHeaderFields::StandardHeaderFields());
};
......@@ -18,6 +18,7 @@
#include "SEGYFileInfo.h"
#include "IO/File.h"
#include "VDS/Hash.h"
#include "DataProvider.h"
#include <OpenVDS/OpenVDS.h>
#include <OpenVDS/KnownMetadata.h>
......@@ -41,6 +42,22 @@
#include <chrono>
static DataProvider createDataProviderFromFile(const std::string &filename, OpenVDS::Error &error)
{
std::unique_ptr<OpenVDS::File> file(new OpenVDS::File());
if (!file->Open(filename, false, false, false, error))
return DataProvider((OpenVDS::File *)nullptr);
return DataProvider(file.release());
}
static DataProvider createDataProviderFromOpenOptions(const OpenVDS::OpenOptions &openoptions, const std::string &objectId, OpenVDS::Error &error)
{
std::unique_ptr<OpenVDS::IOManager> ioManager(OpenVDS::IOManager::CreateIOManager(openoptions, error));
if (error.code)
return DataProvider((OpenVDS::IOManager *)nullptr, "");
return DataProvider(ioManager.release(), objectId);
}
Json::Value
SerializeSEGYBinInfo(SEGYBinInfo const& binInfo)
{
......@@ -377,7 +394,7 @@ findRepresentativeSegment(SEGYFileInfo const& fileInfo)
}
bool
analyzeSegment(OpenVDS::File const& file, SEGYFileInfo const& fileInfo, SEGYSegmentInfo const& segmentInfo, float valueRangePercentile, OpenVDS::FloatRange& valueRange, OpenVDS::Error& error)
analyzeSegment(DataProvider &dataProvider, SEGYFileInfo const& fileInfo, SEGYSegmentInfo const& segmentInfo, float valueRangePercentile, OpenVDS::FloatRange& valueRange, OpenVDS::Error& error)
{
int traceByteSize = fileInfo.TraceByteSize();
......@@ -387,7 +404,7 @@ analyzeSegment(OpenVDS::File const& file, SEGYFileInfo const& fileInfo, SEGYSegm
std::unique_ptr<char[]> buffer(new char[(segmentInfo.m_traceStop - segmentInfo.m_traceStart) * traceByteSize]);
file.Read(buffer.get(), offset, traceCount * traceByteSize, error);
dataProvider.Read(buffer.get(), offset, traceCount * traceByteSize, error);
if (error.code != 0)
{
......@@ -482,14 +499,14 @@ analyzeSegment(OpenVDS::File const& file, SEGYFileInfo const& fileInfo, SEGYSegm
}
bool
createSEGYHeadersMetadata(OpenVDS::File const& file, OpenVDS::MetadataContainer& metadataContainer, OpenVDS::Error& error)
createSEGYHeadersMetadata(DataProvider &dataProvider, OpenVDS::MetadataContainer& metadataContainer, OpenVDS::Error& error)
{
std::vector<uint8_t> textHeader(SEGY::TextualFileHeaderSize);
std::vector<uint8_t> binaryHeader(SEGY::BinaryFileHeaderSize);
// Read headers
bool success = file.Read(textHeader.data(), 0, SEGY::TextualFileHeaderSize, error) &&
file.Read(binaryHeader.data(), SEGY::TextualFileHeaderSize, SEGY::BinaryFileHeaderSize, error);
bool success = dataProvider.Read(textHeader.data(), 0, SEGY::TextualFileHeaderSize, error) &&
dataProvider.Read(binaryHeader.data(), SEGY::TextualFileHeaderSize, SEGY::BinaryFileHeaderSize, error);
if (!success) return false;
......@@ -708,55 +725,6 @@ createChannelDescriptors(SEGYFileInfo const& fileInfo, OpenVDS::FloatRange const
return channelDescriptors;
}
class FileViewManager
{
typedef std::map<std::pair<int64_t, int64_t>, OpenVDS::FileView*> FileViewMap;
std::mutex m_mutex;
FileViewMap m_fileViewMap;
OpenVDS::File& m_file;
void releaseFileView(OpenVDS::FileView* fileView)
{
std::unique_lock<std::mutex> lock(m_mutex);
auto it = m_fileViewMap.find(FileViewMap::key_type(fileView->Pos(), fileView->Size()));
assert(it != m_fileViewMap.end());
if (OpenVDS::FileView::RemoveReference(fileView))