Commit 7c52b187 authored by Stephen Nimmo's avatar Stephen Nimmo Committed by Dmitry Kniazev
Browse files

Refactored moving logic to controller, implemented full rest service

parent b815ca90
......@@ -75,4 +75,20 @@ The embedded Swagger API documentation is also available on the following link:
http://localhost:8080/api/streaming/v1/swagger-ui/
Now you can use either Postman or [stream.helpers](/scripts) to test the API.
\ No newline at end of file
Now you can use either Postman or [stream.helpers](/scripts) to test the API.
# Starting Local Minikube
Install Docs: https://minikube.sigs.k8s.io/docs/start
```
export KUBECONFIG=~/kubeconfig/minikube/config
minikube start
```
Initialize the osdu-streams namespace
```
kubectl create namespace osdu-streams
```
\ No newline at end of file
......@@ -405,7 +405,7 @@ components:
type: object
properties:
id:
type: integer
type: string
state:
type: string
enum:
......
package org.opengroup.osdu.streaming.api;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
@ControllerAdvice
public class StreamAdminServiceExceptionHandler extends ResponseEntityExceptionHandler {
@ExceptionHandler(value = { StreamAdminException.class })
protected ResponseEntity<Object> handleConflict(Exception ex, WebRequest request) {
return handleExceptionInternal(ex, ex.getMessage(), new HttpHeaders(), HttpStatus.BAD_REQUEST, request);
}
}
......@@ -14,17 +14,32 @@
package org.opengroup.osdu.streaming.api;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.StreamApi;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.opengroup.osdu.streaming.service.StreamingAdminService;
import org.opengroup.osdu.streaming.model.StreamStatus;
import org.opengroup.osdu.streaming.model.StreamStatusWorkers;
import org.opengroup.osdu.streaming.service.DeploymentAdminService;
import org.opengroup.osdu.streaming.service.StreamDeploymentStatus;
import org.opengroup.osdu.streaming.service.TopicAdminService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
import java.util.Collections;
import java.util.Optional;
/**
* Streams Administration Controller implements the auto-generated interface
* based on OpenAPI 3.0
......@@ -41,28 +56,117 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminControllerImpl implements StreamApi {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private final ObjectMapper objectMapper;
private final DpsHeaders headers;
private final IStorageFactory storageFactory;
private final TopicAdminService topicAdminService;
private final DeploymentAdminService deploymentAdminService;
private final IStorageService storageService;
@Autowired
private StreamingAdminService streamingAdminService;
public StreamingAdminControllerImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
this.storageService = storageFactory.create(headers);
}
@Override
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
logger.debug("Creating a new stream: " + streamRecord.getData().getName());
String id = null;
id = this.streamingAdminService.createNewStream(streamRecord);
if (id != null) {
return new ResponseEntity<String>(id, HttpStatus.CREATED);
public ResponseEntity<StreamRecord> getStreamById(String dataPartitionId, String id) {
return ResponseEntity.ok(this.fetchStreamRecord(id));
}
@Override
public ResponseEntity<StreamStatus> getStreamStatusById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
Optional<StreamDeploymentStatus> streamDeploymentStatusOptional = this.deploymentAdminService.findStreamDeploymentStatus(streamRecord);
if (streamDeploymentStatusOptional.isPresent()) {
StreamDeploymentStatus streamDeploymentStatus = streamDeploymentStatusOptional.get();
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
} else {
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}
}
@Override
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
try {
Record record = this.convert(streamRecord);
UpsertRecords upsertRecords = this.storageService.upsertRecord(record);
this.topicAdminService.createTopics(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
return ResponseEntity.ok(upsertRecords.getRecordIds().get(0));
} catch (StorageException e) {
throw new StreamAdminException("Unable to persist StreamRecord", e);
}
}
@Override
public ResponseEntity<StreamRecord> getStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.streamingAdminService.getStream(id);
return new ResponseEntity<StreamRecord>(streamRecord, HttpStatus.OK);
public ResponseEntity<StreamStatus> startStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
StreamDeploymentStatus streamDeploymentStatus = this.deploymentAdminService.startStreamDeployment(streamRecord);
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
}
@Override
public ResponseEntity<StreamStatus> stopStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
StreamDeploymentStatus streamDeploymentStatus = this.deploymentAdminService.stopStreamDeployment(streamRecord);
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
}
@Override
public ResponseEntity<Void> deleteStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
this.deploymentAdminService.deleteStreamDeployment(streamRecord);
this.topicAdminService.cleanupTopics(streamRecord);
return ResponseEntity.ok().build();
}
private StreamRecord fetchStreamRecord(String streamRecordId) {
try {
Record record = storageService.getRecord(streamRecordId);
return this.convert(record);
} catch (StorageException e) {
throw new StreamAdminException("Unable to locate StreamRecord", e);
}
}
private Record convert(StreamRecord streamRecord) {
try {
return objectMapper.readValue(objectMapper.writeValueAsString(streamRecord), Record.class);
} catch (JsonProcessingException e) {
throw new StreamAdminException("Unable to serialize StreamRecord: " + streamRecord.toString());
}
}
private StreamRecord convert(Record record) {
try {
return objectMapper.readValue(objectMapper.writeValueAsString(record), StreamRecord.class);
} catch (JsonProcessingException e) {
throw new StreamAdminException("Unable to serialize Record: " + record.toString());
}
}
private StreamStatusWorkers convert(StreamDeploymentStatus streamDeploymentStatus) {
StreamStatusWorkers streamStatusWorkers = new StreamStatusWorkers();
//TODO Failed?
streamStatusWorkers.state(streamDeploymentStatus.getStatus() == StreamDeploymentStatus.Status.RUNNING ? StreamStatusWorkers.StateEnum.RUNNING : StreamStatusWorkers.StateEnum.STOPPED);
streamStatusWorkers.id(streamDeploymentStatus.getDeploymentName());
return streamStatusWorkers;
}
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface StreamingAdminService {
StreamRecord getStream(String streamRecordId);
String createNewStream(StreamRecord streamRecord);
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamDatasetDatasetProperties.StreamTypeEnum;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
/**
* Streaming Admin Service
*
* @author Dmitry Kniazev
*/
@Service
@RequestScope
public class StreamingAdminServiceImpl implements StreamingAdminService {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
private ObjectMapper objectMapper;
private DpsHeaders headers;
private IStorageFactory storageFactory;
private TopicAdminService topicAdminService;
private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
}
@Override
public StreamRecord getStream(String streamRecordId) {
StreamRecord sRec = null;
String jsonRepr = new String();
try {
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
Record record = storageService.getRecord(streamRecordId);
// this is a temporary implementation until I know how to marry Records with
// StreamRecords
// TODO: replace object marshalling/unmarshalling with the proper solution
if (record != null) {
jsonRepr = objectMapper.writeValueAsString(record);
logger.debug("Retrieved stream record: " + jsonRepr);
sRec = objectMapper.readValue(jsonRepr, StreamRecord.class);
}
// TODO: throw exceptions to the controller so that the proper HTTP codes can be
// assigned for the API responses
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
}
return sRec;
}
// TODO: Change API interface to StreamRecord and implement this method.
// The problem is that we don't know how to assign all the required attributes
// for the OSDU records, like ACL, tags, meta, legal, etc. Therefore it might be
// easier to expect all of these to be provided by the client, so that we can
// pass it to the storage svc without any changes...
@Override
public String createNewStream(StreamRecord streamRecord) {
UpsertRecords upRec = null;
try {
// get the instance of StorageService
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
// convert StreamRecord to Record
Record rec = new Record();
String jsonRepr = new String();
jsonRepr = objectMapper.writeValueAsString(streamRecord);
logger.debug("Extracted record: \n" + jsonRepr);
rec = objectMapper.readValue(jsonRepr, Record.class);
// try inserting a new record to storage
upRec = storageService.upsertRecord(rec);
// create topic
// this.topicAdminService.createTopic(streamRecord);
StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
logger.debug("Stream Type: {}", streamType);
if (streamType == StreamTypeEnum.SOURCE)
this.topicAdminService.createTopics(streamRecord.getData().getDatasetProperties().getStreamDefinition()
.getSinkBindings().stream().toArray(String[]::new));
// create deployment
this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) {
logger.error("Got exception from storage service: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
}
if (upRec != null && upRec.getRecordCount() > 0) {
return upRec.getRecordIds().get(0);
} else
return null;
}
}
......@@ -3,16 +3,12 @@ package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.TopicDescription;
import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
import java.util.List;
public interface TopicAdminService {
Optional<TopicDescription> getTopic(StreamRecord streamRecord);
List<TopicDescription> createTopics(StreamRecord streamRecord);
TopicDescription createTopic(StreamRecord streamRecord);
void createTopics(String[] topicList);
void deleteTopic(StreamRecord streamRecord);
void cleanupTopics(StreamRecord streamRecord);
}
package org.opengroup.osdu.streaming.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamDatasetDatasetProperties;
import org.opengroup.osdu.streaming.model.StreamDefinition;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class TopicAdminServiceImpl implements TopicAdminService {
private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TopicAdminService.class);
private AdminClient adminClient;
private final AdminClient adminClient;
public TopicAdminServiceImpl(AdminClient adminClient) {
this.adminClient = adminClient;
}
@Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
public List<TopicDescription> createTopics(StreamRecord streamRecord) {
List<String> topicNames = this.extractTopicNames(streamRecord);
return topicNames.stream()
.map(topicName -> createTopic(topicName))
.collect(Collectors.toList());
}
@Override
public void cleanupTopics(StreamRecord streamRecord) {
StreamDatasetDatasetProperties.StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
List<String> strings = this.extractTopicNames(streamRecord);
strings.forEach(this::deleteTopic);
}
private List<String> extractTopicNames(StreamRecord streamRecord) {
StreamDatasetDatasetProperties.StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
StreamDefinition streamDefinition = streamRecord.getData().getDatasetProperties().getStreamDefinition();
switch (streamType) {
case SOURCE:
return streamDefinition.getSinkBindings();
case SINK:
return streamDefinition.getSourceBindings();
case PROCESSOR:
return Stream.concat(streamDefinition.getSourceBindings().stream(), streamDefinition.getSinkBindings().stream()).collect(Collectors.toList());
default:
throw new IllegalStateException("Unexpected value: " + streamType);
}
}
private TopicDescription createTopic(String topicName) {
Optional<TopicDescription> topicDescription = this.findTopic(topicName);
if (topicDescription.isPresent()) {
return topicDescription.get();
}
this.adminClient.createTopics(Collections.singletonList(this.createNewTopicObject(topicName)));
return this.findTopic(topicName).get();
}
private Optional<TopicDescription> findTopic(String topicName) {
try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient
.describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind())
? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get())
: Optional.empty();
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = this.adminClient.describeTopics(Collections.singletonList(topicName)).values();
if (kafkaFutureMap.containsKey(topicName)) {
return Optional.of(kafkaFutureMap.get(topicName).get());
} else {
return Optional.empty();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return Optional.empty();
......@@ -50,50 +90,15 @@ public class TopicAdminServiceImpl implements TopicAdminService {
}
}
@Override
public TopicDescription createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
Optional<TopicDescription> optional = this.getTopic(streamRecord);
if (optional.isPresent()) {
return optional.get();
}
try {
NewTopic newTopic = TopicBuilder.name(streamRecord.getKind())
// TODO: How to setup partitions and replicas
// .partitions(10)
// .replicas(3)
.build();
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e);
}
private void deleteTopic(String topicName) {
this.findTopic(topicName).ifPresent(topicDescription -> {
this.adminClient.deleteTopics(Collections.singletonList(topicName));
});
}
@Override
public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
private NewTopic createNewTopicObject(String topicName) {
//TODO How to configure partitions and repl factors
return TopicBuilder.name(topicName).build();
}
@Override
public void createTopics(String[] topicList) {
logger.debug("Creating Topic(s): {}", topicList);
List<NewTopic> newTopics = new ArrayList<NewTopic>();
try {
for (String topicName : topicList) {
NewTopic newTopic = TopicBuilder.name(topicName).build();
newTopics.add(newTopic);
}
CreateTopicsResult topics = adminClient.createTopics(newTopics);
logger.debug("Creating Topic(s): {}", topics.all().get());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
logger.warn(e.getMessage());
} else {
logger.debug("Got exception with cause|class: {}|{}", e.getCause(), e.getClass());