Skip to content
Snippets Groups Projects
Commit 6c5422b1 authored by Vadzim Kulyba's avatar Vadzim Kulyba
Browse files

feat: upgrade base azure pipeline and added m13 support

parent 3f27b52b
No related branches found
No related tags found
1 merge request!74feat: upgrade base azure pipeline and added m13 support
Showing
with 200 additions and 982 deletions
...@@ -41,6 +41,7 @@ stages: ...@@ -41,6 +41,7 @@ stages:
- review - review
- linters - linters
- build - build
- test
- containerize - containerize
- bootstrap_dag - bootstrap_dag
- publish_dag - publish_dag
...@@ -60,6 +61,9 @@ include: ...@@ -60,6 +61,9 @@ include:
- project: "osdu/platform/ci-cd-pipelines" - project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/osdu-gcp-dag.yaml" file: "cloud-providers/osdu-gcp-dag.yaml"
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/azure_dag.yml"
- local: "/devops/azure/override-stages.yml" - local: "/devops/azure/override-stages.yml"
- local: "/devops/gcp/pipeline/override-stages.yml" - local: "/devops/gcp/pipeline/override-stages.yml"
- local: "/devops/ibm/ibm-stages.yml" - local: "/devops/ibm/ibm-stages.yml"
......
FROM community.opengroup.org:5555/osdu/platform/deployment-and-operations/base-containers-azure/alpine-python3:0.0.2
ARG DATA_PARTITION=opendes
ENV BUILD_PACKAGES \
py-cryptography
ENV PYTHONUNBUFFERED=1 \
PATH="/home/osdu/.local/bin:${PATH}" \
DATA_PARTITION=${DATA_PARTITION} \
DAG_DIRECTORY=../airflow/workflow-svc-v2 \
DAG_TASK_IMAGE=${DAG_TASK_IMAGE} \
AZURE_DEPLOYMENTS_SUBDIR=. \
AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR=.
USER root
RUN set -x \
echo "**** install Python ****" && \
apk update && \
apk add --no-cache ${BUILD_PACKAGES} &&\
rm -rf /var/cache/apk/*
# Install Requirements
ADD deployments/scripts/azure/requirements.txt ./requirements.txt
RUN pip install -r requirements.txt
# Add Scripts
ADD deployments/scripts/azure/*.py scripts/
ADD deployments/scripts/azure/bootstrap.sh scripts/
RUN chmod +x scripts/bootstrap.sh
# Add DAGs
ADD airflow airflow
# Execute as non root user.
USER osdu
WORKDIR /home/osdu/scripts
ENTRYPOINT ["/home/osdu/scripts/bootstrap.sh"]
# These packages need to be installed in AF Env.
--extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api==0.14.0
--extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple/
osdu-airflow==0.14.0
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion==0.14.0
# Copyright © Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import timedelta
from json import dumps
import airflow
from airflow import DAG
from airflow import configuration as conf
from osdu_airflow.backward_compatibility.default_args import \
update_default_args
from osdu_airflow.operators.process_manifest_r3 import \
ProcessManifestOperatorR3
from osdu_airflow.operators.segy_open_vds_conversion import \
KubernetesPodSegyToOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
# default args for airflow
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
# Get values from dag run configuration
record_id = "{{ dag_run.conf['execution_context']['id'] }}"
authorization = "{{ dag_run.conf['authToken'] }}".replace("Bearer ", "")
svc_token = "{{ dag_run.conf['execution_context']['id_token'] or dag_run.conf['authToken'] }}"
# Constants
DAG_NAME = "{| DAG_NAME |}"
docker_image = "{| DOCKER_IMAGE |}"
NAMESPACE = "{| NAMESPACE |}"
SEGY_CONVERTER = "segy-to-vds"
K8S_POD_KWARGS = {| K8S_POD_OPERATOR_KWARGS |}
if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {}
kube_config_path = os.path.join(os.environ['AIRFLOW_HOME'], 'kube_config.yaml')
persistent_id = "{{ dag_run.conf['execution_context']['persistent_id'] }}"
vds_url = "{{ dag_run.conf['execution_context']['vds_url'] }}"
segy_url = "{{ dag_run.conf['execution_context']['segy_url'] }}"
connection_string = "{{ dag_run.conf['execution_context'][''] }}"
env_vars = {
"SD_SVC_URL": "https://{{ var.value.azure_dns_host }}/seistore-svc/api/v3",
"SD_SVC_TOKEN": svc_token,
"SD_SVC_API_KEY": "NA",
}
svc_connection_string = f'sdauthorityurl={env_vars["SD_SVC_URL"]};sdapikey={env_vars["SD_SVC_API_KEY"]};sdtoken={svc_token}'
dag = DAG(
DAG_NAME,
default_args=default_args,
description="Airflow DAG for transformation from SEGY to OpenVDS",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
)
with dag:
update_status_running = UpdateStatusOperator(
task_id="update_status_running",
)
segy_to_vds = KubernetesPodSegyToOpenVDSOperator(
task_id=SEGY_CONVERTER,
dag=dag,
name=DAG_NAME,
env_vars=env_vars,
cmds=['SEGYImport'],
arguments=[
f'--persistentID', f'{persistent_id}',
f'--url', f'{vds_url}',
f'--url-connection', f'{svc_connection_string}',
f'--input-connection', f'{svc_connection_string}',
f"{segy_url}"
],
namespace=NAMESPACE,
image=docker_image,
is_delete_operator_pod=True,
trigger_rule="none_failed_or_skipped",
**K8S_POD_KWARGS
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id="process_single_manifest_file_task",
previous_task_id=segy_to_vds.task_id,
trigger_rule="none_failed_or_skipped"
)
update_status_finished = UpdateStatusOperator(
task_id="update_status_finished",
trigger_rule="all_done"
)
update_status_running >> segy_to_vds >> process_single_manifest_file >> update_status_finished # pylint: disable=pointless-statement
# Copyright © Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import timedelta
from json import dumps
import airflow
from airflow import DAG
from airflow import configuration as conf
from osdu_airflow.backward_compatibility.default_args import \
update_default_args
from osdu_airflow.operators.process_manifest_r3 import \
ProcessManifestOperatorR3
from osdu_airflow.operators.segy_open_vds_conversion import \
KubernetesPodSegyToOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
# default args for airflow
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
# Get values from dag run configuration
record_id = "{{ dag_run.conf['execution_context']['id'] }}"
authorization = "{{ dag_run.conf['authToken'] }}".replace("Bearer ", "")
svc_token = "{{ dag_run.conf['execution_context']['id_token'] or dag_run.conf['authToken'] }}"
# Constants
DAG_NAME = "{| DAG_NAME |}"
docker_image = "{| DOCKER_IMAGE |}"
NAMESPACE = "{| NAMESPACE |}"
SEGY_CONVERTER = "segy-to-vds"
K8S_POD_KWARGS = {| K8S_POD_OPERATOR_KWARGS |}
if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {}
seismic_store_url = "{| SEISMIC_STORE_URL |}"
env_vars = {
"SD_SVC_URL": seismic_store_url,
"SD_SVC_API_KEY": "NA",
}
dag = DAG(
DAG_NAME,
default_args=default_args,
description="Airflow DAG for transformation from SEGY to OpenVDS",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
)
with dag:
update_status_running = UpdateStatusOperator(
task_id="update_status_running",
)
segy_to_vds = KubernetesPodSegyToOpenVDSOperator(
task_id=SEGY_CONVERTER,
dag=dag,
name=DAG_NAME,
env_vars=env_vars,
cmds=['SEGYImport'],
namespace=NAMESPACE,
image=docker_image,
is_delete_operator_pod=True,
trigger_rule="none_failed_or_skipped",
**K8S_POD_KWARGS
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id="process_single_manifest_file_task",
previous_task_id=segy_to_vds.task_id,
trigger_rule="none_failed_or_skipped"
)
update_status_finished = UpdateStatusOperator(
task_id="update_status_finished",
trigger_rule="all_done"
)
update_status_running >> segy_to_vds >> process_single_manifest_file >> update_status_finished # pylint: disable=pointless-statement
FROM mcr.microsoft.com/mirror/docker/library/python:3.8-slim
ARG DATA_PARTITION=opendes
ENV BUILD_PACKAGES \
python3-cryptography
ENV PYTHONUNBUFFERED=1 \
PATH="/home/osdu/.local/bin:${PATH}" \
DATA_PARTITION=${DATA_PARTITION} \
DAG_DIRECTORY=../airflow/workflow-svc-v2 \
DAG_TASK_IMAGE=${DAG_TASK_IMAGE} \
AZURE_DEPLOYMENTS_SUBDIR=. \
AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR=.
USER root
RUN set -x \
echo "**** install Python ****" && \
apt-get update && \
apt-get install -y -o APT::Keep-Downloaded-Packages=false ${BUILD_PACKAGES} &&\
rm -rf /var/cache/apt/archives/*
# Install Requirements
ADD deployments/scripts/azure/requirements.txt ./requirements.txt
RUN pip install -r requirements.txt
# Add debian repo to install busybox package
RUN \
echo '@debian http://deb.debian.org/debian' >> /etc/apt/repositories && \
echo '@debian http://deb.debian.org/debian-security' >> /etc/apt/repositories
RUN \
apt-get update && \
apt-get install busybox -y
# Add Scripts
WORKDIR /home/osdu
ADD deployments/scripts/azure/*.py scripts/
ADD deployments/scripts/azure/bootstrap.sh scripts/
RUN chmod +x scripts/bootstrap.sh
# Add DAGs
ADD airflow airflow
# Execute as non root user.
RUN addgroup --system osdu && adduser --system osdu --ingroup osdu
USER osdu
WORKDIR /home/osdu/scripts
ENTRYPOINT ["/home/osdu/scripts/bootstrap.sh"]
\ No newline at end of file
## DAG Loading
This is a manual process to load a DAG.
```bash
# Setup Variables
UNIQUE="<your_osdu_unique>" # ie: demo
AZURE_DNS_NAME="<your_osdu_fqdn>" # ie: osdu-$UNIQUE.contoso.com
DATA_PARTITION="<your_partition>" # ie:opendes
ACR_REGISTRY="<your_acr_fqdn>" # ie: msosdu.azurecr.io
DAG_TASK_IMAGE="$ACR_REGISTRY/vds_dag"
DAG_LOAD_IMAGE="$ACR_REGISTRY/segy-to-vds-conversion-dag"
TAG="latest"
# This logs your local Azure CLI in using the configured service principal.
az login --service-principal -u $ARM_CLIENT_ID -p $ARM_CLIENT_SECRET --tenant $ARM_TENANT_ID
GROUP=$(az group list --query "[?contains(name, 'cr${UNIQUE}')].name" -otsv)
ENV_VAULT=$(az keyvault list --resource-group $GROUP --query [].name -otsv)
cat > .env << EOF
DAG_TASK_IMAGE=$DAG_TASK_IMAGE:$TAG
SHARED_TENANT=$DATA_PARTITION
AZURE_TENANT_ID=$AZURE_TENANT_ID
AZURE_DNS_NAME=$AZURE_DNS_NAME
AZURE_AD_APP_RESOURCE_ID=$(az keyvault secret show --id https://${ENV_VAULT}.vault.azure.net/secrets/aad-client-id --query value -otsv)
AZURE_CLIENT_ID=$(az keyvault secret show --id https://${ENV_VAULT}.vault.azure.net/secrets/app-dev-sp-username --query value -otsv)
AZURE_CLIENT_SECRET=$(az keyvault secret show --id https://${ENV_VAULT}.vault.azure.net/secrets/app-dev-sp-password --query value -otsv)
EOF
(cd ../../.. && docker build -f deployments/scripts/azure/Dockerfile -t $DAG_LOAD_IMAGE:$TAG .)
docker run --env-file .env $DAG_LOAD_IMAGE:$TAG
```
import os
import msal
class AzureToken(object):
def get_azure_id_token(self):
tenant_id = os.getenv('AZURE_TENANT_ID')
resource_id = os.getenv('AZURE_AD_APP_RESOURCE_ID')
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
if tenant_id is None or tenant_id == "" :
print('Please pass tenant Id to generate token')
exit(1)
if resource_id is None or resource_id == "":
print('Please pass resource Id to generate token')
exit(1)
if client_id is None or client_id == "":
print('Please pass client Id to generate token')
exit(1)
if client_secret is None or client_id == "":
print('Please pass client secret to generate token')
exit(1)
try:
authority_host_uri = 'https://login.microsoftonline.com'
authority_uri = authority_host_uri + '/' + tenant_id
scopes = [resource_id + '/.default']
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority_uri, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scopes)
token = 'Bearer ' + result.get('access_token')
print(token)
return token
except Exception as e:
print(e)
if __name__ == '__main__':
AzureToken().get_azure_id_token()
#!/usr/bin/env sh
#
# Purpose: Initialize the DAGS.
# Usage:
# bootstrap.sh
###############################
## ARGUMENT INPUT ##
###############################
usage() { echo "Usage: bootstrap.sh"; }
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- START";
echo "------------------------------------";
export NAMESPACE=airflow
if [ -z $AZURE_DNS_NAME ]; then
echo 'ERROR: AZURE_DNS_NAME not provided'
usage;
else
export AZURE_DNS_NAME=$AZURE_DNS_NAME
export WORKFLOW_URL=https://${AZURE_DNS_NAME}/api/workflow/v1/workflow
fi
if [ -z $DATA_PARTITION ]; then
SHARED_TENANT="opendes"
else
SHARED_TENANT=$DATA_PARTITION
fi
export SHARED_TENANT
if [ -z $DAG_TASK_IMAGE ]; then
DAG_TASK_IMAGE=${AZURE_REGISTRY}.azurecr.io/${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:latest
fi
export DAG_TASK_IMAGE #actual conversion
#registeration: DAG_LOAD_IMAGE
echo "AZURE_DNS_NAME: $AZURE_DNS_NAME"
echo "WORKFLOW_URL: $WORKFLOW_URL"
echo "SHARED_TENANT: $SHARED_TENANT"
echo "DAG_TASK_IMAGE: $DAG_TASK_IMAGE"
printf "\n"
echo "Execute Python Script Token.py";
echo "------------------------------------";
BEARER_TOKEN=$(python $AZURE_DEPLOYMENTS_SUBDIR/Token.py)
export BEARER_TOKEN
echo "$(echo $BEARER_TOKEN | cut -c -20)***********"
printf "\n"
echo "Execute Python Script register_dag.py";
echo "------------------------------------";
python $AZURE_DEPLOYMENTS_SUBDIR/register_dag.py -f $DAG_DIRECTORY/openvds_azure.py
if test $? = '1'; then
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- ERROR";
echo "------------------------------------";
exit 1
else
printf "\n"
echo "------------------------------------";
echo "BOOTSTRAP -- FINISH";
echo "------------------------------------";
exit 0
fi
FROM mcr.microsoft.com/mirror/docker/library/python:3.8-slim FROM mcr.microsoft.com/mirror/docker/library/python:3.8-slim
ARG DATA_PARTITION=opendes
ENV BUILD_PACKAGES \ ENV BUILD_PACKAGES \
python3-cryptography python3-cryptography
ENV PYTHONUNBUFFERED=1 \ ENV PYTHONUNBUFFERED=1 \
PATH="/home/osdu/.local/bin:${PATH}" \ PATH="/home/osdu/.local/bin:${PATH}" \
DATA_PARTITION=${DATA_PARTITION} \ DAG_DIRECTORY=/home/osdu/airflow \
DAG_DIRECTORY=../airflow/workflow-svc-v2 \ NAMESPACE=${NAMESPACE} \
DAG_TASK_IMAGE=${DAG_TASK_IMAGE} \ BUILD_VERSION=${BUILD_VERSION} \
AZURE_DEPLOYMENTS_SUBDIR=. \ AZURE_DEPLOYMENTS_SUBDIR=. \
AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR=. AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR=.
...@@ -37,15 +35,22 @@ RUN \ ...@@ -37,15 +35,22 @@ RUN \
# Add Scripts # Add Scripts
WORKDIR /home/osdu WORKDIR /home/osdu
ADD deployments/scripts/azure/*.py scripts/ ADD deployments/scripts/azure/*.py scripts/
ADD deployments/scripts/azure/bootstrap.sh scripts/ ADD deployments/scripts/azure/prepare_dags.sh scripts/
RUN chmod +x scripts/bootstrap.sh RUN chmod +x scripts/prepare_dags.sh
# Add DAGs # Add DAGs
ADD airflow airflow ADD src/dags airflow
# Add output_dags folder
RUN mkdir -p /home/osdu/output_dags
# Execute as non root user. # Execute as non root user.
RUN addgroup --system osdu && adduser --system osdu --ingroup osdu RUN addgroup --system osdu && adduser --system osdu --ingroup osdu
RUN chgrp osdu /home/osdu/output_dags
RUN chmod g+w /home/osdu/output_dags
USER osdu USER osdu
WORKDIR /home/osdu/scripts WORKDIR /home/osdu
ENTRYPOINT ["/home/osdu/scripts/bootstrap.sh"] ENTRYPOINT ["scripts/prepare_dags.sh"]
\ No newline at end of file
import argparse
import json import json
import os import os
import argparse
import shutil import shutil
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
class CreateDag: class CreateDag:
def __init__(self, file_path): def __init__(self, file_path):
self.dag_folder_path = os.environ.get('VDS_DAG_FOLDER_PATH', '../') self.dag_folder_path = os.environ.get(
self.docker_tag = os.environ.get("DOCKER_TAG") 'VDS_DAG_FOLDER_PATH', '/home/osdu/airflow')
self.dns_host_name = "{{ var.value.azure_dns_host }}" self.dns_host_name = "{{ var.value.azure_dns_host }}"
self.namespace = os.environ.get('NAMESPACE') self.namespace = os.environ.get('NAMESPACE')
self.file_path = file_path self.file_path = file_path
self.service_name = "segy-to-vds-conversion" self.service_name = "segy-to-vds-conversion"
self.build_version = os.environ.get('BUILD_VERSION', 'latest') self.build_version = os.environ.get('BUILD_VERSION', 'latest')
self.dag_name = f"{self.service_name}-{self.build_version}" self.dag_name = f"{self.service_name}-{self.build_version}"
self.dag_name = self.dag_name[:50] self.dag_name = self.dag_name[:50]
self.env_vars = { self.env_vars = {}
"SD_SVC_URL": f"https://{self.dns_host_name}/seistore-svc/api/v3",
"SD_SVC_TOKEN": "TOKEN_PLACEHOLDER", self.enable_azure_aad_vars = os.environ.get(
"SD_SVC_API_KEY": "NA", 'AZURE_AAD_VARIABLES', 'true')
} if self.enable_azure_aad_vars == 'true':
self.env_vars["AZURE_TENANT_ID"] = "{{ var.value.azure_tenant_id }}"
self.enable_azure_aad_vars = os.environ.get('AZURE_AAD_VARIABLES', 'true') self.env_vars["AZURE_CLIENT_ID"] = "{{ var.value.azure_client_id }}"
if self.enable_azure_aad_vars == 'true': self.env_vars["AZURE_CLIENT_SECRET"] = "{{ var.value.azure_client_secret }}"
self.env_vars["AZURE_TENANT_ID"] = "{{ var.value.azure_tenant_id }}" self.env_vars["aad_client_id"] = "{{ var.value.aad_client_id }}"
self.env_vars["AZURE_CLIENT_ID"] = "{{ var.value.azure_client_id }}"
self.env_vars["AZURE_CLIENT_SECRET"] = "{{ var.value.azure_client_secret }}" self.kubernetes_pod_operator_options = {
self.env_vars["aad_client_id"] = "{{ var.value.aad_client_id }}" "labels": {
"aadpodidbinding": "osdu-identity"
},
self.kubernetes_pod_operator_options = { "annotations": {
"labels": { "sidecar.istio.io/inject": "false"
"aadpodidbinding": "osdu-identity" }
},
"annotations": {
"sidecar.istio.io/inject": "false"
} }
}
print(f"file_path: {self.file_path}")
print(f"file_path: {self.file_path}") print(f"service_name: {self.service_name}")
print(f"openvds_azure.py exists?: {os.path.exists(self.file_path)}") print(f"build_version: {self.build_version}")
print(f"service_name: {self.service_name}") print(f"dag_name: {self.dag_name}")
print(f"build_version: {self.build_version}") print(f"folder: {self.dag_folder_path}")
print(f"dag_name: {self.dag_name}")
print(f"folder: {self.dag_folder_path}") def _render_template(self, file) -> str:
print(f"docker_tag: {self.docker_tag}") env = Environment(
loader=FileSystemLoader(searchpath=os.getenv(
def _render_template(self, file) -> str: "VDS_DAG_FOLDER_PATH", default="/home/osdu/airflow")),
env = Environment( variable_start_string='{|', variable_end_string='|}'
loader=FileSystemLoader(searchpath=os.getenv("VDS_DAG_FOLDER_PATH", default="airflow/workflow-svc-v2")), )
variable_start_string='{|', variable_end_string='|}' template = env.get_template(os.path.basename(file.name))
)
template = env.get_template(os.path.basename(file.name)) params = {
"DAG_NAME": self.dag_name,
params = { "DOCKER_IMAGE": "{{ var.value.openvds_ingestion_dag }}",
"DAG_NAME": self.dag_name, "K8S_NAMESPACE": self.namespace,
"DOCKER_IMAGE": "{{ var.value.dag_image_acr }}/segy-to-vds-conversion:latest", "K8S_POD_KWARGS": json.dumps(self.kubernetes_pod_operator_options, indent=2),
"DNS_HOST": self.dns_host_name, "EXTRA_ENV_VARS": json.dumps(self.env_vars, indent=4),
"NAMESPACE": self.namespace, "SEISMIC_STORE_URL": f"https://{self.dns_host_name}/seistore-svc/api/v3",
"K8S_POD_OPERATOR_KWARGS": json.dumps(self.kubernetes_pod_operator_options, indent=2) }
}
return template.render(**params)
return template.render(**params)
def create(self):
def create(self): self.read_dag_and_replace_placeholders()
self.read_dag_and_replace_placeholders()
def read_dag_and_replace_placeholders(self):
def read_dag_and_replace_placeholders(self): with open(self.file_path, 'r', encoding='utf-8') as f:
with open(self.file_path, 'r', encoding='utf-8') as f: data = self._render_template(f)
data = self._render_template(f) current_directory = os.getcwd()
current_directory = os.getcwd() final_directory = os.path.join(
final_directory = os.path.join(current_directory, r'output_dags/temp_folder') current_directory, r'output_dags/temp_folder')
if not os.path.exists(final_directory): if not os.path.exists(final_directory):
os.makedirs(final_directory) os.makedirs(final_directory)
new_file= open("./output_dags/temp_folder/" + self.dag_name + ".py", "w") new_file = open("./output_dags/temp_folder/" +
new_file.write(data) self.dag_name + ".py", "w")
new_file.close() new_file.write(data)
self.create_workflow_request_body() new_file.close()
self.copy_vds_converter_folder() self.create_workflow_request_body()
self.prepare_zip_folder() self.copy_vds_converter_folder()
self.prepare_zip_folder()
def prepare_zip_folder(self):
shutil.make_archive('./output_dags/dags/vds_converter', 'zip', './output_dags/temp_folder') def prepare_zip_folder(self):
shutil.rmtree('./output_dags/temp_folder') shutil.make_archive('./output_dags/dags/vds_converter',
'zip', './output_dags/temp_folder')
def copy_vds_converter_folder(self): shutil.rmtree('./output_dags/temp_folder')
shutil.copytree(self.dag_folder_path, './output_dags/temp_folder/osdu_vds_converter')
def copy_vds_converter_folder(self):
def create_workflow_request_body(self): shutil.copytree(self.dag_folder_path,
file_name = os.environ.get("WORKFLOW_REQUEST_BODY_FILE", "workflow_request_body.json") './output_dags/temp_folder/osdu_vds_converter')
new_file= open("./output_dags/" + file_name, "w")
data = '[{"workflowName":"DAG_NAME_PLACEHOLDER","description":"VDSConverter","registrationInstructions":{"dagName":"DAG_NAME_PLACEHOLDER"}}]' def create_workflow_request_body(self):
data = data.replace("DAG_NAME_PLACEHOLDER", self.dag_name) file_name = os.environ.get(
new_file.write(data) "WORKFLOW_REQUEST_BODY_FILE", "workflow_request_body.json")
new_file.close() new_file = open("./output_dags/" + file_name, "w")
data = '[{"workflowName":"DAG_NAME_PLACEHOLDER","description":"VDSConverter","registrationInstructions":{"dagName":"DAG_NAME_PLACEHOLDER"}}]'
data = data.replace("DAG_NAME_PLACEHOLDER", self.dag_name)
new_file.write(data)
new_file.close()
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Register python dag file to the workflow service.") parser = argparse.ArgumentParser(
description="Register python dag file to the workflow service.")
parser.add_argument('-f', type=str, parser.add_argument('-f', type=str,
help='The path to the load python dag file.', help='The path to the load python dag file.',
default=None) default=None)
arguments = parser.parse_args() arguments = parser.parse_args()
if arguments.f is not None: if arguments.f is not None:
file_path = arguments.f file_path = arguments.f
CreateDag(file_path).create() CreateDag(file_path).create()
#!/usr/bin/env sh
#
# Purpose: Create Dags
# Usage:
# prepare_dags.sh
###############################
## ARGUMENT INPUT ##
###############################
usage() { echo "Usage: prepare_dags.sh"; }
printf "\n"
echo "------------------------------------";
echo "Creating Dags -- START ";
echo "------------------------------------";
if [ -z $NAMESPACE ]; then
NAMESPACE=airflow
fi
export NAMESPACE
if [ -z $BUILD_VERSION ]; then
BUILD_VERSION=$(echo ${CI_COMMIT_SHA} | cut -c -5)
fi
export BUILD_VERSION
echo "NAMESPACE: $NAMESPACE"
echo "BUILD_VERSION: $BUILD_VERSION"
printf "\n"
echo "Execute Python Script output_dag_folder.py";
echo "------------------------------------";
python $AZURE_DEPLOYMENTS_SUBDIR/scripts/output_dag_folder.py -f $DAG_DIRECTORY/segy_to_vds_ssdms_conversion_dag.py
msal markupsafe==2.0.1
Jinja2==2.10.1 Jinja2==2.10.1
markupsafe==2.0.1
\ No newline at end of file
--- ---
.azure_variables: .azure_variables:
variables: variables:
...@@ -9,256 +8,49 @@ ...@@ -9,256 +8,49 @@
DATA_PARTITION_ID: opendes DATA_PARTITION_ID: opendes
SHARED_TENANT: opendes SHARED_TENANT: opendes
WORKFLOW_URL: "https://${AZURE_DNS_NAME}/api/workflow/v1/" WORKFLOW_URL: "https://${AZURE_DNS_NAME}/api/workflow/v1/"
azure_build_dag:
artifacts:
expire_in: "2 days"
paths:
- output_dags
before_script:
- "az login --service-principal -u $AZURE_PRINCIPAL_ID -p $AZURE_PRINCIPAL_SECRET --tenant $AZURE_TENANT_ID"
- "az aks get-credentials -g $AZURE_UNIQUE-rg -n $AZURE_UNIQUE-aks"
- "docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY"
extends:
- .azure_variables
image: danielscholl/azure-build-image
only:
variables:
- "$AZURE == '1'"
script:
- |
cat > .env << EOF
DAG_TASK_IMAGE=${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
SHARED_TENANT=$SHARED_TENANT
AZURE_TENANT_ID=$AZURE_TENANT_ID
AZURE_DNS_NAME=$AZURE_DNS_NAME
AZURE_AD_APP_RESOURCE_ID=$AZURE_AD_APP_RESOURCE_ID
AZURE_CLIENT_ID=$AZURE_CLIENT_ID
AZURE_CLIENT_SECRET=$AZURE_CLIENT_SECRET
DOCKER_TAG=$CI_COMMIT_REF_SLUG
BUILD_VERSION=$CI_COMMIT_SHA
EOF
docker build -t $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE --file deployments/scripts/azure/DAG.Dockerfile .
docker build -t $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE --file deployments/scripts/azure/Dockerfile .
docker push $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE
docker push $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
az acr login -n $AZURE_REGISTRY
docker tag $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker tag $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
if [ "$(docker ps -a | grep docker_generate_dags)" ]; then
docker stop docker_generate_dags
docker rm docker_generate_dags
fi
docker run --name "docker_generate_dags" --env-file .env $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
stage: containerize
tags:
- osdu-medium
variables:
DAG_LOAD_IMAGE: "${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
DAG_TASK_IMAGE: "${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
azure_register_dag:
artifacts:
expire_in: "2 days"
paths:
- output_dags
extends:
- .azure_variables
image: "python:3.8"
needs:
- azure_build_dag
only:
variables:
- "$AZURE == '1'"
script:
- |
cat > requirements.txt << EOF
msal
Jinja2==2.10.1
markupsafe==2.0.1
EOF
- |
export VDS_DAG_FOLDER_PATH="airflow/workflow-svc-v2"
export DOCKER_TAG=${CI_COMMIT_REF_SLUG}
export BUILD_VERSION=$(echo ${CI_COMMIT_SHA} | cut -c -5)
- |
# Python script for generating the Bearer Token
cat > Token.py << EOF
import os
import msal
class AzureToken(object):
def get_azure_id_token(self):
tenant_id = os.getenv('AZURE_TENANT_ID')
resource_id = os.getenv('AZURE_AD_APP_RESOURCE_ID')
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
if tenant_id is None:
print('Please pass tenant Id to generate token')
exit(1)
if resource_id is None:
print('Please pass resource Id to generate token')
exit(1)
if client_id is None:
print('Please pass client Id to generate token')
exit(1)
if client_secret is None:
print('Please pass client secret to generate token')
exit(1)
try:
authority_host_uri = 'https://login.microsoftonline.com'
authority_uri = authority_host_uri + '/' + tenant_id
scope = [resource_id + '/.default']
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority_uri, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scope)
token = 'Bearer ' + result.get('access_token')
print(token)
return token
except Exception as e:
print(e)
if __name__ == '__main__':
AzureToken().get_azure_id_token()
EOF
- |
# Python script for registering the DAG by calling Workflow service API
cat > registration_dag.py << EOF
import json
import requests
import os
class RegisterDag:
def __init__(self):
self.token = os.environ.get('BEARER_TOKEN')
self.data_partition_id = os.environ.get('SHARED_TENANT')
self.workflow_service_url = os.environ.get('WORKFLOW_URL') + "workflow"
def register(self):
self.register_dag()
def register_dag(self):
# To register DAG workflow name must match pattern ^[a-zA-Z0-9._-]{1,64}$
dag_name = '$DAG_NAME'
formatted_dag_name = dag_name[:63]
data = json.dumps( azure_standalone_tests:
{ stage: test
"workflowName": formatted_dag_name, needs: []
"description": formatted_dag_name, variables:
"registrationInstructions": AZURE_SKIP_STANDALONE_TESTS: "true"
{
"dagName": formatted_dag_name
}
})
headers = { azure_build_dag:
'Content-Type': 'application/json', needs: []
'Authorization': self.token, before_script:
'data-partition-id': self.data_partition_id
}
print(f"Attempting to register DAG with name {formatted_dag_name}")
response = requests.post(self.workflow_service_url, headers=headers, data=data)
if response.status_code == 200:
workflow_id = response.json().get('workflowId')
print("DAG registered with workflowId: {0}".format(workflow_id))
elif response.status_code == 409:
workflow_id = response.json().get('workflowId')
print("DAG is already registered with workflowId: {0}".format(workflow_id))
else:
print(f"Error while registering DAG due to: {response.json().get('message')}")
print("Error: {0}".format(response.raise_for_status()))
if __name__ == "__main__":
RegisterDag().register()
EOF
- "pip install -r ./requirements.txt"
- "export BEARER_TOKEN=$(python ./Token.py)"
- "echo \"$(echo $BEARER_TOKEN | cut -c -20)***********\""
- "python ./registration_dag.py"
- "python deployments/scripts/azure/output_dag_folder.py -f airflow/workflow-svc-v2/openvds_azure.py"
- "cd output_dags/dags"
stage: deploy
tags:
- osdu-medium
variables:
DAG_NAME: "${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}"
azure_copy_dag:
artifacts:
expire_in: "2 days"
paths:
- output_dags
before_script:
- "az login --service-principal -u $AZURE_PRINCIPAL_ID -p $AZURE_PRINCIPAL_SECRET --tenant $AZURE_TENANT_ID"
- "az aks get-credentials -g $AZURE_UNIQUE-rg -n $AZURE_UNIQUE-aks"
extends:
- .azure_variables
image: danielscholl/azure-build-image
needs:
- azure_register_dag
only:
variables:
- "$AZURE == '1'"
script:
- | - |
# Installing the Azcopy utility # Generating environment file to be passed while running the docker container
cat > .env << EOF
apk add --update coreutils && rm -rf /var/cache/apk/* CI_REGISTRY_USER=${CI_REGISTRY_USER}
mkdir -p tmp CI_REGISTRY_PASSWORD=${CI_REGISTRY_PASSWORD}
cd tmp CI_REGISTRY=${CI_REGISTRY}
wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux && tar -xf azcopy_v10.tar.gz --strip-components=1 AZURE_DNS_NAME=$AZURE_DNS_NAME
cp ./azcopy /usr/bin/ AZURE_REGISTRY=${AZURE_REGISTRY}
cd .. AZURE_PRINCIPAL_ID=${AZURE_PRINCIPAL_ID}
- "EXPIRE=$(date -u -d \"59 minutes\" '+%Y-%m-%dT%H:%M:%SZ')" AZURE_PRINCIPAL_SECRET=${AZURE_PRINCIPAL_SECRET}
- "START=$(date -u -d \"-1 minute\" '+%Y-%m-%dT%H:%M:%SZ')" AZURE_TENANT_ID=${AZURE_TENANT_ID}
- "accountKey=$(kubectl get secret airflow -n osdu -o jsonpath='{.data.azurestorageaccountkey}' | base64 -d)" CI_PROJECT_NAME=${CI_PROJECT_NAME}
- "accountName=$(kubectl get secret airflow -n osdu -o jsonpath='{.data.azurestorageaccountname}' | base64 -d)" CI_COMMIT_REF_SLUG=${CI_COMMIT_REF_SLUG}
- "AZURE_STORAGE_SAS_TOKEN=$(az storage account generate-sas --account-name $accountName --account-key $accountKey --start $START --expiry $EXPIRE --https-only --resource-types sco --services f --permissions cwdlur -o tsv)" CI_COMMIT_SHA=${CI_COMMIT_SHA}
- "cd output_dags/dags" CI_REGISTRY_IMAGE=${CI_REGISTRY_IMAGE}
AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR=${AZURE_DEPLOYMENTS_SCRIPTS_SUBDIR}
DAG_TASK_IMAGE=${DAG_TASK_IMAGE}
EOF
azure_dag_postman_tests:
before_script:
- | - |
if [ "$AZURE_DEPLOY_PACKAGED_DAG" == "true" ]; then # Generating environment file to be passed while running the postman tests
echo "Packaged Dags are enabled" cat > python_env_vars.txt << EOF
if [ -d "./dags/" ]; then TENANT_ID os.environ.get('AZURE_TENANT_ID')
echo "Uploading to: ${accountName}" CLIENT_ID os.environ.get('CLIENT_ID')
# Copying the zipped dag inside the dags folder CLIENT_SECRET os.environ.get('CLIENT_SECRET')
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}" HOSTNAME os.environ.get('AZURE_DNS_NAME')
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}" refresh_token os.environ.get('AZURE_REFRESH_TOKEN')
fi WORKFLOW_HOST os.environ.get('AZURE_DNS_NAME')
else segy_to_vds_ssdms_DAG_name os.environ.get('DAG_NAME')
echo "Packaged Dags are disabled" EOF
if [ -d "./dags/" ]; then - |
# Copying all the contents inside the dags folder export BUILD_VERSION=$(echo ${CI_COMMIT_SHA} | cut -c -5)
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true export DAG_NAME="segy-to-vds-conversion-${BUILD_VERSION}"
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true
cd dags
# Now syncing only the folders which are part of source to remove the deleted files
for directory in *; do
if [ -d "$directory" ]; then
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflowdags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflow2dags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
fi
done
cd ..
fi
fi
stage: bootstrap
tags:
- osdu-medium
variables:
AZURE_DEPLOY_PACKAGED_DAG: "true"
\ No newline at end of file
import os
import msal
import logging
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
def get_id_token():
tenant_id = os.getenv('AZURE_TENANT_ID')
resource_id = os.getenv('AZURE_AD_APP_RESOURCE_ID')
client_id = os.getenv('INTEGRATION_TESTER')
client_secret = os.getenv('AZURE_TESTER_SERVICEPRINCIPAL_SECRET')
authority_host_uri = 'https://login.microsoftonline.com'
authority_uri = authority_host_uri + '/' + tenant_id
scopes = [resource_id + '/.default']
try:
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority_uri, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scopes)
print(result.get('access_token'))
return result.get('access_token')
except Exception as e:
print(e)
def get_invalid_token():
'''
This is dummy jwt
{
"sub": "dummy@dummy.com",
"iss": "dummy@dummy.com",
"aud": "dummy.dummy.com",
"iat": 1556137273,
"exp": 1556223673,
"provider": "dummy.com",
"client": "dummy.com",
"userid": "dummytester.com",
"email": "dummytester.com",
"authz": "",
"lastname": "dummy",
"firstname": "dummy",
"country": "",
"company": "",
"jobtitle": "",
"subid": "dummyid",
"idp": "dummy",
"hd": "dummy.com",
"desid": "dummyid",
"contact_email": "dummy@dummy.com"
}
'''
return "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJkdW1teUBkdW1teS5jb20iLCJpc3MiOiJkdW1teUBkdW1teS5jb20iLCJhdWQiOiJkdW1teS5kdW1teS5jb20iLCJpYXQiOjE1NTYxMzcyNzMsImV4cCI6MTU1NjIzMDk3OSwicHJvdmlkZXIiOiJkdW1teS5jb20iLCJjbGllbnQiOiJkdW1teS5jb20iLCJ1c2VyaWQiOiJkdW1teXRlc3Rlci5jb20iLCJlbWFpbCI6ImR1bW15dGVzdGVyLmNvbSIsImF1dGh6IjoiIiwibGFzdG5hbWUiOiJkdW1teSIsImZpcnN0bmFtZSI6ImR1bW15IiwiY291bnRyeSI6IiIsImNvbXBhbnkiOiIiLCJqb2J0aXRsZSI6IiIsInN1YmlkIjoiZHVtbXlpZCIsImlkcCI6ImR1bW15IiwiaGQiOiJkdW1teS5jb20iLCJkZXNpZCI6ImR1bW15aWQiLCJjb250YWN0X2VtYWlsIjoiZHVtbXlAZHVtbXkuY29tIiwianRpIjoiNGEyMWYyYzItZjU5Yy00NWZhLTk0MTAtNDNkNDdhMTg4ODgwIn0.nkiyKtfXXxAlC60iDjXuB2EAGDfZiVglP-CyU1T4etc"
if __name__ == '__main__':
get_id_token()
\ No newline at end of file
# Copyright © Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import timedelta
from json import dumps
from osdu_airflow.backward_compatibility.default_args import \
update_default_args
import airflow
from airflow import DAG
from airflow import configuration as conf
from airflow.contrib.operators.kubernetes_pod_operator import \
KubernetesPodOperator
# default args for airflow
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'schedule_interval': None
}
# Get values from dag run configuration
authorization = "{{ dag_run.conf['authToken'] }}".replace("Bearer ", "")
svc_token = "{{ dag_run.conf['execution_context']['id_token'] or dag_run.conf['authToken'] }}"
# Constants
DAG_INIT = "dag-init"
DAG_NAME = "{| DAG_NAME |}"
NAMESPACE = "{| NAMESPACE |}"
docker_image = "{| DOCKER_IMAGE |}"
SEGY_CONVERTER = "segy-to-vds"
K8S_POD_KWARGS = {| K8S_POD_OPERATOR_KWARGS |}
if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {}
persistent_id = "{{ dag_run.conf['execution_context']['persistent_id'] }}"
vds_url = "{{ dag_run.conf['execution_context']['vds_url'] }}"
segy_url = "{{ dag_run.conf['execution_context']['segy_url'] }}"
# Get environment variables
# AWS implementation so far assumed the API base url is in coming from OS level. It can be updated to use Airflow Variables
#{{ var.value.api_base_url}} and specify the variable in Airflow UI
dag = DAG(DAG_NAME, default_args=default_args)
kube_config_path = os.path.join(os.environ['AIRFLOW_HOME'], 'kube_config.yaml')
env_vars = {
"SD_SVC_URL": "https://{{ var.value.azure_dns_host }}/seistore-svc/api/v3",
"SD_SVC_TOKEN": svc_token,
"SD_SVC_API_KEY": "NA",
}
svc_connection_string = f'sdauthorityurl={env_vars["SD_SVC_URL"]};sdapikey={env_vars["SD_SVC_API_KEY"]};sdtoken={svc_token}'
segy_to_vds = KubernetesPodOperator(
task_id=SEGY_CONVERTER,
dag=dag,
name=DAG_NAME,
env_vars=env_vars,
cmds=['SEGYImport'],
arguments=[
f'--persistentID', f'{persistent_id}',
f'--url', f'{vds_url}',
f'--url-connection', f'{svc_connection_string}',
f'--input-connection', f'{svc_connection_string}',
f"{segy_url}"
],
namespace=NAMESPACE,
image=docker_image,
is_delete_operator_pod=True,
**K8S_POD_KWARGS
)
segy_to_vds
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment