Skip to content
Snippets Groups Projects
Commit 3d0e0f7b authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

GONRG-1719: updated variables to support ENV approach

parent 7f1c24f6
No related branches found
No related tags found
1 merge request!24Ingestion updates
Pipeline #29045 passed
Showing
with 57 additions and 62 deletions
......@@ -62,37 +62,32 @@ cloud platforms. This Python SDK is located in a separate [os-python-sdk] folder
## Required Variables
### Common naming convention
Some variables are defined using Airflow Variables.
Variable should has prefix which define where variable are used:
- **core.** - use in common part of DAGs;
- **gcp.**, **azure.**, **ibm.**, **aws.** - use in cloud-specific modules of DAGs;
- **sdk.** - pass to Python SDK.
Some variables are defined using Airflow Variables.
Variable should has prefix which define where variable are used:
- **core__** - use in common part of DAGs;
- **gcp__**, **azure__**, **ibm__**, **aws__** - use in cloud-specific modules of DAGs;
- **sdk__** - pass to Python SDK.
If variable defines URL to internal services it should have suffix which show the completeness of the URL:
- **.url** - the variable should contain full path to service API endpoint;
- **.host** - the variable should contain just service host value. The full path to service API endpoint constructed inside operators.
- **__url** - the variable should contain full path to service API endpoint;
- **__host** - the variable should contain just service host value. The full path to service API endpoint constructed inside operators.
### Internal Services
### Internal Services
|Variable |Value Description|
|---|---|
| core.service.storage.url | Storage Service API endpoint to save records |
| core.service.search.url | Search Service API endpoint to search queries |
| core.service.workflow.host | Workflow Service host |
| core.service.workflow.url | (Deprecated) Workflow Service API endpoint to update status |
| core.service.file.host | File Service host |
| core.service.schema.url | Schema Service API endpoint to get schema by Id |
| core__service__storage__url | Storage Service API endpoint to save records |
| core__service__search__url | Search Service API endpoint to search queries |
| core__service__workflow__host | Workflow Service host |
| core__service__workflow__url | (Deprecated) Workflow Service API endpoint to update status |
| core__service__file__host | File Service host |
| core__service__schema__url | Schema Service API endpoint to get schema by Id |
### Configuration
|Variable |Value Description|
|---|---|
| core.dataload_config_path| Path to dataload.ini file. Used in R2 manifest ingestion|
### OSDU Python SDK
|Variable |Value Description|
|---|---|
| core.provider | Need to properly initialize OSDU SDK and cloud-specific modules |
| core__config__dataload_config_path| Path to dataload.ini file. Used in R2 manifest ingestion|
## Testing
......
......@@ -59,19 +59,19 @@ class FileDownloadUrlResponse:
class FileHandler(HeadersMixin):
"""Class to perform operations using OSDU File Service."""
def __init__(self, file_service_url: str, token_refresher: TokenRefresher, context: Context,
def __init__(self, file_service_host: str, token_refresher: TokenRefresher, context: Context,
blob_storage_client: BlobStorageClient = None):
"""File handler.
:param file_service_url: Base OSDU File service url
:type file_service_url: str
:param file_service_host: Base OSDU File service url
:type file_service_host: str
:param token_refresher: Object to refresh tokens
:type token_refresher: TokenRefresher
:param context: The tenant context data
:type context: Context
"""
super().__init__(context)
self._file_service_url = file_service_url
self._file_service_host = file_service_host
self.token_refresher = token_refresher
self._blob_storage_client = blob_storage_client or blob_storage.get_client()
......@@ -148,7 +148,7 @@ class FileHandler(HeadersMixin):
:rtype: FileUploadUrlResponse
"""
logger.debug("Getting upload signed url.")
endpoint = f"{self._file_service_url}/v1/files/uploadURL"
endpoint = f"{self._file_service_host}/v1/files/uploadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
upload_url_response = FileUploadUrlResponse(file_id=response["FileID"],
......@@ -168,7 +168,7 @@ class FileHandler(HeadersMixin):
:rtype: FileDownloadUrlResponse
"""
logger.debug("Getting download signed url.")
endpoint = f"{self._file_service_url}/v1/files/{record_id}/downloadURL"
endpoint = f"{self._file_service_host}/v1/files/{record_id}/downloadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
download_url_response = self._handle_download_url_response(response)
......@@ -202,7 +202,7 @@ class FileHandler(HeadersMixin):
"""
logger.debug("Getting file location.")
request_body = json.dumps({"FileID": file_id})
endpoint = f"{self._file_service_url}/getFileLocation"
endpoint = f"{self._file_service_host}/getFileLocation"
response = self._send_post_request(headers, endpoint, request_body)
logger.debug("File location got.")
return response.json()["Location"]
......@@ -269,7 +269,7 @@ class FileHandler(HeadersMixin):
f"surrogate_name_{file_record['data']['DatasetProperties']['FileSourceInfo']['PreloadFilePath'].split('/')[-1]}"
logger.info(f"Generated name: {file_record['data']['Name']}")
logger.info("Sending file record metadata to File service")
endpoint = f"{self._file_service_url}/v1/files/metadata"
endpoint = f"{self._file_service_host}/v1/files/metadata"
response = self._send_post_request(self.request_headers, endpoint, json.dumps(file_record))
return response.json()["id"]
......
......@@ -50,7 +50,7 @@ class AirflowTokenRefresher(TokenRefresher):
:rtype: str
"""
self._credentials.refresh_token()
self.airflow_variables.set("core.access_token", self._credentials.access_token)
self.airflow_variables.set("core__auth__access_token", self._credentials.access_token)
self._access_token = self._credentials.access_token
return self._access_token
......@@ -63,7 +63,7 @@ class AirflowTokenRefresher(TokenRefresher):
"""
if not self._access_token:
try:
self._access_token = self.airflow_variables.get("core.access_token")
self._access_token = self.airflow_variables.get("core__auth__access_token")
except KeyError:
self.refresh_token()
return self._access_token
......
......@@ -28,7 +28,7 @@ from operators.process_manifest_r3 import ProcessManifestOperatorR3
from operators.update_status import UpdateStatusOperator
from operators.validate_manifest_schema import ValidateManifestSchemaOperator
BATCH_NUMBER = int(Variable.get("core.batch_count", "3"))
BATCH_NUMBER = int(Variable.get("core__ingestion__batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
......
......@@ -84,7 +84,7 @@ class UpdateStatusOperator(BaseOperator):
status = self.status.value
status_updater = UpdateStatus(
workflow_name="",
workflow_url=Variable.get("core.service.workflow.url"),
workflow_url=Variable.get("core__service__workflow__url"),
workflow_id=workflow_id,
run_id="",
status=status,
......
......@@ -39,7 +39,7 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
*args, **kwargs):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.search_url = Variable.get('core.service.search.url')
self.search_url = Variable.get('core__service__search__url')
self.previous_task_id = previous_task_id
def execute(self, context: dict):
......
......@@ -34,7 +34,7 @@ from libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.auth.authorization import authorize
config = configparser.RawConfigParser()
config.read(Variable.get("core.dataload_config_path"))
config.read(Variable.get("core__config__dataload_config_path"))
DEFAULT_TENANT = config.get("DEFAULTS", "tenant")
DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
......@@ -317,7 +317,7 @@ def send_request(headers, request_data):
for retry in range(retries):
try:
# send batch request for creating records
response = requests.put(Variable.get('core.service.storage.url'), json.dumps(request_data),
response = requests.put(Variable.get('core__service__storage__url'), json.dumps(request_data),
headers=headers)
if response.status_code in DATA_LOAD_OK_RESPONSE_CODES:
......
......@@ -57,11 +57,11 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
super().__init__(*args, **kwargs)
self.previous_task_id = previous_task_id
self.batch_number = batch_number
self.schema_service_url = Variable.get('core.service.schema.url')
self.search_service_url = Variable.get('core.service.search.url')
self.storage_url = Variable.get('core.service.storage.url')
self.file_service_url = Variable.get('core.service.file.host')
self.batch_count = int(Variable.get("core.batch_count", "3"))
self.schema_service_url = Variable.get('core__service__schema__url')
self.search_service_url = Variable.get('core__service__search__url')
self.storage_url = Variable.get('core__service__storage__url')
self.file_service_host = Variable.get('core__service__file__host')
self.batch_count = int(Variable.get("core__ingestion__batch_count", "3"))
def _get_manifest_files_range(self, manifests: List[dict]) -> Tuple[int, int]:
"""
......@@ -120,7 +120,7 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
execution_context = context["dag_run"].conf["execution_context"]
payload_context = Context.populate(execution_context)
token_refresher = AirflowTokenRefresher()
file_handler = FileHandler(self.file_service_url, token_refresher, payload_context)
file_handler = FileHandler(self.file_service_host, token_refresher, payload_context)
file_source_validator = FileSourceValidator()
search_client = SearchClient(self.search_service_url, token_refresher, payload_context)
source_file_checker = SourceFileChecker()
......
......@@ -45,6 +45,6 @@ class SearchRecordIdOperator(BaseOperator):
"""
payload_context = Context.populate(context["dag_run"].conf)
record_ids = context["ti"].xcom_pull(key="record_ids", )
ids_searcher = SearchId(Variable.get("core.service.search.url"), record_ids, AirflowTokenRefresher(),
ids_searcher = SearchId(Variable.get("core__service__search__url"), record_ids, AirflowTokenRefresher(),
payload_context)
ids_searcher.check_records_searchable()
......@@ -86,7 +86,7 @@ class UpdateStatusOperator(BaseOperator):
status = self.status.value
status_updater = UpdateStatus(
workflow_name=workflow_name,
workflow_url=Variable.get("core.service.workflow.host"),
workflow_url=Variable.get("core__service__workflow__host"),
workflow_id="",
run_id=run_id,
status=status,
......
......@@ -41,7 +41,7 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
"""Init base operator and obtain base urls from Airflow Variables."""
super().__init__(*args, **kwargs)
self.previous_task_id = previous_task_id
self.schema_service_url = Variable.get('core.service.schema.url')
self.schema_service_url = Variable.get('core__service__schema__url')
def execute(self, context: dict):
"""Execute manifest validation then process it.
......
......@@ -38,13 +38,13 @@ from libs.handle_file import FileDownloadUrlResponse, FileUploadUrlResponse, Fil
class TestFileHandler:
BASE_TEST_URL = "http://file_service_url"
BASE_TEST_HOST = "http://file_service_url"
PARTITION_ID = "test_partition_id"
@pytest.fixture()
def file_handler(self, monkeypatch):
context = Context(data_partition_id=self.PARTITION_ID, app_key="")
file_handler = FileHandler(self.BASE_TEST_URL,
file_handler = FileHandler(self.BASE_TEST_HOST,
AirflowTokenRefresher(get_test_credentials()),
context)
monkeypatch.setattr(
......@@ -61,7 +61,7 @@ class TestFileHandler:
@responses.activate
def test_get_file_staging_location(self, file_handler: FileHandler):
test_staging_location = "gs://staging/test/file_id"
responses.add(responses.POST, f"{self.BASE_TEST_URL}/getFileLocation",
responses.add(responses.POST, f"{self.BASE_TEST_HOST}/getFileLocation",
json={"Location": test_staging_location}, status=http.HTTPStatus.OK)
assert test_staging_location == file_handler.get_file_staging_location("/test/file_id")
......@@ -76,7 +76,7 @@ class TestFileHandler:
@responses.activate
def test_get_file_staging_location_error(self, file_handler: FileHandler,
http_status: str, reason: str):
responses.add(responses.POST, f"{self.BASE_TEST_URL}/getFileLocation",
responses.add(responses.POST, f"{self.BASE_TEST_HOST}/getFileLocation",
status=http_status, body=reason)
with pytest.raises((tenacity.RetryError, requests.exceptions.HTTPError)):
......@@ -91,7 +91,7 @@ class TestFileHandler:
"unsignedUrl": test_permanent_location,
"kind": "test_kind"
}
responses.add(responses.GET, f"{self.BASE_TEST_URL}/v1/files/{test_record_id}/downloadURL",
responses.add(responses.GET, f"{self.BASE_TEST_HOST}/v1/files/{test_record_id}/downloadURL",
json=json_response, status=http.HTTPStatus.OK)
assert test_permanent_location == file_handler.get_file_permanent_location(test_record_id)
......@@ -106,7 +106,7 @@ class TestFileHandler:
def test_get_file_permanent_location_error(self, file_handler: FileHandler,
http_status: str, reason: str):
test_record_id = "test_record_id"
responses.add(responses.GET, f"{self.BASE_TEST_URL}/v1/files/{test_record_id}/downloadURL",
responses.add(responses.GET, f"{self.BASE_TEST_HOST}/v1/files/{test_record_id}/downloadURL",
status=http_status, body=reason)
with pytest.raises((tenacity.RetryError, requests.exceptions.HTTPError)):
......@@ -124,7 +124,7 @@ class TestFileHandler:
file_record, unused_wpc_record, unused_wp_record = json.load(cf)
test_record_id = "test_record_id"
responses.add(responses.POST, f"{self.BASE_TEST_URL}/v1/files/metadata",
responses.add(responses.POST, f"{self.BASE_TEST_HOST}/v1/files/metadata",
json={"id": test_record_id}, status=http.HTTPStatus.OK)
assert test_record_id == file_handler.save_file_record(file_record)
......
......@@ -47,7 +47,7 @@ class TestAirflowTokenRefresher:
Check if access token stored in Airflow Variables after refreshing it.
"""
token_refresher.refresh_token()
assert token_refresher.airflow_variables.get("core.access_token") == access_token
assert token_refresher.airflow_variables.get("core__auth__access_token") == access_token
@pytest.mark.parametrize(
"access_token",
......
......@@ -41,16 +41,16 @@ sed -i 's/load_examples = True/load_examples = False/' /usr/local/airflow/airfl
# turn on all dags
sed -i 's/dags_are_paused_at_creation = True/dags_are_paused_at_creation = False/' /usr/local/airflow/airflow.cfg
airflow variables -s core.service.storage.url $STORAGE_URL
airflow variables -s core.provider gcp
airflow variables -s core.service.workflow.host $WORKFLOW_URL
airflow variables -s core.service.file.host $LOCALHOST
airflow variables -s core.service.workflow.url $UPDATE_STATUS_URL
airflow variables -s core.service.search.url $SEARCH_URL
airflow variables -s core.service.schema.url $LOCALHOST
airflow variables -s core.dataload_config_path $DATALOAD_CONFIG_PATH
airflow variables -s core.access_token test
airflow variables -s core.batch_count 3
airflow variables -s core__service__storage__url $STORAGE_URL
airflow variables -s core__provider gcp
airflow variables -s core__service__workflow__host $WORKFLOW_URL
airflow variables -s core__service__file__host $LOCALHOST
airflow variables -s core__service__workflow__url $UPDATE_STATUS_URL
airflow variables -s core__service__search__url $SEARCH_URL
airflow variables -s core__service__schema__url $LOCALHOST
airflow variables -s core__config__dataload_config_path $DATALOAD_CONFIG_PATH
airflow variables -s core__auth__access_token test
airflow variables -s core__ingestion__batch_count 3
airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
airflow connections -a --conn_id google_cloud_storage --conn_uri $LOCALHOST
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment