Commit a5c00e38 authored by Daniel Scholl's avatar Daniel Scholl
Browse files

Added Charts and pipelines

parent d5af8f6f
# Copyright © Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v2
name: osdu-airflow
appVersion: "latest"
description: Installs the airflow and required components for osdu on Azure"
version: 0.1.0
type: application
dependencies:
- name: airflow
repository: https://kubernetes-charts.storage.googleapis.com/
version: 7.5.0
appinsightstatsd:
aadpodidbinding: "osdu-identity"
airflowLogin:
name: admin
airflow:
airflow:
image:
repository: apache/airflow
tag: 1.10.12-python3.6
pullPolicy: IfNotPresent
pullSecret: ""
config:
AIRFLOW__SCHEDULER__STATSD_ON: "True"
AIRFLOW__SCHEDULER__STATSD_HOST: "appinsights-statsd"
AIRFLOW__SCHEDULER__STATSD_PORT: 8125
AIRFLOW__SCHEDULER__STATSD_PREFIX: "osdu_airflow"
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "False"
## Enable for Debug purpose
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "False"
AIRFLOW__WEBSERVER__AUTHENTICATE: "True"
AIRFLOW__WEBSERVER__AUTH_BACKEND: "airflow.contrib.auth.backends.password_auth"
AIRFLOW__API__AUTH_BACKEND: "airflow.contrib.auth.backends.password_auth"
AIRFLOW__CORE__REMOTE_LOGGING: "True"
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: "az_log"
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: "wasb-airflowlog"
AIRFLOW__CORE__LOGGING_CONFIG_CLASS: "log_config.DEFAULT_LOGGING_CONFIG"
AIRFLOW__CORE__LOG_FILENAME_TEMPLATE: "{{ run_id }}/{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log"
AIRFLOW__CELERY__SSL_ACTIVE: "True"
AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX: "True"
extraEnv:
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow
key: airflow-fernet-key
- name: AIRFLOW_CONN_AZ_LOG
valueFrom:
secretKeyRef:
name: airflow
key: airflow-remote-log-connection
extraConfigmapMounts:
- name: remote-log-config
mountPath: /opt/airflow/config
configMap: airflow-remote-log-config
readOnly: true
extraPipPackages: [
"flask-bcrypt",
"apache-airflow[statsd]",
"apache-airflow[kubernetes]",
"apache-airflow-backport-providers-microsoft-azure"
]
extraVolumeMounts:
# - name: airflow-kubernetes-config
# mountPath: "/home/airflow/.kube"
# readOnly: true
- name: azure-keyvault
mountPath: "/mnt/azure-keyvault"
readOnly: true
extraVolumes:
# - name: airflow-kubernetes-config
# secret:
# secretName: airflow-kubernetes-config
# items:
# - key: airflow-kubernetes-config
# path: config
- name: azure-keyvault
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: azure-keyvault
dags:
installRequirements: true
persistence:
enabled: true
existingClaim: airflowdagpvc
scheduler:
podLabels:
aadpodidbinding: "osdu-identity"
variables: |
{}
web:
podLabels:
aadpodidbinding: "osdu-identity"
baseUrl: "http://localhost/airflow"
ingress:
enabled: true
web:
annotations:
kubernetes.io/ingress.class: azure/application-gateway
cert-manager.io/cluster-issuer: letsencrypt
cert-manager.io/acme-challenge-type: http01
path: "/airflow"
host: osdu-weisun.msft-osdu-test.org
livenessPath: "/airflow/health"
tls:
enabled: true
secretName: osdu-certificate
precedingPaths:
- path: "/airflow/*"
serviceName: airflow-web
servicePort: 8080
workers:
podLabels:
aadpodidbinding: "osdu-identity"
flower:
enabled: false
postgresql:
enabled: false
externalDatabase:
type: postgres
user: osdu_admin@osdu-mvp-weisr-7heu-pg
passwordSecret: "postgres"
passwordSecretKey: "postgres-password"
host: "osdu-mvp-weisr-7heu-pg.postgres.database.azure.com"
port: 5432
properties: "?sslmode=require"
database: airflow
redis:
enabled: false
externalRedis:
host: "osdu-mvp-weisr-7heu-cache.redis.cache.windows.net"
port: 6380
passwordSecret: "redis"
passwordSecretKey: "redis-password"
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-log-processor
namespace: osdu
labels:
app: airflow-log-processor
spec:
selector:
matchLabels:
app: airflow-log-processor
template:
metadata:
labels:
app: airflow-log-processor
aadpodidbinding: "osdu-identity"
spec:
containers:
- name: airflow-log-processor
image: binroon/airflow-logs-to-workspace
imagePullPolicy: Always
env:
- name: AzureFunctionsJobHost__functions__0
value: queueTrigger
- name: FUNCTIONS_WORKER_RUNTIME
value: dotnet
- name: AzureLogWorkspaceLogName
value: airflow_dag
- name: AzureLogWorkspaceCustomerId
valueFrom:
secretKeyRef:
name: central-logging
key: workspace-id
- name: AzureLogWorkspaceSharedKey
valueFrom:
secretKeyRef:
name: central-logging
key: workspace-key
- name: AzureWebJobsStorage
valueFrom:
secretKeyRef:
name: airflow
key: airflow-storage-connection
---
apiVersion: keda.k8s.io/v1alpha1
kind: TriggerAuthentication
metadata:
name: azure-queue-auth
namespace: osdu
spec:
podIdentity:
provider: azure
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: azure-queue-scaledobject
namespace: osdu
spec:
scaleTargetRef:
deploymentName: azurequeue-function
triggers:
- type: azure-queue
metadata:
# Required
queueName: airflowlogqueue
# Optional
queueLength: "5" # default 5
authenticationRef:
name: azure-queue-auth # authenticationRef would need either podIdentity or define a connection parameter
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-appinsight-statsd-config
data:
appinsightsconfig.js: |
{
backends: ["appinsights-statsd"],
aiInstrumentationKey: process.env.APPLICATION_INSIGHTS_INSTRUMENTATION_KEY,
aiPrefix: "osdu_airflow",
aiRoleName: "airflow",
aiRoleInstance: process.env.HOSTNAME,
aiTrackStatsDMetrics: true,
log: {
backend: "syslog",
level: ""
},
debug: false
}
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: appinsights-statsd
labels:
app: appinsights-statsd
spec:
replicas: 1
selector:
matchLabels:
app: appinsights-statsd
template:
metadata:
labels:
app: appinsights-statsd
aadpodidbinding: "osdu-identity"
spec:
containers:
- name: appinsights-statsd
image: binroon/appinsights-statsd:latest
imagePullPolicy: Always
command: ["node"]
args: ["/usr/src/app/statsd/stats.js", "/usr/src/app/statsd/backends/config/appinsightsconfig.js"]
volumeMounts:
- name: config-volume
mountPath: /usr/src/app/statsd/backends/config
env:
- name: APPLICATION_INSIGHTS_INSTRUMENTATION_KEY
valueFrom:
secretKeyRef:
name: central-logging
key: appinsights
ports:
- containerPort: 8125
protocol: UDP
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 250m
memory: 256Mi
volumes:
- name: config-volume
configMap:
name: airflow-appinsight-statsd-config
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: appinsights-statsd
spec:
type: ClusterIP
ports:
- port: 8125
protocol: UDP
selector:
app: appinsights-statsd
\ No newline at end of file
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflowdagpv
labels:
usage: airflow-dag
spec:
capacity:
storage: 5Gi
accessModes:
- ReadOnlyMany
azureFile:
secretName: airflow
shareName: airflowdags
volumeMode: Filesystem
mountOptions:
- dir_mode=0777
- file_mode=0777
- uid=1000
- gid=1000
- mfsymlinks
- nobrl
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflowdagpvc
# Set this annotation to NOT let Kubernetes automatically create
# a persistent volume for this volume claim.
annotations:
volume.beta.kubernetes.io/storage-class: ""
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 5Gi
selector:
matchLabels:
usage: airflow-dag
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: post-install-job-config
data:
requirements.txt: |
flask-bcrypt
psycopg2-binary
create_default_user.py: |
import os
import time
import sys
from flask_bcrypt import generate_password_hash
import psycopg2
PY3 = sys.version_info[0] == 3
def main():
server = os.environ.get('DATABASE_HOST','')
database = os.environ.get('DATABASE_DB','')
database_login_name = os.environ.get('DATABASE_USER','')
database_login_pass = os.environ.get('DATABASE_USER_PASS','')
airflow_admin =os.environ.get('AIRFLOW_ADMIN','')
airflow_pass =os.environ.get('AIRFLOW_ADMIN_PASS','')
retry = 6
retry_window = 10
if not airflow_admin or not airflow_pass or not server or not database or not database_login_name or not database_login_pass:
print('missed required environment variables')
sys.exit(2)
while retry>0:
time.sleep(retry_window)
if check(server,database,database_login_name,database_login_pass,"database") and check(server,database,database_login_name,database_login_pass,"table"):
create_user(server,database,database_login_name,database_login_pass,airflow_admin,airflow_pass)
break
else:
retry -= 1
print(f'wait {retry_window}s for retry')
return
def create_user(server,database,db_login_name,db_login_pass,airflow_admin,airflow_pass):
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(server, db_login_name, database, db_login_pass, 'require')
password_hash = generate_password_hash(airflow_pass,12)
if PY3:
password_hash = str(password_hash,'utf-8')
conn = None
try:
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("insert into users(username,email,password,superuser) values(%s,%s,%s,%s) on CONFLICT(username) DO NOTHING;",(airflow_admin,'',password_hash,True))
conn.commit()
cursor.close()
print(f'default user {airflow_admin} was created')
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def check(server,database,db_login_name,db_login_pass,check_type="database"):
conn_string = None
connection = None
query_string = None
object_exists = False
if check_type =="database":
conn_string = "host={0} user={1} dbname=postgres password={2} sslmode={3}".format(server, db_login_name, db_login_pass, 'require')
query_string = f"select exists(select * from pg_database where datname='{database}')"
else:
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(server, db_login_name, database, db_login_pass, 'require')
query_string = "select exists(select * from information_schema.tables where table_name='users')"
try:
connection = psycopg2.connect(conn_string)
except (Exception,psycopg2.DatabaseError) as error:
print(error)
sys.exit(1)
if connection is not None:
connection.autocommit = True
cur = connection.cursor()
cur.execute(query_string)
object_exists = cur.fetchone()[0]
connection.close()
return object_exists
if __name__ == "__main__":
main()
apiVersion: batch/v1
kind: Job
metadata:
name: "{{ .Release.Name }}-setup-default-user"
labels:
app.kubernetes.io/managed-by: {{ .Release.Service | quote }}
app.kubernetes.io/instance: {{ .Release.Name | quote }}
app.kubernetes.io/version: {{ .Chart.AppVersion }}
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "-5"
"helm.sh/hook-delete-policy": hook-succeeded
spec:
template:
metadata:
name: "{{ .Release.Name }}"
labels:
app.kubernetes.io/managed-by: {{ .Release.Service | quote }}
app.kubernetes.io/instance: {{ .Release.Name | quote }}
helm.sh/chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
aadpodidbinding: "osdu-identity"
spec:
restartPolicy: Never
containers:
- name: post-install-job
image: python:3.6.12-slim-buster
command: ['sh','-c','pip install --user -r /post-install-scripts/requirements.txt && python /post-install-scripts/create_default_user.py']
volumeMounts:
- name: config-volume
mountPath: /post-install-scripts
- name: azure-keyvault
mountPath: "/mnt/azure-keyvault"
readOnly: true
env:
- name: DATABASE_USER
value: {{ .Values.airflow.externalDatabase.user }}
- name: DATABASE_HOST
value: {{ .Values.airflow.externalDatabase.host }}
- name: DATABASE_DB
value: {{ .Values.airflow.externalDatabase.database }}
- name: DATABASE_USER_PASS
valueFrom:
secretKeyRef:
name: postgres
key: postgres-password
- name: AIRFLOW_ADMIN
value: {{ .Values.airflowLogin.name }}
- name: AIRFLOW_ADMIN_PASS
valueFrom:
secretKeyRef:
name: airflow
key: airflow-admin-password
volumes:
- name: config-volume
configMap:
name: post-install-job-config
- name: azure-keyvault
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: azure-keyvault
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-remote-log-config
# namespace: ${airflow_namespace}
data:
__init__.py: ""
log_config.py: |
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from typing import Dict, Any
import six
from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs
# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
COLORED_LOG_FORMAT = conf.get('core', 'COLORED_LOG_FORMAT')
COLORED_LOG = conf.getboolean('core', 'COLORED_CONSOLE_LOG')
COLORED_FORMATTER_CLASS = conf.get('core', 'COLORED_FORMATTER_CLASS')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')
FORMATTER_CLASS_KEY = '()' if six.PY2 else 'class'
DEFAULT_LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow': {
'format': LOG_FORMAT
},
'airflow_coloured': {
'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
},
},
'handlers': {
'console': {
'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter': 'airflow_coloured',
'stream': 'sys.stdout'
},
'task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
},
'loggers': {
'airflow.processor': {
'handlers': ['processor'],
'level': LOG_LEVEL,
'propagate': False,
},</