Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
Segy to Zgy Conversion
Commits
fc2b3592
Commit
fc2b3592
authored
Feb 16, 2022
by
Sumra Zafar
Browse files
Update
parent
9df64ac1
Pipeline
#109915
failed with stages
in 8 minutes and 52 seconds
Changes
4
Pipelines
4
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
fc2b3592
...
...
@@ -44,7 +44,7 @@ include:
ref
:
"
master"
file
:
"
cloud-providers/osdu-gcp-dag.yaml"
-
local
:
"
/devops/azure/override-stages
_bk
.yml"
-
local
:
"
/devops/azure/override-stages.yml"
-
local
:
"
/devops/gcp/override-stages.yml"
...
...
airflow/workflow-svc-v2/segy_to_zgy_ingestion_dag.py
View file @
fc2b3592
...
...
@@ -25,14 +25,10 @@ default_args = update_default_args(default_args)
# Get values from dag run configuration
authorization
=
"{{ dag_run.conf['authToken'] }}"
sd_svc_token
=
"{{ dag_run.conf['execution_context']['id_token'] or dag_run.conf['authToken'] }}"
dataPartitionId
=
"{{ dag_run.conf['execution_context']['data_partition_id'] }}"
run_id
=
"{{ dag_run.conf['runId'] }}"
sd_svc_api_key
=
"{{ dag_run.conf['execution_context']['sd_svc_api_key'] }}"
storage_svc_api_key
=
"{{ dag_run.conf['execution_context']['storage_svc_api_key'] }}"
svc_token
=
"{{ dag_run.conf['execution_context']['id_token'] or dag_run.conf['authToken'] }}"
filecollection_segy_id
=
"{{ dag_run.conf['execution_context']['filecollection_segy_id'] }}"
work_product_id
=
"{{ dag_run.conf['execution_context']['work_product_id'] }}"
dataPartitionId
=
"{{ dag_run.conf['execution_context']['data_partition_id'] }}"
# Constants
DAG_NAME
=
"{| DAG_NAME |}"
# Replace it by the workflowName passed on workflow service API.
...
...
@@ -40,27 +36,22 @@ DOCKER_IMAGE = "{| DOCKER_IMAGE |}" # Replace it by the segy-to-zgy container
SEGY_CONVERTER
=
"segy-to-zgy"
DAG_INIT
=
"dag-init"
NAMESPACE
=
"{| NAMESPACE |}"
K8S_POD_KWARGS
=
{
|
K8S_POD_KWARGS
|
}
K8S_POD_KWARGS
=
{
|
K8S_POD_
OPERATOR_
KWARGS
|
}
if
not
K8S_POD_KWARGS
:
K8S_POD_KWARGS
=
{}
# Values to pass to csv parser
params
=
[
"--osdu"
,
filecollection_segy_id
,
work_product_id
]
# Get environment variables
env_vars
=
{
"STORAGE_SVC_URL"
:
"{| STORAGE_SVC_URL |}"
,
"SD_SVC_URL"
:
"{| SD_SVC_URL |}"
,
"SD_SVC_TOKEN"
:
sd_svc_token
,
"STORAGE_SVC_TOKEN"
:
authorization
,
"STORAGE_SVC_API_KEY"
:
storage_svc_api_key
,
"SD_SVC_API_KEY"
:
sd_svc_api_key
,
"OSDU_DATAPARTITIONID"
:
dataPartitionId
,
"SD_READ_CACHE_PAGE_SIZE"
:
"4195024"
,
"SD_READ_CACHE_MAX_PAGES"
:
"256"
,
"SEGYTOZGY_VERBOSITY"
:
"3"
,
"SEGYTOZGY_GENERATE_INDEX"
:
"1"
}
# TODO: put env vars here from application.properties
env_vars
=
{
|
ENV_VARS
or
{}
|
}
env_vars
[
"OSDU_DATAPARTITIONID"
]
=
dataPartitionId
env_vars
[
"SD_SVC_TOKEN"
]
=
svc_token
env_vars
[
"STORAGE_SVC_TOKEN"
]
=
svc_token
with
DAG
(
DAG_NAME
,
default_args
=
default_args
)
as
dag
:
update_status_running
=
UpdateStatusOperator
(
...
...
devops/azure/override-stages.yml
View file @
fc2b3592
# Override the containerize stage so that we can build the dag load container as well.
azure_containerize
:
---
.azure_variables
:
variables
:
AZURE_AD_APP_RESOURCE_ID
:
$AZURE_APP_ID
AZURE_AD_TENANT_ID
:
$AZURE_TENANT_ID
AZURE_CLIENT_ID
:
$AZURE_PRINCIPAL_ID
AZURE_CLIENT_SECRET
:
$AZURE_PRINCIPAL_SECRET
DATA_PARTITION_ID
:
opendes
SHARED_TENANT
:
opendes
WORKFLOW_URL
:
"
https://${AZURE_DNS_NAME}/api/workflow/v1/"
azure_build_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
before_script
:
-
"
az
login
--service-principal
-u
$AZURE_PRINCIPAL_ID
-p
$AZURE_PRINCIPAL_SECRET
--tenant
$AZURE_TENANT_ID"
-
"
az
aks
get-credentials
-g
$AZURE_UNIQUE-rg
-n
$AZURE_UNIQUE-aks"
-
"
docker
login
-u
$CI_REGISTRY_USER
-p
$CI_REGISTRY_PASSWORD
$CI_REGISTRY"
extends
:
-
.azure_variables
image
:
danielscholl/azure-build-image
tags
:
[
'
osdu-medium'
]
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
cat > .env << EOF
DAG_TASK_IMAGE=${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
SHARED_TENANT=$SHARED_TENANT
AZURE_TENANT_ID=$AZURE_TENANT_ID
AZURE_DNS_NAME=$AZURE_DNS_NAME
AZURE_AD_APP_RESOURCE_ID=$AZURE_AD_APP_RESOURCE_ID
AZURE_CLIENT_ID=$AZURE_CLIENT_ID
AZURE_CLIENT_SECRET=$AZURE_CLIENT_SECRET
BUILD_VERSION=$CI_COMMIT_SHA
EOF
docker build -t $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE --file Dockerfile .
docker build -t $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE --file deployments/scripts/azure/Dockerfile .
docker push $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE
docker push $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
az acr login -n $AZURE_REGISTRY
docker tag $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker tag $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
if [ "$(docker ps -a | grep docker_generate_dags)" ]; then
docker stop docker_generate_dags
docker rm docker_generate_dags
fi
docker run --name "docker_generate_dags" --env-file .env $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
stage
:
containerize
variables
:
DAG_TASK_IMAGE
:
${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}
DAG_LOAD_IMAGE
:
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}
before_script
:
-
echo -n $CI_JOB_TOKEN | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
-
az --version
-
az login --service-principal -u $AZURE_PRINCIPAL_ID -p $AZURE_PRINCIPAL_SECRET --tenant $AZURE_TENANT_ID
script
:
# Build Docker Image
-
docker build -t $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE --file Dockerfile .
-
docker build -t $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE --file deployments/scripts/azure/Dockerfile .
-
docker push $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE
-
docker push $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
# Azure Container Registry
-
az acr login -n $AZURE_REGISTRY
-
docker tag $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
-
docker tag $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
-
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
-
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
only
:
variables
:
-
$AZURE == 'true'
tags
:
-
osdu-medium
variables
:
DAG_LOAD_IMAGE
:
"
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
DAG_TASK_IMAGE
:
"
${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
azure_register_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
extends
:
-
.azure_variables
image
:
"
python:3.8"
needs
:
-
azure_build_dag
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
cat > requirements.txt << EOF
msal
Jinja2==2.10.1
EOF
-
|
export ZGY_DAG_FOLDER_PATH="airflow/workflow-svc-v2"
export DOCKER_TAG=${CI_COMMIT_REF_SLUG}
export BUILD_VERSION=$(echo ${CI_COMMIT_SHA} | cut -c -5)
export NAMESPACE="airflow"
-
|
# Python script for generating the Bearer Token
cat > Token.py << EOF
import os
import msal
class AzureToken(object):
def get_azure_id_token(self):
tenant_id = os.getenv('AZURE_TENANT_ID')
resource_id = os.getenv('AZURE_AD_APP_RESOURCE_ID')
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
if tenant_id is None:
print('Please pass tenant Id to generate token')
exit(1)
if resource_id is None:
print('Please pass resource Id to generate token')
exit(1)
if client_id is None:
print('Please pass client Id to generate token')
exit(1)
if client_secret is None:
print('Please pass client secret to generate token')
exit(1)
try:
authority_host_uri = 'https://login.microsoftonline.com'
authority_uri = authority_host_uri + '/' + tenant_id
scope = [resource_id + '/.default']
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority_uri, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scope)
token = 'Bearer ' + result.get('access_token')
print(token)
return token
except Exception as e:
print(e)
if __name__ == '__main__':
AzureToken().get_azure_id_token()
EOF
-
|
# Python script for registering the DAG by calling Workflow service API
cat > registeration_dag.py << EOF
import json
import requests
import os
class RegisterDag:
def __init__(self):
self.token = os.environ.get('BEARER_TOKEN')
self.data_partition_id = os.environ.get('SHARED_TENANT')
self.workflow_service_url = os.environ.get('WORKFLOW_URL') + "workflow"
def register(self):
self.register_dag()
def register_dag(self):
data = '{"workflowName":"$DAG_NAME","description":"$DAG_NAME","registrationInstructions":{"dagName":"$DAG_NAME"}}'
headers = {
'Content-Type': 'application/json',
'Authorization': self.token,
'data-partition-id': self.data_partition_id
}
azure_deploy
:
response = requests.post(self.workflow_service_url, headers=headers, data=data)
if response.status_code == 200:
workflow_id = response.json().get('workflowId')
print("DAG registered with workflowId: {0}".format(workflow_id))
elif response.status_code == 409:
workflow_id = response.json().get('workflowId')
print("DAG is already registered with workflowId: {0}".format(workflow_id))
else:
print("Error while registering DAG {0}".format(response.raise_for_status()))
if __name__ == "__main__":
RegisterDag().register()
EOF
-
"
pip
install
-r
./requirements.txt"
-
"
export
BEARER_TOKEN=$(python
./Token.py)"
-
"
echo
\"
$(echo
$BEARER_TOKEN
|
cut
-c
-20)***********
\"
"
-
"
python
./registeration_dag.py"
-
"
python
deployments/scripts/azure/output_dag_folder.py
-f
airflow/workflow-svc-v2/segy_to_zgy_ingestion_dag.py"
-
"
cd
output_dags/dags"
stage
:
deploy
tags
:
-
osdu-medium
variables
:
DAG_NAME
:
"
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}"
azure_copy_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
before_script
:
-
"
az
login
--service-principal
-u
$AZURE_PRINCIPAL_ID
-p
$AZURE_PRINCIPAL_SECRET
--tenant
$AZURE_TENANT_ID"
-
"
az
aks
get-credentials
-g
$AZURE_UNIQUE-rg
-n
$AZURE_UNIQUE-aks"
extends
:
-
.azure_variables
image
:
danielscholl/azure-build-image
needs
:
[
"
azure_containerize"
]
tags
:
[
'
osdu-medium'
]
variables
:
DAG_TASK_IMAGE
:
${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}
DAG_LOAD_IMAGE
:
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}
before_script
:
-
az login --service-principal -u $AZURE_PRINCIPAL_ID -p $AZURE_PRINCIPAL_SECRET --tenant $AZURE_TENANT_ID
-
az aks get-credentials -g $AZURE_UNIQUE-rg -n $AZURE_UNIQUE-aks
script
:
needs
:
-
azure_register_dag
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
cat > .env << EOF
DAG_TASK_IMAGE=${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
SHARED_TENANT=$SHARED_TENANT
AZURE_TENANT_ID=$AZURE_TENANT_ID
AZURE_DNS_NAME=$AZURE_DNS_NAME
AZURE_AD_APP_RESOURCE_ID=$AZURE_AD_APP_RESOURCE_ID
AZURE_CLIENT_ID=$AZURE_CLIENT_ID
AZURE_CLIENT_SECRET=$AZURE_CLIENT_SECRET
EOF
docker run --env-file .env $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
# Installing the Azcopy utility
apk add --update coreutils && rm -rf /var/cache/apk/*
mkdir -p tmp
cd tmp
wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux && tar -xf azcopy_v10.tar.gz --strip-components=1
cp ./azcopy /usr/bin/
cd ..
-
"
EXPIRE=$(date
-u
-d
\"
59
minutes
\"
'+%Y-%m-%dT%H:%M:%SZ')"
-
"
START=$(date
-u
-d
\"
-1
minute
\"
'+%Y-%m-%dT%H:%M:%SZ')"
-
"
accountKey=$(kubectl
get
secret
airflow
-n
osdu
-o
jsonpath='{.data.azurestorageaccountkey}'
|
base64
-d)"
-
"
accountName=$(kubectl
get
secret
airflow
-n
osdu
-o
jsonpath='{.data.azurestorageaccountname}'
|
base64
-d)"
-
"
AZURE_STORAGE_SAS_TOKEN=$(az
storage
account
generate-sas
--account-name
$accountName
--account-key
$accountKey
--start
$START
--expiry
$EXPIRE
--https-only
--resource-types
sco
--services
f
--permissions
cwdlur
-o
tsv)"
-
"
cd
output_dags"
-
|
if [ "$AZURE_DEPLOY_PACKAGED_DAG" == "true" ]; then
echo "Packaged Dags are enabled"
if [ -d "./dags/" ]; then
echo "Uploading to: ${accountName}"
# Copying the zipped dag inside the dags folder
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}"
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}"
fi
else
echo "Packaged Dags are disabled"
if [ -d "./dags/" ]; then
# Copying all the contents inside the dags folder
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true
cd dags
only
:
variables
:
-
$AZURE == 'true'
# Now syncing only the folders which are part of source to remove the deleted files
for directory in *; do
if [ -d "$directory" ]; then
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflowdags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflow2dags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
fi
done
cd ..
fi
fi
stage
:
bootstrap
tags
:
-
osdu-medium
variables
:
AZURE_DEPLOY_PACKAGED_DAG
:
"
true"
\ No newline at end of file
devops/azure/override-stages_bk.yml
deleted
100644 → 0
View file @
9df64ac1
---
.azure_variables
:
variables
:
AZURE_AD_APP_RESOURCE_ID
:
$AZURE_APP_ID
AZURE_AD_TENANT_ID
:
$AZURE_TENANT_ID
AZURE_CLIENT_ID
:
$AZURE_PRINCIPAL_ID
AZURE_CLIENT_SECRET
:
$AZURE_PRINCIPAL_SECRET
DATA_PARTITION_ID
:
opendes
SHARED_TENANT
:
opendes
WORKFLOW_URL
:
"
https://${AZURE_DNS_NAME}/api/workflow/v1/"
azure_build_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
before_script
:
-
"
az
login
--service-principal
-u
$AZURE_PRINCIPAL_ID
-p
$AZURE_PRINCIPAL_SECRET
--tenant
$AZURE_TENANT_ID"
-
"
az
aks
get-credentials
-g
$AZURE_UNIQUE-rg
-n
$AZURE_UNIQUE-aks"
-
"
docker
login
-u
$CI_REGISTRY_USER
-p
$CI_REGISTRY_PASSWORD
$CI_REGISTRY"
extends
:
-
.azure_variables
image
:
danielscholl/azure-build-image
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
cat > .env << EOF
DAG_TASK_IMAGE=${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
SHARED_TENANT=$SHARED_TENANT
AZURE_TENANT_ID=$AZURE_TENANT_ID
AZURE_DNS_NAME=$AZURE_DNS_NAME
AZURE_AD_APP_RESOURCE_ID=$AZURE_AD_APP_RESOURCE_ID
AZURE_CLIENT_ID=$AZURE_CLIENT_ID
AZURE_CLIENT_SECRET=$AZURE_CLIENT_SECRET
BUILD_VERSION=$CI_COMMIT_SHA
EOF
docker build -t $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE --file Dockerfile .
docker build -t $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE --file deployments/scripts/azure/Dockerfile .
docker push $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE
docker push $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
az acr login -n $AZURE_REGISTRY
docker tag $CI_REGISTRY_IMAGE/$DAG_TASK_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker tag $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_TASK_IMAGE
docker push ${AZURE_REGISTRY}.azurecr.io/$DAG_LOAD_IMAGE
if [ "$(docker ps -a | grep docker_generate_dags)" ]; then
docker stop docker_generate_dags
docker rm docker_generate_dags
fi
docker run --name "docker_generate_dags" --env-file .env $CI_REGISTRY_IMAGE/$DAG_LOAD_IMAGE
stage
:
containerize
tags
:
-
osdu-medium
variables
:
DAG_LOAD_IMAGE
:
"
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
DAG_TASK_IMAGE
:
"
${CI_PROJECT_NAME}-${CI_COMMIT_REF_SLUG}:${CI_COMMIT_SHA}"
azure_register_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
extends
:
-
.azure_variables
image
:
"
python:3.8"
needs
:
-
azure_build_dag
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
cat > requirements.txt << EOF
msal
Jinja2==2.10.1
EOF
-
|
export ZGY_DAG_FOLDER_PATH="airflow/workflow-svc-v2"
export DOCKER_TAG=${CI_COMMIT_REF_SLUG}
export BUILD_VERSION=$(echo ${CI_COMMIT_SHA} | cut -c -5)
-
|
# Python script for generating the Bearer Token
cat > Token.py << EOF
import os
import msal
class AzureToken(object):
def get_azure_id_token(self):
tenant_id = os.getenv('AZURE_TENANT_ID')
resource_id = os.getenv('AZURE_AD_APP_RESOURCE_ID')
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
if tenant_id is None:
print('Please pass tenant Id to generate token')
exit(1)
if resource_id is None:
print('Please pass resource Id to generate token')
exit(1)
if client_id is None:
print('Please pass client Id to generate token')
exit(1)
if client_secret is None:
print('Please pass client secret to generate token')
exit(1)
try:
authority_host_uri = 'https://login.microsoftonline.com'
authority_uri = authority_host_uri + '/' + tenant_id
scope = [resource_id + '/.default']
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority_uri, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scope)
token = 'Bearer ' + result.get('access_token')
print(token)
return token
except Exception as e:
print(e)
if __name__ == '__main__':
AzureToken().get_azure_id_token()
EOF
-
|
# Python script for registering the DAG by calling Workflow service API
cat > registeration_dag.py << EOF
import json
import requests
import os
class RegisterDag:
def __init__(self):
self.token = os.environ.get('BEARER_TOKEN')
self.data_partition_id = os.environ.get('SHARED_TENANT')
self.workflow_service_url = os.environ.get('WORKFLOW_URL') + "workflow"
def register(self):
self.register_dag()
def register_dag(self):
data = '{"workflowName":"$DAG_NAME","description":"$DAG_NAME","registrationInstructions":{"dagName":"$DAG_NAME"}}'
headers = {
'Content-Type': 'application/json',
'Authorization': self.token,
'data-partition-id': self.data_partition_id
}
response = requests.post(self.workflow_service_url, headers=headers, data=data)
if response.status_code == 200:
workflow_id = response.json().get('workflowId')
print("DAG registered with workflowId: {0}".format(workflow_id))
elif response.status_code == 409:
workflow_id = response.json().get('workflowId')
print("DAG is already registered with workflowId: {0}".format(workflow_id))
else:
print("Error while registering DAG {0}".format(response.raise_for_status()))
if __name__ == "__main__":
RegisterDag().register()
EOF
-
"
pip
install
-r
./requirements.txt"
-
"
export
BEARER_TOKEN=$(python
./Token.py)"
-
"
echo
\"
$(echo
$BEARER_TOKEN
|
cut
-c
-20)***********
\"
"
-
"
python
./registeration_dag.py"
-
"
python
deployments/scripts/azure/output_dag_folder.py
-f
airflow/workflow-svc-v2/segy_to_zgy_ingestion_dag.py"
-
"
cd
output_dags/dags"
stage
:
deploy
tags
:
-
osdu-medium
variables
:
DAG_NAME
:
"
${CI_PROJECT_NAME}-dag-${CI_COMMIT_REF_SLUG}"
azure_copy_dag
:
artifacts
:
expire_in
:
"
2
days"
paths
:
-
output_dags
before_script
:
-
"
az
login
--service-principal
-u
$AZURE_PRINCIPAL_ID
-p
$AZURE_PRINCIPAL_SECRET
--tenant
$AZURE_TENANT_ID"
-
"
az
aks
get-credentials
-g
$AZURE_UNIQUE-rg
-n
$AZURE_UNIQUE-aks"
extends
:
-
.azure_variables
image
:
danielscholl/azure-build-image
needs
:
-
azure_register_dag
only
:
variables
:
-
"
$AZURE
==
'true'"
script
:
-
|
# Installing the Azcopy utility
apk add --update coreutils && rm -rf /var/cache/apk/*
mkdir -p tmp
cd tmp
wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux && tar -xf azcopy_v10.tar.gz --strip-components=1
cp ./azcopy /usr/bin/
cd ..
-
"
EXPIRE=$(date
-u
-d
\"
59
minutes
\"
'+%Y-%m-%dT%H:%M:%SZ')"
-
"
START=$(date
-u
-d
\"
-1
minute
\"
'+%Y-%m-%dT%H:%M:%SZ')"
-
"
accountKey=$(kubectl
get
secret
airflow
-n
osdu
-o
jsonpath='{.data.azurestorageaccountkey}'
|
base64
-d)"
-
"
accountName=$(kubectl
get
secret
airflow
-n
osdu
-o
jsonpath='{.data.azurestorageaccountname}'
|
base64
-d)"
-
"
AZURE_STORAGE_SAS_TOKEN=$(az
storage
account
generate-sas
--account-name
$accountName
--account-key
$accountKey
--start
$START
--expiry
$EXPIRE
--https-only
--resource-types
sco
--services
f
--permissions
cwdlur
-o
tsv)"
-
"
cd
output_dags"
-
|
if [ "$AZURE_DEPLOY_PACKAGED_DAG" == "true" ]; then
echo "Packaged Dags are enabled"
if [ -d "./dags/" ]; then
echo "Uploading to: ${accountName}"
# Copying the zipped dag inside the dags folder
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}"
azcopy cp "./dags/*.zip" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}"
fi
else
echo "Packaged Dags are disabled"
if [ -d "./dags/" ]; then
# Copying all the contents inside the dags folder
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflowdags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true
azcopy cp "./dags/*" "https://${accountName}.file.core.windows.net/airflow2dags/dags?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true
cd dags
# Now syncing only the folders which are part of source to remove the deleted files
for directory in *; do
if [ -d "$directory" ]; then
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflowdags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
azcopy sync "./$directory/" "https://${accountName}.file.core.windows.net/airflow2dags/dags/$directory?${AZURE_STORAGE_SAS_TOKEN}" --recursive=true --delete-destination=true
fi
done
cd ..
fi
fi
stage
:
bootstrap
tags
:
-
osdu-medium
variables
:
AZURE_DEPLOY_PACKAGED_DAG
:
"
true"
\ No newline at end of file
Write
Preview
Supports
Markdown
0%