Commit af275f94 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Refactored to move coordination control up to the controller level....

Refactored to move coordination control up to the controller level. Implemeneted the full set of controller methods.
parent b815ca90
Pipeline #77440 failed with stage
in 1 minute and 18 seconds
......@@ -76,3 +76,19 @@ The embedded Swagger API documentation is also available on the following link:
http://localhost:8080/api/streaming/v1/swagger-ui/
Now you can use either Postman or [stream.helpers](/scripts) to test the API.
# Starting Local Minikube
Install Docs: https://minikube.sigs.k8s.io/docs/start
```
export KUBECONFIG=~/kubeconfig/minikube/config
minikube start
```
Initialize the osdu-streams namespace
```
kubectl create namespace osdu-streams
```
\ No newline at end of file
......@@ -405,7 +405,7 @@ components:
type: object
properties:
id:
type: integer
type: string
state:
type: string
enum:
......
......@@ -14,17 +14,31 @@
package org.opengroup.osdu.streaming.api;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.StreamApi;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.opengroup.osdu.streaming.service.StreamingAdminService;
import org.opengroup.osdu.streaming.model.StreamStatus;
import org.opengroup.osdu.streaming.model.StreamStatusWorkers;
import org.opengroup.osdu.streaming.service.DeploymentAdminService;
import org.opengroup.osdu.streaming.service.StreamDeploymentStatus;
import org.opengroup.osdu.streaming.service.TopicAdminService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
import java.util.Collections;
import java.util.Optional;
/**
* Streams Administration Controller implements the auto-generated interface
* based on OpenAPI 3.0
......@@ -41,28 +55,117 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminControllerImpl implements StreamApi {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private final ObjectMapper objectMapper;
private final DpsHeaders headers;
private final IStorageFactory storageFactory;
private final TopicAdminService topicAdminService;
private final DeploymentAdminService deploymentAdminService;
private final IStorageService storageService;
@Autowired
private StreamingAdminService streamingAdminService;
public StreamingAdminControllerImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
this.storageService = storageFactory.create(headers);
}
@Override
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
logger.debug("Creating a new stream: " + streamRecord.getData().getName());
String id = null;
id = this.streamingAdminService.createNewStream(streamRecord);
if (id != null) {
return new ResponseEntity<String>(id, HttpStatus.CREATED);
public ResponseEntity<StreamRecord> getStreamById(String dataPartitionId, String id) {
return ResponseEntity.ok(this.fetchStreamRecord(id));
}
@Override
public ResponseEntity<StreamStatus> getStreamStatusById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
Optional<StreamDeploymentStatus> streamDeploymentStatusOptional = this.deploymentAdminService.findStreamDeploymentStatus(streamRecord);
if (streamDeploymentStatusOptional.isPresent()) {
StreamDeploymentStatus streamDeploymentStatus = streamDeploymentStatusOptional.get();
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
} else {
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}
}
@Override
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
try {
Record record = this.convert(streamRecord);
this.storageService.upsertRecord(record);
} catch (StorageException e) {
throw new StreamAdminException("Unable to persist StreamRecord", e);
}
this.topicAdminService.createTopics(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
return ResponseEntity.ok(dataPartitionId);
}
@Override
public ResponseEntity<StreamRecord> getStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.streamingAdminService.getStream(id);
return new ResponseEntity<StreamRecord>(streamRecord, HttpStatus.OK);
public ResponseEntity<StreamStatus> startStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
StreamDeploymentStatus streamDeploymentStatus = this.deploymentAdminService.startStreamDeployment(streamRecord);
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
}
@Override
public ResponseEntity<StreamStatus> stopStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
StreamDeploymentStatus streamDeploymentStatus = this.deploymentAdminService.stopStreamDeployment(streamRecord);
StreamStatus streamStatus = new StreamStatus();
streamStatus.command(StreamStatus.CommandEnum.INFO);
streamStatus.workers(Collections.singletonList(this.convert(streamDeploymentStatus)));
return ResponseEntity.ok(streamStatus);
}
@Override
public ResponseEntity<Void> deleteStreamById(String dataPartitionId, String id) {
StreamRecord streamRecord = this.fetchStreamRecord(id);
this.deploymentAdminService.deleteStreamDeployment(streamRecord);
this.topicAdminService.cleanupTopics(streamRecord);
return ResponseEntity.ok().build();
}
private StreamRecord fetchStreamRecord(String streamRecordId) {
try {
Record record = storageService.getRecord(streamRecordId);
return this.convert(record);
} catch (StorageException e) {
throw new StreamAdminException("Unable to locate StreamRecord", e);
}
}
private Record convert(StreamRecord streamRecord) {
try {
return objectMapper.readValue(objectMapper.writeValueAsString(streamRecord), Record.class);
} catch (JsonProcessingException e) {
throw new StreamAdminException("Unable to serialize StreamRecord: " + streamRecord.toString());
}
}
private StreamRecord convert(Record record) {
try {
return objectMapper.readValue(objectMapper.writeValueAsString(record), StreamRecord.class);
} catch (JsonProcessingException e) {
throw new StreamAdminException("Unable to serialize Record: " + record.toString());
}
}
private StreamStatusWorkers convert(StreamDeploymentStatus streamDeploymentStatus) {
StreamStatusWorkers streamStatusWorkers = new StreamStatusWorkers();
//TODO Failed?
streamStatusWorkers.state(streamDeploymentStatus.getStatus() == StreamDeploymentStatus.Status.RUNNING ? StreamStatusWorkers.StateEnum.RUNNING : StreamStatusWorkers.StateEnum.STOPPED);
streamStatusWorkers.id(streamDeploymentStatus.getDeploymentName());
return streamStatusWorkers;
}
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface StreamingAdminService {
StreamRecord getStream(String streamRecordId);
String createNewStream(StreamRecord streamRecord);
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamDatasetDatasetProperties.StreamTypeEnum;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
/**
* Streaming Admin Service
*
* @author Dmitry Kniazev
*/
@Service
@RequestScope
public class StreamingAdminServiceImpl implements StreamingAdminService {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
private ObjectMapper objectMapper;
private DpsHeaders headers;
private IStorageFactory storageFactory;
private TopicAdminService topicAdminService;
private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
}
@Override
public StreamRecord getStream(String streamRecordId) {
StreamRecord sRec = null;
String jsonRepr = new String();
try {
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
Record record = storageService.getRecord(streamRecordId);
// this is a temporary implementation until I know how to marry Records with
// StreamRecords
// TODO: replace object marshalling/unmarshalling with the proper solution
if (record != null) {
jsonRepr = objectMapper.writeValueAsString(record);
logger.debug("Retrieved stream record: " + jsonRepr);
sRec = objectMapper.readValue(jsonRepr, StreamRecord.class);
}
// TODO: throw exceptions to the controller so that the proper HTTP codes can be
// assigned for the API responses
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
}
return sRec;
}
// TODO: Change API interface to StreamRecord and implement this method.
// The problem is that we don't know how to assign all the required attributes
// for the OSDU records, like ACL, tags, meta, legal, etc. Therefore it might be
// easier to expect all of these to be provided by the client, so that we can
// pass it to the storage svc without any changes...
@Override
public String createNewStream(StreamRecord streamRecord) {
UpsertRecords upRec = null;
try {
// get the instance of StorageService
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
// convert StreamRecord to Record
Record rec = new Record();
String jsonRepr = new String();
jsonRepr = objectMapper.writeValueAsString(streamRecord);
logger.debug("Extracted record: \n" + jsonRepr);
rec = objectMapper.readValue(jsonRepr, Record.class);
// try inserting a new record to storage
upRec = storageService.upsertRecord(rec);
// create topic
// this.topicAdminService.createTopic(streamRecord);
StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
logger.debug("Stream Type: {}", streamType);
if (streamType == StreamTypeEnum.SOURCE)
this.topicAdminService.createTopics(streamRecord.getData().getDatasetProperties().getStreamDefinition()
.getSinkBindings().stream().toArray(String[]::new));
// create deployment
this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) {
logger.error("Got exception from storage service: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
}
if (upRec != null && upRec.getRecordCount() > 0) {
return upRec.getRecordIds().get(0);
} else
return null;
}
}
......@@ -3,16 +3,12 @@ package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.TopicDescription;
import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
import java.util.List;
public interface TopicAdminService {
Optional<TopicDescription> getTopic(StreamRecord streamRecord);
List<TopicDescription> createTopics(StreamRecord streamRecord);
TopicDescription createTopic(StreamRecord streamRecord);
void createTopics(String[] topicList);
void deleteTopic(StreamRecord streamRecord);
void cleanupTopics(StreamRecord streamRecord);
}
package org.opengroup.osdu.streaming.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamDatasetDatasetProperties;
import org.opengroup.osdu.streaming.model.StreamDefinition;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@Service
public class TopicAdminServiceImpl implements TopicAdminService {
private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TopicAdminService.class);
private AdminClient adminClient;
private final AdminClient adminClient;
public TopicAdminServiceImpl(AdminClient adminClient) {
this.adminClient = adminClient;
}
@Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient
.describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind())
? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get())
: Optional.empty();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return Optional.empty();
}
throw new StreamAdminException(e);
} catch (InterruptedException e) {
throw new StreamAdminException(e);
}
public List<TopicDescription> createTopics(StreamRecord streamRecord) {
List<String> topicNames = this.extractTopicNames(streamRecord);
return topicNames.stream()
.map(topicName -> createTopic(topicName))
.collect(Collectors.toList());
}
@Override
public TopicDescription createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
Optional<TopicDescription> optional = this.getTopic(streamRecord);
if (optional.isPresent()) {
return optional.get();
public void cleanupTopics(StreamRecord streamRecord) {
StreamDatasetDatasetProperties.StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
List<String> strings = this.extractTopicNames(streamRecord);
switch (streamType) {
case SOURCE:
strings.forEach(this::deleteTopic);
break;
case SINK:
strings.forEach(this::deleteTopic);
case PROCESSOR:
strings.forEach(this::deleteTopic);
default:
throw new IllegalStateException("Unexpected value: " + streamType);
}
try {
NewTopic newTopic = TopicBuilder.name(streamRecord.getKind())
// TODO: How to setup partitions and replicas
// .partitions(10)
// .replicas(3)
.build();
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e);
}
private List<String> extractTopicNames(StreamRecord streamRecord) {
StreamDatasetDatasetProperties.StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
StreamDefinition streamDefinition = streamRecord.getData().getDatasetProperties().getStreamDefinition();
switch (streamType) {
case SOURCE:
return streamDefinition.getSourceBindings();
case SINK:
return streamDefinition.getSinkBindings();
case PROCESSOR:
return Collections.emptyList();
default:
throw new IllegalStateException("Unexpected value: " + streamType);
}
}
@Override
public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
private TopicDescription createTopic(String topicName) {
Optional<TopicDescription> topicDescription = this.findTopic(topicName);
if (topicDescription.isPresent()) {
return topicDescription.get();
}
this.adminClient.createTopics(Collections.singletonList(this.createNewTopicObject(topicName)));
return this.findTopic(topicName).get();
}
@Override
public void createTopics(String[] topicList) {
logger.debug("Creating Topic(s): {}", topicList);
List<NewTopic> newTopics = new ArrayList<NewTopic>();
private Optional<TopicDescription> findTopic(String topicName) {
try {
for (String topicName : topicList) {
NewTopic newTopic = TopicBuilder.name(topicName).build();
newTopics.add(newTopic);
}
CreateTopicsResult topics = adminClient.createTopics(newTopics);
logger.debug("Creating Topic(s): {}", topics.all().get());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
logger.warn(e.getMessage());
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = this.adminClient.describeTopics(Collections.singletonList(topicName)).values();
if (kafkaFutureMap.containsKey(topicName)) {
return Optional.of(kafkaFutureMap.get(topicName).get());
} else {
logger.debug("Got exception with cause|class: {}|{}", e.getCause(), e.getClass());
return Optional.empty();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return Optional.empty();
}
throw new StreamAdminException(e);
} catch (InterruptedException e) {
throw new StreamAdminException(e);
}
}
private void deleteTopic(String topicName) {
this.findTopic(topicName).ifPresent(topicDescription -> {
this.adminClient.deleteTopics(Collections.singletonList(topicName));
});
}
private NewTopic createNewTopicObject(String topicName) {
//TODO How to configure partitions and repl factors
return TopicBuilder.name(topicName).build();
}
}
......@@ -3,17 +3,21 @@ package org.opengroup.osdu.streaming.util;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
import org.springframework.beans.factory.annotation.Value;