Commit ed2f960d authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3759: Add bootstrap

parent 160ea44d
Pipeline #80921 passed with stages
in 1 minute and 11 seconds
.vscode/*
.idea/*
env/**
venv/**
env/**
energistics/src/witsml_parser/osdu/*
!energistics/src/witsml_parser/osdu/.gitkeep
## GCP: DAG FILE RENDERING
```bash
# Install requirements
pip install Jinja2==2.10.1
# Setup Variables
export WITSML_IMAGE_NAME="<image_tag>"
export K8S_NAMESPACE="<your_k8s_namespace>" # ie: default
export K8S_NODE_POOLS=<node_pools> # i.e. pool-1,pool-2 the value will be split by comma
# Three following variables will be added to env_vars of the running WITSML Container (env_var parameter of KubernetesPodOperator)
export OSDU_GCP_BASE_URL=<host_name> # i.e "com.odesprod.com"
export DATA_PARTITION=<data-partition-id> # i.e "osdu"
export SA_FILE_PATH=<path-to-sa-file> # i.e "osdu"
GCP_DEPLOYMENTS_SUBDIR="<path_to_folder>" # ie: path to GCP deployment scripts folder
GCP_DAG_DIRECTORY="<path_to_folder>" # ie: energistics/src/dags/energistics/witsml_parser_dag.py
OUTPUT_FILE="<path_to_output_file>" # ie. to avoid original file overwriting
chmod +x bootstrap.sh
./bootstrap.sh
```
#!/usr/bin/env sh
#
# Purpose: Initialize WITSML DAG.
# Usage:
# bootstrap.sh
###############################
## ARGUMENT INPUT ##
###############################
usage() { echo "Usage: bootstrap.sh"; }
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- START";
echo "------------------------------------";
if [ -z "$K8S_NAMESPACE" ]; then
echo 'ERROR: K8S_NAMESPASE not provided'
usage;
else
export NAMESPACE="$K8S_NAMESPACE"
fi
if [ -z "$K8S_NODE_POOLS" ]; then
echo 'WARNING: K8S_NODE_POOLS not provided. Will be used default node pool'
export K8S_NODE_POOLS="default"
else
export K8S_NODE_POOLS=$K8S_NODE_POOLS
fi
if [ -z "$WITSML_IMAGE_NAME" ]; then
echo 'ERROR: WITSML_IMAGE_NAME not provided'
usage;
else
export WITSML_IMAGE_NAME="$WITSML_IMAGE_NAME"
fi
if [ -z "$OSDU_GCP_BASE_URL" ]; then
echo 'ERROR: OSDU_GCP_BASE_URL not provided'
usage;
else
export OSDU_GCP_BASE_URL="$OSDU_GCP_BASE_URL"
fi
if [ -z "$DATA_PARTITION" ]; then
echo 'ERROR: DATA_PARTITION not provided'
usage;
else
export DATA_PARTITION="$DATA_PARTITION"
fi
if [ -z "$SA_FILE_PATH" ]; then
echo 'ERROR: SA_FILE_PATH not provided'
usage;
else
export SA_FILE_PATH="${SA_FILE_PATH}"
fi
printf "\n"
echo "SETTING POPULATION TO THE DAG FILE";
echo "------------------------------------";
if [ -z $OUTPUT_FILE ]; then
echo "THE ORIGINAL DAG FILE WILL BE OVERWRITTEN"
python3 $GCP_DEPLOYMENTS_SUBDIR/render_dag_file.py -f $GCP_DAG_DIRECTORY/witsml_parser_dag.py
else
echo "RENDERED OUTPUT WILL BE SAVED INTO ${OUTPUT_FILE}"
python3 $GCP_DEPLOYMENTS_SUBDIR/render_dag_file.py -f $GCP_DAG_DIRECTORY/witsml_parser_dag.py -o ${OUTPUT_FILE}
fi
if test $? = '1'; then
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- ERROR";
echo "------------------------------------";
exit 1
else
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- FINISH";
echo "------------------------------------";
exit 0
fi
import argparse
import os
from jinja2 import Environment, FileSystemLoader
from typing import Dict, List
class DAGFileRenderer:
def __init__(self, file_path):
self.file_path = file_path
self.docker_image = os.getenv("WITSML_IMAGE_NAME")
self.namespace = os.getenv("NAMESPACE")
self.dag_name = "Energistics_xml_ingest"
def _get_node_pools(self) -> List:
pools = os.getenv("K8S_NODE_POOLS")
return [p.strip() for p in pools.split(",")]
def _prepare_env_vars(self) -> Dict:
return {
"SA_FILE_PATH": os.getenv("SA_FILE_PATH"),
"OSDU_GCP_BASE_URL": os.getenv("OSDU_GCP_BASE_URL"),
"DATA_PARTITION": os.getenv("DATA_PARTITION"),
"CLOUD_PROVIDER": "gcp"
}
def _get_resources(self):
return {
"request_memory": os.getenv("GCP_WITSML_INGESTION_REQUEST_MEMORY", default="1Gi"),
"request_cpu": os.getenv("GCP_WITSML_INGESTION_REQUEST_CPU", default="200m"),
"limit_memory": os.getenv("GCP_WITSML_INGESTION_LIMIT_MEMORY", default="8Gi"),
"limit_cpu": os.getenv("GCP_WITSML_INGESTION_LIMIT_CPU", default="1000m")
}
def _prepare_operator_kwargs(self) -> Dict:
return {
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": self._get_node_pools()
}]
}]
}
}
},
"resources": self._get_resources(),
"startup_timeout_seconds": os.getenv("GCP_WITSML_INGESTION_STARTUP_TIMEOUT", default=300)
}
def _render_template(self, file) -> str:
env = Environment(
loader=FileSystemLoader(searchpath=os.getenv("GCP_DAG_DIRECTORY", default="src/dags")),
variable_start_string='{|', variable_end_string='|}',
)
template = env.get_template(os.path.basename(file.name))
params = {
"DAG_NAME": self.dag_name,
"DOCKER_IMAGE": self.docker_image,
"K8S_NAMESPACE": self.namespace,
"K8S_POD_KWARGS": self._prepare_operator_kwargs(),
"ENV_VARS": self._prepare_env_vars(),
}
return template.render(**params)
def render(self, output_path=None):
print(f"Start {self.file_path} file rendering")
output_path = output_path or self.file_path
with open(self.file_path, "r") as f:
rendered_file_data = self._render_template(f)
with open(output_path, "w") as f:
f.write(rendered_file_data)
print(f"Finish {self.file_path} file rendering")
print(f"Rendered output file: {output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Render python dag file. Populate required environment settins and parameters.")
parser.add_argument("-f", type=str,
help="The path to the load python dag file.",
default=None)
parser.add_argument("-o", type=str,
help="The path to the rendered dag file.",
default=None)
arguments = parser.parse_args()
file_path = arguments.f
output_path = arguments.o
DAGFileRenderer(file_path).render(output_path=output_path)
Jinja2==2.10.1
\ No newline at end of file
energistics:
energistics_witsml_parser_k8s_operator:
image: "IMAGE"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-nodepool
operator: In
values:
- pool-1
env_vars:
SA_FILE_PATH: "{{ var.value.core__config__sa_file_path }}"
OSDU_GCP_BASE_URL: "{{ var.value.core__config__osdu_gcp_base_url }}"
DATA_PARTITION: "{{ var.value.core__config__data_partition }}"
CLOUD_PROVIDER: gcp
resources:
request_memory: "1Gi"
request_cpu: 200m
limit_memory: 1Gi
limit_cpu: 1000m
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Energistics XML ingest."""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from osdu_airflow.backward_compatibility.default_args import update_default_args
from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from osdu_airflow.operators.process_manifest_r3 import ProcessManifestOperatorR3
from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
dag_name = "{| DAG_NAME|default('Energistics_xml_ingest') |}"
docker_image = "{| DOCKER_IMAGE |}"
k8s_namespace = "{| K8S_NAMESPACE |}"
env_vars = {| ENV_VARS|default('{}') |}
K8S_POD_KWARGS = {| K8S_POD_KWARGS|default('{}') |}
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"
default_args = {
"name": dag_name,
"owner": "energistics",
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(minutes=50),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
dag = DAG(
"Energistics_xml_ingest",
default_args=default_args,
description="witsml parser dag",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
)
with dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
trigger_rule="all_done"
)
process_energistics_op = KubernetesPodOperator(
namespace="default",
task_id="witsml_parser_task",
name="witsml_parser_task",
do_xcom_push=True,
image=docker_image,
env_vars=env_vars,
cmds=[
"sh",
"-c",
"mkdir -p /airflow/xcom/; "
"python main.py"
" --context '{{ dag_run.conf['execution_context'] | tojson }}'"
" --file_service {{ var.value.core__service__file__host }}"
" --out /airflow/xcom/return.json",
],
is_delete_operator_pod=True,
image_pull_policy="Always",
get_logs=True,
startup_timeout_seconds=300,
**K8S_POD_KWARGS
)
validate_schema_operator = ValidateManifestSchemaOperator(
task_id="validate_manifest_schema_task",
previous_task_id=process_energistics_op.task_id,
trigger_rule="none_failed_or_skipped"
)
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=ENSURE_INTEGRITY_TASK,
previous_task_id=validate_schema_operator.task_id,
trigger_rule="none_failed_or_skipped"
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
previous_task_id=ensure_integrity_op.task_id,
trigger_rule="none_failed_or_skipped"
)
update_status_running_op >> \
process_energistics_op >> \
validate_schema_operator >> \
ensure_integrity_op >> \
process_single_manifest_file >> \
update_status_finished_op
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment