Commit eb6ca742 authored by Matt Wise's avatar Matt Wise
Browse files

Merge remote-tracking branch 'origin/master' into dev

parents f0fe2fbb 5fbdf217
Pipeline #30386 failed with stages
in 43 minutes and 4 seconds
......@@ -78,18 +78,20 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
String workflowName = rq.getWorkflowName();
LOGGER.info("Deleting DAG {} in Airflow", workflowName);
try {
String deleteDAGEndpoint = String.format("api/experimental/dags/%s", workflowName);
callAirflowApi(deleteDAGEndpoint, HttpMethod.DELETE, null,
String.format(AIRFLOW_DELETE_DAG_ERROR_MESSAGE, workflowName));
} catch (AppException e) {
if (e.getError().getCode() != 404) {
throw e;
}
}
if (rq.isDeployedThroughWorkflowService()) {
// Deleting only if dag is deployed through workflow service.
// Figure out how to only remove the metadata but not the DAG.
// Because in repeated delete create fashion the dag will not appear for a while
try {
String deleteDAGEndpoint = String.format("api/experimental/dags/%s", workflowName);
callAirflowApi(deleteDAGEndpoint, HttpMethod.DELETE, null,
String.format(AIRFLOW_DELETE_DAG_ERROR_MESSAGE, workflowName));
} catch (AppException e) {
if (e.getError().getCode() != 404) {
throw e;
}
}
String fileName = getFileNameFromWorkflow(workflowName);
LOGGER.info("Deleting DAG file {} from file share", fileName);
try {
......
......@@ -310,27 +310,27 @@ public class WorkflowEngineServiceImplTest {
}
@Test
public void testDeleteWorkflowShouldShouldNotCallFileShareForDAGsNotDeployedThroughWorkflowService() {
when(airflowConfig.getUrl()).thenReturn(AIRFLOW_URL);
when(airflowConfig.getAppKey()).thenReturn(AIRFLOW_APP_KEY);
when(restClient.resource(eq(AIRFLOW_DAG_URL))).thenReturn(webResource);
when(webResource.type(eq(MediaType.APPLICATION_JSON))).thenReturn(webResourceBuilder);
when(webResourceBuilder.header(eq(HEADER_AUTHORIZATION_NAME), eq(HEADER_AUTHORIZATION_VALUE)))
.thenReturn(webResourceBuilder);
when(webResourceBuilder.method(eq("DELETE"), eq(ClientResponse.class), eq(null)))
.thenReturn(clientResponse);
when(clientResponse.getStatus()).thenReturn(SUCCESS_STATUS_CODE);
public void testDeleteWorkflowShouldShouldNotCallFileShareAndAirflowForDAGsNotDeployedThroughWorkflowService() {
// when(airflowConfig.getUrl()).thenReturn(AIRFLOW_URL);
// when(airflowConfig.getAppKey()).thenReturn(AIRFLOW_APP_KEY);
// when(restClient.resource(eq(AIRFLOW_DAG_URL))).thenReturn(webResource);
// when(webResource.type(eq(MediaType.APPLICATION_JSON))).thenReturn(webResourceBuilder);
// when(webResourceBuilder.header(eq(HEADER_AUTHORIZATION_NAME), eq(HEADER_AUTHORIZATION_VALUE)))
// .thenReturn(webResourceBuilder);
// when(webResourceBuilder.method(eq("DELETE"), eq(ClientResponse.class), eq(null)))
// .thenReturn(clientResponse);
// when(clientResponse.getStatus()).thenReturn(SUCCESS_STATUS_CODE);
workflowEngineService.deleteWorkflow(workflowEngineRequest(null, false));
verify(dagsFileShareStore, times(0)).deleteFile(eq(FILE_NAME));
verify(restClient).resource(eq(AIRFLOW_DAG_URL));
verify(webResource).type(eq(MediaType.APPLICATION_JSON));
verify(webResourceBuilder).header(eq(HEADER_AUTHORIZATION_NAME), eq(HEADER_AUTHORIZATION_VALUE));
verify(webResourceBuilder).method(eq("DELETE"), eq(ClientResponse.class), eq(null));
verify(clientResponse).getStatus();
verify(airflowConfig).getUrl();
verify(airflowConfig).getAppKey();
verify(restClient, times(0)).resource(eq(AIRFLOW_DAG_URL));
verify(webResource, times(0)).type(eq(MediaType.APPLICATION_JSON));
verify(webResourceBuilder, times(0)).header(eq(HEADER_AUTHORIZATION_NAME), eq(HEADER_AUTHORIZATION_VALUE));
verify(webResourceBuilder, times(0)).method(eq("DELETE"), eq(ClientResponse.class), eq(null));
verify(clientResponse, times(0)).getStatus();
verify(airflowConfig, times(0)).getUrl();
verify(airflowConfig, times(0)).getAppKey();
}
@Test
......
......@@ -167,9 +167,6 @@ public class WorkflowRunRepository implements IWorkflowRunRepository {
QueryResult<WorkflowRunDoc> results = db.query(new QueryBuilder(
eq("workflowName", workflowName)).
build(), WorkflowRunDoc.class);
if(results.getDocs().isEmpty()) {
throw new AppException(HttpStatus.SC_NOT_FOUND, "NOT_FOUND", String.format("WorkflowRun %s does not exists", workflowName));
}
List<WorkflowRun> workflowRunList = results.getDocs().stream().map(wrd -> wrd.getWorkflowRun()).collect(Collectors.toList());
return workflowRunList;
}
......
......@@ -10,6 +10,7 @@ import java.util.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.exception.WorkflowNotFoundException;
import org.opengroup.osdu.workflow.exception.WorkflowRunCompletedException;
import org.opengroup.osdu.workflow.logging.AuditLogger;
import org.opengroup.osdu.workflow.model.*;
......@@ -91,7 +92,10 @@ public class WorkflowRunServiceImpl implements IWorkflowRunService {
@Override
public List<WorkflowRun> getAllRunInstancesOfWorkflow(String workflowName,
Map<String, Object> params) {
Map<String, Object> params)
throws WorkflowNotFoundException {
// Calling getWorkflow will throw WorkflowNotFoundException
workflowMetadataRepository.getWorkflow(workflowName);
return workflowRunRepository.getAllRunInstancesOfWorkflow(workflowName, params);
}
......
......@@ -457,7 +457,7 @@ class WorkflowRunServiceTest {
void testDeleteWorkflowRunsByWorkflowIdWithActiveWorkflowRuns() throws Exception {
final WorkflowRun finishedWorkflowRun = OBJECT_MAPPER.readValue(FINISHED_WORKFLOW_RUN,
WorkflowRun.class);
final WorkflowRun submittedWorkflowRun = OBJECT_MAPPER.readValue(SUBMITTED_WORKFLOW_RUN,
final WorkflowRun submittedWorkflowRun = OBJECT_MAPPER.readValue(SUBMITTED_WORKFLOW_RUN,
WorkflowRun.class);
final WorkflowRun runningWorkflowRun = OBJECT_MAPPER.readValue(RUNNING_WORKFLOW_RUN,
WorkflowRun.class);
......@@ -495,6 +495,34 @@ class WorkflowRunServiceTest {
verify(workflowRunRepository, times(0)).deleteWorkflowRuns(eq(WORKFLOW_NAME), any(List.class));
}
@Test
void testGetAllRunInstancesOfWorkflowForExistentWorkflowName() throws Exception {
final WorkflowRun finishedWorkflowRun = OBJECT_MAPPER.readValue(FINISHED_WORKFLOW_RUN,
WorkflowRun.class);
final WorkflowRun submittedWorkflowRun = OBJECT_MAPPER.readValue(SUBMITTED_WORKFLOW_RUN,
WorkflowRun.class);
final WorkflowRun runningWorkflowRun = OBJECT_MAPPER.readValue(RUNNING_WORKFLOW_RUN,
WorkflowRun.class);
final List<WorkflowRun> workflowRuns =
Arrays.asList(finishedWorkflowRun, submittedWorkflowRun, runningWorkflowRun);
final Map<String, Object> config = new HashMap<>();
final WorkflowMetadata workflowMetadata = mock(WorkflowMetadata.class);
when(workflowMetadataRepository.getWorkflow(eq(WORKFLOW_NAME))).thenReturn(workflowMetadata);
when(workflowRunRepository.getAllRunInstancesOfWorkflow(eq(WORKFLOW_NAME), eq(config))).thenReturn(workflowRuns);
Assertions.assertEquals(workflowRuns, workflowRunService.getAllRunInstancesOfWorkflow(WORKFLOW_NAME, config));
verify(workflowRunRepository).getAllRunInstancesOfWorkflow(eq(WORKFLOW_NAME), eq(config));
verify(workflowMetadataRepository).getWorkflow(eq(WORKFLOW_NAME));
}
@Test
void testGetAllRunInstancesOfWorkflowForNonExistentWorkflowName() {
when(workflowMetadataRepository.getWorkflow(WORKFLOW_NAME)).thenThrow(WorkflowNotFoundException.class);
Assertions.assertThrows(WorkflowNotFoundException.class, () -> {
workflowRunService.getAllRunInstancesOfWorkflow(WORKFLOW_NAME, new HashMap<>());
});
verify(workflowMetadataRepository).getWorkflow(eq(WORKFLOW_NAME));
}
private Map<String, Object> createWorkflowPayload(final String runId,
final TriggerWorkflowRequest request) {
final Map<String, Object> payload = new HashMap<>();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment