Skip to content
Snippets Groups Projects

Draft: Gc update dags

Closed Siarhei Poliak [EPAM / GCP] requested to merge gc_update_dags into master
9 files
+ 197
275
Compare changes
  • Side-by-side
  • Inline
Files
9
from datetime import timedelta
from json import dumps
import os
from kubernetes.client import models as k8s_models
import airflow
@@ -36,7 +35,7 @@ user_id = "{{ dag_run.conf['execution_context'].get('userId') }}"
# Constants
DAG_NAME = "csv_ingestion"
DOCKER_IMAGE = "{{ var.value.image__csv_parser }}"
NAMESPACE = "default"
NAMESPACE = "composer-user-workloads"
CSV_PARSER = "csv-parser"
# Values to pass to csv parser
@@ -51,45 +50,20 @@ params = {
# Get environment variables
# TODO: put env vars here from application.properties
env_vars = {
"storage_service_endpoint": "{{ var.value.core__service__storage__url }}",
"schema_service_endpoint": "{{ var.value.core__service__schema__url }}",
"search_service_endpoint": "{{ var.value.core__service__search__url }}",
"partition_service_endpoint": "{{ var.value.core__service__partition__url }}",
"unit_service_endpoint": "{{ var.value.core__service__unit__url }}",
"file_service_endpoint": "{{ var.value.core__service__file__url }}",
"dataset_service_endpoint": "{{ var.value.core__service__dataset__url }}",
"workflow_service_endpoint": "{{ var.value.core__service__workflow__url }}",
"data_service_to_use": "file",
"OPENID_PROVIDER_CLIENT_ID": os.getenv("KEYCLOAK_CLIENT_ID"),
"OPENID_PROVIDER_CLIENT_SECRET": os.getenv("KEYCLOAK_CLIENT_SECRET"),
"OPENID_PROVIDER_URL": os.getenv("CSV_PARSER_KEYCLOAK_AUTH_URL")
}
env_vars = {'SPRING_PROFILES_ACTIVE': 'gcp', 'storage_service_endpoint': '{{ var.value.core__service__storage__url }}', 'schema_service_endpoint': '{{ var.value.core__service__schema__url }}', 'search_service_endpoint': '{{ var.value.core__service__search__url }}', 'partition_service_endpoint': '{{ var.value.core__service__partition__url }}', 'unit_service_endpoint': '{{ var.value.core__service__unit__url }}', 'file_service_endpoint': '{{ var.value.core__service__file__url }}', 'dataset_service_endpoint': '{{ var.value.core__service__dataset__url }}', 'workflow_service_endpoint': '{{ var.value.core__service__workflow__url }}', 'data_service_to_use': 'file'}
if data_service_to_use:
env_vars["data_service_to_use"] = data_service_to_use
operator_kwargs = { "container_resources": k8s_models.V1ResourceRequirements(
operator_kwargs = {'container_resources': k8s_models.V1ResourceRequirements(
limits={
"memory": airflow.models.Variable.get("csv__ingestion__limit__memory", default_var="1Gi"),
"cpu": airflow.models.Variable.get("csv__ingestion__limit__cpu", default_var="1000m")
},
requests={
"memory": airflow.models.Variable.get("csv__ingestion__request__memory", default_var="1Gi"),
"cpu": airflow.models.Variable.get("csv__ingestion__request__cpu", default_var="200m"),
"cpu": airflow.models.Variable.get("csv__ingestion__request__cpu", default_var="200m")
}
),
"annotations": {
"sidecar.istio.io/inject": "false"
},
"startup_timeout_seconds": 300,
"cmds": [
"sh",
"-c",
'java -Djava.security.egd=file:/dev/./urandom ' \
' -Dspring.profiles.active=anthos ' \
'-jar /app/csv-parser-gc.jar \'{"id": "{{ dag_run.conf[\'execution_context\'][\'id\'] }}", "authorization": "{{ dag_run.conf[\'authToken\'] }}", "dataPartitionId": "{{ dag_run.conf[\'execution_context\'][\'dataPartitionId\'] }}", "steps": ["LOAD_FROM_CSV", "TYPE_COERCION", "ID", "ACL", "LEGAL", "KIND", "META", "TAGS", "UNIT", "CRS", "RELATIONSHIP", "STORE_TO_OSDU"], "dataServiceName": "{{ dag_run.conf[\'execution_context\'].get(\'data_service_to_use\', \'file\') }}"}\''
]
}
), 'config_file': '/home/airflow/composer_kube_config', 'startup_timeout_seconds': 300, 'cmds': ['sh', '-c', 'java -Djava.security.egd=file:/dev/./urandom -jar /app/csv-parser-gc.jar \'{"id": "{{ dag_run.conf[\'execution_context\'][\'id\'] }}", "authorization": "{{ dag_run.conf[\'authToken\'] }}", "dataPartitionId": "{{ dag_run.conf[\'execution_context\'][\'dataPartitionId\'] }}", "steps": ["LOAD_FROM_CSV", "TYPE_COERCION", "ID", "ACL", "LEGAL", "KIND", "META", "TAGS", "UNIT", "CRS", "RELATIONSHIP", "STORE_TO_OSDU"], "dataServiceName": "{{ dag_run.conf[\'execution_context\'].get(\'data_service_to_use\', \'file\') }}"}\'']}
with DAG(
DAG_NAME,
@@ -116,7 +90,7 @@ with DAG(
)
update_status_running >> csv_parser >> update_status_finished # pylint: disable=pointless-statement
#@ Version: 0.25.0
#@ Branch: v0.25.0
#@ Commit: 8e1511df
#@ SHA-256 checksum: 12a9c0d83fc72f14baaa5ce76be0dc6ec9a76c0e072466328dbd117b3a41a353
#@ Version: 0.26.0
#@ Branch: v0.26.0
#@ Commit: ee1937ca
#@ SHA-256 checksum: 81fa9a81a137106349ae8d566ae0851bdc55ddde0cfe3c16eb11b9c4912b891d
Loading