Commit c4a571d1 authored by Vadzim Kulyba (EPAM)'s avatar Vadzim Kulyba (EPAM)
Browse files

feat(azure): add azure provider impl WITSML parser/dag

parent be0649aa
Pipeline #80338 skipped with stages
ARG BASE_IMAGE
FROM ${BASE_IMAGE}
ENV CLOUD_PROVIDER azure
COPY build/providers/azure/requirements.txt ./
COPY build/providers/azure/osdu_api.ini ./
RUN pip install -r requirements.txt
[environment]
data_partition_id=%(DATA_PARTITION_ID)s
storage_url=%(API_BASE_URL)s/api/storage/v2
search_url=%(API_BASE_URL)s/api/search/v2
legal_url=%(API_BASE_URL)s/api/legal/v1
data_workflow_url=%(API_BASE_URL)s/api/data-workflow/v1
file_dms_url=%(API_BASE_URL)s/api/filedms/v2
dataset_url=%(API_BASE_URL)s/api/dataset/v1
entitlements_url=%(API_BASE_URL)s/api/entitlements/v1
schema_url=%(API_BASE_URL)s/api/schema-service/v1
ingestion_workflow_url=%(API_BASE_URL)s/api/workflow/v1
partition_url=$(API_BASE_URL)s/api/partition/v1
use_service_principal=False
[provider]
name=azure
azure-storage-blob
azure-keyvault-secrets
azure-identity
msal
\ No newline at end of file
......@@ -5,5 +5,5 @@ osdu-api~=0.12.0
--extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple/
osdu-airflow~=0.12.0
-extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
--extra-index-url=https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.12.0
......@@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import dataclasses
import enum
import json
import logging
import os
from copy import deepcopy
from typing import List
import json
import dataclasses
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher
......@@ -30,7 +31,8 @@ from osdu_api.configuration.config_manager import DefaultConfigManager
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.handle_file import FileHandler
from osdu_ingestion.libs.utils import remove_trailing_colon
from energistics.libs.energistics_parsers import parse_witsml_file, FileInfo
from energistics.libs.energistics_parsers import FileInfo, parse_witsml_file
RETRIES = 3
TIMEOUT = 1
......@@ -234,26 +236,42 @@ class EnergisticsManifestCreator:
if execution_context["Context"].get("dataset_id"):
dataset_dms_client = DatasetDmsClient(DefaultConfigManager(), self.payload_context.data_partition_id)
bearer_token = None if dataset_dms_client.use_service_principal else self.token_refresher.refresh_token()
retrieval_instructions_response = dataset_dms_client.get_retrieval_instructions(record_id=execution_context["Context"]["dataset_id"], bearer_token=bearer_token)
# logger.info(f"Dataset retrieval response code: {retrieval_instructions_response.status_code}")
print(f"Dataset retrieval response code: {retrieval_instructions_response.status_code}")
if os.environ["CLOUD_PROVIDER"] == "azure":
retrieval_instructions_response = dataset_dms_client.retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"],
bearer_token=bearer_token)
instruction_list_key = "datasets"
else:
retrieval_instructions_response = dataset_dms_client.get_retrieval_instructions(
record_id=execution_context["Context"]["dataset_id"],
bearer_token=bearer_token)
instruction_list_key = "delivery"
logger.info(f"Dataset retrieval response code: {retrieval_instructions_response.status_code}")
retrieval_instructions = None
if retrieval_instructions_response.status_code == 200:
retrieval_instructions = json.loads(retrieval_instructions_response.content)
# logger.info(f"Dataset retrieval response body: {retrieval_instructions_response.content}")
print(f"Dataset retrieval response body: {retrieval_instructions_response.content}")
if len(retrieval_instructions["delivery"]) > 0:
download_signed_url = retrieval_instructions["delivery"][0]["retrievalProperties"]["signedUrl"]
logger.info(f"Dataset retrieval response body: {retrieval_instructions_response.content}")
if len(retrieval_instructions[instruction_list_key]) > 0:
download_signed_url = retrieval_instructions[instruction_list_key][0]["retrievalProperties"]["signedUrl"]
storage_client = RecordClient(DefaultConfigManager(), self.payload_context.data_partition_id)
record_response = storage_client.get_specific_record(recordId=retrieval_instructions["delivery"][0]["datasetRegistryId"], version="", bearer_token=bearer_token)
record_response = storage_client.get_specific_record(
recordId=retrieval_instructions[instruction_list_key][0]["datasetRegistryId"],
version="",
bearer_token=bearer_token)
record = json.loads(record_response.content)
file_source = record["data"]["DatasetProperties"]["FileSourceInfo"]["FileSource"]
print(f"Record's FileSource: {file_source}")
logger.info(f"Record's FileSource: {file_source}")
else:
preload_file_path = execution_context["Context"]["preload_file_path"]
file_source = self.file_handler.upload_file(preload_file_path)
logger.info(f"Record's FileSource: {file_source}")
file_record = self._create_file_meta_record(
file_source=file_source,
preload_file_path=preload_file_path,
......@@ -262,8 +280,11 @@ class EnergisticsManifestCreator:
kind=kind,
)
record_id = self.file_handler.save_file_record(file_record)
logger.info(f"Uploaded file record id: {record_id}")
download_signed_url = self.file_handler._get_download_signed_url(self.file_handler.request_headers,
record_id).signed_url
file_content = get_file_content(download_signed_url)
parsed_entities = self._generate_manifest_entities(
file_content,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment