Commit c8c521b5 authored by Chad Leong's avatar Chad Leong
Browse files

Merge branch '23-search-for-ingested-wellbore-id-to-avoid-duplicate-wellbore-record-id' into 'main'

Resolve "Search for ingested wellbore ID to avoid duplicate wellbore record ID"

Closes #23

See merge request osdu/platform/data-flow/data-loading/wellbore-ddms-las-loader!23
parents a44f2e17 4e9b8150
Pipeline #74107 passed with stage
in 1 minute and 3 seconds
......@@ -48,10 +48,14 @@ class LasCommandLoader(CLICommandsLoader):
group.command('wellbore', 'wellbore')
group.command('welllog', 'welllog')
group.command('curves', 'welllog_data')
group.command('wellbore_search', 'wellbore_search')
with CommandGroup(self, 'update', 'lasloader.commands.update#{}') as group:
group.command('welllog', 'welllog')
with CommandGroup(self, 'search', 'lasloader.commands.search#{}') as group:
group.command('wellbore', 'wellbore_search')
with CommandGroup(self, 'download', 'lasloader.commands.download#{}') as group:
group.command('welllog', 'download_las')
......@@ -66,103 +70,88 @@ class LasCommandLoader(CLICommandsLoader):
# 'fileload'
with ArgumentsContext(self, 'fileload') as arg_context:
arg_context.argument('input_path', type=str, options_list=('-p', '--path'),
help='Path to a file or folder containing LAS file(s) to upload.')
self._register_input_path_argument(arg_context)
with ArgumentsContext(self, 'fileload convert') as arg_context:
arg_context.argument('wellbore_id', type=str, options_list=('--wellbore_id'),
help='The wellbore id to use when printing to file.')
self._register_wellbore_id_flag(arg_context)
# 'ingest'
with ArgumentsContext(self, 'ingest') as arg_context:
arg_context.argument('input_path', type=str, options_list=('-p', '--path'),
help='Path to a file or folder containing LAS file(s) to upload.')
with ArgumentsContext(self, 'ingest') as arg_context:
arg_context.argument('url', type=str, options_list=('-u', '--url'),
help='The base url of the OSDU instance.')
with ArgumentsContext(self, 'ingest') as arg_context:
arg_context.argument('token', type=str, options_list=('-t', '--token'),
help='A valid bearer token used to authenticate with the OSDU instance.')
self._register_input_path_argument(arg_context)
self._register_url_and_token_arguments(arg_context)
with ArgumentsContext(self, 'ingest wellbore') as arg_context:
arg_context.argument('no_recognize', options_list=('--norecognize'), action='store_true',
help='If specified LASCLI won\'t attempt to recognize the curve families.')
with ArgumentsContext(self, 'ingest data') as arg_context:
arg_context.argument('welllog_id', options_list=('--welllog_id'),
help='The welllog id of the record into which to write the data.')
self._register_welllog_id_argument(arg_context)
# 'list'
with ArgumentsContext(self, 'list') as arg_context:
arg_context.argument('url', type=str, options_list=('-u', '--url'),
help='Url for the OSDU instance.')
with ArgumentsContext(self, 'list') as arg_context:
arg_context.argument('token', type=str, options_list=('-t', '--token'),
help='A valid bearer token used to authenticate with the OSDU instance.')
self._register_url_and_token_arguments(arg_context)
with ArgumentsContext(self, 'list wellbore') as arg_context:
arg_context.argument('wellbore_id', type=str, options_list=('--wellbore_id'),
help='The wellbore id of the record to retrieve.')
with ArgumentsContext(self, 'list welllog') as arg_context:
arg_context.argument('welllog_id', type=str, options_list=('--welllog_id'),
help='The welllog id of the record to retrieve.')
self._register_wellbore_id_flag(arg_context)
with ArgumentsContext(self, 'list welllog') as arg_context:
self._register_welllog_id_argument(arg_context)
arg_context.argument('curves', options_list=('--curveids'), action='store_true',
help='Show only the curve ids.')
with ArgumentsContext(self, 'list curves') as arg_context:
arg_context.argument('welllog_id', type=str, options_list=('--welllog_id'),
help='The welllog id of the record to retrieve.')
with ArgumentsContext(self, 'list curves') as arg_context:
arg_context.argument('curves', type=str, options_list=('--curves'), nargs='*',
help='The list of curves to retrieve (space separated list), returns all curves if not specified.')
self._register_welllog_id_argument(arg_context)
self._register_curves_argument(arg_context)
# 'update'
with ArgumentsContext(self, 'update') as arg_context:
arg_context.argument('url', type=str, options_list=('-u', '--url'),
help='Url for the OSDU instance.')
with ArgumentsContext(self, 'update') as arg_context:
arg_context.argument('token', type=str, options_list=('-t', '--token'),
help='A valid bearer token used to authenticate with the OSDU instance.')
with ArgumentsContext(self, 'update welllog') as arg_context:
arg_context.argument('welllog_id', type=str, options_list=('--welllog_id'),
help='The welllog ID of the record to updated.')
self._register_url_and_token_arguments(arg_context)
with ArgumentsContext(self, 'update welllog') as arg_context:
self._register_welllog_id_argument(arg_context)
arg_context.argument('recognize', options_list=('--recognize'), action='store_true',
help='If specified or set to True LASCLI will attempt to recognize and update the curve families.')
# 'download'
with ArgumentsContext(self, 'download') as arg_context:
arg_context.argument('url', type=str, options_list=('-u', '--url'),
help='Url for the OSDU instance.')
with ArgumentsContext(self, 'download') as arg_context:
arg_context.argument('token', type=str, options_list=('-t', '--token'),
help='A valid bearer token used to authenticate with the OSDU instance.')
with ArgumentsContext(self, 'download') as arg_context:
arg_context.argument('welllog_id', type=str, options_list=('--welllog_id'),
help='The welllog id of the record to retrieve.')
# 'Search'
with ArgumentsContext(self, 'search') as arg_context:
self._register_url_and_token_arguments(arg_context)
with ArgumentsContext(self, 'download') as arg_context:
arg_context.argument('curves', type=str, options_list=('--curves'), nargs='*',
help='The list of curves to retrieve (space separated list), returns all curves if not specified.')
with ArgumentsContext(self, 'search wellbore') as arg_context:
arg_context.argument('wellbore_name', type=str, options_list=('--name'),
help='The facility name of wellbore ids to retrieve.')
# 'download'
with ArgumentsContext(self, 'download') as arg_context:
self._register_url_and_token_arguments(arg_context)
self._register_welllog_id_argument(arg_context)
self._register_curves_argument(arg_context)
arg_context.argument('outfile', type=str, options_list=('--out'),
help='The output file path')
super(LasCommandLoader, self).load_arguments(command)
def _register_input_path_argument(self, arg_context):
arg_context.argument('input_path', type=str, options_list=('-p', '--path'),
help='Path to a file or folder containing one or more LAS file(s).')
def _register_curves_argument(self, arg_context):
arg_context.argument('curves', type=str, options_list=('--curves'), nargs='*',
help='The list of curves to retrieve (space separated list), returns all curves if not specified.')
def _register_wellbore_id_flag(self, arg_context):
arg_context.argument('wellbore_id', type=str, options_list=('--wellbore_id'),
help='The wellbore id of the record.')
def _register_welllog_id_argument(self, arg_context):
arg_context.argument('welllog_id', type=str, options_list=('--welllog_id'),
help='The welllog id of the record.')
def _register_url_and_token_arguments(self, arg_context):
arg_context.argument('url', type=str, options_list=('-u', '--url'),
help='The base url of the OSDU instance.')
arg_context.argument('token', type=str, options_list=('-t', '--token'),
help='A valid bearer token used to authenticate with the OSDU instance.')
def main():
"""Main entry point for LAS LOADER"""
......
from pathlib import Path
from knack.log import get_logger
from lasloader.file_loader import LasParser, LocalFileLoader
from lasloader.file_loader import LasParser, LocalFileLoader, FileUtilities
from lasloader.record_mapper import LasToRecordMapper
from lasloader.configuration import Configuration
from lasloader.json_writer import JsonToFile
......@@ -12,38 +12,41 @@ logger = get_logger(__name__)
def printlas(input_path):
"""
Print a LAS file header
:param str input_path: Path and filename of a LAS file
:param str input_path: Path and filename of a LAS file or folder or LAS files
"""
las_parser = LasParser(LocalFileLoader())
las_data = las_parser.load_las_file(input_path)
print(las_data.header)
logger.info(f"LAS path: {input_path}")
for file_path in FileUtilities.get_filenames(Path(input_path), '.las'):
logger.warning(f"LAS file: {file_path}")
las_data = las_parser.load_las_file(file_path)
print(las_data.header)
def convert(input_path: str, config_path: str, wellbore_id: str):
"""
Convert a LAS file to Wellbore and Well Log and write to json files.
:param str input_path: Path and filename of a LAS file
:param str input_path: Path and filename of a LAS file or folder containing LAS files
:param str config_path: Path to the LAS metadata file
:param str wellbore_id: The wellbore id
"""
las_parser = LasParser(LocalFileLoader())
las_data = las_parser.load_las_file(input_path)
config = Configuration(LocalFileLoader(), config_path)
mapper = LasToRecordMapper(las_data, config)
logger.info("Mapping LAS file to wellbore and well log records.")
wellbore_record = mapper.map_to_wellbore_record()
welllog_record = mapper.map_to_well_log_record(wellbore_id)
logger.info(f"LAS path: {input_path}")
for file_path in FileUtilities.get_filenames(Path(input_path), '.las'):
logger.warning(f"LAS file: {file_path}")
las_data = las_parser.load_las_file(file_path)
mapper = LasToRecordMapper(las_data, config)
wellbore_record = mapper.map_to_wellbore_record()
welllog_record = mapper.map_to_well_log_record(wellbore_id)
path = Path(input_path)
writer = JsonToFile()
path = Path(file_path)
writer = JsonToFile()
logger.info("Writing wellbore and well log records to JSON files.")
writer.write(vars(wellbore_record), path.with_suffix('.wellbore.json'))
logger.warning(f"Wellbore record file created: {path.with_suffix('.wellbore.json')}")
writer.write(vars(welllog_record), path.with_suffix('.welllog.json'))
logger.warning(f"Well log record file created: {path.with_suffix('.welllog.json')}")
writer.write(vars(wellbore_record), path.with_suffix('.wellbore.json'))
logger.warning(f"Wellbore record file created: {path.with_suffix('.wellbore.json')}")
writer.write(vars(welllog_record), path.with_suffix('.welllog.json'))
logger.warning(f"Well log record file created: {path.with_suffix('.welllog.json')}")
import json
from pathlib import Path
from ntpath import basename
from knack.log import get_logger
......@@ -5,7 +6,7 @@ from lasloader.file_loader import LasParser, LocalFileLoader, FileUtilities
from lasloader.record_mapper import LasToRecordMapper
from lasloader.osdu_client import OsduClient
from lasloader.configuration import Configuration
from lasloader.well_service import WellBoreService, WellLogService
from lasloader.well_service import WellBoreService, WellLogService, LasLoaderConflictError
logger = get_logger(__name__)
......@@ -39,15 +40,20 @@ def wellbore(
las_data = las_parser.load_las_file(file_path)
las_mapper = LasToRecordMapper(las_data, config)
service.file_ingest(las_mapper, config.data_partition_id, no_recognize)
logger.warning(f"Ingest complete: {basename(file_path)}")
except LasLoaderConflictError as e:
logger.error(f"Ingest failed: {basename(file_path)} (see summary for details)")
ids = json.dumps(e.ids, indent=4, sort_keys=True)
logger.debug(f"{basename(file_path)} ({str(e)} Matching wellbore ids {ids})")
failed_ingests.append(f"{basename(file_path)} ({str(e)} Perform a search to list the conflicted wellbore ids.)")
except Exception as e:
failed_ingests.append(f"{basename(file_path)} ({str(e)})")
logger.error(f"Ingest failed: {basename(file_path)} (see summary for details)")
if failed_ingests != []:
logger.error("SUMMARY - files not ingested: " + ", ".join(failed_ingests))
logger.error("SUMMARY - files not ingested: ")
for message in failed_ingests:
logger.error(f" {message}")
def welllog_data(
......
......@@ -61,7 +61,7 @@ def welllog_data(url: str,
:param str url: The base URL of the OSDU instance
:param str token: a valid bearer token that is used to authenticate against the OSDU instance
:param str config_path: Path to the las metadata file
:param str wellbore_id: The well bore id of the record to retrieve
:param str welllog_id: The welllog id of the record to retrieve
:param list[str] curves: The curves to retrieve, use None to get all curves
"""
config = Configuration(LocalFileLoader(), config_path)
......
import json
from knack.log import get_logger
from lasloader.osdu_client import OsduClient
from lasloader.configuration import Configuration
from lasloader.file_loader import LocalFileLoader
from lasloader.well_service import WellBoreService, WellLogService
logger = get_logger(__name__)
def wellbore_search(url: str,
token: str,
config_path: str,
wellbore_name: str):
"""
Retrieve and print the ids of wellbores that match the specified name
:param str url: The base URL of the OSDU instance
:param str token: a valid bearer token that is used to authenticate against the OSDU instance
:param str config_path: Path to the las metadata file
:param str wellbore_name: The well bore name to search for
"""
config = Configuration(LocalFileLoader(), config_path)
client = OsduClient(url, token, config.data_partition_id)
service = WellBoreService(client, WellLogService(client))
data = service.search_for_wellbore(wellbore_name)
if data is None or len(data) < 1:
logger.warning("No records found")
else:
print("Matching Wellbore ids:")
print(json.dumps(data, indent=4, sort_keys=True))
......@@ -146,6 +146,28 @@ class OsduClient:
payload = {"label": mnemonic, "log_unit": unit}
return self._send_request_json_response("POST", recognize_family_url, None, payload)
def search_for_wellbore(self, wellbore_name: str) -> list[str]:
"""
Search an OSDU instance to find all wellbores with the specified name and return their ids.
:param str wellbore_name: The well name
:return: A list that contains the matching wellbore ids
:rtype: list[str]
"""
url = f"{self._base_url}/api/search/v2/query"
payload = {
"kind": "*:*:master-data--Wellbore:*",
"query": f'data.FacilityName:"{wellbore_name}"',
"returnedFields": ["id"],
"limit": 1000
}
result = self._send_request_json_response("POST", url, None, payload)
if result is None or result.get("totalCount") is None or result.get("totalCount") < 1:
return []
else:
return [r["id"] for r in result.get("results")]
def _post_data_with_id_response(self, path: str, record: Record) -> list[any]:
payload = [vars(record)]
url = f"{self._base_url}{path}"
......
......@@ -11,6 +11,27 @@ from lasloader.record_mapper import LasToRecordMapper, Record, MappingUtilities,
logger = get_logger(__name__)
class LasLoaderConflictError(Exception):
"""
Exception class for data conflict errors that occur in the service layer
"""
def __init__(self, message: str, ids):
"""
Create a new instance of a LasLoaderConflictError
:param str message: An error message.
"""
self._ids = ids
super().__init__(message)
@property
def ids(self):
"""
Get the ids of the conflicted objects
"""
return self._ids
class WellLogService:
def __init__(self, client: OsduClient):
"""
......@@ -206,10 +227,15 @@ class WellBoreService:
wellbore_record = mapper.map_to_wellbore_record()
ids = self._client.create_wellbore(wellbore_record)
logger.warning(f"New wellbore IDs: {ids}")
wellbore_id = self._get_wellbore_by_name(wellbore_record.data.get("FacilityName"))
if wellbore_id is None:
ids = self._client.create_wellbore(wellbore_record)
logger.warning(f"New wellbore IDs: {ids}")
wellbore_id = self._safe_get_first_record(ids)
else:
logger.warning(f"Adding new welllog data to the existing well bore with id: {wellbore_id}")
wellbore_id = self._safe_get_first_record(ids)
logger.info(json.dumps(self._client.get_wellbore_record(wellbore_id).get_raw_data(), indent=4, sort_keys=True))
welllog_record = mapper.map_to_well_log_record(wellbore_id)
......@@ -231,6 +257,26 @@ class WellBoreService:
welllog_data = mapper.extract_log_data()
self._client.add_welllog_data(welllog_data, welllog_id)
def _get_wellbore_by_name(self, wellbore_name: str) -> str:
"""
Get the first element from a list or return None if the list is None or empty
:param list array: An OSDU client wrapper
:return: The id of the matching wellbore
:rtype: str
"""
if wellbore_name is None:
return None
wellbore_ids = self.search_for_wellbore(wellbore_name)
if wellbore_ids is None or len(wellbore_ids) < 1:
return None
elif len(wellbore_ids) > 1:
message = f"More than one matching wellbore found for '{wellbore_name}'."
raise LasLoaderConflictError(message, wellbore_ids)
else:
return self._safe_get_first_record(wellbore_ids)
def _safe_get_first_record(self, array: list[any]) -> any:
"""
Get the first element from a list or return None if the list is None or empty
......@@ -239,3 +285,12 @@ class WellBoreService:
:rtype: any
"""
return array[0] if array is not None and len(array) > 0 else None
def search_for_wellbore(self, wellbore_name: str) -> list[str]:
"""
Get the first element from a list or return None if the list is None or empty
:param list array: An OSDU client wrapper
:return: The first element of the list or None
:rtype: list[str]
"""
return self._client.search_for_wellbore(wellbore_name)
......@@ -7,6 +7,7 @@ from lasloader.record_mapper import Record
class TestOsduClient:
_expected_ids = [123, 234, 345]
_expected_ids_str = [{"id": "321"}, {"id": "432"}, {"id": "543"}]
_access_token = "token"
_data_partition_id = "AnId"
......@@ -20,7 +21,7 @@ class TestOsduClient:
if "data-partition-id" not in request.headers or self._data_partition_id != request.headers["data-partition-id"]:
return httpx.Response(500, text="No data-partition-id header")
return httpx.Response(200, json={"recordIds": self._expected_ids})
return httpx.Response(200, json={"recordIds": self._expected_ids, "totalCount": 3, "results": self._expected_ids_str})
def match_and_mock_get(self, request: any):
if request.method != "GET":
......@@ -290,3 +291,18 @@ class TestOsduClient:
result = client.post_log_recognition("mnemonic", "unit")
assert result is not None
def test_search_for_wellbore(self, respx_mock):
# Assemble
base_url = "http://test.bp.com"
url = f"{base_url}/api/search/v2/query"
respx_mock.post(url).mock(side_effect=self.match_and_mock_post)
client = OsduClient(base_url, self._access_token, self._data_partition_id)
# Act
result = client.search_for_wellbore("Wellbore Name")
assert result == ["321", "432", "543"]
from pandas.core.frame import DataFrame
import pytest
from unittest.mock import Mock, ANY
from lasloader.well_service import WellBoreService, WellLogService
from lasloader.well_service import WellBoreService, WellLogService, LasLoaderConflictError
from lasloader.osdu_client import OsduClient, LasLoaderWebResponseError
from lasloader.record_mapper import LasToRecordMapper, Record, WellLogRecord, WellboreRecord
......@@ -266,14 +266,14 @@ class TestWellLogService:
class TestWellBoreService:
@pytest.mark.parametrize("no_recognize", [True, False])
def test_recognize_log_family(self, no_recognize: bool):
@pytest.mark.parametrize("no_recognize,existing_wellbore_ids", [(True, None), (False, None), (True, ["well_bore_id"])])
def test_file_ingest(self, no_recognize: bool, existing_wellbore_ids: list[str]):
# Assemble
mock_client = Mock(spec=OsduClient)
mock_well_log_service = Mock(spec=WellLogService)
mock_mapper = Mock(spec=LasToRecordMapper)
well_bore_record = WellboreRecord({"Kind": "well_bore_kind", "acl": {}, "legal": {}, "data": {}})
well_bore_record = WellboreRecord({"Kind": "well_bore_kind", "acl": {}, "legal": {}, "data": {"FacilityName": "WellBoreName"}})
well_log_record = Record("well_log_kind", {}, {}, {})
well_log_record_recognized = Record("well_log_rec_kind", {"blah": "blah"}, {}, {})
well_log_record_returned = WellLogRecord({"kind": "returned_welllog_record"})
......@@ -287,6 +287,7 @@ class TestWellBoreService:
mock_mapper.map_to_well_log_record.return_value = well_log_record
mock_mapper.extract_log_data.return_value = well_log_data
mock_client.search_for_wellbore.return_value = existing_wellbore_ids
mock_client.create_wellbore.return_value = well_bore_ids
mock_client.get_wellbore_record.return_value = well_bore_record
mock_client.post_welllog.return_value = well_log_ids
......@@ -300,18 +301,91 @@ class TestWellBoreService:
subject.file_ingest(mock_mapper, data_partition_id, no_recognize)
# Assert
mock_mapper.map_to_wellbore_record.called_once()
mock_client.create_wellbore.called_once_with(well_bore_record)
mock_client.get_wellbore_record.called_once_with(well_bore_ids[0])
mock_mapper.map_to_well_log_record.called_once_with(well_bore_ids[0])
mock_client.search_for_wellbore.assert_called_once_with("WellBoreName")
mock_mapper.map_to_wellbore_record.assert_called_once()
if existing_wellbore_ids is None:
mock_client.create_wellbore.assert_called_once_with(well_bore_record)
else:
mock_client.create_wellbore.assert_not_called()
mock_client.get_wellbore_record.assert_called_once_with(well_bore_ids[0])
mock_mapper.map_to_well_log_record.assert_called_once_with(well_bore_ids[0])
if no_recognize:
mock_well_log_service.recognize_log_family.assert_not_called()
mock_client.post_welllog.called_once_with(well_log_record)
assert not mock_well_log_service.recognize_log_family.called
mock_client.post_welllog.assert_called_once_with(well_log_record)
else:
mock_well_log_service.recognize_log_family.called_once_with(well_log_record, data_partition_id)
mock_client.post_welllog.called_once_with(well_log_record_recognized)
mock_well_log_service.recognize_log_family.assert_called_once_with(well_log_record, data_partition_id)
mock_client.post_welllog.assert_called_once_with(well_log_record_recognized)
mock_client.get_welllog_record.assert_called_once_with(well_log_ids[0])
mock_mapper.extract_log_data.assert_called_once()
mock_client.add_welllog_data.assert_called_once_with(well_log_data, well_log_ids[0])
def test_file_ingest_raises_on_well_bore_conflict(self):
# Assemble
mock_client = Mock(spec=OsduClient)
mock_well_log_service = Mock(spec=WellLogService)
mock_mapper = Mock(spec=LasToRecordMapper)
mock_client.get_welllog_record.called_once_with(well_log_ids[0])
mock_mapper.extract_log_data.called_once()
mock_client.add_welllog_data.called_once_with(well_log_data, well_log_ids[0])
well_bore_record = WellboreRecord({"Kind": "well_bore_kind", "acl": {}, "legal": {}, "data": {"FacilityName": "WellBoreName"}})
data_partition_id = "dp_id"
mock_mapper.map_to_wellbore_record.return_value = well_bore_record
mock_client.search_for_wellbore.return_value = ["id1", "id2"]
subject = WellBoreService(mock_client, mock_well_log_service)
# Act
with pytest.raises(LasLoaderConflictError):
subject.file_ingest(mock_mapper, data_partition_id, False)
# Assert
mock_client.search_for_wellbore.assert_called_once_with("WellBoreName")
mock_mapper.map_to_wellbore_record.assert_called_once()
mock_client.create_wellbore.assert_not_called()
mock_client.post_welllog.assert_not_called()
mock_client.add_welllog_data.assert_not_called()
def test_get_wellbore_by_name_raises_on_conflict(self):
# Assemble
mock_client = Mock(spec=OsduClient)
mock_well_log_service = Mock(spec=WellLogService)
mock_client.search_for_wellbore.return_value = ["id1", "id2"]
subject = WellBoreService(mock_client, mock_well_log_service)
# Act
with pytest.raises(LasLoaderConflictError):
subject._get_wellbore_by_name("WellBoreName")
# Assert
mock_client.search_for_wellbore.assert_called_once_with("WellBoreName")
@pytest.mark.parametrize("existing_wellbore_ids,wellbore_name",
[(None, "WellBoreName"), (["well_bore_id"], "WellBoreName"), (None, None)])
def test_get_wellbore_by_name(self, existing_wellbore_ids, wellbore_name):
# Assemble
mock_client = Mock(spec=OsduClient)
mock_well_log_service = Mock(spec=WellLogService)
mock_client.search_for_wellbore.return_value = existing_wellbore_ids
subject = WellBoreService(mock_client, mock_well_log_service)