Commit b28f45e0 authored by Mark Hewitt's avatar Mark Hewitt
Browse files

dataload ingest - support sequence file for ordered loading (ref. standard reference-data)

parent b5985575
Pipeline #73887 passed with stages
in 2 minutes and 25 seconds
......@@ -28,13 +28,18 @@ For more information, specify the `-h` flag:
Change Log
==========
0.0.20
------
- dataload support sequence file for ordered loading (ref. standard reference-data)
0.0.19
------
- schema add --overwrite-existing option
- merge dataload checkrefs code (wip)
- user friendly output mode
- updated dataload helper text
- improved dataload helper text
0.0.18
------
......
......@@ -12,4 +12,4 @@
""" OSDU command line environment"""
__VERSION__ = "0.0.19"
__VERSION__ = "0.0.20"
......@@ -44,7 +44,7 @@ VERIFY_BATCH_SIZE = 200
@click.option(
"-p",
"--path",
help="Path to a manifest file or files to ingest.",
help="Path to a a sequence file, manifest file or folder with manifest files to ingest.",
type=click.Path(exists=True, file_okay=True, dir_okay=True, readable=True, resolve_path=True),
required=True,
)
......@@ -118,7 +118,14 @@ def ingest(
logger.debug("Files list: %s", files)
runids = _ingest_files(
state.config, manifest_files, files, runid_log, batch_size, wait, skip_existing, simulate
state.config,
manifest_files,
files,
runid_log,
batch_size,
wait,
skip_existing,
simulate,
)
print(runids)
return runids
......@@ -139,21 +146,44 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
for filepath in manifest_files:
if filepath.endswith(".json"):
with open(filepath) as file:
manifest = json.load(file)
# Note this code currently assumes only one of MasterData, ReferenceData or Data exists!
json_string = file.read()
# for reference data do replacements (acl, legal happens later)
json_string = json_string.replace(
"{{NAMESPACE}}", config.get("core", CONFIG_DATA_PARTITION_ID)
)
manifest = json.loads(json_string)
if not manifest:
logger.error("Error with file %s. File is empty.", filepath)
else:
logger.info("Processing %s.", filepath)
if isinstance(manifest, list):
_ingest_json_as_sequence_file(
config,
files,
runid_log,
batch_size,
skip_existing,
simulate,
runids,
manifest,
)
else:
# Note this code currently assumes only one of MasterData, ReferenceData or Data exists!
if "ReferenceData" in manifest and len(manifest["ReferenceData"]) > 0:
_update_legal_and_acl_tags_all(config, manifest["ReferenceData"])
if batch_size is None and not skip_existing:
_create_and_submit(config, manifest, runids, runid_log_handle, simulate)
_create_and_submit(
config, manifest, runids, runid_log_handle, simulate
)
else:
data_objects += manifest["ReferenceData"]
file_batch_size = (
len(data_objects) if skip_existing and not batch_size else batch_size
len(data_objects)
if skip_existing and not batch_size
else batch_size
)
data_objects = _process_batch(
config,
......@@ -168,11 +198,15 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
elif "MasterData" in manifest and len(manifest["MasterData"]) > 0:
_update_legal_and_acl_tags_all(config, manifest["MasterData"])
if batch_size is None and not skip_existing:
_create_and_submit(config, manifest, runids, runid_log_handle, simulate)
_create_and_submit(
config, manifest, runids, runid_log_handle, simulate
)
else:
data_objects += manifest["MasterData"]
file_batch_size = (
len(data_objects) if skip_existing and not batch_size else batch_size
len(data_objects)
if skip_existing and not batch_size
else batch_size
)
data_objects = _process_batch(
config,
......@@ -185,8 +219,20 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
simulate,
)
elif "Data" in manifest:
_update_work_products_metadata(config, manifest["Data"], files, simulate)
_create_and_submit(config, manifest, runids, runid_log_handle, simulate)
_update_work_products_metadata(
config, manifest["Data"], files, simulate
)
_create_and_submit(
config, manifest, runids, runid_log_handle, simulate
)
else:
logger.error(
"No manifest ReferenceData, MasterData or Data section found in %s.",
filepath,
)
else:
logger.warning("Skipping %s - no .json extension.", filepath)
finally:
if runid_log_handle is not None:
runid_log_handle.close()
......@@ -197,6 +243,30 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
return runids
def _ingest_json_as_sequence_file(
config, files, runid_log, batch_size, skip_existing, simulate, runids, sequence_file
):
logger.info(
"Processing as sequence file. Will wait for each entry to complete before submitting new."
)
if all(isinstance(entry, dict) and "FileName" in entry for entry in sequence_file):
for entry in sequence_file:
_sequence_run_ids = _ingest_files(
config,
[entry["FileName"]],
files,
runid_log,
batch_size,
True,
skip_existing,
simulate,
)
runids.extend(_sequence_run_ids)
else:
logger.error("Invalid sequence file.")
def _process_batch(
config, batch_size, data_type, data_objects, runids, runid_log_handle, skip_existing, simulate
):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment