Commit a51a52eb authored by Aalekh Jain's avatar Aalekh Jain
Browse files

Updated `WorkflowEngineServiceImpl.java` to use system workflow instead of threadlocals

parent c4055214
......@@ -18,7 +18,6 @@ import org.opengroup.osdu.workflow.provider.azure.config.AirflowConfigResolver;
import org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig;
import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareConfig;
import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareStore;
import org.opengroup.osdu.workflow.provider.azure.utils.ThreadLocalUtils;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -157,9 +156,9 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
private ClientResponse triggerWorkflowUsingController(AirflowConfig airflowConfig,
final String runId, final String workflowId,
String workflowName,
Map<String, Object> inputData) {
Map<String, Object> inputData, boolean isSystemWorkflow) {
String triggerDAGEndpoint = String
.format("api/experimental/dags/%s/dag_runs", getAirflowConfig(ThreadLocalUtils.getSystemDagFlag()).getControllerDagId());
.format("api/experimental/dags/%s/dag_runs", getAirflowConfig(isSystemWorkflow).getControllerDagId());
JSONObject requestBody = new JSONObject();
String parentRunId = "PARENT_" + runId;
......@@ -185,10 +184,10 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
String workflowId = rq.getWorkflowId();
LOGGER.info("Submitting ingestion with Airflow with dagName: {}", workflowName);
ClientResponse response = null;
AirflowConfig airflowConfig = getAirflowConfig(ThreadLocalUtils.getSystemDagFlag());
AirflowConfig airflowConfig = getAirflowConfig(rq.isSystemWorkflow());
if (airflowConfig.isDagRunAbstractionEnabled()) {
response = triggerWorkflowUsingController(airflowConfig, runId, workflowId,
workflowName, inputData);
workflowName, inputData, rq.isSystemWorkflow());
} else {
response = triggerWorkflowBase(airflowConfig, runId, workflowId, workflowName, inputData);
}
......@@ -234,7 +233,7 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
executionDate);
String getDAGRunStatusEndpoint = String.format("api/experimental/dags/%s/dag_runs/%s",
workflowName, executionDate);
ClientResponse response = callAirflowApi(getAirflowConfig(ThreadLocalUtils.getSystemDagFlag()),
ClientResponse response = callAirflowApi(getAirflowConfig(rq.isSystemWorkflow()),
getDAGRunStatusEndpoint, HttpMethod.GET, null,
String.format(AIRFLOW_WORKFLOW_RUN_NOT_FOUND, workflowName, executionDate));
try {
......@@ -260,8 +259,8 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
return workflowName + FILE_NAME_PREFIX;
}
private AirflowConfig getAirflowConfig(Boolean isSystemDAG) {
if (isSystemDAG) {
private AirflowConfig getAirflowConfig(Boolean isSystemWorkflow) {
if (isSystemWorkflow) {
if(workflowEngineConfig.getIsDPAirflowUsedForSystemDAG()) {
return airflowConfigResolver.getAirflowConfig(dpsHeaders.getPartitionId());
} else {
......
package org.opengroup.osdu.workflow.provider.azure.utils;
public class ThreadLocalUtils {
private static final ThreadLocal<Boolean> isSystemDag =
ThreadLocal.withInitial(() -> false);
public static Boolean getSystemDagFlag() {
return isSystemDag.get();
}
public static void setSystemDagFlag(Boolean isSystemDagInput) {
isSystemDag.set(isSystemDagInput);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment