Commit b13dbf81 authored by Aalekh Jain's avatar Aalekh Jain
Browse files

Merge branch 'master' into SystemDags-Core

parents 6f85c6e3 8029e58a
......@@ -3,12 +3,12 @@ variables:
OSDU_GCP_SERVICE: workflow
OSDU_GCP_VENDOR: gcp
OSDU_GCP_APPLICATION_NAME: os-workflow
OSDU_GCP_ENV_VARS: OSDU_ENTITLEMENTS_URL=$OSDU_GCP_ENTITLEMENTS_V2_URL,PARTITION_API=$OSDU_GCP_PARTITION_API,GOOGLE_AUDIENCES=$GOOGLE_AUDIENCE,OSDU_ENTITLEMENTS_APPKEY=$OSDU_GCP_ENTITLEMENTS_APPKEY,GCP_AIRFLOW_URL=$OSDU_GCP_AIRFLOW_URL,OSDU_AIRFLOW_URL=$OSDU_GCP_AIRFLOW_URL --vpc-connector=$OSDU_GCP_VPC_CONNECTOR
OSDU_GCP_ENV_VARS: OSDU_ENTITLEMENTS_URL=$OSDU_GCP_ENTITLEMENTS_V2_URL,PARTITION_API=$OSDU_GCP_PARTITION_API,GOOGLE_AUDIENCES=$GOOGLE_AUDIENCE,OSDU_ENTITLEMENTS_APPKEY=$OSDU_GCP_ENTITLEMENTS_APPKEY,GCP_AIRFLOW_URL=$OSDU_GCP_AIRFLOW_URL,OSDU_AIRFLOW_URL=$OSDU_GCP_AIRFLOW_URL,WORKFLOW_ADMIN_ACCOUNT=$OSDU_GCP_WORKFLOW_ADMIN_ACCOUNT,SHARED_TENANT_NAME=$TENANT_NAME --vpc-connector=$OSDU_GCP_VPC_CONNECTOR
OSDU_GCP_TEST_SUBDIR: testing/$OSDU_GCP_SERVICE-test-$OSDU_GCP_VENDOR
OSDU_GCP_HELM_PACKAGE_CHARTS: "devops/gcp/deploy devops/gcp/configmap"
OSDU_GCP_HELM_CONFIG_SERVICE: workflow-config
OSDU_GCP_HELM_DEPLOYMENT_SERVICE: workflow-deploy
OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.google_audiences=$GOOGLE_AUDIENCE --set data.osdu_entitlements_appkey=workflow-service --set data.gcp_airflow_url=$OSDU_GCP_AIRFLOW_URL --set data.osdu_airflow_url=$OSDU_GCP_AIRFLOW_URL"
OSDU_GCP_HELM_CONFIG_SERVICE_VARS: "--set data.google_audiences=$GOOGLE_AUDIENCE --set data.osdu_entitlements_appkey=workflow-service --set data.gcp_airflow_url=$OSDU_GCP_AIRFLOW_URL --set data.osdu_airflow_url=$OSDU_GCP_AIRFLOW_URL --set data.workflow_admin_account=$OSDU_GCP_WORKFLOW_ADMIN_ACCOUNT --set data.shared_tenant_name=$TENANT_NAME"
OSDU_GCP_HELM_DEPLOYMENT_SERVICE_VARS: "--set data.image=$CI_REGISTRY_IMAGE/osdu-gcp:$CI_COMMIT_SHORT_SHA --set data.serviceAccountName=workload-identity-workflow"
# --- end of osdu gcp specific variables ---
......@@ -53,11 +53,11 @@ include:
- project: "osdu/platform/ci-cd-pipelines"
ref: "master"
file: "cloud-providers/osdu-gcp-cloudrun.yml"
file: "cloud-providers/osdu-gcp-gke.yml"
- project: "osdu/platform/ci-cd-pipelines"
ref: "master"
file: "cloud-providers/ibm.yml"
file: "cloud-providers/ibm-java-git.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws.yml"
......
This diff is collapsed.
......@@ -54,10 +54,14 @@ spec:
- containerPort: 80
readinessProbe:
httpGet:
path: /api/workflow/swagger-ui.html
port: 80
initialDelaySeconds: 100
timeoutSeconds: 30
path: /actuator/health
port: 8081
livenessProbe:
httpGet:
path: /actuator/health
port: 8081
initialDelaySeconds: 250
periodSeconds: 10
volumeMounts:
- name: azure-keyvault
mountPath: "/mnt/azure-keyvault"
......
......@@ -13,4 +13,6 @@ data:
OSDU_ENTITLEMENTS_APPKEY: "{{ .Values.data.osdu_entitlements_appkey}}"
GCP_AIRFLOW_URL: "{{ .Values.data.gcp_airflow_url }}"
OSDU_AIRFLOW_URL: "{{ .Values.data.gcp_airflow_url }}"
WORKFLOW_ADMIN_ACCOUNT: "{{ .Values.data.workflow_admin_account }}"
SHARED_TENANT_NAME: "{{ .Values.data.shared_tenant_name }}"
......@@ -9,6 +9,8 @@ data:
osdu_entitlements_url: "http://entitlements/api/entitlements/v2/"
osdu_entitlements_appkey: ""
gcp_airflow_url: ""
workflow_admin_account: ""
shared_tenant_name: ""
conf:
configmap: "workflow-config"
......
......@@ -4,9 +4,9 @@
data:
requests_cpu: "0.25"
requests_memory: "256M"
requests_memory: "1G"
limits_cpu: "1"
limits_memory: "1G"
limits_memory: "2G"
serviceAccountName: ""
image: ""
......
......@@ -83,3 +83,8 @@ osdu.azure.airflow.ignoreDagContent=${ignore_dagContent}
# Azure Event Grid Configuration
azure.eventGrid.enabled=${event_grid_enabled:true}
azure.eventGrid.topicName=${event_grid_topic:statuschangedtopic}
# Configuration for health checks
management.server.port=8081
management.health.azure-key-vault.enabled=false
management.health.elasticsearch.enabled=false
......@@ -437,8 +437,14 @@ public class WorkflowEngineServiceImplTest {
private WorkflowEngineRequest workflowEngineRequest(String workflowEngineExecutionDate,
boolean isDeployedThroughWorkflowService) {
return new WorkflowEngineRequest(RUN_ID, WORKFLOW_ID, WORKFLOW_NAME, EXECUTION_TIMESTAMP,
workflowEngineExecutionDate, isDeployedThroughWorkflowService);
return WorkflowEngineRequest.builder()
.runId(RUN_ID)
.workflowId(WORKFLOW_ID)
.workflowName(WORKFLOW_NAME)
.executionTimeStamp(EXECUTION_TIMESTAMP)
.workflowEngineExecutionDate(workflowEngineExecutionDate)
.isDeployedThroughWorkflowService(isDeployedThroughWorkflowService)
.build();
}
private Map<String, Object> registrationInstructions(String dagContent) {
......
......@@ -27,6 +27,7 @@ In order to run the service locally, you will need to have the following environ
| `LOG_PREFIX` | `workflow` | Logging prefix | no | - |
| `osdu.entitlements.url` | ex `https://entitlements.com/entitlements/v1` | Entitlements API endpoint | no | output of infrastructure deployment |
| `osdu.entitlements.app-key` | ex `test` | Entitlements app key | no | - |
| `WORKFLOW_ADMIN_ACCOUNT` | ex `admin@domain.iam.gserviceaccount.com` | Admin account for using root endpoints | yes | - |
| `gcp.airflow.url` | ex `https://********-tp.appspot.com` | Airflow endpoint | yes | - |
| `GOOGLE_AUDIENCES` | ex `*****.apps.googleusercontent.com` | Client ID for getting access to cloud resources | yes | https://console.cloud.google.com/apis/credentials |
| `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | https://console.cloud.google.com/iam-admin/serviceaccounts |
......@@ -34,6 +35,7 @@ In order to run the service locally, you will need to have the following environ
| `OSDU_AIRFLOW_USERNAME` | ex `******` | Username for access Apache Airflow | yes | - |
| `OSDU_AIRFLOW_PASSWORD` | ex `******` | Password for access Apache Airflow | yes | - |
| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - |
| `SHARED_TENANT_NAME` | ex `common` or `opendes` | the name of the shared tenant | yes | - |
| `STATUS_CHANGED_MESSAGING_ENABLED` | `true` OR `false` | Allows to configure message publishing about schemas changes to Pub/Sub | no | - |
| `STATUS_CHANGED_TOPIC_NAME` | ex `status-changed` | Allows to subscribe a specific Pub/Sub topic | no | - |
......
/*
Copyright 2021 Google LLC
Copyright 2021 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Getter
@Setter
@Configuration
@ConfigurationProperties
public class GcpPropertiesConfiguration {
private String googleAudiences;
private String workflowAdminAccount;
}
......@@ -30,5 +30,7 @@ import org.springframework.context.annotation.Configuration;
public class WorkflowPropertiesConfiguration {
private String workflowRunKind;
private String workflowKind;
private String systemWorkflowKind;
private String workflowStatusKind;
private String sharedTenantName;
}
/*
Copyright 2021 Google LLC
Copyright 2021 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.repository;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.datastore.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.tomcat.util.json.JSONParser;
import org.apache.tomcat.util.json.ParseException;
import org.opengroup.osdu.core.common.exception.BadRequestException;
import org.opengroup.osdu.core.common.exception.NotFoundException;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.legal.PersistenceException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.multitenancy.IDatastoreFactory;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.exception.IntegrationException;
import org.opengroup.osdu.workflow.exception.RuntimeException;
import org.opengroup.osdu.workflow.model.WorkflowMetadata;
import org.opengroup.osdu.workflow.provider.gcp.config.WorkflowPropertiesConfiguration;
import org.opengroup.osdu.workflow.provider.gcp.service.GoogleIapHelper;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
@Service
@Slf4j
@RequiredArgsConstructor
public class GcpWorkflowCommonMetadataRepository {
private static final String KEY_DAG_NAME = "dagName";
private static final String DAG_NAME = "DagName";
private static final String WORKFLOW_NAME = "WorkflowName";
private static final String DESCRIPTION = "Description";
private static final String CREATED_BY = "CreatedBy";
private static final String CREATION_TIME_STAMP = "CreationTimestamp";
private static final String VERSION = "Version";
private Map<String, Datastore> tenantRepositories = new HashMap<>();
private final AirflowConfig airflowConfig;
private final WorkflowPropertiesConfiguration workflowConfig;
private final GoogleIapHelper googleIapHelper;
private final TenantInfo tenantInfo;
private final IDatastoreFactory datastoreFactory;
private final ITenantFactory tenantFactory;
public WorkflowMetadata createWorkflow(WorkflowMetadata workflowMetadata, boolean isSystemWorkflow) {
log.info("Saving workflow : {}", workflowMetadata);
String dagName = getDagName(workflowMetadata);
// validate DAG name
try {
String airflowUrl = this.airflowConfig.getUrl();
String iapClientId = this.googleIapHelper.getIapClientId(airflowUrl);
String webServerUrl = String.format("%s/api/experimental/latest_runs", airflowUrl);
HttpRequest httpRequest = this.googleIapHelper.buildIapGetRequest(webServerUrl, iapClientId);
HttpResponse response = httpRequest.execute();
String content = IOUtils.toString(response.getContent(), UTF_8);
if (Objects.nonNull(content)) {
JSONParser parser = new JSONParser(content);
LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>> dagList =
(LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>>) parser.parse();
List<LinkedHashMap<String, String>> items = dagList.get("items");
String finalDagName = dagName;
if (items.stream().noneMatch(c -> c.get("dag_id").equals(finalDagName))) {
throw new BadRequestException(
String.format("DAG with name %s doesn't exist.", dagName));
}
}
} catch (HttpResponseException e) {
throw new IntegrationException("Airflow request fail", e);
} catch (IOException | ParseException e) {
throw new RuntimeException(e.getMessage(), e);
}
saveWorkflowMetadata(workflowMetadata, dagName, isSystemWorkflow);
resetWorkflowMetadata(workflowMetadata);
log.info("Fetch saved workflow : {}. DAG name {}", workflowMetadata, dagName);
return workflowMetadata;
}
public WorkflowMetadata getWorkflow(String workflowName, boolean isSystemWorkflow) {
log.info("Get details for workflow. Workflow name : {}", workflowName);
Datastore ds = getDatastore(isSystemWorkflow);
EntityQuery.Builder queryBuilder = getBaseQueryBuilder(workflowName, isSystemWorkflow);
QueryResults<Entity> tasks = ds.run(queryBuilder.build());
if (tasks.hasNext()) {
return convertEntityToWorkflowMetadata(tasks.next());
}
throw new NotFoundException(
String.format("Workflow entity for workflow name: %s not found.", workflowName));
}
public void deleteWorkflow(String workflowName, boolean isSystemWorkflow) {
log.info("Delete workflow. Workflow name : {}", workflowName);
Transaction txn = null;
Datastore ds = getDatastore(isSystemWorkflow);
try {
EntityQuery.Builder queryBuilder = getBaseQueryBuilder(workflowName, isSystemWorkflow);
QueryResults<Entity> tasks = ds.run(queryBuilder.build());
if (tasks.hasNext()) {
Key key = tasks.next().getKey();
txn = ds.newTransaction();
txn.delete(key);
txn.commit();
}
} catch (DatastoreException ex) {
throw new PersistenceException(ex.getCode(), ex.getMessage(), ex.getReason());
} finally {
if (txn != null && txn.isActive()) {
txn.rollback();
}
}
}
public List<WorkflowMetadata> getAllWorkflowForTenant(final String prefix, boolean isSystemWorkflow) {
log.info("Get all workflows. Prefix {}", prefix);
Datastore ds = getDatastore(isSystemWorkflow);
List<WorkflowMetadata> responseList = new ArrayList<>();
String kind = getKind(isSystemWorkflow);
EntityQuery.Builder queryBuilder = Query.newEntityQueryBuilder()
.setKind(kind);
QueryResults<Entity> tasks = ds.run(queryBuilder.build());
while (tasks.hasNext()) {
responseList.add(convertEntityToWorkflowMetadata(tasks.next()));
}
return responseList.stream().filter(c -> {
if (Objects.isNull(prefix) ||
Objects.nonNull(prefix) && c.getWorkflowName().startsWith(prefix)) {
return true;
}
return false;
}).collect(Collectors.toList());
}
private void saveWorkflowMetadata(WorkflowMetadata workflowMetadata, String dagName, boolean isSystemWorkflow) {
Datastore ds = getDatastore(isSystemWorkflow);
Transaction txn = ds.newTransaction();
String workflowName = workflowMetadata.getWorkflowName();
try {
EntityQuery.Builder queryBuilder = getBaseQueryBuilder(workflowName, isSystemWorkflow);
QueryResults<Entity> tasks = ds.run(queryBuilder.build());
if (!tasks.hasNext()) {
Entity entity = convertWorkflowMetadataToEntity(workflowMetadata, dagName, isSystemWorkflow);
txn.put(entity);
txn.commit();
workflowMetadata.setWorkflowId(entity.getKey().getName());
} else {
throw new AppException(HttpStatus.CONFLICT.value(), "Conflict", String.format("Workflow with name %s already exists.", workflowName));
}
} catch (DatastoreException ex) {
throw new PersistenceException(ex.getCode(), ex.getMessage(), ex.getReason());
} finally {
if (txn.isActive()) {
txn.rollback();
}
}
}
private void resetWorkflowMetadata(WorkflowMetadata workflowMetadata) {
workflowMetadata.setRegistrationInstructions(null);
}
private Entity convertWorkflowMetadataToEntity(WorkflowMetadata workflowMetadata, String dagName, boolean isSystemWorkflow) {
Datastore ds = getDatastore(isSystemWorkflow);
String kind = getKind(isSystemWorkflow);
Key taskKey = ds.newKeyFactory()
.setKind(kind)
.newKey(UUID.randomUUID().toString());
return Entity.newBuilder(taskKey)
.set(DAG_NAME, dagName == null ? "" : dagName)
.set(WORKFLOW_NAME, workflowMetadata.getWorkflowName())
.set(DESCRIPTION, workflowMetadata.getDescription() == null ?
"" : workflowMetadata.getDescription())
.set(CREATED_BY, workflowMetadata.getCreatedBy() == null ?
"" : workflowMetadata.getCreatedBy())
.set(CREATION_TIME_STAMP, workflowMetadata.getCreationTimestamp() == null ?
0 : workflowMetadata.getCreationTimestamp())
.set(VERSION, workflowMetadata.getVersion() == null ? 0 : workflowMetadata.getVersion())
.build();
}
private String getDagName(WorkflowMetadata workflowMetadata) {
Map<String, Object> instructions = workflowMetadata.getRegistrationInstructions();
return instructions != null && instructions.get(KEY_DAG_NAME) != null ?
instructions.get(KEY_DAG_NAME).toString() :
workflowMetadata.getWorkflowName();
}
private String getKind(boolean isSystemWorkflow) {
return isSystemWorkflow ?
this.workflowConfig.getSystemWorkflowKind() :
this.workflowConfig.getWorkflowKind();
}
private WorkflowMetadata convertEntityToWorkflowMetadata(Entity entity) {
return WorkflowMetadata.builder()
.workflowId(entity.getKey().getName())
.workflowName(entity.getString(WORKFLOW_NAME))
.createdBy(entity.getString(CREATED_BY))
.creationTimestamp(entity.getLong(CREATION_TIME_STAMP))
.description(entity.getString(DESCRIPTION))
.version(entity.getLong(VERSION))
.registrationInstructions(Collections.singletonMap(KEY_DAG_NAME, entity.getString(DAG_NAME)))
.build();
}
private EntityQuery.Builder getBaseQueryBuilder(String workflowName, boolean isSystemWorkflow) {
String kind = getKind(isSystemWorkflow);
return Query.newEntityQueryBuilder()
.setKind(kind)
.setFilter(StructuredQuery.PropertyFilter.eq(WORKFLOW_NAME, workflowName));
}
private Datastore getDatastore(boolean isSystemWorkflow){
String tenantName = isSystemWorkflow ?
this.workflowConfig.getSharedTenantName():
this.tenantInfo.getName();
log.debug("tenantName: " + tenantName);
if (!this.tenantRepositories.containsKey(tenantName)) {
TenantInfo info = this.tenantFactory.getTenantInfo(tenantName);
Datastore datastore = this.datastoreFactory.getDatastore(info);
this.tenantRepositories.put(tenantName, datastore);
}
return this.tenantRepositories.get(tenantName);
}
}
......@@ -17,247 +17,37 @@
package org.opengroup.osdu.workflow.provider.gcp.repository;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.EntityQuery;
import com.google.cloud.datastore.EntityQuery.Builder;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
import com.google.cloud.datastore.Transaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.tomcat.util.json.JSONParser;
import org.apache.tomcat.util.json.ParseException;
import org.opengroup.osdu.core.common.exception.BadRequestException;
import org.opengroup.osdu.core.common.exception.NotFoundException;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.legal.PersistenceException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.multitenancy.IDatastoreFactory;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.exception.IntegrationException;
import org.opengroup.osdu.workflow.exception.RuntimeException;
import org.opengroup.osdu.workflow.model.WorkflowMetadata;
import org.opengroup.osdu.workflow.provider.gcp.config.WorkflowPropertiesConfiguration;
import org.opengroup.osdu.workflow.provider.gcp.service.GoogleIapHelper;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowMetadataRepository;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.List;
@Service
@Slf4j
@RequiredArgsConstructor
public class GcpWorkflowMetadataRepository implements IWorkflowMetadataRepository {
private static final String KEY_DAG_NAME = "dagName";
private static final String DAG_NAME = "DagName";
private static final String WORKFLOW_NAME = "WorkflowName";
private static final String DESCRIPTION = "Description";
private static final String CREATED_BY = "CreatedBy";
private static final String CREATION_TIME_STAMP = "CreationTimestamp";
private static final String VERSION = "Version";
private Map<String, Datastore> tenantRepositories = new HashMap<>();
private final AirflowConfig airflowConfig;
private final WorkflowPropertiesConfiguration workflowConfig;
private final GoogleIapHelper googleIapHelper;
private final TenantInfo tenantInfo;
private final IDatastoreFactory datastoreFactory;
private final ITenantFactory tenantFactory;
private final GcpWorkflowCommonMetadataRepository commonMetadataRepository;
@Override
public WorkflowMetadata createWorkflow(WorkflowMetadata workflowMetadata) {
log.info("Saving workflow : {}", workflowMetadata);
String dagName = null;
Map<String, Object> instructions = workflowMetadata.getRegistrationInstructions();
if (Objects.nonNull(instructions)) {
dagName = (String) instructions.get(KEY_DAG_NAME);
}
if (Objects.isNull(dagName)) {
dagName = workflowMetadata.getWorkflowName();
}
// validate DAG name
try {
String airflowUrl = this.airflowConfig.getUrl();
String iapClientId = this.googleIapHelper.getIapClientId(airflowUrl);
String webServerUrl = String.format("%s/api/experimental/latest_runs", airflowUrl);
HttpRequest httpRequest = this.googleIapHelper.buildIapGetRequest(webServerUrl, iapClientId);
HttpResponse response = httpRequest.execute();
String content = IOUtils.toString(response.getContent(), UTF_8);
if (Objects.nonNull(content)) {
JSONParser parser = new JSONParser(content);
LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>> dagList =
(LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>>) parser.parse();
List<LinkedHashMap<String, String>> items = dagList.get("items");
String finalDagName = dagName;
if (items.stream().noneMatch(c -> c.get("dag_id").equals(finalDagName))) {
throw new BadRequestException(
String.format("DAG with name %s doesn't exist.", dagName));
}
}
} catch (HttpResponseException e) {
throw new IntegrationException("Airflow request fail", e);
} catch (IOException | ParseException e) {