osdu_client.py 9.19 KB
Newer Older
Greg Harris's avatar
Greg Harris committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from typing import Dict, List, Union
import httpx
import io
from knack.log import get_logger
from ..service.record_mapper import Record, WellLogRecord
from pandas import DataFrame, read_parquet


logger = get_logger(__name__)


class DataLoaderWebResponseError(Exception):
    """
    Common class for web response errors
    """

    def __init__(self, http_code: str, url: str, message: str = ""):
        """
        Create a new instance of a DataLoaderWebResponseError

        :param str http_code: The HTTP response code.
        :param str url: The URL that was called.
        :param str message: An optional message.
        """
        super().__init__(f"HTTP error: {http_code}. Calling: '{url}'. {message}")


class OsduClient:
29
    def __init__(self, base_url: str, access_token: str, data_partition_id: str, csp_base_url_prefix: str, csp_prefix_wellbore_service: str, csp_prefix_search_service: str) -> None:
Greg Harris's avatar
Greg Harris committed
30
31
32
33
34
35
36
37
38
39
        """
        Create a new instance of a OsduClient

        :param str base_url: The base URL of the OSDU instance.
        :param str access_token: The access token required to access the OSDU instance.
        :param str data_partition_id: The data partition ID
        """
        self._access_token = access_token
        self._base_url = base_url
        self._data_partition_id = data_partition_id
40
        self._csp_base_url_prefix = csp_base_url_prefix
41
42
43
        self._csp_prefix_wellbore_service = csp_prefix_wellbore_service
        self._csp_prefix_search_service = csp_prefix_search_service

Greg Harris's avatar
Greg Harris committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    def _create_headers(self) -> Dict[str, str]:
        """
        Create a new set of auth headers for OSDU

        :return: the header for a standard request
        :rtype: str
        """
        return {
            "Authorization": f"Bearer {self._access_token}",
            "data-partition-id": self._data_partition_id,
        }

    def create_wellbore(self, record: Record) -> List[any]:
        """
        Make a post request to OSDU to create a new wellbore record.

        :param Record record: The wellbore record to be uploaded
        :return: the id of the new well bore
        :rtype: str
        """
64
        return self._post_data_with_id_response(f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/wellbores', record)
Greg Harris's avatar
Greg Harris committed
65
66
67
68
69
70
71
72
73
74
75

    def post_welllog(self, record: Union[Record, WellLogRecord]) -> List[any]:
        """
        Make a post request to OSDU to create a new or update an existing well log record.
        If the `welllog_record` contains an `id` attribute, the well log with that ID shall be updated.
        If the `welllog_record` does not contain an `id` attribute, a new record will be created.

        :param Union[Record, WellLogRecord] welllog_record: The well log record to be uploaded.
        :return: The ID of the created/updated well log
        :rtype: List[any]
        """
76
        return self._post_data_with_id_response(f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/welllogs', record)
Greg Harris's avatar
Greg Harris committed
77
78
79
80
81
82
83
84
85

    def get_wellbore_record(self, wellbore_id: str) -> Record:
        """
        Make a get request to OSDU to retreive an existing wellbore record.

        :param str wellbore_id: The wellbore id to be uploaded
        :return: the wellbore record
        :rtype: Record
        """
86
87
88
89

        get_record_url = f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/wellbores/{wellbore_id}'
        #get_record_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/wellbores/{wellbore_id}"

Greg Harris's avatar
Greg Harris committed
90
91
92
93
94
95
96
97
98
99
        return Record(self._send_request_json_response("GET", get_record_url, None, None))

    def get_welllog_record(self, welllog_id: str) -> WellLogRecord:
        """
        Make a get request to OSDU to retreive an existing welllog record.

        :param str welllog_id: The welllog id to be uploaded
        :return: the welllog record
        :rtype: WellLogRecord
        """
100
101
        get_record_url = f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}'
        #get_record_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}"
Greg Harris's avatar
Greg Harris committed
102
103
104
105
106
107
108
109
110
111
        return WellLogRecord(self._send_request_json_response("GET", get_record_url, None, None))

    def add_welllog_data(self, data: DataFrame, welllog_id: str) -> None:
        """
        Make a post request to OSDU to add a new data to an existing well log.

        :param DataFrame data: The welllog data to add
        :param str welllog_id: The id of the new well log
        """

112
113
        url = f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data'
        #url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data"
Greg Harris's avatar
Greg Harris committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

        # Adding content-type to standard headers
        headers = self._create_headers()
        headers.update({"content-type": "application/x-parquet"})

        payload = data.to_parquet()
        self._send_request_json_response("POST", url, payload, None, headers=headers)

    def get_welllog_data(self, welllog_id: str, curves: List[str] = None) -> DataFrame:
        """
        Get welllog data for all or a specified set of curves.
        :param str welllog_id: The id of the well log
        :param List[str] curves: The welllog curves, None or empty returns all curves
        :return: The welllog data
        :rtype: DataFrame
        """
        curves_query = ""
        if curves is not None and len(curves) > 0:
            curves_query = f"&curves={','.join(curves)}"

134
135
136

        get_welllog_data_url = f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data?describe=false{curves_query}'
        #get_welllog_data_url = f"{self._base_url}/api/os-wellbore-ddms/ddms/v3/welllogs/{welllog_id}/data?describe=false{curves_query}"
Greg Harris's avatar
Greg Harris committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

        try:
            res = self._send_request("GET", get_welllog_data_url, None, None)
        except DataLoaderWebResponseError as ex:
            logger.warning(f"No curve data found. Error: {str(ex)}")
            return None
        return read_parquet(io.BytesIO(res.content))

    def post_log_recognition(self, mnemonic: str, unit: str) -> any:
        """
        Make a request to OSDU to recognize a particular combination of mnemonic and unit.
        :param str mnemonic: The mnemonic
        :param str unit: The unit
        :return: A dictionary that contains the curve family
        :rtype: any
        """
153
154
        recognize_family_url = f'{self._get_base_url("wellbore")}/api/os-wellbore-ddms/log-recognition/family'
        #recognize_family_url = f"{self._base_url}/api/os-wellbore-ddms/log-recognition/family"
Greg Harris's avatar
Greg Harris committed
155
156
157
158
159
160
161
162
163
164
        payload = {"label": mnemonic, "log_unit": unit}
        return self._send_request_json_response("POST", recognize_family_url, None, payload)

    def search_for_wellbore(self, wellbore_name: str) -> List[str]:
        """
        Search an OSDU instance to find all wellbores with the specified name and return their ids.
        :param str wellbore_name: The well name
        :return: A List that contains the matching wellbore ids
        :rtype: List[str]
        """
165
166
        #url = f"{self._base_url}/api/search/v2/query"
        url = f'{self._get_base_url("search")}/api/search/v2/query'
Greg Harris's avatar
Greg Harris committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182

        payload = {
            "kind": "*:*:master-data--Wellbore:*",
            "query": f'data.FacilityName:"{wellbore_name}"',
            "returnedFields": ["id"],
            "limit": 1000
        }

        result = self._send_request_json_response("POST", url, None, payload)
        if result is None or result.get("totalCount") is None or result.get("totalCount") < 1:
            return []
        else:
            return [r["id"] for r in result.get("results")]

    def _post_data_with_id_response(self, path: str, record: Record) -> List[any]:
        payload = [record.get_raw_data()]
183
184
        url = f"{path}"
        #url = f"{self._base_url}{path}"
Greg Harris's avatar
Greg Harris committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

        parsed_result = self._send_request_json_response("POST", url, None, payload)

        if "recordIds" in parsed_result:
            return parsed_result["recordIds"]
        else:
            return []

    def _send_request_json_response(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
        res = self._send_request(method, url, content, json, headers)
        return res.json()

    def _send_request(self, method: str, url: str, content: any, json: Dict[str, any], headers=None) -> any:
        logger.debug(f"{method}: {url}")

        headers = self._create_headers() if headers is None else headers

        with httpx.Client() as client:
            res = client.request(method, url, json=json, content=content, headers=headers)

            if res.status_code >= 300:
                raise DataLoaderWebResponseError(res.status_code, url, res.text)

            return res
209
210
211
212

    def _get_base_url( self,service_name: str) -> str:
        if self._csp_base_url_prefix == "yes":
            if service_name == "wellbore":
213
                 return  f"{self._base_url}/{self._csp_prefix_wellbore_service}"
214
            if service_name == "search":
215
                 return f"{self._base_url}/{self._csp_prefix_search_service}"
216
217
        else:
            return self._base_url