Commit 386aa0bc authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-5382: Refactor WITSML parser DAG

parent 7290fa3a
Pipeline #129772 failed
0.13.0
\ No newline at end of file
0.17.0
\ No newline at end of file
......@@ -29,7 +29,6 @@ ENV OSDU_CORE_PROVIDERS_DIR="${WITSML_PARSER_DIR}/osdu/ingestion-dags/src/dags/p
COPY ./${BUILD_DIR}/requirements.txt ./
COPY ./${WITSML_PARSER_DIR}/*.py ./
COPY ./${ENERGISTICS_LIBS} ./energistics
COPY ./${BUILD_DIR}/providers/gcp/osdu_api.ini ./
# TODO: Configure Package Registry for Python SDK
RUN pip install --upgrade pip && \
......@@ -55,7 +54,6 @@ ENV CLOUD_PROVIDER $PROVIDER_NAME
COPY "build/providers/${PROVIDER_NAME}/requirements.txt" ./
RUN pip install -r "requirements.txt"
COPY build/providers/gcp/osdu_api.ini ./
## azure
FROM base as azure
......@@ -72,5 +70,4 @@ FROM base as anthos
ENV CLOUD_PROVIDER anthos
COPY build/providers/anthos/requirements.txt ./
COPY build/providers/anthos/osdu_api.ini ./
RUN pip install -r requirements.txt
# Copyright 2021 Google LLC
# Copyright 2021 EPAM
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id=%(OSDU_ANTHOS_DATA_PARTITION)s
storage_url=%(OSDU_ANTHOS_STORAGE_URL)s
search_url=stub
legal_url=stub
data_workflow_url=stub
file_dms_url=stub
dataset_url=%(OSDU_ANTHOS_DATASET_URL)s
entitlements_url=stub
schema_url=stub
ingestion_workflow_url=stub
partition_url=stub
use_service_principal=False
[provider]
name=anthos
\ No newline at end of file
[environment]
data_partition_id=%(DATA_PARTITION)s
storage_url=%(OSDU_GCP_BASE_URL)s/api/storage/v2
search_url=%(OSDU_GCP_BASE_URL)s/api/search/v2
legal_url=%(OSDU_GCP_BASE_URL)s/api/legal/v1
data_workflow_url=%(OSDU_GCP_BASE_URL)s/api/data-workflow/v1
file_dms_url=%(OSDU_GCP_BASE_URL)s/api/filedms/v2
dataset_url=%(OSDU_GCP_BASE_URL)s/api/dataset/v1
entitlements_url=%(OSDU_GCP_BASE_URL)s/api/entitlements/v1
schema_url=%(OSDU_GCP_BASE_URL)s/api/schema-service/v1
ingestion_workflow_url=%(OSDU_GCP_BASE_URL)s/api/workflow/v1
partition_url=%(OSDU_GCP_BASE_URL)s/api/partition/v1
use_service_principal=False
[provider]
name=gcp
......@@ -6,7 +6,7 @@ tenacity
dataclasses==0.8;python_version<"3.7"
--extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.14.0.dev
osdu-api~=0.17.0.dev
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.13.0
......@@ -15,9 +15,8 @@ class DAGFileRenderer:
def _prepare_env_vars(self) -> Dict:
return """{
"CLOUD_PROVIDER": "anthos",
"OSDU_ANTHOS_STORAGE_URL": "{{ var.value.core__service__storage__url }}",
"OSDU_ANTHOS_DATASET_URL": "{{ var.value.core__service__dataset__url }}",
"OSDU_ANTHOS_DATA_PARTITION": "{{ var.value.core__config__data_partition }}",
"STORAGE_SERVICE_URL": "{{ var.value.core__service__storage__url }}",
"DATASET_SERVICE_URL": "{{ var.value.core__service__dataset__url }}",
"KEYCLOAK_AUTH_URL": os.getenv("KEYCLOAK_AUTH_URL"),
"KEYCLOAK_CLIENT_ID": os.getenv("KEYCLOAK_CLIENT_ID"),
"KEYCLOAK_CLIENT_SECRET": os.getenv("KEYCLOAK_CLIENT_SECRET"),
......
......@@ -17,7 +17,9 @@ class DAGFileRenderer:
"SA_FILE_PATH": "{{ var.value.gcp__config__sa_file_path }}",
"OSDU_GCP_BASE_URL": "{{ var.value.gcp__osdu_gcp_base_url }}",
"DATA_PARTITION": "{{ var.value.gcp__data_partition_id }}",
"CLOUD_PROVIDER": "gcp"
"CLOUD_PROVIDER": "gcp",
"STORAGE_SERVICE_URL": "{{ var.value.core__service__storage__url }}",
"DATASET_SERVICE_URL": "{{ var.value.core__service__dataset__url }}",
}
def _prepare_operator_kwargs(self) -> Dict:
......
## Examples of submitting ingestion using the Workflow service:
1. External Storage and OSDU File Service (`file_name` and `preload file path` are required in this approach)
```
curl --location --request POST 'https://<workflow-url>/v1/workflow/Energistics_xml_ingest/workflowRun' \
--header 'data-partition-id: osdu' \
--header 'Authorization: <token>
--header 'Content-Type: application/json' \
--data-raw '{
"executionContext": {
"Payload": {
"AppKey": "test-app",
"data-partition-id": <string>
},
"Context": {
"acl": {
"viewers": [
<string>
],
"owners": [
<string>
]
},
"legal": {
"legaltags": [
<string>
],
"otherRelevantDataCountries": [
<string>
],
"status": "compliant"
},
"kind": <string>,
"file_name": <string>,
"preload_file_path": <string>,
"version": 5
}
}
}'
```
### Sequence diagram for witsml_parser_task
```mermaid
sequenceDiagram
participant EnergisticsManifestCreator
participant FileHandler
participant OSDU File Service
participant OSDU Storage
participant Blob Storage
Note over FileHandler: osdu-ingestion-lib
Note over Blob Storage: External Cloud Storage
Note over OSDU File Service, OSDU Storage: OSDU Data Platform
EnergisticsManifestCreator->>+FileHandler: upload_file call
FileHandler->>+Blob Storage: gets file by preload_file_path
Blob Storage-->>-FileHandler: returns file and content type
FileHandler->>+OSDU File Service: [GET] /v2/files/uploadURL
OSDU File Service-->>-FileHandler: returns FileID, SignedURL, FileSource
FileHandler->>+OSDU Storage: [PUT] uploads file by SignedURL
OSDU Storage-->>-FileHandler: returns OK response
FileHandler-->>-EnergisticsManifestCreator: returns file_source (osdu storage path)
EnergisticsManifestCreator->>+EnergisticsManifestCreator: _create_file_meta_record [preload_file_path, file_source]
EnergisticsManifestCreator->>+FileHandler: save_file_record call (file_meta_record)
FileHandler->>+OSDU File Service: [POST] /v2/files/metadata (Sending file record metadata to File service)
OSDU File Service-->>-FileHandler: returns OK response and file record data
FileHandler-->>-EnergisticsManifestCreator: returns file record metadata id
EnergisticsManifestCreator->>+FileHandler: _get_download_signed_url call for file record metadata id
FileHandler->>+OSDU File Service: [GET] /v2/files/{record_id}/downloadURL
OSDU File Service-->>-FileHandler: returns OK response and file download data
FileHandler-->>-EnergisticsManifestCreator: returns SignedURL for downloading
EnergisticsManifestCreator->>+OSDU Storage: [GET] gets file by SignedURL
OSDU Storage-->>-EnergisticsManifestCreator: returns source file
EnergisticsManifestCreator->>+EnergisticsManifestCreator: gets file (.xml) content
EnergisticsManifestCreator->>+EnergisticsManifestCreator: generates manifest from file_content and file_source
```
2. If you registered a dataset, specify `dataset_id` instead of `file_name` and `prelaod_file_path`.
1. If you registered a dataset, specify `dataset_id` instead of `file_name` and `prelaod_file_path`.
```
curl --location --request POST 'https://<workflow-url>/v1/workflow/Energistics_xml_ingest/workflowRun' \
--header 'data-partition-id: osdu' \
......
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
# Copyright 2021 Energistics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import jsonschema
import pytest
from energistics.libs import create_energistics_manifest
from data_file_paths import TRAJECTORY_XML_FILE_PATH, MANIFEST_SCHEMA_FILE_PATH
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.handle_file import FileHandler
MOCK_EXECUTION_CONTEXT = {
"Context": {
"acl": {
"viewers": [
"data.default.viewers@test.osdu-gcp.go3-nrg.projects.epam.com"
],
"owners": [
"data.default.owners@test.osdu-gcp.go3-nrg.projects.epam.com"
]
},
"legal": {
"legaltags": [
"test-demo-legaltag"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"kind": "test:wks:dataset--File.Generic:1.0.0",
"file_name": "trace_trajectory",
"preload_file_path": "gs://energistics-test-data/energistics_witsml_data_Trajectory.xml",
"version": 5
}
}
class MockSignedUrl:
@property
def signed_url(self):
return "https://test/test-host/test-area/test-location-some-hash/test-another-cash?test-other-data"
class TestCreateEnergisticsManifest:
@pytest.fixture(autouse=True)
def cloud_provider(self):
os.environ["CLOUD_PROVIDER"] = "gcp"
@pytest.fixture
def mock_get_file_content(self, monkeypatch, file_path):
with open(file_path) as f:
file_content = f.read()
monkeypatch.setattr(create_energistics_manifest, "get_file_content", lambda *args, kwargs: file_content)
return file_content
@pytest.fixture
def mock_file_handler(self, monkeypatch):
token_refresher = BaseTokenRefresher(creds="test")
context = Context("test", "test")
monkeypatch.setattr(FileHandler, "save_file_record", lambda *args, **kwargs: "test")
monkeypatch.setattr(FileHandler, "upload_file",
lambda *args, **kwargs: "/test-location-some-hash/test-another-cash")
monkeypatch.setattr(FileHandler, "_get_download_signed_url", lambda *args, **kwargs: MockSignedUrl())
file_handler = FileHandler(
file_service_host="",
token_refresher=token_refresher,
context=context,
blob_storage_client="test"
)
return file_handler
@pytest.mark.parametrize(
"xml_file",
[
pytest.param(
TRAJECTORY_XML_FILE_PATH,
id="Trajectory XML"),
]
)
def test_create_manifest_from_xml(self, monkeypatch, mock_file_handler, xml_file):
"""
Test if a Manifest is created from XML and passes validation against GenericManifestSchema
:param monkeypatch:
:param mock_file_handler:
:param xml_file:
:return:
"""
def get_file_content(file):
with open(file) as f:
return f.read()
with open(MANIFEST_SCHEMA_FILE_PATH) as f:
schema = json.load(f)
monkeypatch.setattr(create_energistics_manifest, "get_file_content",
lambda *args, **kwargs: get_file_content(xml_file))
token_refresher = BaseTokenRefresher(creds="test")
context = Context("test", "test")
manifest_creator = create_energistics_manifest.EnergisticsManifestCreator(mock_file_handler, token_refresher, context)
manifest = manifest_creator.create_manifest(MOCK_EXECUTION_CONTEXT)
jsonschema.validate(manifest, schema)
......@@ -32,7 +32,14 @@ class TestGenerateEnergisticsManifest(unittest.TestCase):
context: Context = Context(app_key='', data_partition_id='odesprod')
token_refresher = MagicMock()
file_handler: FileHandler = MagicMock()
self.creator: EnergisticsManifestCreator = EnergisticsManifestCreator(file_handler, token_refresher, context)
storage_client = MagicMock()
dataset_client = MagicMock()
self.creator: EnergisticsManifestCreator = EnergisticsManifestCreator(
file_handler,
dataset_client,
storage_client,
context
)
def test__generate_manifest_entities_well(self):
file_content = get_file_content(WELL_XML_FILE_PATH)
......
......@@ -19,15 +19,12 @@ import enum
import json
import logging
import os
from copy import deepcopy
from typing import List
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher
from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.configuration.config_manager import DefaultConfigManager
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.handle_file import FileHandler
from osdu_ingestion.libs.utils import remove_trailing_colon
......@@ -47,58 +44,6 @@ RETRY_OPTS = {
"stop": tenacity.stop_after_attempt(5)
}
# TODO: Think about how a file meta record must look.
FILE_RECORD_TEMPLATE = {
"kind": "osdu:wks:dataset--File.Generic:1.0.0",
"version": 1,
"acl": {},
"legal": {},
"tags": {
"NameOfKey": "String value"
},
"createTime": "2020-12-16T11:46:20.163Z",
"createUser": "some-user@some-company-cloud.com",
"modifyTime": "2020-12-16T11:52:24.477Z",
"modifyUser": "some-user@some-company-cloud.com",
"ancestry": {
"parents": []
},
"meta": [],
"data": {
"ResourceHomeRegionID": "namespace:reference-data--OSDURegion:SomeUniqueOSDURegionID:",
"ResourceHostRegionIDs": [
"namespace:reference-data--OSDURegion:SomeUniqueOSDURegionID:"
],
"ResourceCurationStatus": "namespace:reference-data--ResourceCurationStatus:CREATED:",
"ResourceLifecycleStatus": "namespace:reference-data--ResourceLifecycleStatus:LOADING:",
"ResourceSecurityClassification": "namespace:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Source": "Example Data Source",
"ExistenceKind": "namespace:reference-data--ExistenceKind:Prototype:",
"Name": "energistics",
"Description": "As originally delivered by ACME.com.",
"TotalSize": "13245217273",
"EncodingFormatTypeID": "namespace:reference-data--EncodingFormatType:text%2Fcsv:",
"SchemaFormatTypeID": "namespace:reference-data--SchemaFormatType:CWLS%20LAS3:",
"Endian": "BIG",
"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "",
"PreloadFilePath": "",
"PreloadFileCreateUser": "",
"PreloadFileCreateDate": "2019-12-16T11:46:20.163Z",
"PreloadFileModifyUser": "",
"PreloadFileModifyDate": "2019-12-20T17:20:05.356Z",
"Name": "energistics",
"FileSize": "95463",
"EncodingFormatTypeID": "namespace:reference-data--EncodingFormatType:application%2Fgeo%2Bjson:"
}
},
"Checksum": "",
"WITSML_Version": "WITSML 1.1.4",
"ExtensionProperties": {}
}
}
class FileType(enum.Enum):
"""Energistics File type enum."""
......@@ -134,41 +79,17 @@ class EnergisticsManifestCreator:
def __init__(
self,
file_handler: FileHandler,
token_refresher: TokenRefresher,
dataset_dms_client: DatasetDmsClient,
storage_client: RecordClient,
payload_context: Context
) -> None:
self.file_handler = file_handler
self.token_refresher = token_refresher
self.dataset_dms_client = dataset_dms_client
self.storage_client = storage_client
self.payload_context = payload_context
self.bearer_token = None
@staticmethod
def _create_file_meta_record(
file_source: str,
preload_file_path: str,
acl: dict,
legal: dict,
kind: str,
):
"""
Create a meta record of a file for File Service.
:param file_source: File source
:param preload_file_path: Preload file path
:param acl: Acl
:param legal: Legal
:param file_name: File name
:return: File record
"""
# TODO: For Energistics team: Define your own strategy to create file meta records.
file_record = deepcopy(FILE_RECORD_TEMPLATE)
file_record["legal"] = legal
file_record["acl"] = acl
file_record["kind"] = kind
file_record["data"]["DatasetProperties"]["FileSourceInfo"]["FileSource"] = file_source
file_record["data"]["DatasetProperties"]["FileSourceInfo"]["PreloadFilePath"] = preload_file_path
return file_record
def _generate_manifest_entities(
self,
file_content: str,
......@@ -239,18 +160,11 @@ class EnergisticsManifestCreator:
:param record_id: str
:return: file_source: str
"""
storage_client = RecordClient(DefaultConfigManager(), self.payload_context.data_partition_id)
if storage_client.use_service_principal:
self.bearer_token = None
else:
if self.bearer_token is None:
self.bearer_token = self.token_refresher.refresh_token()
record_response = storage_client.get_specific_record(
record_response = self.storage_client.get_specific_record(
recordId=record_id,
version="",
bearer_token=self.bearer_token)
version=""
)
record = json.loads(record_response.content)
file_source = record["data"]["DatasetProperties"]["FileSourceInfo"]["FileSource"]
......@@ -269,22 +183,16 @@ class EnergisticsManifestCreator:
"""
acl = execution_context["Context"]["acl"]
legal = execution_context["Context"]["legal"]
kind = execution_context["Context"]["kind"]
download_signed_url = ""
if execution_context["Context"].get("dataset_id"):
dataset_dms_client = DatasetDmsClient(DefaultConfigManager(), self.payload_context.data_partition_id)
self.bearer_token = None if dataset_dms_client.use_service_principal else self.token_refresher.refresh_token()
if os.environ["CLOUD_PROVIDER"] == "azure":
retrieval_instructions_response = dataset_dms_client.retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"],
bearer_token=self.bearer_token)
retrieval_instructions_response = self.dataset_dms_client.retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"])
instruction_list_key = "datasets"
else:
retrieval_instructions_response = dataset_dms_client.get_retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"],
bearer_token=self.bearer_token)
retrieval_instructions_response = self.dataset_dms_client.get_retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"])
instruction_list_key = "delivery"
logger.info(f"Dataset retrieval response code: {retrieval_instructions_response.status_code}")
......@@ -309,23 +217,7 @@ class EnergisticsManifestCreator:
record_id,
).signed_url
else:
preload_file_path = execution_context["Context"]["preload_file_path"]
file_source = self.file_handler.upload_file(preload_file_path)
logger.info(f"Record's FileSource: {file_source}")
file_record = self._create_file_meta_record(
file_source=file_source,
preload_file_path=preload_file_path,
acl=acl,
legal=legal,
kind=kind,
)
record_id = self.file_handler.save_file_record(file_record)
logger.info(f"Uploaded file record id: {record_id}")
download_signed_url = self.file_handler._get_download_signed_url(self.file_handler.request_headers,
record_id).signed_url
raise ValueError("Execution context doesn't contain either 'record_id' or 'dataset_id' field.")
file_content = get_file_content(download_signed_url)
parsed_entities = self._generate_manifest_entities(
......
......@@ -18,8 +18,12 @@
import argparse
import json
import logging
import os
from energistics.libs.create_energistics_manifest import EnergisticsManifestCreator
from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.configuration.config_manager import DefaultConfigManager
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.handle_file import FileHandler
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
......@@ -38,8 +42,27 @@ def main(execution_context: dict, file_service_url: str) -> dict:
"""
payload_context = Context.populate(execution_context)
token_refresher = BaseTokenRefresher()
storage_client = RecordClient(
storage_url=os.environ.get("STORAGE_SERVICE_URL"),
config_manager=DefaultConfigManager(),
provider=os.environ.get("CLOUD_PROVIDER"),
data_partition_id=payload_context.data_partition_id,
token_refresher=token_refresher
)
dataset_client = DatasetDmsClient(
dataset_url=os.environ.get("DATASET_SERVICE_URL"),
config_manager=DefaultConfigManager(),
provider=os.environ.get("CLOUD_PROVIDER"),
data_partition_id=payload_context.data_partition_id,
token_refresher=token_refresher
)
file_handler = FileHandler(file_service_url, token_refresher, payload_context)
manifest_creator = EnergisticsManifestCreator(file_handler, token_refresher, payload_context)
manifest_creator = EnergisticsManifestCreator(
file_handler=file_handler,
dataset_dms_client=dataset_client,
storage_client=storage_client,
payload_context=payload_context
)
manifest = manifest_creator.create_manifest(execution_context)
return {"manifest": manifest}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment