Issue with two parallel processes writing to same VDS source.
I was trying to implement workflow of multiple parallel processes which are writing to separate chunks of the same VDS source. Unfortunately I encountered some issues in the moment of reading modified VDS source.
Here is code example that I am running
from multiprocessing import Process
import openvds
import numpy as np
def unlock_dataset(sd_path):
"Disclosed implementation"
return
def write_zero_pages(accessor):
chunks_count = accessor.getChunkCount()
for c in range(chunks_count):
page = accessor.createPage(c)
buf = np.array(page.getWritableBuffer(), copy=False)
buf[:, :, :] = np.zeros(buf.shape, dtype=float)
page.release()
accessor.commit()
def create_vds(
path,
connection_string,
shape=None,
databrick_size=openvds.VolumeDataLayoutDescriptor.BrickSize.BrickSize_128,
access_mode=openvds.IVolumeDataAccessManager.AccessMode.AccessMode_Create,
components=openvds.VolumeDataChannelDescriptor.Components.Components_1,
format=openvds.VolumeDataChannelDescriptor.Format.Format_R32,
create_and_write_pages=True,
):
layout_descriptor = openvds.VolumeDataLayoutDescriptor(
brickSize=databrick_size,
lodLevels=openvds.VolumeDataLayoutDescriptor.LODLevels.LODLevels_1,
brickSize2DMultiplier=4,
options=openvds.VolumeDataLayoutDescriptor.Options.Options_None,
negativeMargin=0,
positiveMargin=0,
fullResolutionDimension=0,
)
metadata_container = openvds.MetadataContainer()
axis_descriptors = []
for i, size in enumerate(shape):
axis_descriptors.append(
openvds.VolumeDataAxisDescriptor(
size,
f"X{i}",
"unitless",
-1000.0,
1000.0,
)
)
channel_descriptors = [
openvds.VolumeDataChannelDescriptor(
format=format,
components=components,
name=f"Channel0",
unit="unitless",
valueRangeMin=0.0,
valueRangeMax=1000.0,
)
]
vds = openvds.create(
path,
connection_string,
layout_descriptor,
axis_descriptors,
channel_descriptors,
metadata_container,
)
access_manager = openvds.getAccessManager(vds)
accessor = access_manager.createVolumeDataPageAccessor(
dimensionsND=openvds.DimensionsND.Dimensions_012,
accessMode=access_mode,
lod=0,
channel=0,
maxPages=8,
chunkMetadataPageSize=1024,
)
chunks_count = accessor.getChunkCount()
if create_and_write_pages:
write_zero_pages(accessor)
openvds.close(vds)
return chunks_count
def writing_process(path, connection_string, chunks_range, number):
vds = openvds.open(path, connection_string)
manager = openvds.getAccessManager(vds)
accessor = manager.createVolumeDataPageAccessor(
dimensionsND=openvds.DimensionsND.Dimensions_012,
lod=0,
channel=0,
maxPages=8,
accessMode=openvds.IVolumeDataAccessManager.AccessMode.AccessMode_ReadWrite,
chunkMetadataPageSize=1024,
)
for c in range(chunks_range[0], chunks_range[1]):
page = accessor.createPage(c)
buf = np.array(page.getWritableBuffer(), copy=False)
buf[:, :, :] = np.reshape(np.array([float(number)] * buf.size), buf.shape)
page.release()
accessor.commit()
# openvds.close(vds)
def get_data(path, connection_string):
with openvds.open(path, connection_string) as vds_source:
layout = openvds.getLayout(vds_source)
axis_descriptors = [
layout.getAxisDescriptor(dim) for dim in range(layout.getDimensionality())
]
begin_slice = [0, 0, 0, 0, 0, 0]
end_slice = (
int(axis_descriptors[0].numSamples),
int(axis_descriptors[1].numSamples),
int(axis_descriptors[2].numSamples),
1,
1,
1,
)
accessManager = openvds.VolumeDataAccessManager(vds_source)
req = accessManager.requestVolumeSubset(
begin_slice, # start slice
end_slice, # end slice
format=openvds.VolumeDataChannelDescriptor.Format.Format_R32,
lod=0,
replacementNoValue=0.0,
channel=0,
)
if req.data is None:
err_code, err_msg = accessManager.getCurrentDownloadError()
print(err_code)
print(err_msg)
raise RuntimeError("requestVolumeSubset failed!")
dims = (
end_slice[2] - begin_slice[2],
end_slice[1] - begin_slice[1],
end_slice[0] - begin_slice[0],
)
return req.data.reshape(*dims)
if __name__ == "__main__":
path = "sd://osdu/example/dataset-4"
connection_string = "sd_authority_url=https://example.com/api/seismic-store/v3;sd_api_key=xxx;auth_token_url=https://example.com/oauth2/token;sdtoken=SDTOKEN;client_id=CLIENTID;refresh_token=REFRESH_TOKEN;scopes=openid email;LogLevel=100"
numer_of_processes = 2
processes = []
chunks_ranges = []
chunks_count = create_vds(path, connection_string, shape=(512, 512, 512))
a = chunks_count // numer_of_processes
r = chunks_count % numer_of_processes
for i in range(numer_of_processes):
if i == numer_of_processes - 1:
chunks_ranges.append((i * a, (i + 1) * a + r))
else:
chunks_ranges.append((i * a, (i + 1) * a))
print(chunks_ranges)
unlock_dataset(path)
for i, chunks_range in enumerate(chunks_ranges):
p = Process(
target=writing_process,
args=(
path,
connection_string,
chunks_range,
i,
),
)
processes.append(p)
p.start()
for p in processes:
p.join()
print("finished")
vds = openvds.open(path, connection_string)
openvds.close(vds)
data = get_data(path, connection_string)
Here is output
-- sdapi 3.14.0 - Fri Feb 4 13:40:40 2022 -- Write Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.750 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:41 2022 -- Write Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.716 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:41 2022 -- Write Block LayerStatus --- 0.730 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:41 2022 -- Write Block LayerStatus --- 0.737 s
finished
-- sdapi 3.14.0 - Fri Feb 4 13:40:43 2022 -- Open Dataset sd://osdu/mergingtests4/dataset-4 in ReadOnly mode --- 1.722 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:45 2022 -- Get Block Size --- 1.320 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:45 2022 -- Read Block VolumeDataLayout --- 0.599 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:46 2022 -- Get Block Size --- 0.590 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:46 2022 -- Read Block LayerStatus --- 0.596 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:47 2022 -- Close Dataset sd://osdu/mergingtests4/dataset-4 --- 0.412 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:48 2022 -- Open Dataset sd://osdu/mergingtests4/dataset-4 in ReadOnly mode --- 1.272 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:49 2022 -- Get Block Size --- 0.592 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:49 2022 -- Read Block VolumeDataLayout --- 0.575 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:50 2022 -- Get Block Size --- 0.643 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:50 2022 -- Read Block LayerStatus --- 0.579 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:51 2022 -- Get Block Size --- 0.582 s
-- sdapi 3.14.0 - Fri Feb 4 13:40:52 2022 -- Read Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.600 s
-1
Missing data for chunk: Dimensions_012LOD0/0
-- sdapi 3.14.0 - Fri Feb 4 13:40:52 2022 -- Close Dataset sd://osdu/mergingtests4/dataset-4 --- 0.355 s
Traceback (most recent call last):
File "simple.py", line 208, in <module>
data = get_data(path, connection_string)
File "simple.py", line 159, in get_data
raise RuntimeError("requestVolumeSubset failed!")
RuntimeError: requestVolumeSubset failed!
I executed code using seismic store and s3 with the same result. Looks like data are getting corupted when multiple processes writing to the same VDS source.
Can I receive some guidance on this problem? Is my implementation bad or there is something wrong inside OpenVDS?
I am using python lib openvds 2.2.0 and sd api 3.14.0