Commit 97fa5938 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Patching as per multiple comments on the pull request, added the spring exception handler

parent 889d5e03
Pipeline #77577 failed with stage
in 1 minute and 39 seconds
package org.opengroup.osdu.streaming.api;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
@ControllerAdvice
public class StreamAdminServiceExceptionHandler extends ResponseEntityExceptionHandler {
@ExceptionHandler(value = { StreamAdminException.class })
protected ResponseEntity<Object> handleConflict(Exception ex, WebRequest request) {
return handleExceptionInternal(ex, ex.getMessage(), new HttpHeaders(), HttpStatus.BAD_REQUEST, request);
}
}
......@@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.StreamApi;
......@@ -98,13 +99,13 @@ public class StreamingAdminControllerImpl implements StreamApi {
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
try {
Record record = this.convert(streamRecord);
this.storageService.upsertRecord(record);
UpsertRecords upsertRecords = this.storageService.upsertRecord(record);
this.topicAdminService.createTopics(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
return ResponseEntity.ok(upsertRecords.getRecordIds().get(0));
} catch (StorageException e) {
throw new StreamAdminException("Unable to persist StreamRecord", e);
}
this.topicAdminService.createTopics(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
return ResponseEntity.ok(dataPartitionId);
}
@Override
......
......@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class TopicAdminServiceImpl implements TopicAdminService {
......@@ -44,17 +45,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
public void cleanupTopics(StreamRecord streamRecord) {
StreamDatasetDatasetProperties.StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
List<String> strings = this.extractTopicNames(streamRecord);
switch (streamType) {
case SOURCE:
strings.forEach(this::deleteTopic);
break;
case SINK:
strings.forEach(this::deleteTopic);
case PROCESSOR:
strings.forEach(this::deleteTopic);
default:
throw new IllegalStateException("Unexpected value: " + streamType);
}
strings.forEach(this::deleteTopic);
}
private List<String> extractTopicNames(StreamRecord streamRecord) {
......@@ -66,7 +57,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
case SINK:
return streamDefinition.getSinkBindings();
case PROCESSOR:
return Collections.emptyList();
return Stream.concat(streamDefinition.getSourceBindings().stream(), streamDefinition.getSinkBindings().stream()).collect(Collectors.toList());
default:
throw new IllegalStateException("Unexpected value: " + streamType);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment