Skip to content
Snippets Groups Projects
Commit 74ebe022 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'feature/GONRG-1269_Refactor_and_unit_tests' into 'integration-master'

GONRG-1269 "Feature/ refactor and unit tests"

See merge request go3-nrg/platform/data-flow/ingestion/ingestion-dags!18
parents 3b4a1c74 68f80897
No related branches found
No related tags found
1 merge request!6R3 Data Ingestion
Showing
with 1218 additions and 522 deletions
......@@ -34,3 +34,8 @@
# will remove it latter
**/schema_registration/
# ignore coverage.py
htmlcov/*
.coverage
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
@dataclasses.dataclass
class Context(object):
"""
Store data-partition-id and AppKey passed via Payload field of dagrun.conf.
"""
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
ctx_payload = ctx.pop('Payload')
ctx_obj = cls(
app_key=ctx_payload['AppKey'],
data_partition_id=ctx_payload['data-partition-id']
)
return ctx_obj
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from libs.context import Context
class HeadersMixin(object):
"""
Mixin for creating request headers to OSDU services using context.
"""
def __init__(self, context: Context):
self.context = context
@property
def request_headers(self) -> dict:
headers = {
'Content-type': 'application/json',
'data-partition-id': self.context.data_partition_id,
'AppKey': self.context.app_key
}
return headers
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
import logging
import uuid
from typing import List
import requests
import tenacity
from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
from libs.mixins import HeadersMixin
from libs.refresh_token import AirflowTokenRefresher, refresh_token
logger = logging.getLogger()
RETRIES = 3
TIMEOUT = 1
class ManifestProcessor(HeadersMixin):
"""Class to process WP, Master and Reference data"""
RECORD_TEMPLATE = {
"legal": {},
"acl": {},
"kind": "",
"id": "",
"data": {
}
}
def __init__(self, storage_url: str, dagrun_conf: dict, context: Context):
super().__init__(context)
self.storage_url = storage_url
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
@staticmethod
def _get_kind_name(kind: str) -> str:
"""
osdu:osdu:Well:1.0.0 -> Well
"""
kind_name = kind.split(":")[2]
return kind_name
def generate_id(self, manifest_fragment: dict) -> str:
"""
Generate id to use it in Storage.
"""
group_type = manifest_fragment.get("groupType", "doc")
kind = manifest_fragment.get("kind")
kind_name = self._get_kind_name(kind)
_id = f"{self.context.data_partition_id}:{group_type}_{kind_name}:{str(uuid.uuid4())}"
return _id
def populate_manifest_storage_record(self, manifest: dict) -> dict:
"""
Create a record from Master-manifest to store it in Storage service
"""
record = copy.deepcopy(self.RECORD_TEMPLATE)
record["id"] = manifest["id"] if manifest.get("id") else self.generate_id(manifest)
record["kind"] = manifest.pop("kind")
record["legal"] = manifest.pop("legal")
record["acl"] = manifest.pop("acl")
record["data"] = manifest
return record
def _validate_storage_response(self, response_dict: dict):
if not (
isinstance(response_dict, dict) and
isinstance(response_dict.get("recordIds"), list)
):
raise ValueError(f"Invalid answer from Storage server: {response_dict}")
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@refresh_token(AirflowTokenRefresher())
def save_record(self, headers: dict, request_data: List[dict]) -> requests.Response:
"""
Send request to record storage API.
"""
request_data = json.dumps(request_data)
logger.info("Send to Storage service")
logger.info(f"{request_data}")
response = requests.put(self.storage_url, request_data, headers=headers)
if response.ok:
response_dict = response.json()
self._validate_storage_response(response_dict)
logger.info(f"Response: {response_dict}")
logger.info(",".join(map(str, response_dict["recordIds"])))
else:
reason = response.text[:250]
logger.error(f"Request error.")
logger.error(f"Response status: {response.status_code}. "
f"Response content: {reason}.")
return response
def process_work_product(self, manifest: dict) -> List[dict]:
"""
Process WP.
"""
wp = manifest["WorkProduct"]
records = [self.populate_manifest_storage_record(wp)]
return records
def process_work_product_components(self, manifest: dict) -> List[dict]:
"""
Process list of WPS.
"""
records = []
for wpc in manifest["WorkProductComponents"]:
record = self.populate_manifest_storage_record(wpc)
records.append(record)
return records
def process_work_product_files(self, manifest: dict) -> List[dict]:
"""
Process list of files.
"""
records = []
for file in manifest["Files"]:
record = self.populate_manifest_storage_record(file)
records.append(record)
return records
def process_work_product_manifest(self, manifest: dict) -> List[dict]:
file_records = self.process_work_product_files(manifest)
wpc_records = self.process_work_product_components(manifest)
wp_records = self.process_work_product(manifest)
records = file_records + wpc_records + wp_records
return records
def create_manifest_records(self) -> List[dict]:
"""
Process every record in manifest field
Return list of records ready to be saved into Storage service.
"""
manifest_records = []
manifests = self.data_object["manifest"]
for manifest in manifests:
if "WorkProduct" in manifest:
wp_records = self.process_work_product_manifest(manifest)
manifest_records.extend(wp_records)
else:
record = self.populate_manifest_storage_record(manifest)
manifest_records.append(record)
return manifest_records
def process_manifest(self) -> List[str]:
"""
Process manifests and save them into Storage service.
Returns recordIds of saved records.
"""
if "manifest" in self.data_object:
manifest_records = self.create_manifest_records()
else:
raise EmptyManifestError
response = self.save_record(self.request_headers, request_data=manifest_records)
record_ids = response.json()["recordIds"]
return record_ids
......@@ -13,13 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import json
import logging
import os
import sys
from typing import Callable, Union
from abc import ABC, abstractmethod
from functools import partial
from http import HTTPStatus
from urllib.parse import urlparse
......@@ -30,13 +29,7 @@ from google.oauth2 import service_account
from libs.exceptions import RefreshSATokenError, SAFilePathError
from tenacity import retry, stop_after_attempt
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger = logging.getLogger()
RETRIES = 3
......@@ -68,7 +61,7 @@ class TokenRefresher(ABC):
class AirflowTokenRefresher(TokenRefresher):
DEFAULT_ACCESS_SCOPES = ['openid', 'email', 'profile']
def __init__(self, access_scopes: list=None):
def __init__(self, access_scopes: list = None):
from airflow.models import Variable
self.airflow_variables = Variable
self._access_token = None
......@@ -86,7 +79,7 @@ class AirflowTokenRefresher(TokenRefresher):
@staticmethod
@retry(stop=stop_after_attempt(RETRIES))
def get_sa_file_content_from_google_storage(bucket_name: str, source_blob_name: str) -> str:
def get_sa_info_from_google_storage(bucket_name: str, source_blob_name: str) -> dict:
"""
Get sa_file content from Google Storage.
"""
......@@ -94,9 +87,15 @@ class AirflowTokenRefresher(TokenRefresher):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
logger.info("Got SA_file.")
return blob.download_as_string()
sa_info = json.loads(blob.download_as_string())
return sa_info
def get_sa_file_info(self) -> dict:
@staticmethod
def get_sa_info_from_file(path: str) -> dict:
with open(path) as f:
return json.load(f)
def get_sa_info(self) -> dict:
"""
Get file path from SA_FILE_PATH environmental variable.
This path can be GCS object URI or local file path.
......@@ -107,28 +106,30 @@ class AirflowTokenRefresher(TokenRefresher):
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
sa_file_content = self.get_sa_file_content_from_google_storage(bucket_name,
source_blob_name)
sa_file_info = json.loads(sa_file_content)
sa_info = self.get_sa_info_from_google_storage(bucket_name, source_blob_name)
elif not parsed_path.scheme and os.path.isfile(parsed_path.path):
with open(parsed_path.path) as f:
sa_file_info = json.load(f)
sa_info = self.get_sa_info_from_file(parsed_path.path)
else:
raise SAFilePathError
return sa_file_info
logger.error("SA file path error.")
raise SAFilePathError(f"Got path {os.environ.get('SA_FILE_PATH', None)}")
return sa_info
@retry(stop=stop_after_attempt(RETRIES))
def get_access_token_using_sa_file(self) -> str:
"""
Get new access token using SA info.
"""
sa_file_content = self.get_sa_file_info()
def _get_credentials_from_sa_info(self, sa_info: dict) -> service_account.Credentials:
try:
credentials = service_account.Credentials.from_service_account_info(
sa_file_content, scopes=self.access_scopes)
sa_info, scopes=self.access_scopes)
except ValueError as e:
logger.error("SA file has bad format.")
raise e
return credentials
def get_access_token_using_sa_file(self) -> str:
"""
Get new access token using SA info.
"""
sa_info = self.get_sa_info()
credentials = self._get_credentials_from_sa_info(sa_info)
logger.info("Refresh token.")
credentials.refresh(Request())
......@@ -171,9 +172,9 @@ def make_callable_request(obj: Union[object, None], request_function: Callable,
Create send_request_with_auth function.
"""
if obj: # if wrapped function is an object's method
callable_request = lambda: request_function(obj, headers, *args, **kwargs)
callable_request = partial(request_function, obj, headers, *args, **kwargs)
else:
callable_request = lambda: request_function(headers, *args, **kwargs)
callable_request = partial(request_function, headers, *args, **kwargs)
return callable_request
......@@ -199,7 +200,7 @@ def _validate_token_refresher_type(token_refresher: TokenRefresher):
def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
**kwargs) -> requests.Response:
**kwargs) -> requests.Response:
"""
Send request with authorization token. If response status is in HTTPStatus.UNAUTHORIZED or
HTTPStatus.FORBIDDEN, then refresh token and send request once again.
......@@ -224,7 +225,11 @@ def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
headers,
*args, **kwargs)
response = send_request_with_auth()
response.raise_for_status()
try:
response.raise_for_status()
except requests.HTTPError as e:
logger.error(f"{response.text}")
raise e
return response
......@@ -238,6 +243,7 @@ def refresh_token(token_refresher: TokenRefresher) -> Callable:
Or method:
request_method(self, header: dict, *args, **kwargs) -> requests.Response
"""
_validate_token_refresher_type(token_refresher)
def refresh_token_wrapper(request_function: Callable) -> Callable:
......@@ -245,17 +251,17 @@ def refresh_token(token_refresher: TokenRefresher) -> Callable:
if is_method:
def _wrapper(obj: object, headers: dict, *args, **kwargs) -> requests.Response:
return send_request_with_auth_header(token_refresher,
request_function=request_function,
obj=obj,
headers=headers,
*args,
**kwargs)
request_function=request_function,
obj=obj,
headers=headers,
*args,
**kwargs)
else:
def _wrapper(headers: dict, *args, **kwargs) -> requests.Response:
return send_request_with_auth_header(token_refresher,
request_function=request_function,
headers=headers,
*args, **kwargs)
request_function=request_function,
headers=headers,
*args, **kwargs)
return _wrapper
return refresh_token_wrapper
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import requests
import tenacity
from libs.context import Context
from libs.exceptions import RecordsNotSearchableError
from libs.mixins import HeadersMixin
from libs.refresh_token import AirflowTokenRefresher, refresh_token
logger = logging.getLogger()
RETRIES = 5
WAIT = 5
TIMEOUT = 10
class SearchId(HeadersMixin):
def __init__(self, search_url: str, record_ids: list, context: Context):
super().__init__(context)
if not record_ids:
logger.error("There are no record ids")
raise ValueError("There are no record id")
self.record_ids = record_ids
self.search_url = search_url
self.expected_total_count = len(record_ids)
self._create_request_body()
def _create_search_query(self) -> str:
"""
Create search query to send to Search service using recordIds need to be found.
"""
record_ids = " OR ".join(f"\"{id_}\"" for id_ in self.record_ids)
logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})"
return query
def _create_request_body(self):
"""
Create request body to send to Search service.
"""
query = self._create_search_query()
request_body = {
"kind": "*:*:*:*",
"query": query
}
self.request_body = json.dumps(request_body)
def _is_record_searchable(self, response: requests.Response) -> bool:
"""
Check if search service returns expected totalCount of records.
"""
logger.info(response.text)
data = response.json()
total_count = data.get('totalCount')
logger.info(f"Got total count {total_count}")
if total_count is None:
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
return total_count == self.expected_total_count
@tenacity.retry(
wait=tenacity.wait_exponential(WAIT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@refresh_token(AirflowTokenRefresher())
def search_files(self, headers: dict) -> requests.Response:
"""
Send request with recordIds to Search service.
"""
if self.request_body:
response = requests.post(self.search_url, self.request_body, headers=headers)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count,
)
raise RecordsNotSearchableError
return response
def check_records_searchable(self):
"""
Check if every record in self.record_ids is searchable.
"""
headers = self.request_headers
self.search_files(headers)
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import requests
from libs.context import Context
from libs.mixins import HeadersMixin
from libs.refresh_token import AirflowTokenRefresher, refresh_token
logger = logging.getLogger()
class UpdateStatus(HeadersMixin):
def __init__(
self,
workflow_id: str,
workflow_url: str,
status: str,
context: Context,
) -> None:
super().__init__(context)
self.workflow_url = workflow_url
self.workflow_id = workflow_id
self.context = context
self.status = status
@refresh_token(AirflowTokenRefresher())
def update_status_request(self, headers: dict) -> requests.Response:
request_body = {
"WorkflowID": self.workflow_id,
"Status": self.status
}
request_body = json.dumps(request_body)
logger.info(f" Sending request '{request_body}'")
response = requests.post(self.workflow_url, request_body, headers=headers)
return response
def update_workflow_status(self):
"""
Update current workflowID.
"""
headers = self.request_headers
self.update_status_request(headers)
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import logging
import jsonschema
import requests
import tenacity
from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
from libs.mixins import HeadersMixin
from libs.refresh_token import AirflowTokenRefresher, refresh_token
logger = logging.getLogger()
RETRIES = 3
TIMEOUT = 1
class OSDURefResolver(jsonschema.RefResolver):
def __init__(self, schema_service: str, *args, **kwargs):
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document: dict, fragment: str) -> dict:
"""
Extend base resolve_fragment method. If a nested schema has 'definitions' field and there
is a schema under this 'definitions', jsonschema attempts to use the id field of this
double-nested schema as URI to get this schema later. So it has sense to replace this id
with a correct one.
"""
document = super().resolve_fragment(document, fragment)
fragment_parts = fragment.split("/") # /definitions/<OsduID> -> [..., <OsduID>]
if len(fragment_parts) > 1:
osdu_id = fragment_parts[-1]
url = f"{self.schema_service}/{osdu_id}"
document["$id"] = url
return document
class SchemaValidator(HeadersMixin):
"""Class to validate schema of Manifests."""
def __init__(self, schema_service: str, dagrun_conf: dict, context: Context):
super().__init__(context)
self.schema_service = schema_service
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
self.resolver_handlers = {
"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request
}
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@refresh_token(AirflowTokenRefresher())
def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
"""
Request to Schema service to retrieve schema.
"""
response = requests.get(uri, headers=headers, timeout=60)
return response
def get_schema_request(self, uri: str) -> dict:
"""
Get Schema from Schema service. Change $id field to url of getting schema.
"""
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_from_schema_service(self.request_headers, uri).json()
response["$id"] = uri
return response
def get_schema(self, kind: str) -> dict:
manifest_schema_uri = f"{self.schema_service}/{kind}"
try:
response = self.get_schema_request(manifest_schema_uri)
except Exception as e:
logger.error(f"Error on getting schema of kind '{kind}'")
raise e
return response
def _validate_schema(self, manifest: dict, schema: dict = None):
"""
Validate schema. If argument schema is not defined, then use schema service to retrieve
corresponding schema.
"""
if not schema:
schema = self.get_schema(manifest["kind"])
logger.info(f"Validating kind {manifest['kind']}")
resolver = OSDURefResolver(schema_service=self.schema_service,
base_uri=schema.get("$id", ""), referrer=schema,
handlers=self.resolver_handlers, cache_remote=True)
validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
validator.validate(manifest)
def validate_work_product(self, work_product: dict):
"""
Validate WP manifest. Raise error if manifest is not valid.
"""
for key, value in work_product.items():
if key != "WorkProduct":
for component in value:
self._validate_schema(component)
else:
self._validate_schema(value)
def validate_manifest(self):
"""
Validate manifest. Raise error if manifest is not valid.
"""
if "manifest" not in self.data_object:
raise EmptyManifestError
for manifest in self.data_object["manifest"]:
if isinstance(manifest, dict) and manifest.get("kind"):
self._validate_schema(manifest)
elif manifest.get("WorkProductComponents"):
self.validate_work_product(manifest)
else:
raise NotOSDUShemaFormatError(f"Not valid schema {manifest}")
......@@ -40,13 +40,11 @@ dag = DAG(
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
status=UpdateStatusOperator.RUNNING_STATUS,
dag=dag
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
status=UpdateStatusOperator.FINISHED_STATUS,
dag=dag,
trigger_rule="all_done",
)
......
......@@ -26,6 +26,7 @@ from typing import Tuple
from urllib.error import HTTPError
import requests
import tenacity
from airflow.models import BaseOperator, Variable
from libs.refresh_token import AirflowTokenRefresher, refresh_token
......@@ -261,6 +262,11 @@ def create_workproduct_request_data(loaded_conf: dict, product_type: str, wp, wp
return data_objects_list
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@refresh_token(AirflowTokenRefresher())
def send_request(headers, request_data):
"""
......
......@@ -13,291 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import dataclasses
import json
import logging
import sys
import uuid
from typing import List
import jsonschema
import requests
import tenacity
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
from libs.refresh_token import AirflowTokenRefresher, refresh_token
from libs.context import Context
from libs.process_manifest_r3 import ManifestProcessor
from libs.validate_schema import SchemaValidator
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger = logging.getLogger()
RETRIES = 3
TIMEOUT = 1
@dataclasses.dataclass
class Context(object):
"""
Store data-partition-id and AppKey passed via Payload field of dagrun.conf.
Remove Payload from dagrun.conf.
"""
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
ctx_payload = ctx.pop('Payload')
app_key = ctx_payload['AppKey']
data_partition_id = ctx_payload['data-partition-id']
ctx_obj = cls(app_key=app_key, data_partition_id=data_partition_id)
return ctx_obj
class OSDURefResolver(jsonschema.RefResolver):
def __init__(self, schema_service: str, *args, **kwargs):
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document: dict, fragment: str) -> dict:
"""
Extend base resolve_fragment method. If a nested schema has 'definitions' field and there
is a schema under this 'definitions', jsonschema attempts to use the id field of this
double-nested schema as URI to get this schema later. So it has sense to replace this id
with a correct one.
"""
document = super().resolve_fragment(document, fragment)
fragment_parts = fragment.split("/") # /definitions/<OsduID> -> [..., <OsduID>]
if len(fragment_parts) > 1:
osdu_id = fragment_parts[-1]
url = f"{self.schema_service}/{osdu_id}"
document["$id"] = url
return document
class SchemaValidator(object):
"""Class to validate schema of Manifests."""
def __init__(self, schema_service: str, dagrun_conf: dict, context: Context):
self.schema_service = schema_service
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
self.resolver_handlers = {"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request}
self.create_request_headers()
def create_request_headers(self):
self.request_headers = {
'Content-type': 'application/json',
'data-partition-id': self.context.data_partition_id,
'AppKey': self.context.app_key,
}
@refresh_token(AirflowTokenRefresher())
def _get_schema_request(self, headers: dict, uri: str) -> requests.Response:
"""
Request to Schema service to retrieve schema.
"""
response = requests.get(uri, headers=headers, timeout=60)
return response
def get_schema_request(self, uri: str) -> dict:
"""
Get Schema from Schema service. Change $id field to url of getting schema.
"""
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_request(self.request_headers, uri).json()
response["$id"] = uri
return response
def get_schema(self, kind: str) -> dict:
manifest_schema_uri = f"{self.schema_service}/{kind}"
try:
response = self.get_schema_request(manifest_schema_uri)
except Exception as e:
logger.error(f"Error on getting schema of kind '{kind}'")
raise e
return response
def _validate_schema(self, manifest: dict, schema: dict = None):
"""
Validate schema. If argument schema is not defined, then use schema service to retrieve
corresponding schema.
"""
if not schema:
schema = self.get_schema(manifest["kind"])
logger.info(f"Validating kind {manifest['kind']}")
resolver = OSDURefResolver(schema_service=self.schema_service,
base_uri=schema.get("$id", ""), referrer=schema,
handlers=self.resolver_handlers, cache_remote=True)
validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
validator.validate(manifest)
def validate_work_product(self, work_product: dict):
"""
Validate WP manifest. Raise error if manifest is not valid.
"""
for key, value in work_product.items():
if key != "WorkProduct":
for component in value:
self._validate_schema(component)
else:
self._validate_schema(value)
def validate_manifest(self):
"""
Validate manifest. Raise error if manifest is not valid.
"""
for manifest in self.data_object["manifest"]:
if isinstance(manifest, dict) and manifest.get("kind"):
self._validate_schema(manifest)
elif manifest.get("WorkProductComponents"):
self.validate_work_product(manifest)
else:
raise NotOSDUShemaFormatError(f"Not valid schema {manifest}")
class ManifestProcessor(object):
"""Class to process WP, Master and Reference data"""
RECORD_TEMPLATE = {
"legal": {},
"acl": {},
"kind": "",
"id": "",
"data": {
}
}
def __init__(self, storage_url, dagrun_conf, context):
self.storage_url = storage_url
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
@staticmethod
def _get_kind_name(kind: str) -> str:
"""
osdu:osdu:Well:1.0.0 -> Well
"""
kind_name = kind.split(":")[2]
return kind_name
def generate_id(self, manifest_fragment: dict) -> str:
"""
Generate id to use it in Storage.
"""
group_type = manifest_fragment.get("groupType", "doc")
kind = manifest_fragment.get("kind")
kind_name = self._get_kind_name(kind)
_id = f"{self.context.data_partition_id}:{group_type}_{kind_name}:{str(uuid.uuid4())}"
return _id
@property
def request_headers(self) -> dict:
headers = {
'Content-type': 'application/json',
'data-partition-id': self.context.data_partition_id,
'AppKey': self.context.app_key
}
return headers
def populate_manifest_storage_record(self, manifest: dict) -> dict:
"""
Create a record from Master-manifest to store it in Storage service
"""
record = copy.deepcopy(self.RECORD_TEMPLATE)
record["id"] = self.generate_id(manifest)
record["kind"] = manifest.pop("kind")
record["legal"] = manifest.pop("legal")
record["acl"] = manifest.pop("acl")
record["data"] = manifest
return record
@tenacity.retry(tenacity.wait_fixed(TIMEOUT),
tenacity.stop_after_attempt(RETRIES))
@refresh_token(AirflowTokenRefresher())
def save_record(self, headers: dict, request_data: List[dict]) -> requests.Response:
"""
Send request to record storage API.
"""
response = requests.put(self.storage_url, json.dumps(request_data), headers=headers)
if response.ok:
logger.info(",".join(map(str, response.json()["recordIds"])))
else:
reason = response.text[:250]
logger.error(f"Request error.")
logger.error(f"Response status: {response.status_code}. "
f"Response content: {reason}.")
return response
def process_work_product(self, manifest: dict) -> List[dict]:
"""
Process WP.
"""
wp = manifest["WorkProduct"]
records = [self.populate_manifest_storage_record(wp)]
return records
def process_work_product_components(self, manifest: dict) -> List[dict]:
"""
Process list of WPS.
"""
records = []
for wpc in manifest["WorkProductComponents"]:
record = self.populate_manifest_storage_record(wpc)
records.append(record)
return records
def process_work_product_files(self, manifest: dict) -> List[dict]:
"""
Process list of files.
"""
records = []
for file in manifest["Files"]:
record = self.populate_manifest_storage_record(file)
records.append(record)
return records
def process_work_product_manifest(self, manifest: dict) -> List[dict]:
file_records = self.process_work_product_files(manifest)
wpc_records = self.process_work_product_components(manifest)
wp_records = self.process_work_product(manifest)
records = file_records + wpc_records + wp_records
return records
def create_manifest_records(self) -> List[dict]:
manifest_records = []
manifests = self.data_object["manifest"]
for manifest in manifests:
if "WorkProduct" in manifest:
wp_records = self.process_work_product_manifest(manifest)
manifest_records.extend(wp_records)
else:
record = self.populate_manifest_storage_record(manifest)
manifest_records.append(record)
return manifest_records
def process_manifest(self) -> List[str]:
if "manifest" in self.data_object:
manifest_records = self.create_manifest_records()
else:
raise EmptyManifestError
response = self.save_record(self.request_headers, request_data=manifest_records)
record_ids = response.json()["recordIds"]
return record_ids
class ProcessManifestOperatorR3(BaseOperator):
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
def pre_execute(self, context: dict):
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.schema_service_url = Variable.get('schema_service_url')
self.storage_url = Variable.get('storage_url')
......
......@@ -13,26 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import sys
from typing import Tuple
import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks import search_http_hook, workflow_hook
from libs.exceptions import RecordsNotSearchableError
from libs.refresh_token import AirflowTokenRefresher, refresh_token
from libs.context import Context
from libs.search_record_ids import SearchId
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger = logging.getLogger()
class SearchRecordIdOperator(BaseOperator):
......@@ -47,82 +34,12 @@ class SearchRecordIdOperator(BaseOperator):
RUNNING_STATUS = "running"
FAILED_STATUS = "failed"
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.workflow_hook = workflow_hook
self.search_hook = search_http_hook
# the will be set at the beginning of execute method
self.request_body = None
self.expected_total_count = None
def get_headers(self, **kwargs) -> dict:
data_conf = kwargs['dag_run'].conf
# for /submitWithManifest authorization and partition-id are inside Payload field
if "Payload" in data_conf:
partition_id = data_conf["Payload"]["data-partition-id"]
else:
partition_id = data_conf["data-partition-id"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
'Authorization': "",
}
return headers
@staticmethod
def _create_search_query(record_ids) -> Tuple[str, int]:
expected_total_count = len(record_ids)
record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids)
logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})"
return query, expected_total_count
def _create_request_body(self, **kwargs):
record_ids = kwargs["ti"].xcom_pull(key="record_ids", )
if record_ids:
query, expected_total_count = self._create_search_query(record_ids)
else:
logger.error("There are no record ids")
sys.exit(2)
request_body = {
"kind": "*:*:*:*",
"query": query
}
return request_body, expected_total_count
def _is_record_searchable(self, resp) -> bool:
"""
Check if search service returns expected totalCount of records.
"""
logger.info(resp.text)
data = resp.json()
return data.get("totalCount") == self.expected_total_count
@refresh_token(AirflowTokenRefresher())
def search_files(self, headers, **kwargs):
if self.request_body:
response = self.search_hook.run(
endpoint=Variable.get("search_query_ep"),
headers=headers,
data=json.dumps(self.request_body),
extra_options={"check_response": False}
)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count
)
raise RecordsNotSearchableError
return response
else:
logger.error("There is an error in header or in request body")
sys.exit(2)
def execute(self, context):
def execute(self, context: dict):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED
"""
self.request_body, self.expected_total_count = self._create_request_body(**context)
headers = self.get_headers(**context)
self.search_files(headers, **context)
payload_context = Context.populate(context["dag_run"].conf)
record_ids = context["ti"].xcom_pull(key="record_ids", )
ids_searcher = SearchId(Variable.get("search_url"), record_ids, payload_context, )
ids_searcher.check_records_searchable()
......@@ -14,105 +14,36 @@
# limitations under the License.
import copy
import enum
import json
import logging
import sys
from functools import partial
import tenacity
from airflow.models import BaseOperator, Variable
from airflow.utils.decorators import apply_defaults
from hooks.http_hooks import search_http_hook, workflow_hook
from libs.context import Context
from libs.exceptions import PipelineFailedError
from libs.refresh_token import AirflowTokenRefresher, refresh_token
from libs.update_status import UpdateStatus
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger = logging.getLogger()
class UpdateStatusOperator(BaseOperator):
ui_color = '#10ECAA'
ui_fgcolor = '#000000'
FINISHED_STATUS = "finished"
RUNNING_STATUS = "running"
FAILED_STATUS = "failed"
class prev_ti_state(enum.Enum):
NONE = enum.auto()
SUCCESS = enum.auto()
FAILED = enum.auto()
NONE = "running"
SUCCESS = "finished"
FAILED = "failed"
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.workflow_hook = workflow_hook
self.search_hook = search_http_hook
@staticmethod
def _file_searched(resp, expected_total_count):
"""Check if search service returns totalCount.
The method is used as a callback
def get_previous_ti_statuses(self, context: dict):
"""
Get status of previous tasks' executions.
Return corresponding enum value.
"""
data = resp.json()
return data.get("totalCount") == expected_total_count
def get_headers(self, **kwargs):
data_conf = kwargs['dag_run'].conf
# for /submitWithManifest authorization and partition-id are inside Payload field
if "Payload" in data_conf:
partition_id = data_conf["Payload"]["data-partition-id"]
else:
partition_id = data_conf["data-partition-id"]
headers = {
'Content-type': 'application/json',
'data-partition-id': partition_id,
}
return headers
def search_files(self, **kwargs):
def create_query(record_ids):
expected_total_count = len(record_ids)
record_ids = " OR ".join(f"\"{id_}\"" for id_ in record_ids)
logger.info(f"Search query {record_ids}")
query = f"id:({record_ids})"
return query, expected_total_count
record_ids = kwargs["ti"].xcom_pull(key="record_ids", task_ids='create_records')
if record_ids:
query, expected_total_count = create_query(record_ids)
else:
logger.error("There are no record ids")
sys.exit(2)
headers = self.get_headers(**kwargs)
request_body = {
"kind": "*:*:*:*",
"query": query
}
retry_opts = {
"wait": tenacity.wait_exponential(multiplier=5),
"stop": tenacity.stop_after_attempt(5),
"retry": tenacity.retry_if_not_result(
partial(self._file_searched, expected_total_count=expected_total_count)
)
}
self.search_hook.run_with_advanced_retry(
endpoint=Variable.get("search_query_ep"),
headers=headers,
data=json.dumps(request_body),
_retry_args=retry_opts
)
def previous_ti_statuses(self, context):
dagrun = context['ti'].get_dagrun()
failed_ti, success_ti = dagrun.get_task_instances(
state='failed'), dagrun.get_task_instances(state='success')
failed_ti = dagrun.get_task_instances(state='failed')
success_ti = dagrun.get_task_instances(state='success')
if not failed_ti and not success_ti: # There is no prev task so it can't have been failed
logger.info("There are no tasks before this one. So it has status RUNNING")
return self.prev_ti_state.NONE
......@@ -122,39 +53,31 @@ class UpdateStatusOperator(BaseOperator):
logger.info("There are successed tasks before this one. So it has status SUCCESSED")
return self.prev_ti_state.SUCCESS
def pre_execute(self, context):
prev_tis = self.previous_ti_statuses(context)
if prev_tis is self.prev_ti_state.NONE:
self.status = self.RUNNING_STATUS
elif prev_tis is self.prev_ti_state.FAILED:
self.status = self.FAILED_STATUS
elif prev_tis is self.prev_ti_state.SUCCESS:
self.status = self.FINISHED_STATUS
def pre_execute(self, context: dict):
self.status = self.get_previous_ti_statuses(context)
def execute(self, context):
def execute(self, context: dict):
"""Execute update workflow status.
If status assumed to be FINISHED then we check whether proceed files are searchable or not.
If they are then update status FINISHED else FAILED
"""
headers = self.get_headers(**context)
self.update_status_request(headers, self.status, **context)
if self.status == self.FAILED_STATUS:
conf = copy.deepcopy(context["dag_run"].conf)
logger.info(f"Got conf {conf}.")
if "Payload" in conf:
payload_context = Context.populate(conf)
else:
payload_context = Context(data_partition_id=conf["data-partition-id"],
app_key=conf.get("AppKey", ""))
workflow_id = conf["WorkflowID"]
status = self.status.value
status_updater = UpdateStatus(
workflow_url=Variable.get("update_status_url"),
workflow_id=workflow_id,
status=status,
context=payload_context
)
status_updater.update_workflow_status()
if self.status is self.prev_ti_state.FAILED:
raise PipelineFailedError("Dag failed")
@refresh_token(AirflowTokenRefresher())
def update_status_request(self, headers, status, **kwargs):
data_conf = kwargs['dag_run'].conf
logger.info(f"Got dataconf {data_conf}")
workflow_id = data_conf["WorkflowID"]
request_body = {
"WorkflowID": workflow_id,
"Status": status
}
logger.info(f" Sending request '{status}'")
response = self.workflow_hook.run(
endpoint=Variable.get("update_status_ep"),
data=json.dumps(request_body),
headers=headers,
extra_options={"check_response": False}
)
return response
......@@ -13,9 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
from flask import Flask, json, request, url_for
# Set up base logger
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s] %(message)s"))
logger = logging.getLogger("Dataload")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
OSDU_INGEST_SUCCES_FIFO = "/tmp/osdu_ingest_success"
OSDU_INGEST_FAILED_FIFO = "/tmp/osdu_ingest_failed"
......@@ -27,7 +38,7 @@ def index():
@app.route('/st', methods=['POST', 'GET', "PUT"])
def storage():
print(request.json)
logger.info(request.json)
with open("/tmp/osdu_ingest_result", "w") as f:
json.dump(request.get_json(), f)
f.close()
......@@ -36,11 +47,11 @@ def storage():
@app.route('/wf/us', methods=['POST', 'GET', "PUT"])
def workflow():
print(request)
print(request.headers)
print(request.json)
content = request.get_json(force=True)
print(content)
logger.info(request)
logger.info(request.headers)
logger.info(request.json)
content = request.json
logger.info(content)
if content.get("Status") == "finished":
None
if content.get("Status") == "failed":
......
{
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"WorkflowID": "foo"
}
{
"Payload": {
"authorization": "Bearer test",
"data-partition-id": "opendes",
"AppKey": "",
"kind_version": "3.0.0"
},
"$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": [
{
}
],
"WorkflowID": "foo"
}
......@@ -9,7 +9,7 @@
"$filename": "load_Wellbore.1.0.0_350112350400.json",
"manifest": [
{
"id": "srn:opendes:master-data/Wellbore:350112350400",
"id": "opendes:master-data/Wellbore:350112350400",
"kind": "opendes:osdu:Wellbore:0.3.0",
"groupType": "master-data",
"version": 1,
......
[
{
"legal": {
"legaltags": [
"opendes-demo-legaltag"
],
"otherRelevantDataCountries": [
"srn:opendes:master-data/GeoPoliticalEntity:USA:"
],
"status": "srn:opendes:reference-data/LegalStatus:public:1111"
},
"acl": {
"owners": [
"data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com"
],
"viewers": [
"data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com"
]
},
"kind": "opendes:osdu:Wellbore:0.3.0",
"id": "opendes:master-data/Wellbore:350112350400",
"data": {
"id": "opendes:master-data/Wellbore:350112350400",
"groupType": "master-data",
"version": 1,
"resourceHostRegionIDs": [
"srn:opendes:reference-data/OSDURegion:US-EAST:"
],
"resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00",
"resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:public:",
"source": "srn:opendes:master-data/Organisation:Oklahoma Corporation Commission:",
"existenceKind": "srn:opendes:reference-data/ExistenceKind:Active:",
"licenseState": "srn:opendes:reference-data/LicenseState:Unlicensed:",
"data": {
"FacilityTypeID": "srn:opendes:reference-data/FacilityType:Wellbore:",
"FacilityOperator": [
{
"FacilityOperatorOrganisationID": "srn:opendes:master-data/Organisation:CONTINENTAL RESOURCES INC:"
}
],
"DataSourceOrganisationID": "srn:opendes:master-data/Organisation:Oklahoma Corporation Commission:",
"SpatialLocation": [
{
"Coordinates": [
{
"x": -98.580887,
"y": 35.6381829999999
}
],
"SpatialGeometryTypeID": "srn:opendes:reference-data/SpatialGeometryType:Point:",
"VerticalCRSID": "srn:opendes:reference-data/VerticalCRS:MSL:",
"HorizontalCRSID": "srn:opendes:reference-data/HorizontalCRS:NAD27:",
"HeightAboveGroundLevelUOMID": "srn:opendes:reference-data/UnitOfMeasure:ft[US]:"
}
],
"OperatingEnvironmentID": "srn:opendes:reference-data/OperatingEnvironment:onshore:",
"FacilityName": "IRETA 1-4-9XH",
"FacilityNameAlias": [
{
"AliasName": " IRETA 1-4-9XH",
"AliasNameTypeID": "srn:opendes:reference-data/AliasNameType:Name:"
},
{
"AliasName": "350112350400",
"AliasNameTypeID": "srn:opendes:reference-data/AliasNameType:UWBI:"
}
],
"FacilityEvent": [
{
"FacilityEventTypeID": "srn:opendes:reference-data/FacilityEventType:SPUD:",
"EffectiveDateTime": "2015-03-11T00:00:00-05:00"
},
{
"FacilityEventTypeID": "srn:opendes:reference-data/FacilityEventType:DRILLING FINISH:",
"EffectiveDateTime": "2015-05-18T00:00:00-06:00"
}
],
"WellID": "srn:opendes:master-data/Well:3501123504:",
"SequenceNumber": 1,
"VerticalMeasurements": [
{
"VerticalMeasurementID": "TD_1",
"VerticalMeasurement": 0,
"VerticalMeasurementTypeID": "srn:opendes:reference-data/VerticalMeasurementType:Total Depth:",
"VerticalMeasurementPathID": "srn:opendes:reference-data/VerticalMeasurementPath:Measured Depth:",
"VerticalMeasurementUnitOfMeasureID": "srn:opendes:reference-data/UnitOfMeasure:ft[US]:",
"VerticalReferenceID": "Drill Floor"
},
{
"VerticalMeasurementID": "TD_2",
"VerticalMeasurement": 0,
"VerticalMeasurementTypeID": "srn:opendes:reference-data/VerticalMeasurementType:Total Depth:",
"VerticalMeasurementPathID": "srn:opendes:reference-data/VerticalMeasurementPath:True Vertical Depth:",
"VerticalMeasurementUnitOfMeasureID": "srn:opendes:reference-data/UnitOfMeasure:ft[US]:",
"VerticalReferenceID": "Drill Floor"
},
{
"VerticalMeasurementID": "Elev_1",
"VerticalMeasurement": 1636,
"VerticalMeasurementTypeID": "srn:opendes:reference-data/VerticalMeasurementType:Drill Floor:",
"VerticalMeasurementPathID": "srn:opendes:reference-data/VerticalMeasurementPath:Elevation:",
"VerticalMeasurementUnitOfMeasureID": "srn:opendes:reference-data/UnitOfMeasure:ft[US]:",
"VerticalCRSID": "srn:opendes:reference-data/VerticalCRS:MSL:"
},
{
"VerticalMeasurementID": "Elev_2",
"VerticalMeasurement": 1606,
"VerticalMeasurementTypeID": "srn:opendes:reference-data/VerticalMeasurementType:Ground Level:",
"VerticalMeasurementPathID": "srn:opendes:reference-data/VerticalMeasurementPath:Elevation:",
"VerticalMeasurementUnitOfMeasureID": "srn:opendes:reference-data/UnitOfMeasure:ft[US]:",
"VerticalCRSID": "srn:opendes:reference-data/VerticalCRS:MSL:"
}
],
"TrajectoryTypeID": "srn:opendes:reference-data/WellboreTrajectoryType:Horizontal:",
"DefaultVerticalMeasurementID": "",
"GeographicBottomHoleLocation": {
"Coordinates": [
{
"x": -98.580887,
"y": 35.6381829999999
}
]
}
}
}
}
]
{
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "A hole in the ground extending from a point at the earth's surface to the maximum point of penetration.",
"additionalProperties": false,
"title": "Wellbore",
"type": "object",
"definitions": {
"opendes:osdu:AbstractMetaItem:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "A meta data item, which allows the association of named properties or property values to a Unit/Measurement/CRS/Azimuth/Time context.",
"title": "Frame of Reference Meta Data Item",
"type": "object",
"properties": {
"persistableReference": {
"description": "The persistable reference string uniquely identifying the CRS or Unit.",
"title": "Persistable Reference",
"type": "string",
"example": "{\"scaleOffset\":{\"scale\":0.3048006096012192,\"offset\":0.0},\"symbol\":\"ftUS\",\"baseMeasurement\":{\"ancestry\":\"Length\",\"type\":\"UM\"},\"type\":\"USO\"}"
},
"kind": {
"description": "The kind of reference, unit, measurement, CRS or azimuth reference.",
"title": "Reference Kind",
"type": "string",
"enum": [
"CRS",
"Unit",
"Measurement",
"AzimuthReference",
"DateTime"
]
},
"propertyNames": {
"description": "The list of property names, to which this meta data item provides Unit/CRS context to. Data structures, which come in a single frame of reference, can register the property name, others require a full path like \"data.structureA.propertyB\" to define a unique context.",
"title": "Property Names",
"type": "array",
"items": {
"type": "string"
},
"example": [
"elevationFromMsl",
"totalDepthMdDriller",
"wellHeadProjected"
]
},
"name": {
"description": "The name of the CRS or the symbol/name of the unit.",
"title": "Name or Symbol",
"type": "string",
"example": [
"NAD27 * OGP-Usa Conus / North Dakota South [32021,15851]",
"ft"
]
},
"propertyValues": {
"description": "The list of property values, to which this meta data item provides Unit/CRS context to. Typically a unit symbol is a value to a data structure; this symbol is then registered in this propertyValues array and the persistableReference provides the absolute reference.",
"title": "Property Values",
"type": "array",
"items": {
"type": "string"
},
"example": [
"F",
"ftUS",
"deg"
]
},
"uncertainty": {
"description": "The uncertainty of the values measured given the unit or CRS unit.",
"title": "Uncertainty",
"type": "number"
}
},
"required": [
"kind",
"persistableReference"
],
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractMetaItem.1.0.0.json"
},
"opendes:osdu:AbstractLegalTags:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Legal meta data like legal tags, relevant other countries, legal status.",
"additionalProperties": false,
"title": "Legal Meta Data",
"type": "object",
"properties": {
"legaltags": {
"description": "The list of legal tags.",
"title": "Legal Tags",
"type": "array",
"items": {
"type": "string"
}
},
"otherRelevantDataCountries": {
"description": "The list of other relevant data countries.",
"title": "Other Relevant Data Countries",
"type": "array",
"items": {
"type": "string"
}
},
"status": {
"description": "The legal status.",
"title": "Legal Status",
"type": "string"
}
},
"required": [
"legaltags",
"otherRelevantDataCountries"
],
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractLegalTags.1.0.0.json"
},
"opendes:osdu:AbstractAccessControlList:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "The access control tags associated with this entity.",
"additionalProperties": false,
"title": "Access Control List",
"type": "object",
"properties": {
"viewers": {
"description": "The list of viewers to which this data record is accessible/visible/discoverable.",
"title": "List of Viewers",
"type": "array",
"items": {
"type": "string"
}
},
"owners": {
"description": "The list of owners of this data record.",
"title": "List of Owners",
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"owners",
"viewers"
],
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractAccessControlList.1.0.0.json"
},
"opendes:osdu:AbstractLegalParentList:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "A list of entity IDs in the data ecosystem, which act as legal parents to the current entity.",
"title": "Parent List",
"type": "object",
"properties": {
"parents": {
"description": "An array of none, one or many entity references in the data ecosystem, which identify the source of data in the legal sense. Example: the 'parents' will be queried when e.g. the subscription of source data services is terminated; access to the derivatives is also terminated.",
"title": "Parents",
"type": "array",
"items": {
"type": "string"
},
"example": []
}
},
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractLegalParentList.1.0.0.json"
},
"opendes:osdu:AbstractCoordinates:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "A geographic position on the surface of the earth.",
"title": "AbstractCoordinates",
"type": "object",
"properties": {
"x": {
"description": "x is Easting or Longitude.",
"type": "number"
},
"y": {
"description": "y is Northing or Latitude.",
"type": "number"
}
},
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractCoordinates.1.0.0.json"
}
},
"properties": {
"groupType": {
"const": "master-data",
"description": "The OSDU GroupType assigned to this resource object.",
"title": "Group Type"
},
"ancestry": {
"description": "The links to data, which constitute the inputs.",
"title": "Ancestry",
"$ref": "#/definitions/opendes:osdu:AbstractLegalParentList:1.0.0"
},
"resourceCurationStatus": {
"pattern": "^srn:opendes:reference-data\/ResourceCurationStatus:[^:]+:[0-9]*$",
"description": "Describes the current Curation status.",
"title": "Resource Curation Status",
"type": "string"
},
"licenseState": {
"pattern": "^srn:opendes:reference-data\/LicenseState:[^:]+:[0-9]*$",
"description": "Indicates what kind of ownership Company has over data.",
"title": "License State",
"type": "string"
},
"data": {
"allOf": [
{
"type": "object",
"properties": {
"VerticalMeasurements": {
"description": "List of all depths and elevations pertaining to the wellbore, like, plug back measured depth, total measured depth, KB elevation",
"type": "array"
},
"PrimaryMaterialID": {
"pattern": "^srn:opendes:reference-data\/MaterialType:[^:]+:[0-9]*$",
"description": "The primary material injected/produced from the wellbore.",
"type": "string"
},
"SequenceNumber": {
"description": "A number that indicates the order in which wellbores were drilled.",
"type": "integer"
},
"TargetFormation": {
"pattern": "^srn:opendes:reference-data\/GeologicalFormation:[^:]+:[0-9]*$",
"description": "The Formation of interest for which the Wellbore is drilled to interact with. The Wellbore may terminate in a lower formation if the requirement is to drill through the entirety of the target formation, therefore this is not necessarily the Formation at TD.",
"type": "string"
},
"KickOffWellbore": {
"pattern": "^srn:opendes:master-data\/Wellbore:[^:]+:[0-9]*$",
"description": "This is a pointer to the parent wellbore. The wellbore that starts from top has no parent.",
"type": "string"
},
"DefaultVerticalMeasurementID": {
"description": "The default datum reference point, or zero depth point, used to determine other points vertically in a wellbore. References an entry in the Vertical Measurements array of this wellbore.",
"type": "string"
},
"WellID": {
"pattern": "^srn:opendes:master-data\/Well:[^:]+:[0-9]*$",
"type": "string"
},
"TrajectoryTypeID": {
"pattern": "^srn:opendes:reference-data\/WellboreTrajectoryType:[^:]+:[0-9]*$",
"description": "Describes the predominant shapes the wellbore path can follow if deviated from vertical. Sample Values: Horizontal, Vertical, Directional.",
"type": "string"
},
"DefinitiveTrajectoryID": {
"pattern": "^srn:opendes:work-product-component\/WellboreTrajectory:[^:]+:[0-9]+$",
"description": "SRN of Wellbore Trajectory which is considered the authoritative or preferred version.",
"type": "string"
}
}
},
{
"type": "object",
"properties": {
"ExtensionProperties": {
"type": "object",
"properties": {}
}
}
}
]
},
"kind": {
"pattern": "^[A-Za-z0-9-_]+:[A-Za-z0-9-_]+:[A-Za-z0-9-_]+:[0-9]+.[0-9]+.[0-9]+$",
"description": "The schema identification for the OSDU resource object following the pattern <Namespace>:<Source>:<Type>:<VersionMajor>.<VersionMinor>.<VersionPatch>. The versioning scheme follows the semantic versioning, https://semver.org/.",
"title": "Entity Kind",
"type": "string",
"example": "namespace:osdu:Wellbore:2.7.112"
},
"acl": {
"description": "The access control tags associated with this entity.",
"title": "Access Control List",
"$ref": "#/definitions/opendes:osdu:AbstractAccessControlList:1.0.0"
},
"source": {
"pattern": "^srn:opendes:master-data\/Organisation:[^:]+:[0-9]*$",
"description": "Where did the data resource originate? This could be many kinds of entities, such as company, agency, team or individual.",
"title": "Data Source",
"type": "string"
},
"version": {
"format": "int64",
"description": "The version number of this OSDU resource; set by the framework.",
"title": "Version Number",
"type": "integer",
"example": 1831253916104085
},
"resourceVersionCreationDateTime": {
"format": "date-time",
"description": "Timestamp of the time when the current version of this resource entered the OSDU.",
"title": "Resource Version Creation DateTime",
"type": "string"
},
"resourceHomeRegionID": {
"pattern": "^srn:opendes:reference-data\/OSDURegion:[^:]+:[0-9]*$",
"description": "The name of the home [cloud environment] region for this OSDU resource object.",
"title": "Resource Home Region ID",
"type": "string"
},
"resourceObjectCreationDateTime": {
"format": "date-time",
"description": "Timestamp of the time at which Version 1 of this OSDU resource object was originated.",
"title": "Resource Object Creation DateTime",
"type": "string"
},
"resourceSecurityClassification": {
"pattern": "^srn:opendes:reference-data\/ResourceSecurityClassification:[^:]+:[0-9]*$",
"description": "Classifies the security level of the resource.",
"title": "Resource Security Classification",
"type": "string"
},
"resourceHostRegionIDs": {
"description": "The name of the host [cloud environment] region(s) for this OSDU resource object.",
"title": "Resource Host Region ID",
"type": "array",
"items": {
"pattern": "^srn:opendes:reference-data\/OSDURegion:[^:]+:[0-9]*$",
"type": "string"
}
},
"resourceLifecycleStatus": {
"pattern": "^srn:opendes:reference-data\/ResourceLifecycleStatus:[^:]+:[0-9]*$",
"description": "Describes the current Resource Lifecycle status.",
"title": "Resource Lifecycle Status",
"type": "string"
},
"legal": {
"description": "The entity's legal tags and compliance status.",
"title": "Legal Tags",
"$ref": "#/definitions/opendes:osdu:AbstractLegalTags:1.0.0"
},
"id": {
"pattern": "^opendes:master-data/Wellbore:[^:]+$",
"description": "The SRN which identifies this OSDU resource object without version.",
"title": "Entity ID",
"type": "string",
"example": "srn:opendes:master-data/Wellbore:2adac27b-5d84-5bcd-89f2-93ee709c06d9"
},
"existenceKind": {
"pattern": "^srn:opendes:reference-data\/ExistenceKind:[^:]+:[0-9]*$",
"description": "Where does this data resource sit in the cradle-to-grave span of its existence?",
"title": "Existence Kind",
"type": "string"
},
"persistableReferences": {
"description": "The meta data section linking the 'unitKey', 'crsKey' to self-contained definitions.",
"title": "Frame of Reference Meta Data",
"type": "array",
"items": {
"$ref": "#/definitions/opendes:osdu:AbstractMetaItem:1.0.0"
}
}
},
"required": [
"id",
"kind",
"groupType",
"version",
"legal",
"resourceObjectCreationDateTime",
"resourceVersionCreationDateTime"
],
"$id": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json"
}
{
"opendes:osdu:AbstractCoordinates:1.0.0": {
"x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "A geographic position on the surface of the earth.",
"title": "AbstractCoordinates",
"type": "object",
"properties": {
"x": {
"description": "x is Easting or Longitude.",
"type": "number"
},
"y": {
"description": "y is Northing or Latitude.",
"type": "number"
}
},
"$id": "https://schema.osdu.opengroup.org/json/abstract/AbstractCoordinates.1.0.0.json"
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment