Commit 2b5ab72f authored by Aalekh Jain's avatar Aalekh Jain
Browse files

Updated `WorkflowEngineServiceImpl.java`

parent e2cb1d76
......@@ -6,24 +6,26 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.*;
import org.opengroup.osdu.workflow.model.AirflowGetDAGRunStatus;
import org.opengroup.osdu.workflow.model.TriggerWorkflowResponse;
import org.opengroup.osdu.workflow.model.WorkflowEngineRequest;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.azure.config.AirflowConfigResolver;
import org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig;
import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareConfig;
import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareStore;
import org.opengroup.osdu.workflow.provider.azure.utils.ThreadLocalUtils;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
......@@ -33,7 +35,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import org.springframework.stereotype.Service;
@Service
@Primary
......@@ -79,17 +80,16 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
public void createWorkflow(
final WorkflowEngineRequest rq, final Map<String, Object> registrationInstruction) {
String dagContent = (String) registrationInstruction.get(KEY_DAG_CONTENT);
if(workflowEngineConfig.getIgnoreDagContent()) {
if (workflowEngineConfig.getIgnoreDagContent()) {
LOGGER.info("Ignoring input DAG content: {}", dagContent);
dagContent = "";
}
if(dagContent != null && !dagContent.isEmpty()) {
if(!StringUtils.isEmpty(dpsHeaders.getPartitionId())) {
if (dagContent != null && !dagContent.isEmpty()) {
if (!rq.isSystemWorkflow()) {
fileShareStore.writeToFileShare(dpsHeaders.getPartitionId(), fileShareConfig.getShareName(),
fileShareConfig.getDagsFolder(), getFileNameFromWorkflow(rq.getWorkflowName()),
dagContent);
}
else{
} else {
fileShareStore.writeToFileShare(fileShareConfig.getShareName(),
fileShareConfig.getDagsFolder(), getFileNameFromWorkflow(rq.getWorkflowName()),
dagContent);
......@@ -109,7 +109,7 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
try {
String deleteDAGEndpoint = String.format("api/experimental/dags/%s", workflowName);
callAirflowApi(getAirflowConfig(ThreadLocalUtils.getSystemDagFlag()), deleteDAGEndpoint, HttpMethod.DELETE, null,
callAirflowApi(getAirflowConfig(rq.isSystemWorkflow()), deleteDAGEndpoint, HttpMethod.DELETE, null,
String.format(AIRFLOW_DELETE_DAG_ERROR_MESSAGE, workflowName));
} catch (AppException e) {
if (e.getError().getCode() != 404) {
......@@ -120,11 +120,10 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
String fileName = getFileNameFromWorkflow(workflowName);
LOGGER.info("Deleting DAG file {} from file share", fileName);
try {
if(!StringUtils.isEmpty(dpsHeaders.getPartitionId())) {
if (!rq.isSystemWorkflow()) {
fileShareStore.deleteFromFileShare(dpsHeaders.getPartitionId(),
fileShareConfig.getShareName(), fileShareConfig.getDagsFolder(), fileName);
}
else{
} else {
fileShareStore.deleteFromFileShare(fileShareConfig.getShareName(), fileShareConfig.getDagsFolder(), fileName);
}
} catch (final ShareStorageException e) {
......@@ -262,7 +261,7 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
}
private AirflowConfig getAirflowConfig(Boolean isSystemDAG) {
if(isSystemDAG) {
if (isSystemDAG) {
if(workflowEngineConfig.getIsDPAirflowUsedForSystemDAG()) {
return airflowConfigResolver.getAirflowConfig(dpsHeaders.getPartitionId());
} else {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment