Commit 93aea7a9 authored by Stephen Nimmo's avatar Stephen Nimmo Committed by Dmitry Kniazev
Browse files

DeploymentAdminService implementation completed based on existing requirements.

parent 69587dbd
services:
- docker:19.03.12-dind
variables:
DOCKER_HOST: "tcp://docker:2375"
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: ""
test:
stage: test
image: openjdk:11
script: ./mvnw -ntp test
\ No newline at end of file
image: openjdk:8
script: ./mvnw -ntp test
tags: ['docker-runner']
\ No newline at end of file
......@@ -350,6 +350,20 @@ components:
$ref: "#/components/schemas/StreamDefinition"
ExtensionProperties:
type: object
properties:
StreamDeployment:
type: object
properties:
Image:
type: string
example: "quay.io/org/image:tag"
Env:
type: object
additionalProperties:
type: string
example:
ENV1: "ENV VALUE 1"
ENV2: "ENV VALUE 2"
StreamDefinition:
type: object
required:
......
......@@ -17,7 +17,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<!-- Dependency Versions -->
......@@ -45,6 +45,11 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>12.0.1</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
......
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
public interface DeploymentAdminService {
Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord);
StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord);
StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord);
StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord);
void deleteStreamDeployment(StreamRecord streamRecord);
void deleteAllStreamDeployments();
}
package org.opengroup.osdu.streaming.service;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.PatchUtils;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Service
@RequestScope
public class DeploymentAdminServiceImpl implements DeploymentAdminService {
private static final Logger LOGGER = LoggerFactory.getLogger(DeploymentAdminServiceImpl.class);
private static final String PATCH_REPLICAS = "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]";
private static final String DEPLOYMENT_NAME_SUFFIX = "-deployment";
private static final String CONTAINER_NAME_SUFFIX = "-container";
private static final String SELECTOR_MATCH_LABEL_KEY = "run";
private static final String LABEL_TYPE_NAME = "osdu-streams-type";
private static final String LABEL_TYPE_VALUE = "osdu-streams-deployment";
private static final String LABEL_SELECTOR = String.format("%s=%s", LABEL_TYPE_NAME, LABEL_TYPE_VALUE);
private AppsV1Api appsV1Api;
private String namespace;
public DeploymentAdminServiceImpl(@Value("${deployment.namespace}") String namespace, AppsV1Api appsV1Api) {
this.namespace = namespace;
this.appsV1Api = appsV1Api;
}
@Override
public Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
Optional<V1Deployment> optional = findExistingDeployment(deploymentName);
return optional.isPresent() ? Optional.of(new StreamDeploymentStatus(
optional.get().getMetadata().getName(),
optional.get().getMetadata().getNamespace(),
optional.get().getSpec().getReplicas() > 0 ? StreamDeploymentStatus.Status.RUNNING : StreamDeploymentStatus.Status.STOPPED,
LocalDateTime.now())) : Optional.empty();
}
@Override
public StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String normalizedName = this.getNormalizedName(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
String containerName = this.getContainerName(streamRecord);
if (this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to create StreamDeployment. A deployment already exists for %s.", deploymentName));
}
V1DeploymentBuilder v1DeploymentBuilder = new V1DeploymentBuilder();
v1DeploymentBuilder
.withNewMetadata()
.withName(deploymentName)
.withNamespace(this.namespace)
.withLabels(Collections.singletonMap(LABEL_TYPE_NAME, LABEL_TYPE_VALUE))
.endMetadata()
.withNewSpec()
.withReplicas(0)
.withNewSelector()
.withMatchLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endSelector()
.withNewTemplate()
.withNewMetadata()
.withLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endMetadata()
.withNewSpec()
.withContainers(new V1ContainerBuilder()
.withName(containerName)
.withImage(this.getImage(streamRecord))
.withEnv(this.getEnvVar(streamRecord))
.build())
.endSpec()
.endTemplate()
.endSpec();
V1Deployment v1Deployment = v1DeploymentBuilder.build();
try {
V1Deployment namespacedDeployment = appsV1Api.createNamespacedDeployment(namespace, v1Deployment, null, null, null);
return new StreamDeploymentStatus(namespacedDeployment.getMetadata().getName(), namespacedDeployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
@Override
public StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
}
V1Deployment v1Deployment = this.patchReplicas(deploymentName, 1);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.RUNNING, LocalDateTime.now());
}
@Override
public StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
}
V1Deployment v1Deployment = this.patchReplicas(deploymentName, 0);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
}
private V1Deployment patchReplicas(String deploymentName, int replicas) {
String patchString = String.format(PATCH_REPLICAS, replicas);
try {
return PatchUtils.patch(
V1Deployment.class,
() ->
appsV1Api.patchNamespacedDeploymentCall(
deploymentName,
this.namespace,
new V1Patch(patchString),
null,
null,
null, // field-manager is optional
null,
null),
V1Patch.PATCH_FORMAT_JSON_PATCH,
appsV1Api.getApiClient());
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
@Override
public void deleteStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
try {
String deploymentName = this.getDeploymentName(streamRecord);
appsV1Api.deleteNamespacedDeployment(deploymentName, this.namespace, null, null, null, null, null, null);
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
public void deleteAllStreamDeployments() {
try {
appsV1Api.deleteCollectionNamespacedDeployment(this.namespace, null, null, null, null, null, LABEL_SELECTOR, null, null, null, null, null, null, null);
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
private Optional<V1Deployment> findExistingDeployment(String deploymentName) {
try {
V1DeploymentList v1DeploymentList = appsV1Api.listNamespacedDeployment(this.namespace, null, null, null, null, LABEL_SELECTOR, null, null, null, null, null);
return v1DeploymentList.getItems().stream().filter(v1Deployment -> v1Deployment.getMetadata().getName().equals(deploymentName)).findFirst();
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
private void validateStreamRecord(@NotNull StreamRecord streamRecord) {
if (streamRecord.getId() == null) {
throw new StreamAdminException("StreamRecord.id is null");
}
if (streamRecord.getKind() == null) {
throw new StreamAdminException("StreamRecord.kind is null");
}
if (streamRecord.getData() == null) {
throw new StreamAdminException("StreamRecord.data is null");
}
if (streamRecord.getData().getDatasetProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment.image is null");
}
}
private String getNormalizedName(StreamRecord streamRecord) {
return streamRecord.getId().substring(streamRecord.getId().lastIndexOf(':') + 1);
}
private String getDeploymentName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), DEPLOYMENT_NAME_SUFFIX);
}
private String getContainerName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), CONTAINER_NAME_SUFFIX);
}
private String getImage(StreamRecord streamRecord) {
String image = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage();
if (Objects.isNull(image)) {
throw new StreamAdminException("Unable to deploy: ExtensionProperties.StreamDeployment.Image is null");
}
return image;
}
private List<V1EnvVar> getEnvVar(StreamRecord streamRecord) {
Map<String, String> envMap = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getEnv();
if (Objects.isNull(envMap) || envMap.isEmpty()) {
return Collections.emptyList();
}
return envMap.entrySet().stream().map(entry ->
new V1EnvVarBuilder()
.withName(entry.getKey())
.withValue(entry.getValue())
.build()
).collect(Collectors.toList());
}
}
package org.opengroup.osdu.streaming.service;
import java.time.LocalDateTime;
import java.util.Objects;
public class StreamDeploymentStatus {
public enum Status {
RUNNING, STOPPED
}
private String deploymentName;
private String namespace;
//TODO This isn't really the status, but rather a reflection of the replicas count.
private Status status;
private LocalDateTime dateTime;
public StreamDeploymentStatus(String deploymentName, String namespace, Status status, LocalDateTime dateTime) {
this.deploymentName = deploymentName;
this.namespace = namespace;
this.status = status;
this.dateTime = dateTime;
}
public String getDeploymentName() {
return deploymentName;
}
public String getNamespace() {
return namespace;
}
public Status getStatus() {
return status;
}
public LocalDateTime getDateTime() {
return dateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamDeploymentStatus that = (StreamDeploymentStatus) o;
return Objects.equals(deploymentName, that.deploymentName) && Objects.equals(namespace, that.namespace) && status == that.status && Objects.equals(dateTime, that.dateTime);
}
@Override
public int hashCode() {
return Objects.hash(deploymentName, namespace, status, dateTime);
}
@Override
public String toString() {
return "StreamDeploymentStatus{" +
"deploymentName='" + deploymentName + '\'' +
", namespace='" + namespace + '\'' +
", status=" + status +
", dateTime=" + dateTime +
'}';
}
}
......@@ -25,7 +25,6 @@ import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
......@@ -41,17 +40,20 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private DpsHeaders headers;
@Autowired
private IStorageFactory storageFactory;
@Autowired
private TopicAdminService topicAdminService;
private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
}
@Override
public StreamRecord getStream(String streamRecordId) {
......@@ -108,6 +110,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// create topic
this.topicAdminService.createTopic(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
......
......@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
......@@ -31,7 +31,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(List.of(streamRecord.getKind())).values();
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
......@@ -56,7 +56,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
//.partitions(10)
//.replicas(3)
.build();
adminClient.createTopics(List.of(newTopic)).all().get();
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e);
......@@ -65,7 +65,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override
public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(List.of(streamRecord.getKind()));
adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
}
}
......@@ -10,7 +10,6 @@ import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@Configuration
public class KafkaAdminConfiguration {
......@@ -27,10 +26,11 @@ public class KafkaAdminConfiguration {
}
@Bean
public AdminClient adminClient() throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
AdminClient adminClient = KafkaAdminClient.create(configs);
public AdminClient adminClient() {
Map<String, Object> configurationMap = new HashMap<>();
configurationMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configurationMap.put(AdminClientConfig.CLIENT_ID_CONFIG, "stream_admin_service");
AdminClient adminClient = KafkaAdminClient.create(configurationMap);
return adminClient;
}
......
package org.opengroup.osdu.streaming.util;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class KubernetesAdminConfiguration {
public KubernetesAdminConfiguration() throws IOException {
ApiClient client = Config.defaultClient();
io.kubernetes.client.openapi.Configuration.setDefaultApiClient(client);
client.setDebugging(true);
}
@Bean
public CoreV1Api coreV1Api() {
return new CoreV1Api();
}
@Bean
public AppsV1Api appsV1Api() {
return new AppsV1Api();
}
}
......@@ -7,4 +7,7 @@ spring.main.allow-bean-definition-overriding=true
logging.level.root=INFO
osdu.storage.api=https://blah:1234/api/storage/v2
\ No newline at end of file
osdu.storage.api=https://blah:1234/api/storage/v2
deployment.namespace=osdu-streams
\ No newline at end of file
https://keda.sh/docs/1.4/concepts/scaling-deployments/#leverage-the-container-lifecycle
# The KEDA scaled object - REF: https://keda.sh/docs/1.4/scalers/apache-kafka/
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
namespace: default
spec:
scaleTargetRef:
deploymentName: azure-functions-deployment
pollingInterval: 30
triggers:
- type: kafka
metadata:
bootstrapServers: localhost:9092
consumerGroup: my-group # Make sure that this consumer group name is the same one as the one that is consuming topics
topic: test-topic
# Optional
lagThreshold: "50"
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: streams-${deploymentName}-deployment
namespace: osdu-streams
spec:
replicas: 1
selector:
matchLabels:
run: ${deploymentName}
template:
metadata:
labels:
run: ${deploymentName}
spec:
containers:
- image: ${image}
ports:
- containerPort: ${containerPort}
env:
- name: ENV1
value: "env1_value"