osdu_client.py 7.76 KB
Newer Older
Greg Harris's avatar
Greg Harris committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from typing import Dict, List, Union
import httpx
import io
from knack.log import get_logger
from ..service.record_mapper import Record, WellLogRecord
from pandas import DataFrame, read_parquet


logger = get_logger(__name__)


class DataLoaderWebResponseError(Exception):
    """
    Common class for web response errors
    """

    def __init__(self, http_code: str, url: str, message: str = ""):
        """
        Create a new instance of a DataLoaderWebResponseError

        :param str http_code: The HTTP response code.
        :param str url: The URL that was called.
        :param str message: An optional message.
        """
        super().__init__(f"HTTP error: {http_code}. Calling: '{url}'. {message}")


class OsduClient:
    def __init__(self, base_url: str, access_token: str, data_partition_id: str) -> None:
        """
        Create a new instance of a OsduClient

        :param str base_url: The base URL of the OSDU instance.
        :param str access_token: The access token required to access the OSDU instance.
        :param str data_partition_id: The data partition ID
        """
        self._access_token = access_token
        self._base_url = base_url
        self._data_partition_id = data_partition_id

    def _create_headers(self) -> Dict[str, str]:
        """
        Create a new set of auth headers for OSDU

        :return: the header for a standard request
        :rtype: str
        """
        return {
            "Authorization": f"Bearer {self._access_token}",
            "data-partition-id": self._data_partition_id,
        }

    def create_wellbore(self, record: Record) -> List[any]:
        """
        Make a post request to OSDU to create a new wellbore record.

        :param Record record: The wellbore record to be uploaded
        :return: the id of the new well bore
        :rtype: str
        """
        return self._post_data_with_id_response("/api/os-wellbore-ddms/ddms/v3/wellbores", record)

    def post_welllog(self, record: Union[Record, WellLogRecord]) -> List[any]:
        """
        Make a post request to OSDU to create a new or update an existing well log record.
        If the `welllog_record` contains an `id` attribute, the well log with that ID shall be updated.
        If the `welllog_record` does not contain an `id` attribute, a new record will be created.

        :param Union[Record, WellLogRecord] welllog_record: The well log record to be uploaded.
        :return: The ID of the created/updated well log
        :rtype: List[any]
        """
        return self._post_data_with_id_response("/api/os-wellbore-ddms/ddms/v3/welllogs", record)

    def get_wellbore_record(self, wellbore_id: str) -> Record:
        """
        Make a get request to OSDU to retreive an existing wellbore record.

        :param str wellbore_id: The wellbore id to be uploaded
        :return: the wellbore record
        :rtype: Record
        """
        get_record_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/wellbores/{wellbore_id}"
        return Record(self._send_request_json_response("GET", get_record_url, None, None))

    def get_welllog_record(self, welllog_id: str) -> WellLogRecord:
        """
        Make a get request to OSDU to retreive an existing welllog record.

        :param str welllog_id: The welllog id to be uploaded
        :return: the welllog record
        :rtype: WellLogRecord
        """
        get_record_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}"
        return WellLogRecord(self._send_request_json_response("GET", get_record_url, None, None))

    def add_welllog_data(self, data: DataFrame, welllog_id: str) -> None:
        """
        Make a post request to OSDU to add a new data to an existing well log.

        :param DataFrame data: The welllog data to add
        :param str welllog_id: The id of the new well log
        """

        url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data"

        # Adding content-type to standard headers
        headers = self._create_headers()
        headers.update({"content-type": "application/x-parquet"})

        payload = data.to_parquet()
        self._send_request_json_response("POST", url, payload, None, headers=headers)

    def get_welllog_data(self, welllog_id: str, curves: List[str] = None) -> DataFrame:
        """
        Get welllog data for all or a specified set of curves.
        :param str welllog_id: The id of the well log
        :param List[str] curves: The welllog curves, None or empty returns all curves
        :return: The welllog data
        :rtype: DataFrame
        """
        curves_query = ""
        if curves is not None and len(curves) > 0:
            curves_query = f"&curves={','.join(curves)}"

        get_welllog_data_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data?describe=false{curves_query}"

        try:
            res = self._send_request("GET", get_welllog_data_url, None, None)
        except DataLoaderWebResponseError as ex:
            logger.warning(f"No curve data found. Error: {str(ex)}")
            return None
        return read_parquet(io.BytesIO(res.content))

    def post_log_recognition(self, mnemonic: str, unit: str) -> any:
        """
        Make a request to OSDU to recognize a particular combination of mnemonic and unit.
        :param str mnemonic: The mnemonic
        :param str unit: The unit
        :return: A dictionary that contains the curve family
        :rtype: any
        """
        recognize_family_url = f"{self._base_url}/api/os-wellbore-ddms/log-recognition/family"
        payload = {"label": mnemonic, "log_unit": unit}
        return self._send_request_json_response("POST", recognize_family_url, None, payload)

    def search_for_wellbore(self, wellbore_name: str) -> List[str]:
        """
        Search an OSDU instance to find all wellbores with the specified name and return their ids.
        :param str wellbore_name: The well name
        :return: A List that contains the matching wellbore ids
        :rtype: List[str]
        """
        url = f"{self._base_url}/api/search/v2/query"

        payload = {
            "kind": "*:*:master-data--Wellbore:*",
            "query": f'data.FacilityName:"{wellbore_name}"',
            "returnedFields": ["id"],
            "limit": 1000
        }

        result = self._send_request_json_response("POST", url, None, payload)
        if result is None or result.get("totalCount") is None or result.get("totalCount") < 1:
            return []
        else:
            return [r["id"] for r in result.get("results")]

    def _post_data_with_id_response(self, path: str, record: Record) -> List[any]:
        payload = [record.get_raw_data()]
        url = f"{self._base_url}{path}"

        parsed_result = self._send_request_json_response("POST", url, None, payload)

        if "recordIds" in parsed_result:
            return parsed_result["recordIds"]
        else:
            return []

    def _send_request_json_response(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
        res = self._send_request(method, url, content, json, headers)
        return res.json()

    def _send_request(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
        logger.debug(f"{method}: {url}")

        headers = self._create_headers() if headers is None else headers

        with httpx.Client() as client:
            res = client.request(method, url, json=json, content=content, headers=headers)

            if res.status_code >= 300:
                raise DataLoaderWebResponseError(res.status_code, url, res.text)

            return res