Commit 468b99ed authored by Mark Hewitt's avatar Mark Hewitt
Browse files

Merge branch 'feature/ingest-improvements' into 'main'

- dataload ingest added options for passing legal tags and acl

See merge request !14
parents 630eb9e7 0d5e909d
Pipeline #93698 passed with stages
in 3 minutes and 9 seconds
...@@ -28,6 +28,12 @@ For more information, specify the `-h` flag: ...@@ -28,6 +28,12 @@ For more information, specify the `-h` flag:
Change Log Change Log
========== ==========
0.0.32
------
- dataload ingest added options for passing legal tags and acl
- correct CRS Converter Service naming
0.0.31 0.0.31
------ ------
......
...@@ -12,4 +12,4 @@ ...@@ -12,4 +12,4 @@
""" OSDU command line environment""" """ OSDU command line environment"""
__VERSION__ = "0.0.31" __VERSION__ = "0.0.32"
...@@ -17,6 +17,6 @@ CRS_CATALOG_SERVICE_NAME = "CRS Catalog service" ...@@ -17,6 +17,6 @@ CRS_CATALOG_SERVICE_NAME = "CRS Catalog service"
CRS_CATALOG_STATUS_PATH = "../_ah/readiness_check" CRS_CATALOG_STATUS_PATH = "../_ah/readiness_check"
CRS_CATALOG_SWAGGER_PATH = "../swagger-ui.html" CRS_CATALOG_SWAGGER_PATH = "../swagger-ui.html"
CRS_CONVERTER_SERVICE_NAME = "CRS Catalog service" CRS_CONVERTER_SERVICE_NAME = "CRS Converter service"
CRS_CONVERTER_STATUS_PATH = "../_ah/readiness_check" CRS_CONVERTER_STATUS_PATH = "../_ah/readiness_check"
CRS_CONVERTER_SWAGGER_PATH = "../swagger-ui.html" CRS_CONVERTER_SWAGGER_PATH = "../swagger-ui.html"
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
import json import json
import os import os
import re
from typing import List
import click import click
import requests import requests
...@@ -44,7 +46,7 @@ VERIFY_BATCH_SIZE = 200 ...@@ -44,7 +46,7 @@ VERIFY_BATCH_SIZE = 200
@click.option( @click.option(
"-p", "-p",
"--path", "--path",
help="Path to a a sequence file, manifest file or folder with manifest files to ingest.", help="Path to a sequence file, manifest file or folder with manifest files to ingest.",
type=click.Path(exists=True, file_okay=True, dir_okay=True, readable=True, resolve_path=True), type=click.Path(exists=True, file_okay=True, dir_okay=True, readable=True, resolve_path=True),
required=True, required=True,
) )
...@@ -52,8 +54,8 @@ VERIFY_BATCH_SIZE = 200 ...@@ -52,8 +54,8 @@ VERIFY_BATCH_SIZE = 200
@click.option( @click.option(
"-b", "-b",
"--batch", "--batch",
help="Batch size (per file). If not specified no batching is performed.", help="Batch size (per file). If not specified manifests are uploaded as is.",
is_flag=False, is_flag=True,
flag_value=200, flag_value=200,
type=int, type=int,
default=None, default=None,
...@@ -76,6 +78,46 @@ VERIFY_BATCH_SIZE = 200 ...@@ -76,6 +78,46 @@ VERIFY_BATCH_SIZE = 200
show_default=True, show_default=True,
) )
@click.option("--simulate", help="Simulate ingestion only.", is_flag=True, show_default=True) @click.option("--simulate", help="Simulate ingestion only.", is_flag=True, show_default=True)
# @click.option(
# "-a",
# "--authority",
# help="Schema authority to use when generating manifest files.",
# default="osdu",
# show_default=True,
# required=False,
# )
# @click.option(
# "-d",
# "--data-partition",
# help="A data partition name to use when generating manifest files. If not specified the default value is used.",
# )
@click.option(
"-l",
"--legal-tags",
help="Rewrite existing legal tags. Specify either a comma seperated list of values or without a value to use the default value from config.", # noqa: E501 pylint: disable=C0301
is_flag=False,
flag_value="",
default=None,
show_default=True,
)
@click.option(
"-aclo",
"--acl-owners",
help="Rewrite existing acl owners. Specify either a comma seperated list of values or without a value to use the default value from config.", # noqa: E501 pylint: disable=C0301
is_flag=False,
flag_value="",
default=None,
show_default=True,
)
@click.option(
"-aclv",
"--acl-viewers",
help="Rewrite existing acl viewers. Specify either a comma seperated list of values or without a value to use the default value from config.", # noqa: E501 pylint: disable=C0301
is_flag=False,
flag_value="",
default=None,
show_default=True,
)
@handle_cli_exceptions @handle_cli_exceptions
@command_with_output(None) @command_with_output(None)
def _click_command( def _click_command(
...@@ -87,13 +129,32 @@ def _click_command( ...@@ -87,13 +129,32 @@ def _click_command(
wait: bool = False, wait: bool = False,
skip_existing: str = False, skip_existing: str = False,
simulate: bool = False, simulate: bool = False,
# authority: str = None,
# data_partition: str = None,
legal_tags: str = None,
acl_owners: str = None,
acl_viewers: str = None,
): ):
"""Ingest manifest files into OSDU. """Ingest manifest files into OSDU.
This command will take existing manfiest files and load them into OSDU via the workflow This command will take existing manfiest files and load them into OSDU via the workflow
service and Airflow. 'legal' and 'acl' tags will be updated based upon the current service and Airflow. 'legal' and 'acl' tags will be updated based upon the current
configuration""" configuration"""
return ingest(state, path, files, batch, runid_log, wait, skip_existing, simulate) return ingest(
state,
path,
files,
batch,
runid_log,
wait,
skip_existing,
simulate,
# authority,
# data_partition,
None if legal_tags is None else [] if legal_tags == "" else legal_tags.split(","),
None if acl_owners is None else [] if acl_owners == "" else acl_owners.split(","),
None if acl_viewers is None else [] if acl_viewers == "" else acl_viewers.split(","),
)
def ingest( def ingest(
...@@ -105,6 +166,11 @@ def ingest( ...@@ -105,6 +166,11 @@ def ingest(
wait: bool = False, wait: bool = False,
skip_existing: bool = False, skip_existing: bool = False,
simulate: bool = False, simulate: bool = False,
# authority: str = None,
# data_partition: str = None,
legal_tags: List[str] = None,
acl_owners: List[str] = None,
acl_viewers: List[str] = None,
) -> dict: ) -> dict:
"""Ingest manifest files into OSDU """Ingest manifest files into OSDU
...@@ -126,13 +192,26 @@ def ingest( ...@@ -126,13 +192,26 @@ def ingest(
wait, wait,
skip_existing, skip_existing,
simulate, simulate,
legal_tags,
acl_owners,
acl_viewers,
) )
print(runids) print(runids)
return runids return runids
def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
config: CLIConfig, manifest_files, files, runid_log, batch_size, wait, skip_existing, simulate config: CLIConfig,
manifest_files,
files,
runid_log,
batch_size,
wait,
skip_existing,
simulate,
legal_tags,
acl_owners,
acl_viewers,
): ):
logger.debug("Files list: %s", manifest_files) logger.debug("Files list: %s", manifest_files)
runids = [] runids = []
...@@ -151,14 +230,14 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks ...@@ -151,14 +230,14 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
json_string = json_string.replace( json_string = json_string.replace(
"{{NAMESPACE}}", config.get("core", CONFIG_DATA_PARTITION_ID) "{{NAMESPACE}}", config.get("core", CONFIG_DATA_PARTITION_ID)
) )
manifest = json.loads(json_string) json_data = json.loads(json_string)
if not manifest: if not json_data:
logger.error("Error with file %s. File is empty.", filepath) logger.error("Error with file %s. File is empty.", filepath)
else: else:
logger.info("Processing %s.", filepath) logger.info("Processing %s.", filepath)
if isinstance(manifest, list): if isinstance(json_data, list):
_ingest_json_as_sequence_file( _ingest_json_as_sequence_file(
config, config,
files, files,
...@@ -167,19 +246,28 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks ...@@ -167,19 +246,28 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
skip_existing, skip_existing,
simulate, simulate,
runids, runids,
manifest, json_data,
legal_tags,
acl_owners,
acl_viewers,
) )
else: else:
# Note this code currently assumes only one of MasterData, ReferenceData or Data exists! # Note this code currently assumes only one of MasterData, ReferenceData or Data exists!
if "ReferenceData" in manifest and len(manifest["ReferenceData"]) > 0: if "ReferenceData" in json_data and len(json_data["ReferenceData"]) > 0:
_update_legal_and_acl_tags_all(config, manifest["ReferenceData"]) _update_legal_and_acl_tags_all(
config,
json_data["ReferenceData"],
legal_tags,
acl_owners,
acl_viewers,
)
if batch_size is None and not skip_existing: if batch_size is None and not skip_existing:
_create_and_submit( _create_and_submit(
config, manifest, runids, runid_log_handle, simulate config, json_data, runids, runid_log_handle, simulate
) )
else: else:
data_objects += manifest["ReferenceData"] data_objects += json_data["ReferenceData"]
file_batch_size = ( file_batch_size = (
len(data_objects) len(data_objects)
if skip_existing and not batch_size if skip_existing and not batch_size
...@@ -195,14 +283,20 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks ...@@ -195,14 +283,20 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
skip_existing, skip_existing,
simulate, simulate,
) )
elif "MasterData" in manifest and len(manifest["MasterData"]) > 0: elif "MasterData" in json_data and len(json_data["MasterData"]) > 0:
_update_legal_and_acl_tags_all(config, manifest["MasterData"]) _update_legal_and_acl_tags_all(
config,
json_data["MasterData"],
legal_tags,
acl_owners,
acl_viewers,
)
if batch_size is None and not skip_existing: if batch_size is None and not skip_existing:
_create_and_submit( _create_and_submit(
config, manifest, runids, runid_log_handle, simulate config, json_data, runids, runid_log_handle, simulate
) )
else: else:
data_objects += manifest["MasterData"] data_objects += json_data["MasterData"]
file_batch_size = ( file_batch_size = (
len(data_objects) len(data_objects)
if skip_existing and not batch_size if skip_existing and not batch_size
...@@ -218,12 +312,18 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks ...@@ -218,12 +312,18 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
skip_existing, skip_existing,
simulate, simulate,
) )
elif "Data" in manifest: elif "Data" in json_data:
_update_work_products_metadata( _update_work_products_metadata(
config, manifest["Data"], files, simulate config,
json_data["Data"],
files,
simulate,
legal_tags,
acl_owners,
acl_viewers,
) )
_create_and_submit( _create_and_submit(
config, manifest, runids, runid_log_handle, simulate config, json_data, runids, runid_log_handle, simulate
) )
else: else:
logger.error( logger.error(
...@@ -244,7 +344,17 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks ...@@ -244,7 +344,17 @@ def _ingest_files( # noqa:C901 pylint: disable=R0912,too-many-nested-blocks
def _ingest_json_as_sequence_file( def _ingest_json_as_sequence_file(
config, files, runid_log, batch_size, skip_existing, simulate, runids, sequence_file config,
files,
runid_log,
batch_size,
skip_existing,
simulate,
runids,
sequence_file,
legal_tags,
acl_owners,
acl_viewers,
): ):
logger.info( logger.info(
"Processing as sequence file. Will wait for each entry to complete before submitting new." "Processing as sequence file. Will wait for each entry to complete before submitting new."
...@@ -261,6 +371,9 @@ def _ingest_json_as_sequence_file( ...@@ -261,6 +371,9 @@ def _ingest_json_as_sequence_file(
True, True,
skip_existing, skip_existing,
simulate, simulate,
legal_tags,
acl_owners,
acl_viewers,
) )
runids.extend(_sequence_run_ids) runids.extend(_sequence_run_ids)
else: else:
...@@ -352,21 +465,79 @@ def _upload_file(config: CLIConfig, filepath): ...@@ -352,21 +465,79 @@ def _upload_file(config: CLIConfig, filepath):
if response.status_code not in [200, 201]: if response.status_code not in [200, 201]:
raise CliError(f"({response.status_code}) {response.text[:250]}") raise CliError(f"({response.status_code}) {response.text[:250]}")
# Need to figure how metadata is handled wrt. ariflow, fileid (returned v's manifest)
# file_metadata = _populate_file_metadata_req_body(file_source)
# _update_legal_and_acl_tags(config, file_metadata)
# print(file_metadata)
# update_metadata_response = connection.cli_post_returning_json(
# CONFIG_FILE_URL,
# "files/metadata",
# json.dumps(file_metadata),
# [200, 201],
# )
# generated_file_id = update_metadata_response.get("id")
# logger.info(
# f"{filepath} is uploaded with file id {generated_file_id} with file source {file_source}"
# )
# # Get record version
# file_record_version_response = connection.cli_get_returning_json(
# CONFIG_STORAGE_URL, "records/versions/" + generated_file_id
# )
# file_record_version = file_record_version_response.get("versions")[0]
# metadata = {
# "file_id": generated_file_id,
# "file_source": file_source,
# "file_record_version": str(file_record_version),
# }
# print(metadata)
# generated_file_id = upload_metadata_response_json.get("id") # generated_file_id = upload_metadata_response_json.get("id")
# logger.info("%s is uploaded with file id %s with file source %s", filepath, generated_file_id, file_source) logger.info("%s is uploaded with file source %s", filepath, file_source)
# return generated_file_id, file_source # return generated_file_id, file_source
return file_source return file_source
raise CliError(f"No upload location returned: {initiate_upload_response_json}") raise CliError(f"No upload location returned: {initiate_upload_response_json}")
def _update_work_products_metadata(config: CLIConfig, data, files, simulate): def _populate_file_metadata_req_body(file_source):
return {
"kind": "osdu:wks:dataset--File.Generic:1.0.0",
"acl": {
"viewers": [],
"owners": [],
},
"legal": {
"legaltags": [],
"otherRelevantDataCountries": ["US"],
"status": "compliant",
},
"data": {"DatasetProperties": {"FileSourceInfo": {"FileSource": file_source}}},
}
def _update_work_products_metadata(
config: CLIConfig,
data,
files,
simulate,
legal_tags: List[str],
acl_owners: List[str],
acl_viewers: List[str],
): # pylint: disable=R1702
if "WorkProduct" in data: if "WorkProduct" in data:
_update_legal_and_acl_tags(config, data["WorkProduct"]) _update_legal_and_acl_tags(config, data["WorkProduct"], legal_tags, acl_owners, acl_viewers)
if "WorkProductComponents" in data: if "WorkProductComponents" in data:
_update_legal_and_acl_tags_all(config, data["WorkProductComponents"]) _update_legal_and_acl_tags_all(
config, data["WorkProductComponents"], legal_tags, acl_owners, acl_viewers
)
if "Datasets" in data: if "Datasets" in data:
_update_legal_and_acl_tags_all(config, data["Datasets"]) _update_legal_and_acl_tags_all(
config, data["Datasets"], legal_tags, acl_owners, acl_viewers
)
# if files is specified then upload any needed data. # if files is specified then upload any needed data.
if files: if files:
...@@ -375,16 +546,17 @@ def _update_work_products_metadata(config: CLIConfig, data, files, simulate): ...@@ -375,16 +546,17 @@ def _update_work_products_metadata(config: CLIConfig, data, files, simulate):
dataset.get("data", {}).get("DatasetProperties", {}).get("FileSourceInfo") dataset.get("data", {}).get("DatasetProperties", {}).get("FileSourceInfo")
) )
# only process if FileSource isn't already specified # only process if FileSource isn't already specified
if file_source_info and not file_source_info.get("FileSource"):
if not simulate: if file_source_info:
file_source_info["FileSource"] = _upload_file( file_path = os.path.join(files, file_source_info["Name"])
config, os.path.join(files, file_source_info["Name"]) if os.path.exists(file_path):
if not simulate:
file_source_info["FileSource"] = _upload_file(config, file_path)
else:
logger.info(
"Local file '%s' not found - skipping.",
file_path,
) )
else:
logger.info(
"FileSource already especified for '%s' - skipping.",
file_source_info["Name"],
)
# TO DO: Here we scan by name from filemap # TO DO: Here we scan by name from filemap
# with open(file_location_map) as file: # with open(file_location_map) as file:
...@@ -408,13 +580,59 @@ def _update_work_products_metadata(config: CLIConfig, data, files, simulate): ...@@ -408,13 +580,59 @@ def _update_work_products_metadata(config: CLIConfig, data, files, simulate):
# logger.debug(f"data to upload workproduct \n {data}") # logger.debug(f"data to upload workproduct \n {data}")
def _update_legal_and_acl_tags_all(config: CLIConfig, data): def _update_legal_and_acl_tags_all(
for _datu in data: config: CLIConfig,
_update_legal_and_acl_tags(config, _datu) data,
legal_tags: List[str] = None,
acl_owners: List[str] = None,
acl_viewers: List[str] = None,
):
for _data in data:
_update_legal_and_acl_tags(config, _data, legal_tags, acl_owners, acl_viewers)
def _update_legal_and_acl_tags( # noqa: C901 pylint: disable=R0912
config: CLIConfig,
data,
legal_tags: List[str] = None,
acl_owners: List[str] = None,
acl_viewers: List[str] = None,
):
# Update legal tags if needed
if legal_tags is not None:
if len(legal_tags) == 0:
legal_tags = [config.get("core", CONFIG_LEGAL_TAG)]
data["legal"]["legaltags"] = legal_tags
else:
for legal_tag in data["legal"]["legaltags"]:
if re.search("^{{.*}}$", legal_tag):
raise CliError(
f"Found a legal tag placeholder {legal_tag}. Use the -l option to replace these."
)
# Update legal country
data["legal"]["otherRelevantDataCountries"] = ["US"]
def _update_legal_and_acl_tags(config: CLIConfig, datu): # Update acl owners tags if needed
datu["legal"]["legaltags"] = [config.get("core", CONFIG_LEGAL_TAG)] if acl_owners is not None:
datu["legal"]["otherRelevantDataCountries"] = ["US"] if len(acl_owners) == 0:
datu["acl"]["viewers"] = [config.get("core", CONFIG_ACL_VIEWER)] acl_owners = [config.get("core", CONFIG_ACL_OWNER)]
datu["acl"]["owners"] = [config.get("core", CONFIG_ACL_OWNER)] data["acl"]["owners"] = acl_owners
else:
for acl_owner in data["acl"]["owners"]:
if re.search("^{{.*}}$", acl_owner):
raise CliError(
f"Found an acl owner placeholder {acl_owner}. Use the -aclo option to replace these."
)
# Update acl viewers if needed
if acl_viewers is not None:
if len(acl_viewers) == 0:
acl_viewers = [config.get("core", CONFIG_ACL_VIEWER)]
data["acl"]["viewers"] = acl_viewers
else:
for acl_viewer in data["acl"]["viewers"]:
if re.search("^{{.*}}$", acl_viewer):
raise CliError(
f"Found an acl viewer placeholder {acl_viewer}. Use the -aclv option to replace these."
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment