well_service.py 12.9 KB
Newer Older
1
import json
Niall McDaid's avatar
Niall McDaid committed
2
from typing import Dict, List, Tuple
3
4
import urllib
from knack.log import get_logger
5
from lasio.las import LASFile
Gregory Harris's avatar
Gregory Harris committed
6
from lasloader.file_loader import FileValidationError, LasParser, LocalFileLoader
7
from lasloader.osdu_client import OsduClient, LasLoaderWebResponseError
Gregory Harris's avatar
Gregory Harris committed
8
from lasloader.record_mapper import LasToRecordMapper, Record, MappingUtilities, MapWellLogToLas
9
10
11
12
13


logger = get_logger(__name__)


14
15
16
17
class LasLoaderConflictError(Exception):
    """
    Exception class for data conflict errors that occur in the service layer
    """
18
19

    def __init__(self, message: str, ids: List[str]):
20
21
22
23
        """
        Create a new instance of a LasLoaderConflictError

        :param str message: An error message.
24
        :param List[str] ids: List of ids
25
26
27
28
29
30
31
32
33
34
35
36
        """
        self._ids = ids
        super().__init__(message)

    @property
    def ids(self):
        """
        Get the ids of the conflicted objects
        """
        return self._ids


37
38
39
class WellLogService:
    def __init__(self, client: OsduClient):
        """
40
        Construct a new instance of WellLogService
41
42
43
44
45
46
47
48
        :param OsduClient client: An OSDU client wrapper
        """
        self._client = client

    def recognize_log_family(self, welllog_record: Record, data_partition_id: str) -> Record:
        """
        For all the curves in the welllog record call OSDU to get the curve family and
        return an enriched welllog record. This makes no changes to data in OSDU.
Gregory Harris's avatar
Gregory Harris committed
49
        :param Record welllog_record: The welllog record to be enriched
50
        :param str data_partition_id: The data partition Id.
Gregory Harris's avatar
Gregory Harris committed
51
        :return: the updated welllog record
52
53
54
55
56
        :rtype: Record
        """
        welllog_record.data["Curves"] = self.recognize_curves_family(welllog_record.data.get("Curves"), data_partition_id)
        return welllog_record

Niall McDaid's avatar
Niall McDaid committed
57
    def recognize_curves_family(self, curves: List[Dict[str, any]], data_partition_id: str) -> Dict[str, any]:
58
59
60
        """
        For a set of curves call OSDU to get the curve family and
        return an enriched set of curves. This makes no changes to data in OSDU.
Niall McDaid's avatar
Niall McDaid committed
61
        :param List[Dict[str, any]] curves: The curves to be enriched
62
63
        :param str data_partition_id: The data partition Id.
        :return: the updated curves
Niall McDaid's avatar
Niall McDaid committed
64
        :rtype: Dict[str, any]
65
66
        """
        if curves is None:
Greg Harris's avatar
Greg Harris committed
67
            logger.warning("No curve data to recognize")
68
69
70
71
72
            return None

        for curve in curves:
            mnemonic = curve["Mnemonic"]
            osdu_unit = curve["CurveUnit"]
73
            unit = MappingUtilities.convert_osdu_unit_to_raw_unit(osdu_unit)
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

            curve["LogCurveFamilyID"] = self.recognize_curve_family(mnemonic, unit, data_partition_id) if unit is not None else None

        return curves

    def recognize_curve_family(self, mnemonic: str, unit: str, data_partition_id: str) -> str:
        """
        For a mnemonic and unit call OSDU to get the curve family and return the family id.
        :param str mnemonic: The mnemonic
        :param str unit: The unit
        :param str data_partition_id: The data partition Id.
        :return: The curve family id
        :rtype: str
        """
        logger.info("Recognizing log family curve: " + mnemonic)

        try:
            res = self._client.post_log_recognition(mnemonic, unit)
        except LasLoaderWebResponseError as ex:
93
            logger.warning(f"Could not identify log family curve: {mnemonic}. Error: {str(ex)}")
94
95
96
            return None

        if res is None or res.get("family") is None:
97
            logger.warning(f"Family not given in response from OSDU. log family curve: {mnemonic}.")
Greg Harris's avatar
Greg Harris committed
98
            return None
99
100
101
102
103

        family_id = urllib.parse.quote(res.get("family").replace(" ", "-"), safe="")

        return f"{data_partition_id}:reference-data--LogCurveFamily:{family_id}:"

Niall McDaid's avatar
Niall McDaid committed
104
105
106
107
108
109
110
111
112
113
114
115
116
    def update_log_family(self, welllog_id: str, data_partition_id: str) -> None:
        """
        Update the recognized curve families for an existing well log record.
        :param str welllog_id: ID of the well log record to be updated.
        :param str data_partition_id: The data partition ID.
        """
        logger.info("Retrieving existing well log record.")
        welllog_record = self._client.get_welllog_record(welllog_id)

        logger.info("Beginning recognition of curve families.")
        enriched_welllog_record = self.recognize_log_family(welllog_record.to_record(), data_partition_id)

        logger.info("Updating existing well log record with recognized curve families.")
Niall McDaid's avatar
Niall McDaid committed
117
118
        welllog_ids = self._client.post_welllog(enriched_welllog_record)

Niall McDaid's avatar
Niall McDaid committed
119
120
        # Print out updated record in logging for user to see.
        wellbore_service = WellBoreService(self._client, self)
Niall McDaid's avatar
Niall McDaid committed
121
122
123
        welllog_id = wellbore_service._safe_get_first_record(welllog_ids)
        welllog_record = self._client.get_welllog_record(welllog_id)
        logger.info(json.dumps(welllog_record.get_raw_data(), indent=4, sort_keys=True))
Niall McDaid's avatar
Niall McDaid committed
124

125
    def ingest_welllog_data(self, input_path: str, welllog_id: str) -> None:
126
        """
127
128
        Write data from a LAS file to an existing well log. First the LAS file is validated against the
        existing well log record, then the data is extracted and written to the existing well log record.
129

130
131
        :param str input_path: Path and filename of a LAS file containing data to be written to existing well log.
        :param str welllog_id: ID of well log to be updated.
132
133
        """
        las_parser = LasParser(LocalFileLoader())
134
        las_data = las_parser.load_las_file(input_path)
135

136
137
138
139
140
141
142
143
144
145
146
147
        logger.info("Beginning LAS file validation.")
        try:
            self._validate_welllog_data_ingest_file(las_data, welllog_id)
        except FileValidationError as e:
            logger.error(f"Data not ingested - LAS file validation failed: {str(e)}")
            return
        except Exception as e:
            logger.error(f"Data not ingested: {str(e)}")
            return
        logger.info("LAS file validation completed.")

        logger.info("Extracting data from LAS file and writing to existing Well Log.")
148
        data = las_data.df().reset_index()
Niall McDaid's avatar
Niall McDaid committed
149
        self._client.add_welllog_data(data, welllog_id)
150

151
    def download_and_construct_las(self, welllog_id: str, curves: List[str] = None) -> LASFile:
152
        """
153
154
        Download wellbore and log data and convert to a LAS file.
        :param str welllog_id: The welllog_id
155
        :param List[str] curves: The Curves to get, or None for all curves
Gregory Harris's avatar
Gregory Harris committed
156
157
        :return: A new instance of the LAS file object
        :rtype: LASFile
158
159
        """

160
161
162
163
        logger.warning(f"Getting welllog ID {welllog_id}")
        welllog = self._client.get_welllog_record(welllog_id)
        wellbore_id = welllog.data.get("WellboreID")

164
        if wellbore_id is None:
Gregory Harris's avatar
Gregory Harris committed
165
            logger.error("The welllog records contained no wellbore Id, cannot get wellbore")
166
167
168
            wellbore = Record(None, {}, {}, {})
        else:
            logger.warning(f"Getting wellbore ID {wellbore_id}")
Gregory Harris's avatar
Gregory Harris committed
169
            wellbore = self._client.get_wellbore_record(wellbore_id)
170

171
        logger.warning(f"Getting curve data for welllog ID {welllog_id}")
172
173
174
175
176

        if curves:
            logger.warning(f"Curves: {curves}")

        welllog_data = self._client.get_welllog_data(welllog_id, curves)
177

178
        mapper = MapWellLogToLas(wellbore, welllog, welllog_data)
179

180
        return mapper.build_las_file()
181

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    def _validate_welllog_data_ingest_file(self, las_data: LASFile, welllog_id: str) -> None:
        """
        Validate a LAS File against an existing well log record.
        This should be done before attempting to write data from a LAS file to an existing record.

        :param LASFile las_data: LASFile object to be validated.
        :param str welllog_id: ID of the well log record to write data to.
        """
        logger.info("Retrieving well log record and associated information.")
        welllog_well_name, welllog_curve_ids = self._get_data_ingest_validation_variables(welllog_id)

        logger.info("Validating LAS file well name and curves against existing well log record.")
        las_parser = LasParser(LocalFileLoader())
        las_parser.validate_las_file_against_record(las_data, welllog_well_name, welllog_curve_ids)

197
    def _get_data_ingest_validation_variables(self, welllog_id: str) -> Tuple[str, List[str]]:
198
199
        """
        Get the associated well name and curve IDs of the existing well log for validating against LAS file.
Niall McDaid's avatar
Niall McDaid committed
200
        :param str welllog_id: ID of the well log record to validate a LAS file against.
201
        :returns: Well name associated with well log AND curve IDs of the well log.
202
        :rtype: Tuple[str, List[str]]
203
        """
Niall McDaid's avatar
Niall McDaid committed
204
205
        welllog_record = self._client.get_welllog_record(welllog_id)
        wellbore_record = self._client.get_wellbore_record(welllog_record.data['WellboreID'])
206
207
208
209
        welllog_well_name = wellbore_record.data['FacilityName']
        welllog_curve_ids = welllog_record.get_curveids()

        return welllog_well_name, welllog_curve_ids
Gregory Harris's avatar
Gregory Harris committed
210
211


212
213
214
class WellBoreService:
    def __init__(self, client: OsduClient, well_log_service: WellLogService):
        """
Gregory Harris's avatar
Gregory Harris committed
215
        Construct a new instance of WellBoreService
216
        :param OsduClient client: An OSDU client wrapper
Gregory Harris's avatar
Gregory Harris committed
217
        :param WellLogService well_log_service: An instance of WellLogService
218
219
220
221
        """
        self._client = client
        self._well_log_service = well_log_service

Gregory Harris's avatar
Gregory Harris committed
222
    def file_ingest(self, mapper: LasToRecordMapper, data_partition_id: str, no_recognize: bool):
223
224
        """
        Ingest a single las file into new wellbore and welllog records.
Gregory Harris's avatar
Gregory Harris committed
225
        :param LasToRecordMapper mapper: The Las data mapper
226
        :param str data_partition_id: The data partition Id.
Gregory Harris's avatar
Gregory Harris committed
227
        :param bool no_recognize: If true don't attempt to recognize the curves, otherwise recognize the curves
228
229
230
231
        """

        wellbore_record = mapper.map_to_wellbore_record()

232
233
234
235
236
237
238
239
        wellbore_id = self._get_wellbore_by_name(wellbore_record.data.get("FacilityName"))

        if wellbore_id is None:
            ids = self._client.create_wellbore(wellbore_record)
            logger.warning(f"New wellbore IDs: {ids}")
            wellbore_id = self._safe_get_first_record(ids)
        else:
            logger.warning(f"Adding new welllog data to the existing well bore with id: {wellbore_id}")
240

241
        logger.info(json.dumps(self._client.get_wellbore_record(wellbore_id).get_raw_data(), indent=4, sort_keys=True))
242
243
244
245

        welllog_record = mapper.map_to_well_log_record(wellbore_id)

        # enrich welllog
Gregory Harris's avatar
Gregory Harris committed
246
247
248
        if no_recognize:
            enriched_welllog_record = welllog_record
        else:
Gregory Harris's avatar
Gregory Harris committed
249
            logger.warning("Recognizing the curve families")
Gregory Harris's avatar
Gregory Harris committed
250
            enriched_welllog_record = self._well_log_service.recognize_log_family(welllog_record, data_partition_id)
251

Niall McDaid's avatar
Niall McDaid committed
252
        welllog_ids = self._client.post_welllog(enriched_welllog_record)
253
254
255
256
257
258
259
260
261
        logger.warning(f"New welllog IDs: {welllog_ids}")

        welllog_id = self._safe_get_first_record(welllog_ids)
        welllog_record = self._client.get_welllog_record(welllog_id)
        logger.info(json.dumps(welllog_record.get_raw_data(), indent=4, sort_keys=True))

        welllog_data = mapper.extract_log_data()
        self._client.add_welllog_data(welllog_data, welllog_id)

262
263
    def _get_wellbore_by_name(self, wellbore_name: str) -> str:
        """
264
        Search for existing wellbores by name and return appropriate value for
Niall McDaid's avatar
Niall McDaid committed
265
        the three cases - 1 result, >1 result, 0 results.
266
        :param str wellbore_name: The wellbore name to search for
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
        :return: The id of the matching wellbore
        :rtype: str
        """
        if wellbore_name is None:
            return None

        wellbore_ids = self.search_for_wellbore(wellbore_name)

        if wellbore_ids is None or len(wellbore_ids) < 1:
            return None
        elif len(wellbore_ids) > 1:
            message = f"More than one matching wellbore found for '{wellbore_name}'."
            raise LasLoaderConflictError(message, wellbore_ids)
        else:
            return self._safe_get_first_record(wellbore_ids)

283
    def _safe_get_first_record(self, array: List[any]) -> any:
284
285
        """
        Get the first element from a list or return None if the list is None or empty
286
287
        :param List array: An OSDU client wrapper
        :return: The first element of the List or None
288
289
290
        :rtype: any
        """
        return array[0] if array is not None and len(array) > 0 else None
291

292
    def search_for_wellbore(self, wellbore_name: str) -> List[str]:
293
        """
294
295
296
297
        Search for existing wellbores with the provided `wellbore_name`
        :param str wellbore_name: The wellbore name to search for
        :return: List of wellbore IDs that have the provided wellbore name
        :rtype: List[str]
298
299
        """
        return self._client.search_for_wellbore(wellbore_name)