Commit 7e33abc6 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3119: Python SDK usage

parent f3fc8463
Pipeline #76112 passed with stages
in 1 minute and 43 seconds
......@@ -40,7 +40,6 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
*args, **kwargs):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.search_url = Variable.get('core__service__search__url')
self.whitelist_ref_patterns = Variable.get('core__config__reference_patterns_whitelist', default_var=None)
self.previous_task_id = previous_task_id
self._show_skipped_ids = Variable.get(
......@@ -58,7 +57,6 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
file_source_validator = FileSourceValidator()
manifest_integrity = ManifestIntegrity(
self.search_url,
token_refresher,
file_source_validator,
payload_context,
......
......@@ -30,7 +30,7 @@ from urllib.error import HTTPError
import requests
import tenacity
from airflow.models import BaseOperator, Variable
from osdu_ingestion.libs.auth.authorization import authorize
from osdu_api.auth.authorization import authorize
from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher
config = configparser.RawConfigParser()
......
......@@ -33,7 +33,6 @@ from osdu_ingestion.libs.handle_file import FileHandler
from osdu_ingestion.libs.process_manifest_r3 import ManifestProcessor
from osdu_ingestion.libs.processors.single_manifest_processor import SingleManifestProcessor
from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher
from osdu_ingestion.libs.search_client import SearchClient
from osdu_ingestion.libs.source_file_check import SourceFileChecker
from osdu_ingestion.libs.types import ManifestType
from osdu_ingestion.libs.validation.validate_file_source import FileSourceValidator
......@@ -62,8 +61,6 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
super().__init__(*args, **kwargs)
self.previous_task_id = previous_task_id
self.batch_number = batch_number
self.schema_service_url = Variable.get('core__service__schema__url')
self.search_service_url = Variable.get('core__service__search__url')
self.storage_url = Variable.get('core__service__storage__url')
self.file_service_host = Variable.get('core__service__file__host')
self.batch_count = int(Variable.get("core__ingestion__batch_count", "3"))
......@@ -137,25 +134,21 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
token_refresher = AirflowTokenRefresher()
file_handler = FileHandler(self.file_service_host, token_refresher, payload_context)
file_source_validator = FileSourceValidator()
search_client = SearchClient(self.search_service_url, token_refresher, payload_context)
source_file_checker = SourceFileChecker()
referential_integrity_validator = ManifestIntegrity(
self.search_service_url,
token_refresher,
file_source_validator,
payload_context
)
manifest_processor = ManifestProcessor(
storage_url=self.storage_url,
file_handler=file_handler,
token_refresher=token_refresher,
context=payload_context,
source_file_checker=source_file_checker,
)
validator = SchemaValidator(
self.schema_service_url,
token_refresher,
payload_context,
data_types_with_surrogate_ids=DATA_TYPES_WITH_SURROGATE_KEYS,
......
......@@ -40,10 +40,8 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
@apply_defaults
def __init__(self, previous_task_id: str = None, *args, **kwargs):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.previous_task_id = previous_task_id
self.schema_service_url = Variable.get('core__service__schema__url')
self._show_skipped_ids = Variable.get(
'core__config__show_skipped_ids', default_var=False
)
......@@ -63,7 +61,6 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
token_refresher = AirflowTokenRefresher()
schema_validator = SchemaValidator(
self.schema_service_url,
token_refresher,
payload_context,
surrogate_key_fields_paths=SURROGATE_KEYS_PATHS,
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id=opendes
storage_url=https://blah/api/storage/v2
partition_url=https://blah/api/storage/v2
search_url=https://blah/api/search/v2
legal_url=https://blah/api/legal/v1
data_workflow_url=https://blah/api/data-workflow/v1
entitlements_url=https://blah/api/entitlements/v1
file_dms_url=https://blah/api/filedms/v2
dataset_url=https://blah/api/dataset-registry/v1
schema_url=https://blah/api/schema-service/v1
ingestion_workflow_url=stub
use_service_principal=no
[provider]
name=provider_test
#!/bin/sh
export CLOUD_PROVIDER="provider_test"
export OSDU_API_CONFIG_INI="`dirname $0`/osdu_api.ini"
airflow db init
......
......@@ -6,7 +6,7 @@ requests==2.25.1
tenacity==6.2.0
--extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.12.0
osdu-api~=0.13.0.dev241
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.12.0
osdu-ingestion~=0.13.0.dev68
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment