Issue with two parallel processes writing to same VDS source.

I was trying to implement workflow of multiple parallel processes which are writing to separate chunks of the same VDS source. Unfortunately I encountered some issues in the moment of reading modified VDS source.

Here is code example that I am running

from multiprocessing import Process
import openvds
import numpy as np


def unlock_dataset(sd_path):
    "Disclosed implementation"
    return


def write_zero_pages(accessor):
    chunks_count = accessor.getChunkCount()
    for c in range(chunks_count):
        page = accessor.createPage(c)
        buf = np.array(page.getWritableBuffer(), copy=False)
        buf[:, :, :] = np.zeros(buf.shape, dtype=float)
        page.release()
    accessor.commit()


def create_vds(
    path,
    connection_string,
    shape=None,
    databrick_size=openvds.VolumeDataLayoutDescriptor.BrickSize.BrickSize_128,
    access_mode=openvds.IVolumeDataAccessManager.AccessMode.AccessMode_Create,
    components=openvds.VolumeDataChannelDescriptor.Components.Components_1,
    format=openvds.VolumeDataChannelDescriptor.Format.Format_R32,
    create_and_write_pages=True,
):
    layout_descriptor = openvds.VolumeDataLayoutDescriptor(
        brickSize=databrick_size,
        lodLevels=openvds.VolumeDataLayoutDescriptor.LODLevels.LODLevels_1,
        brickSize2DMultiplier=4,
        options=openvds.VolumeDataLayoutDescriptor.Options.Options_None,
        negativeMargin=0,
        positiveMargin=0,
        fullResolutionDimension=0,
    )
    metadata_container = openvds.MetadataContainer()
    axis_descriptors = []

    for i, size in enumerate(shape):
        axis_descriptors.append(
            openvds.VolumeDataAxisDescriptor(
                size,
                f"X{i}",
                "unitless",
                -1000.0,
                1000.0,
            )
        )

    channel_descriptors = [
        openvds.VolumeDataChannelDescriptor(
            format=format,
            components=components,
            name=f"Channel0",
            unit="unitless",
            valueRangeMin=0.0,
            valueRangeMax=1000.0,
        )
    ]
    vds = openvds.create(
        path,
        connection_string,
        layout_descriptor,
        axis_descriptors,
        channel_descriptors,
        metadata_container,
    )
    access_manager = openvds.getAccessManager(vds)
    accessor = access_manager.createVolumeDataPageAccessor(
        dimensionsND=openvds.DimensionsND.Dimensions_012,
        accessMode=access_mode,
        lod=0,
        channel=0,
        maxPages=8,
        chunkMetadataPageSize=1024,
    )
    chunks_count = accessor.getChunkCount()

    if create_and_write_pages:
        write_zero_pages(accessor)

    openvds.close(vds)

    return chunks_count


def writing_process(path, connection_string, chunks_range, number):
    vds = openvds.open(path, connection_string)

    manager = openvds.getAccessManager(vds)
    accessor = manager.createVolumeDataPageAccessor(
        dimensionsND=openvds.DimensionsND.Dimensions_012,
        lod=0,
        channel=0,
        maxPages=8,
        accessMode=openvds.IVolumeDataAccessManager.AccessMode.AccessMode_ReadWrite,
        chunkMetadataPageSize=1024,
    )

    for c in range(chunks_range[0], chunks_range[1]):
        page = accessor.createPage(c)
        buf = np.array(page.getWritableBuffer(), copy=False)
        buf[:, :, :] = np.reshape(np.array([float(number)] * buf.size), buf.shape)
        page.release()
        accessor.commit()

    # openvds.close(vds)


def get_data(path, connection_string):
    with openvds.open(path, connection_string) as vds_source:
        layout = openvds.getLayout(vds_source)

        axis_descriptors = [
            layout.getAxisDescriptor(dim) for dim in range(layout.getDimensionality())
        ]

        begin_slice = [0, 0, 0, 0, 0, 0]

        end_slice = (
            int(axis_descriptors[0].numSamples),
            int(axis_descriptors[1].numSamples),
            int(axis_descriptors[2].numSamples),
            1,
            1,
            1,
        )

        accessManager = openvds.VolumeDataAccessManager(vds_source)
        req = accessManager.requestVolumeSubset(
            begin_slice,  # start slice
            end_slice,  # end slice
            format=openvds.VolumeDataChannelDescriptor.Format.Format_R32,
            lod=0,
            replacementNoValue=0.0,
            channel=0,
        )

        if req.data is None:
            err_code, err_msg = accessManager.getCurrentDownloadError()
            print(err_code)
            print(err_msg)
            raise RuntimeError("requestVolumeSubset failed!")

        dims = (
            end_slice[2] - begin_slice[2],
            end_slice[1] - begin_slice[1],
            end_slice[0] - begin_slice[0],
        )
        return req.data.reshape(*dims)


if __name__ == "__main__":
    path = "sd://osdu/example/dataset-4"
    connection_string = "sd_authority_url=https://example.com/api/seismic-store/v3;sd_api_key=xxx;auth_token_url=https://example.com/oauth2/token;sdtoken=SDTOKEN;client_id=CLIENTID;refresh_token=REFRESH_TOKEN;scopes=openid email;LogLevel=100"
    numer_of_processes = 2
    processes = []
    chunks_ranges = []
    chunks_count = create_vds(path, connection_string, shape=(512, 512, 512))
    a = chunks_count // numer_of_processes
    r = chunks_count % numer_of_processes

    for i in range(numer_of_processes):
        if i == numer_of_processes - 1:
            chunks_ranges.append((i * a, (i + 1) * a + r))
        else:
            chunks_ranges.append((i * a, (i + 1) * a))

    print(chunks_ranges)
    unlock_dataset(path)

    for i, chunks_range in enumerate(chunks_ranges):
        p = Process(
            target=writing_process,
            args=(
                path,
                connection_string,
                chunks_range,
                i,
            ),
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("finished")
    vds = openvds.open(path, connection_string)
    openvds.close(vds)

    data = get_data(path, connection_string)

code.py

Here is output

-- sdapi 3.14.0 - Fri Feb  4 13:40:40 2022 -- Write Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.750 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:41 2022 -- Write Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.716 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:41 2022 -- Write Block LayerStatus --- 0.730 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:41 2022 -- Write Block LayerStatus --- 0.737 s
finished
-- sdapi 3.14.0 - Fri Feb  4 13:40:43 2022 -- Open Dataset sd://osdu/mergingtests4/dataset-4 in ReadOnly mode --- 1.722 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:45 2022 -- Get Block Size --- 1.320 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:45 2022 -- Read Block VolumeDataLayout --- 0.599 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:46 2022 -- Get Block Size --- 0.590 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:46 2022 -- Read Block LayerStatus --- 0.596 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:47 2022 -- Close Dataset sd://osdu/mergingtests4/dataset-4 --- 0.412 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:48 2022 -- Open Dataset sd://osdu/mergingtests4/dataset-4 in ReadOnly mode --- 1.272 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:49 2022 -- Get Block Size --- 0.592 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:49 2022 -- Read Block VolumeDataLayout --- 0.575 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:50 2022 -- Get Block Size --- 0.643 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:50 2022 -- Read Block LayerStatus --- 0.579 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:51 2022 -- Get Block Size --- 0.582 s
-- sdapi 3.14.0 - Fri Feb  4 13:40:52 2022 -- Read Block Dimensions_012LOD0/ChunkMetadata/0 --- 0.600 s
-1
Missing data for chunk: Dimensions_012LOD0/0
-- sdapi 3.14.0 - Fri Feb  4 13:40:52 2022 -- Close Dataset sd://osdu/mergingtests4/dataset-4 --- 0.355 s
Traceback (most recent call last):
  File "simple.py", line 208, in <module>
    data = get_data(path, connection_string)
  File "simple.py", line 159, in get_data
    raise RuntimeError("requestVolumeSubset failed!")
RuntimeError: requestVolumeSubset failed!

logs_from_sd_path.log

I executed code using seismic store and s3 with the same result. Looks like data are getting corupted when multiple processes writing to the same VDS source.

Can I receive some guidance on this problem? Is my implementation bad or there is something wrong inside OpenVDS?

I am using python lib openvds 2.2.0 and sd api 3.14.0

Edited Feb 04, 2022 by Michał Murawski
Assignee Loading
Time tracking Loading