Commit f3f7978d authored by Chad Leong's avatar Chad Leong
Browse files

Merge branch '21-bugfix-python-version' into 'main'

Add backwards compatibility for Python 3.8

Closes #21

See merge request osdu/platform/data-flow/data-loading/wellbore-ddms-las-loader!24
parents c8c521b5 aa275f49
Pipeline #76026 passed with stage
in 1 minute and 4 seconds
from typing import List
from knack.log import get_logger
from lasloader.osdu_client import OsduClient
from lasloader.configuration import Configuration
......@@ -14,7 +15,7 @@ def download_las(url: str,
config_path: str,
welllog_id: str,
outfile: str,
curves: list[str] = None):
curves: List[str] = None):
"""
Retrieve welllog data from an OSDU instance and save to a LAS format file
:param str url: The base URL of the OSDU instance
......@@ -22,7 +23,7 @@ def download_las(url: str,
:param str config_path: Path to the las metadata file
:param str welllog_id: The well bore id of the record to retrieve
:param str outfile: The path of the output file
:param list[str] curves: The curves to retrieve, use None to get all curves
:param List[str] curves: The curves to retrieve, use None to get all curves
"""
config = Configuration(LocalFileLoader(), config_path)
client = OsduClient(url, token, config.data_partition_id)
......
import json
from typing import List
from knack.log import get_logger
from lasloader.osdu_client import OsduClient
from lasloader.configuration import Configuration
......@@ -55,14 +56,14 @@ def welllog_data(url: str,
token: str,
config_path: str,
welllog_id: str,
curves: list[str] = None):
curves: List[str] = None):
"""
Retrieve and print welllog data from an OSDU instance
:param str url: The base URL of the OSDU instance
:param str token: a valid bearer token that is used to authenticate against the OSDU instance
:param str config_path: Path to the las metadata file
:param str welllog_id: The welllog id of the record to retrieve
:param list[str] curves: The curves to retrieve, use None to get all curves
:param List[str] curves: The curves to retrieve, use None to get all curves
"""
config = Configuration(LocalFileLoader(), config_path)
client = OsduClient(url, token, config.data_partition_id)
......
from typing import Dict, List
from lasloader.file_loader import IFileLoader, JsonLoader
......@@ -45,20 +46,20 @@ class Configuration:
return self._config.get("wellbore_id")
@property
def legal_tags(self) -> list[str]:
def legal_tags(self) -> List[str]:
"""
Gets the legal_tag.
return: the legal_tag
rtype: list[str]
rtype: List[str]
"""
return self._none_propagating_get(self._config.get("legal"), "legaltags")
@property
def legal_relevant_data_countries(self) -> list[str]:
def legal_relevant_data_countries(self) -> List[str]:
"""
Gets the legal relevant data countries.
return: the legal relevant data countries
rtype: list[str]
rtype: List[str]
"""
return self._none_propagating_get(self._config.get("legal"), "otherRelevantDataCountries")
......@@ -72,11 +73,11 @@ class Configuration:
return self._none_propagating_get(self._config.get("legal"), "status")
@property
def data_default_viewers(self) -> list[str]:
def data_default_viewers(self) -> List[str]:
"""
Gets the default data viewers.
return: the default data viewers.
rtype: list[str]
rtype: List[str]
"""
return self._none_propagating_get(self._none_propagating_get(self._config.get("data"), "default"), 'viewers')
......@@ -85,9 +86,9 @@ class Configuration:
"""
Gets the default data owners.
return: the default data owners.
rtype: list[str]
rtype: List[str]
"""
return self._none_propagating_get(self._none_propagating_get(self._config.get("data"), "default"), 'owners')
def _none_propagating_get(self, d: dict, key: str) -> any:
def _none_propagating_get(self, d: Dict, key: str) -> any:
return None if d is None else d.get(key)
import os
from pathlib import Path
from typing import Dict, List
import lasio
from lasio.las import LASFile
from knack.log import get_logger
......@@ -59,7 +60,7 @@ class JsonLoader:
"""
self._file_loader = file_loader
def load(self, path: str) -> dict[str, any]:
def load(self, path: str) -> Dict[str, any]:
"""
Load and parse a json file
:param path str : The path and filename of the file to load.
......@@ -85,7 +86,7 @@ class LasParser:
if not well_name or well_name == " ":
raise FileValidationError
def validate_las_file_against_record(self, las: LASFile, record_well_name: str, rec_curve_mnemonics: list[str]):
def validate_las_file_against_record(self, las: LASFile, record_well_name: str, rec_curve_mnemonics: List[str]):
"""
More extensive validation of a LAS file if it is to be used to write data to an existing record (as
opposed to creating new wellbore and well log records).
......@@ -94,7 +95,7 @@ class LasParser:
:param las LASFile: The LASFile object to be validated.
:param record_well_name str: Well name associated with the well log record that is to have data written to it.
:param rec_curve_mnemonics list[str]: List of available curves in well log record.
:param rec_curve_mnemonics List[str]: List of available curves in well log record.
"""
self.validate_las_file(las)
......@@ -130,19 +131,19 @@ class LasParser:
class FileUtilities:
@staticmethod
def get_filenames(path: Path, suffix: str = '') -> list[Path]:
def get_filenames(path: Path, suffix: str = '') -> List[Path]:
"""
Get the paths of the files in a path and with a suffix.
If Path itself is a file returns a single item list that contains Path.
If Path is a directory return a list of all the paths of all the files in the directory with suffix.
If Path itself is a file returns a single item List that contains Path.
If Path is a directory return a List of all the paths of all the files in the directory with suffix.
:param Path path: A director or file path
:return: A list of paths.
:rtype: list[Path]
:return: A List of paths.
:rtype: List[Path]
"""
if path.is_file():
return [path]
elif path.is_dir():
return list(path.glob(f'**/*{suffix}'))
return List(path.glob(f'**/*{suffix}'))
else:
logger.error(
"Invalid input paths. Please confirm you have provided the required inputs correctly."
......
from typing import Union
from typing import Dict, List, Union
import httpx
import io
from knack.log import get_logger
......@@ -38,7 +38,7 @@ class OsduClient:
self._base_url = base_url
self._data_partition_id = data_partition_id
def _create_headers(self) -> dict[str, str]:
def _create_headers(self) -> Dict[str, str]:
"""
Create a new set of auth headers for OSDU
......@@ -50,7 +50,7 @@ class OsduClient:
"data-partition-id": self._data_partition_id,
}
def create_wellbore(self, wellbore_record: Union[Record, WellboreRecord]) -> list[any]:
def create_wellbore(self, wellbore_record: Union[Record, WellboreRecord]) -> List[any]:
"""
Make a post request to OSDU to create a new wellbore record.
......@@ -61,7 +61,7 @@ class OsduClient:
record = wellbore_record.to_record() if isinstance(wellbore_record, WellboreRecord) else wellbore_record
return self._post_data_with_id_response("/api/os-wellbore-ddms/ddms/v3/wellbores", record)
def post_welllog(self, welllog_record: Union[Record, WellLogRecord]) -> list[any]:
def post_welllog(self, welllog_record: Union[Record, WellLogRecord]) -> List[any]:
"""
Make a post request to OSDU to create a new or update an existing well log record.
If the `welllog_record` contains an `id` attribute, the well log with that ID shall be updated.
......@@ -69,7 +69,7 @@ class OsduClient:
:param Union[Record, WellLogRecord] welllog_record: The well log record to be uploaded.
:return: The ID of the created/updated well log
:rtype: list[any]
:rtype: List[any]
"""
record = welllog_record.to_record() if isinstance(welllog_record, WellLogRecord) else welllog_record
return self._post_data_with_id_response("/api/os-wellbore-ddms/ddms/v3/welllogs", record)
......@@ -113,11 +113,11 @@ class OsduClient:
payload = data.to_parquet()
self._send_request_json_response("POST", url, payload, None, headers=headers)
def get_welllog_data(self, welllog_id: str, curves: list[str] = None) -> DataFrame:
def get_welllog_data(self, welllog_id: str, curves: List[str] = None) -> DataFrame:
"""
Get welllog data for all or a specified set of curves.
:param str welllog_id: The id of the well log
:param list[str] curves: The welllog curves, None or empty returns all curves
:param List[str] curves: The welllog curves, None or empty returns all curves
:return: The welllog data
:rtype: DataFrame
"""
......@@ -146,12 +146,12 @@ class OsduClient:
payload = {"label": mnemonic, "log_unit": unit}
return self._send_request_json_response("POST", recognize_family_url, None, payload)
def search_for_wellbore(self, wellbore_name: str) -> list[str]:
def search_for_wellbore(self, wellbore_name: str) -> List[str]:
"""
Search an OSDU instance to find all wellbores with the specified name and return their ids.
:param str wellbore_name: The well name
:return: A list that contains the matching wellbore ids
:rtype: list[str]
:return: A List that contains the matching wellbore ids
:rtype: List[str]
"""
url = f"{self._base_url}/api/search/v2/query"
......@@ -168,7 +168,7 @@ class OsduClient:
else:
return [r["id"] for r in result.get("results")]
def _post_data_with_id_response(self, path: str, record: Record) -> list[any]:
def _post_data_with_id_response(self, path: str, record: Record) -> List[any]:
payload = [vars(record)]
url = f"{self._base_url}{path}"
......@@ -179,11 +179,11 @@ class OsduClient:
else:
return []
def _send_request_json_response(self, method: str, url: str, content: any, json: dict[str, any], headers=None) -> any:
def _send_request_json_response(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
res = self._send_request(method, url, content, json, headers)
return res.json()
def _send_request(self, method: str, url: str, content: any, json: dict[str, any], headers=None) -> any:
def _send_request(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
logger.debug(f"{method}: {url}")
headers = self._create_headers() if headers is None else headers
......
from dataclasses import dataclass
from typing import Union, Optional
from typing import Dict, List, Union, Optional
import urllib
import lasio
from knack.log import get_logger
......@@ -14,16 +14,16 @@ logger = get_logger(__name__)
@dataclass
class Record:
kind: str
acl: dict[str, list[str]]
legal: dict[str, Union[list[str], str]]
data: dict[str, any]
acl: Dict[str, List[str]]
legal: Dict[str, Union[List[str], str]]
data: Dict[str, any]
id: Optional[str] = None
class WellboreRecord(Record):
_raw_data: dict[str, any]
_raw_data: Dict[str, any]
def __init__(self, data: dict[str, any]) -> None:
def __init__(self, data: Dict[str, any]) -> None:
self.kind = data.get("kind", None)
self.data = data.get("data", {})
self.acl = data.get("acl", {})
......@@ -32,11 +32,11 @@ class WellboreRecord(Record):
self._raw_data = data
def get_raw_data(self) -> dict[str, any]:
def get_raw_data(self) -> Dict[str, any]:
"""
Get the wellbore data used to construct this object
:returns: The wellbore data
:rtype: dict[str, any]
:rtype: Dict[str, any]
"""
return self._raw_data
......@@ -50,9 +50,9 @@ class WellboreRecord(Record):
class WellLogRecord(Record):
_raw_data: dict[str, any]
_raw_data: Dict[str, any]
def __init__(self, data: dict[str, any]) -> None:
def __init__(self, data: Dict[str, any]) -> None:
self.kind = data.get("kind", None)
self.data = data.get("data", {})
self.acl = data.get("acl", {})
......@@ -61,20 +61,20 @@ class WellLogRecord(Record):
self._raw_data = data
def get_curveids(self) -> list[str]:
def get_curveids(self) -> List[str]:
"""
Get the curve ids
:returns: The list of curve ids
:rtype: list[str]
:returns: The List of curve ids
:rtype: List[str]
"""
curves = self.data['Curves'] if self.data and 'Curves' in self.data else []
return [c['CurveID'] for c in curves if 'CurveID' in c]
def get_raw_data(self) -> dict[str, any]:
def get_raw_data(self) -> Dict[str, any]:
"""
Get the well log data used to construct this object
:returns: The well log data
:rtype: dict[str, any]
:rtype: Dict[str, any]
"""
return self._raw_data
......@@ -90,13 +90,13 @@ class WellLogRecord(Record):
class AttributeBuilder:
# Common
def build_acl(self, config: Configuration) -> dict[str, list[str]]:
def build_acl(self, config: Configuration) -> Dict[str, List[str]]:
"""
Get the 'acl' (access control list) component of a record.
Get the 'acl' (access control List) component of a record.
:param config Configuration: Record config object containing config variables.
:returns: The 'acl' component of a record.
:rtype: dict[str, list[str]]
:rtype: Dict[str, List[str]]
"""
if None in [config.data_default_viewers, config.data_default_owners]:
raise ValueError("Please ensure a data default viewers and owners have been provided in the config file.")
......@@ -106,17 +106,17 @@ class AttributeBuilder:
"owners": config.data_default_owners
}
def build_legal(self, config: Configuration) -> dict[str, Union[list[str], str]]:
def build_legal(self, config: Configuration) -> Dict[str, Union[List[str], str]]:
"""
Get the 'legal' component of a record.
:param config Configuration: Record config object containing config variables.
:returns: The 'legal' component of a record.
:rtype: dict[str, Union[list[str], str]]
:rtype: Dict[str, Union[List[str], str]]
"""
if None in [config.legal_tags, config.legal_relevant_data_countries, config.legal_status]:
raise ValueError(
"Please ensure a legal tag, list of relevant data countries and legal status has been provided in the config file.")
"Please ensure a legal tag, List of relevant data countries and legal status has been provided in the config file.")
return {
"legaltags": config.legal_tags,
......@@ -143,14 +143,14 @@ class AttributeBuilder:
# If the LAS data object does not have a 'UWI' attribute return None
return None
def _build_name_aliases(self, uwi: str, config: Configuration) -> list[dict[str, str]]:
def _build_name_aliases(self, uwi: str, config: Configuration) -> List[Dict[str, str]]:
"""
Determine what the NameAliases attribute should be, depending on the existence of a UWI.
:param uwi str: The UWI from the LASFile.
:param config Configuration: Record config object containing config variables.
:returns: The 'name aliases' component of a wellbore record's data attribute.
:rtype: list[dict[str, str]]
:rtype: List[Dict[str, str]]
"""
if uwi is not None:
if config.data_partition_id is None:
......@@ -164,13 +164,13 @@ class AttributeBuilder:
else:
return []
def build_wellbore_data(self, las: lasio.LASFile, config: Configuration) -> dict[str, any]:
def build_wellbore_data(self, las: lasio.LASFile, config: Configuration) -> Dict[str, any]:
"""
Get the 'data' component of a Wellbore record.
:param las LASFile: The LASFile object from which to build the 'data' component.
:returns: The 'data' component of a Wellbore record.
:rtype: dict[str, any]
:rtype: Dict[str, any]
"""
well_name = las.well.WELL.value
uwi = self._get_uwi(las)
......@@ -182,14 +182,14 @@ class AttributeBuilder:
}
# Well Log
def _build_curves(self, las: lasio.LASFile, data_partition_id: str) -> list[dict[str, str]]:
def _build_curves(self, las: lasio.LASFile, data_partition_id: str) -> List[Dict[str, str]]:
"""
Get the curves from a LAS data object.
:param las LASFile: The LASFile object from which to extract the curves.
:param str data_partition_id: The datapartition ID
:returns: The curves for the well log.
:rtype: list[dict[str, str]]
:rtype: List[Dict[str, str]]
"""
if data_partition_id is None:
raise ValueError("Please ensure a data partition ID has been provided in the config file.")
......@@ -208,18 +208,18 @@ class AttributeBuilder:
}
)
except AttributeError:
# If the LAS data object does not have a 'curves' attribute return empty list
# If the LAS data object does not have a 'curves' attribute return empty List
return []
return curves
def build_well_log_data(self, las: lasio.LASFile, config: Configuration, wellbore_id: str) -> dict[str, any]:
def build_well_log_data(self, las: lasio.LASFile, config: Configuration, wellbore_id: str) -> Dict[str, any]:
"""
Get the 'data' component of a Well Log record.
:param las LASFile: The LASFile object from which to build the 'data' component.
:param wellbore_id str: The wellbore_id object, often returned by OSDU.
:returns: The 'data' component of a Well Log record.
:rtype: dict[str, any]
:rtype: Dict[str, any]
"""
if None in [wellbore_id]:
......
import json
from typing import Tuple
from typing import Dict, List, Tuple
import urllib
from knack.log import get_logger
from lasio.las import LASFile
......@@ -15,11 +15,13 @@ class LasLoaderConflictError(Exception):
"""
Exception class for data conflict errors that occur in the service layer
"""
def __init__(self, message: str, ids):
def __init__(self, message: str, ids: List[str]):
"""
Create a new instance of a LasLoaderConflictError
:param str message: An error message.
:param List[str] ids: List of ids
"""
self._ids = ids
super().__init__(message)
......@@ -52,14 +54,14 @@ class WellLogService:
welllog_record.data["Curves"] = self.recognize_curves_family(welllog_record.data.get("Curves"), data_partition_id)
return welllog_record
def recognize_curves_family(self, curves: list[dict[str, any]], data_partition_id: str) -> dict[str, any]:
def recognize_curves_family(self, curves: List[Dict[str, any]], data_partition_id: str) -> Dict[str, any]:
"""
For a set of curves call OSDU to get the curve family and
return an enriched set of curves. This makes no changes to data in OSDU.
:param list[dict[str, any]] curves: The curves to be enriched
:param List[Dict[str, any]] curves: The curves to be enriched
:param str data_partition_id: The data partition Id.
:return: the updated curves
:rtype: dict[str, any]
:rtype: Dict[str, any]
"""
if curves is None:
logger.warning("No curve data to recognize")
......@@ -146,11 +148,11 @@ class WellLogService:
data = las_data.df().reset_index()
self._client.add_welllog_data(data, welllog_id)
def download_and_construct_las(self, welllog_id: str, curves: list[str] = None) -> LASFile:
def download_and_construct_las(self, welllog_id: str, curves: List[str] = None) -> LASFile:
"""
Download wellbore and log data and convert to a LAS file.
:param str welllog_id: The welllog_id
:param list[str] curves: The Curves to get, or None for all curves
:param List[str] curves: The Curves to get, or None for all curves
:return: A new instance of the LAS file object
:rtype: LASFile
"""
......@@ -192,12 +194,12 @@ class WellLogService:
las_parser = LasParser(LocalFileLoader())
las_parser.validate_las_file_against_record(las_data, welllog_well_name, welllog_curve_ids)
def _get_data_ingest_validation_variables(self, welllog_id: str) -> Tuple[str, list[str]]:
def _get_data_ingest_validation_variables(self, welllog_id: str) -> Tuple[str, List[str]]:
"""
Get the associated well name and curve IDs of the existing well log for validating against LAS file.
:param str welllog_id: ID of the well log record to validate a LAS file against.
:returns: Well name associated with well log AND curve IDs of the well log.
:rtype: Tuple[str, list[str]]
:rtype: Tuple[str, List[str]]
"""
welllog_record = self._client.get_welllog_record(welllog_id)
wellbore_record = self._client.get_wellbore_record(welllog_record.data['WellboreID'])
......@@ -259,8 +261,9 @@ class WellBoreService:
def _get_wellbore_by_name(self, wellbore_name: str) -> str:
"""
Get the first element from a list or return None if the list is None or empty
:param list array: An OSDU client wrapper
Search for existing wellbores by name and return appropriate value for
the three cases - 1 result, >1 results, 0 results.
:param str wellbore_name: The wellbore name to search for
:return: The id of the matching wellbore
:rtype: str
"""
......@@ -277,20 +280,20 @@ class WellBoreService:
else:
return self._safe_get_first_record(wellbore_ids)
def _safe_get_first_record(self, array: list[any]) -> any:
def _safe_get_first_record(self, array: List[any]) -> any:
"""
Get the first element from a list or return None if the list is None or empty
:param list array: An OSDU client wrapper
:return: The first element of the list or None
:param List array: An OSDU client wrapper
:return: The first element of the List or None
:rtype: any
"""
return array[0] if array is not None and len(array) > 0 else None
def search_for_wellbore(self, wellbore_name: str) -> list[str]:
def search_for_wellbore(self, wellbore_name: str) -> List[str]:
"""
Get the first element from a list or return None if the list is None or empty
:param list array: An OSDU client wrapper
:return: The first element of the list or None
:rtype: list[str]
Search for existing wellbores with the provided `wellbore_name`
:param str wellbore_name: The wellbore name to search for
:return: List of wellbore IDs that have the provided wellbore name
:rtype: List[str]
"""
return self._client.search_for_wellbore(wellbore_name)
from typing import List
from pandas.core.frame import DataFrame
import pytest
from unittest.mock import Mock, ANY
......@@ -267,7 +268,7 @@ class TestWellLogService:
class TestWellBoreService:
@pytest.mark.parametrize("no_recognize,existing_wellbore_ids", [(True, None), (False, None), (True, ["well_bore_id"])])
def test_file_ingest(self, no_recognize: bool, existing_wellbore_ids: list[str]):
def test_file_ingest(self, no_recognize: bool, existing_wellbore_ids: List[str]):
# Assemble
mock_client = Mock(spec=OsduClient)
mock_well_log_service = Mock(spec=WellLogService)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment