Skip to content
Snippets Groups Projects
Commit 80909aae authored by Shrikant Garg's avatar Shrikant Garg
Browse files

Merge branch 'merge-ibm-helm' into 'master'

Merge ibm helm to master

See merge request !125
parents fd6b7617 3262995c
No related branches found
No related tags found
1 merge request!125Merge ibm helm to master
Pipeline #142003 failed
......@@ -14,6 +14,7 @@
# limitations under the License.
variables:
IBM_HELM_DAG_PATH: devops/ibm/ibm-manifest-dag
#AWS variables
AWS_DEPLOY_TARGET: EKS
# AWS skip stages
......
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
apiVersion: v2
name: ibm-manifest-dag
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"
#!/bin/bash
MANIFEST_DAG="/opt/airflow/dags/osdu-ingest-r3.py"
TUTORIAL_DAG="/opt/airflow/dags/tutorial.py"
chown -R 50000:root /opt/airflow/dags
chmod 775 /opt/airflow/dags
df -h
pwd
ls -ltr
echo "Printing Source Dags"
ls -ltr /tmp/scripts
cp /tmp/scripts/osdu-ingest-r3.py /opt/airflow/dags/.
cp /tmp/scripts/tutorial.py /opt/airflow/dags/.
chown -R 50000:root /opt/airflow/dags
chmod 775 /opt/airflow/dags
echo "printing target dags"
ls -ltr /opt/airflow/dags
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for R3 ingestion."""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from osdu_airflow.backward_compatibility.default_args import update_default_args
from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from osdu_airflow.operators.process_manifest_r3 import ProcessManifestOperatorR3
from osdu_airflow.operators.update_status import UpdateStatusOperator
from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator
from osdu_ingestion.libs.exceptions import NotOSDUSchemaFormatError
BATCH_NUMBER = int(Variable.get("core__ingestion__batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
workflow_name = "Osdu_ingest"
def is_batch(**context):
"""
:param context: Dag context
:return: SubDag to be executed next depending on Manifest type
"""
manifest = context["dag_run"].conf["execution_context"].get("manifest")
if isinstance(manifest, dict):
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
elif isinstance(manifest, list):
subdag = PROCESS_BATCH_MANIFEST_FILE
else:
raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
f"Got {manifest}.")
return subdag
with DAG(
workflow_name,
default_args=default_args,
description="R3 manifest processing with providing integrity",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=180)
) as dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
)
branch_is_batch_op = BranchPythonOperator(
task_id="check_payload_type",
python_callable=is_batch,
trigger_rule="none_failed_or_skipped"
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
)
validate_schema_operator = ValidateManifestSchemaOperator(
task_id="validate_manifest_schema_task",
trigger_rule="none_failed_or_skipped"
)
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=ENSURE_INTEGRITY_TASK,
previous_task_id=validate_schema_operator.task_id,
trigger_rule="none_failed_or_skipped"
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
previous_task_id=ensure_integrity_op.task_id,
trigger_rule="none_failed_or_skipped"
)
# Dummy operator as entry point into parallel task of batch upload
batch_upload = DummyOperator(
task_id=PROCESS_BATCH_MANIFEST_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessManifestOperatorR3(
task_id=f"process_manifest_task_{batch + 1}",
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1,
trigger_rule="none_failed_or_skipped"
) >> update_status_finished_op
update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload # pylint: disable=pointless-statement
branch_is_batch_op >> validate_schema_operator >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op # pylint: disable=pointless-statement
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
---
apiVersion: v1
kind: ConfigMap
metadata:
name: osdu-manifest-dag-config
data:
{{ (.Files.Glob "files/*").AsConfig | indent 2 }}
kind: Job
apiVersion: batch/v1
metadata:
name: osdu-manifest-dag-setup-job
labels:
app: osdu-manifest-dag-setup-job
job-name: osdu-manifest-dag-setup-job
spec:
activeDeadlineSeconds: 900
parallelism: 1
completions: 1
backoffLimit: 6
selector:
matchLabels:
template:
metadata:
name: osdu-manifest-dag-setup-job
creationTimestamp: null
labels:
app: osdu-manifest-dag-setup-job
job-name: osdu-manifest-dag-setup-job
spec:
restartPolicy: OnFailure
schedulerName: default-scheduler
terminationGracePeriodSeconds: 30
securityContext:
runAsUser: 0
containers:
- name: deploy-dags
image: >-
alpine/k8s:1.21.12
command:
- /bin/sh
- '-c'
- |
# Execute scripts "chown -R 50000:root /opt/airflow/dags; chmod 775 /opt/airflow/dags"
/tmp/scripts/copy-dags.sh
resources:
requests:
cpu: 5m
memory: 5Mi
volumeMounts:
- name: scripts-pvc
mountPath: /tmp/scripts
- name: dags-pvc
mountPath: /opt/airflow/dags
subPath: dags
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
volumes:
- name: scripts-pvc
projected:
defaultMode: 0755
sources:
- configMap:
name: osdu-manifest-dag-config
- name: dags-pvc
persistentVolumeClaim:
claimName: dags-pvc
dnsPolicy: ClusterFirst
# Default values for osdu-manifest.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
#persistentvolume:
# enabled: false
#storageclass:
# file_storage: ibmc-file-gold
image:
repository: nginx
pullPolicy: IfNotPresent
ibm-helm-charts-master:
stage: build
tags: ["osdu-small"]
image:
name: alpine/helm:3.6.3
entrypoint: [""]
before_script:
- cp src/osdu_dags/osdu-ingest-r3.py devops/ibm/ibm-manifest-dag/files/.
- helm plugin install https://github.com/chartmuseum/helm-push.git
script:
- helm cm-push ${IBM_HELM_DAG_PATH} ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/helm/stable --username gitlab-ci-token --password $CI_JOB_TOKEN
rules:
- if: '$CI_COMMIT_BRANCH == "master"'
ibm_deploy:
image: openshift/origin-cli
tags: ['osdu-medium']
......@@ -34,9 +48,9 @@ ibm_bootstrap:
- oc project $IBM_OPENSHIFT_PROJECT
- airflowWebPod=`oc get pods -n osdu-airflow | grep airflow-web* | awk '{ print $1 }'`
- echo $airflowWebPod
ibm_integration_tests:
image:
image:
name: postman/newman:alpine
entrypoint: [""]
stage: end_to_end_postman_tests
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment