Skip to content
Snippets Groups Projects
Commit 5c8be559 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

Anthos bootstrap

parent 08d6cfde
No related branches found
No related tags found
2 merge requests!56Update to master,!46Anthos bootstrap
Pipeline #105135 failed
......@@ -31,6 +31,7 @@ variables:
OSDU_GCP_SEISMIC_STORE_SERVICE_ENDPOINT: https://seismic-store-attcrcktoa-uc.a.run.app/api/v3
SEGY_VDS_DOCKER_IMAGE: "community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/seismic/open-vds/openvds-ingestion:latest"
OSDU_GCP_OUTPUT_DAG_DIR: bootstrapped_dag
OSDU_ANTHOS_OUTPUT_DAG_DIR: bootstraped_dag_anthos
OSDU_GCP_SEISMIC_STORE_SERVICE_COMMUNITY_ENDPOINT: https://community.gcp.gnrg-osdu.projects.epam.com/api/seismic-store/v3
OSDU_GCP_SEISMIC_STORE_SERVICE_PRE_SHIP_ENDPOINT: https://preship.osdu.club/api/seismic-store/v3
......
## ANTHOS: DAG FILE RENDERING
```bash
# Install requirements
pip install Jinja2==2.10.1
# Setup Variables
ANTHOS_DEPLOYMENTS_SUBDIR="<path_to_folder>" # ie: path to GCP deployment scripts folder
ANTHOS_DAG_DIRECTORY="<path_to_folder>" # ie: segy-to-vds-conversion/airflow/workflow-svc-v2
OUTPUT_FILE="<path_to_output_file>" # ie. to avoid original file overwriting
chmod +x bootstrap.sh
./bootstrap.sh
```
Workflow Payload Example:
{
"executionContext": {
"Payload": {
"AppKey": "<some-app-key>",
"data-partition-id": "<data-partition-id>"
},
"vds_url": "sd://<tenant>/<subproject>/<path>",
"segy_url": "sd://<tenant>/<subproject>/<path>/<filename>",
"persistent_id": "<string>",
"id_token": "<id_token>"
}
}
\ No newline at end of file
#!/usr/bin/env sh
#
# Purpose: Initialize the VDS Parser DAGS.
# Usage:
# bootstrap.sh
###############################
## ARGUMENT INPUT ##
###############################
usage() { echo "Usage: bootstrap.sh"; }
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- START";
echo "------------------------------------";
printf "\n"
echo "SETTING POPULATION TO THE DAG FILE";
echo "------------------------------------";
if [ -z $OUTPUT_FILE ]; then
echo "THE ORIGINAL DAG FILE WILL BE OVERWRITTEN"
python3 $ANTHOS_DEPLOYMENTS_SUBDIR/render_dag_file.py -f $ANTHOS_DAG_DIRECTORY/segy_to_vds_ssdms_conversion_dag.py
else
echo "RENDERED OUTPUT WILL BE SAVED INTO ${OUTPUT_FILE}"
python3 $ANTHOS_DEPLOYMENTS_SUBDIR/render_dag_file.py -f $ANTHOS_DAG_DIRECTORY/segy_to_vds_ssdms_conversion_dag.py -o ${OUTPUT_FILE}
fi
if test $? = '1'; then
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- ERROR";
echo "------------------------------------";
exit 1
else
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- FINISH";
echo "------------------------------------";
exit 0
fi
import argparse
import os
from jinja2 import Environment, FileSystemLoader
from typing import Dict, List
class DAGFileRenderer:
def __init__(self, file_path):
self.file_path = file_path
self.docker_image = "{{ var.value.anthos__image__segy_to_vds_converter }}"
# As for 25 FEB 2022, there is no namespace parameter in template_fields (https://github.com/apache/airflow/blob/2.2.0/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L165),
# so we have to hardcode this value for now.
self.namespace = "{{ var.value.anthos__parsers_k8s_namespace }}"
self.sd_svc_url = "{{ var.value.core__service__seismic__url }}"
self.dag_name = "Segy_to_vds_conversion_sdms"
def _get_resources(self):
return """{
"request_memory": airflow.models.Variable.get("anthos__vds_ingestion_request_memory", default_var="2Gi"),
"request_cpu": airflow.models.Variable.get("anthos__vds_ingestion_request_cpu", default_var="200m"),
"limit_memory": airflow.models.Variable.get("anthos__vds_ingestion_limit_memory", default_var="8Gi"),
"limit_cpu": airflow.models.Variable.get("anthos__vds_ingestion_limit_cpu", default_var="1000m")
}"""
def _prepare_operator_kwargs(self) -> Dict:
return f"""{{
"resources": {self._get_resources()},
"startup_timeout_seconds": 300
}}"""
def _prepare_extra_env_vars(self) -> Dict:
return """{
"S3_ENDPOINT_OVERRIDE": "{{ var.value.core__service__minio__url }}"
}"""
def _render_template(self, file) -> str:
env = Environment(
loader=FileSystemLoader(searchpath=os.getenv("GCP_DAG_DIRECTORY", default="src/dags")),
variable_start_string='{|', variable_end_string='|}',
)
template = env.get_template(os.path.basename(file.name))
params = {
"DAG_NAME": self.dag_name,
"DOCKER_IMAGE": self.docker_image,
"K8S_NAMESPACE": self.namespace,
"K8S_POD_KWARGS": self._prepare_operator_kwargs(),
"EXTRA_ENV_VARS": self._prepare_extra_env_vars(),
"SEISMIC_STORE_URL": self.sd_svc_url,
}
return template.render(**params)
def render(self, output_path=None):
print(f"Start {self.file_path} file rendering")
output_path = output_path or self.file_path
with open(self.file_path, "r") as f:
rendered_file_data = self._render_template(f)
with open(output_path, "w") as f:
f.write(rendered_file_data)
print(f"Finish {self.file_path} file rendering")
print(f"Rendered output file: {output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Render python dag file. Populate required environment settins and parameters.")
parser.add_argument("-f", type=str,
help="The path to the load python dag file.",
default=None)
parser.add_argument("-o", type=str,
help="The path to the rendered dag file.",
default=None)
arguments = parser.parse_args()
file_path = arguments.f
output_path = arguments.o
DAGFileRenderer(file_path).render(output_path=output_path)
Jinja2==2.10.1
markupsafe==2.0.1
......@@ -16,6 +16,15 @@ osdu_gcp_bootstrap_dag_pre_ship:
variables:
<<: *osdu_gcp_common_bootstrap_variables
osdu_anthos_bootstrap_dag:
stage: bootstrap_dag
variables:
ANTHOS_DAG_DIRECTORY: $CI_PROJECT_DIR/src/dags
ANTHOS_DEPLOYMENTS_SUBDIR: deployments/scripts/anthos
OSDU_ANTHOS_BOOTSTRAP_REQUIREMENTS: deployments/scripts/anthos/requirements.txt
OSDU_ANTHOS_BOOTSTRAP_SCRIPT: deployments/scripts/anthos/bootstrap.sh
OUTPUT_FILE: ${OSDU_ANTHOS_OUTPUT_DAG_DIR}/segy_to_vds_ssdms_conversion_dag.py
osdu_gcp_publish_dag:
stage: publish_dag
variables:
......@@ -28,6 +37,12 @@ osdu_gcp_deploy_composer_community:
DAG_FOLDER: $OSDU_GCP_OUTPUT_DAG_DIR
OSDU_GCP_DAG_FOLDER: segy_to_vds
osdu_anthos_publish_dag:
stage: publish_dag
variables:
BOOTSTRAPPED_DAG_PATH: ${OSDU_ANTHOS_OUTPUT_DAG_DIR}/segy_to_vds_ssdms_conversion_dag.py
DAG_FILE: segy_to_vds_ssdms_conversion_dag.py
osdu_gcp_deploy_composer_pre_ship:
stage: deploy
variables:
......
......@@ -44,6 +44,11 @@ K8S_POD_KWARGS = {| K8S_POD_KWARGS |}
if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {}
seismic_store_url = "{| SEISMIC_STORE_URL |}"
env_vars = {
"SD_SVC_URL": seismic_store_url,
"SD_SVC_API_KEY": "test",
}
env_vars.update({| EXTRA_ENV_VARS|default('{}') |})
dag = DAG(
dag_name,
......@@ -61,10 +66,7 @@ with dag:
segy_to_vds = KubernetesPodSegyToOpenVDSOperator(
task_id='segy_to_vds_ssdms_conversion',
name='segy_vds_conversion',
env_vars={
"SD_SVC_URL": seismic_store_url,
"SD_SVC_API_KEY": "test"
},
env_vars=env_vars,
cmds=['SEGYImport'],
namespace=k8s_namespace,
image=docker_image,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment