Commit c87a67f1 authored by Dmitry Kniazev's avatar Dmitry Kniazev
Browse files

Merge branch '11-implement-deploymentadminservice' into 'develop'

DeploymentAdminService implementation completed based on existing requirements.

See merge request !11
parents 69587dbd 93aea7a9
Pipeline #74822 failed with stage
in 1 minute and 15 seconds
services:
- docker:19.03.12-dind
variables:
DOCKER_HOST: "tcp://docker:2375"
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: ""
test: test:
stage: test stage: test
image: openjdk:11 image: openjdk:8
script: ./mvnw -ntp test script: ./mvnw -ntp test
\ No newline at end of file tags: ['docker-runner']
\ No newline at end of file
...@@ -350,6 +350,20 @@ components: ...@@ -350,6 +350,20 @@ components:
$ref: "#/components/schemas/StreamDefinition" $ref: "#/components/schemas/StreamDefinition"
ExtensionProperties: ExtensionProperties:
type: object type: object
properties:
StreamDeployment:
type: object
properties:
Image:
type: string
example: "quay.io/org/image:tag"
Env:
type: object
additionalProperties:
type: string
example:
ENV1: "ENV VALUE 1"
ENV2: "ENV VALUE 2"
StreamDefinition: StreamDefinition:
type: object type: object
required: required:
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version> <java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target> <maven.compiler.target>${java.version}</maven.compiler.target>
<!-- Dependency Versions --> <!-- Dependency Versions -->
...@@ -45,6 +45,11 @@ ...@@ -45,6 +45,11 @@
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>12.0.1</version>
</dependency>
<!-- Test Dependencies --> <!-- Test Dependencies -->
<dependency> <dependency>
......
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
public interface DeploymentAdminService {
Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord);
StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord);
StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord);
StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord);
void deleteStreamDeployment(StreamRecord streamRecord);
void deleteAllStreamDeployments();
}
package org.opengroup.osdu.streaming.service;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.PatchUtils;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Service
@RequestScope
public class DeploymentAdminServiceImpl implements DeploymentAdminService {
private static final Logger LOGGER = LoggerFactory.getLogger(DeploymentAdminServiceImpl.class);
private static final String PATCH_REPLICAS = "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]";
private static final String DEPLOYMENT_NAME_SUFFIX = "-deployment";
private static final String CONTAINER_NAME_SUFFIX = "-container";
private static final String SELECTOR_MATCH_LABEL_KEY = "run";
private static final String LABEL_TYPE_NAME = "osdu-streams-type";
private static final String LABEL_TYPE_VALUE = "osdu-streams-deployment";
private static final String LABEL_SELECTOR = String.format("%s=%s", LABEL_TYPE_NAME, LABEL_TYPE_VALUE);
private AppsV1Api appsV1Api;
private String namespace;
public DeploymentAdminServiceImpl(@Value("${deployment.namespace}") String namespace, AppsV1Api appsV1Api) {
this.namespace = namespace;
this.appsV1Api = appsV1Api;
}
@Override
public Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
Optional<V1Deployment> optional = findExistingDeployment(deploymentName);
return optional.isPresent() ? Optional.of(new StreamDeploymentStatus(
optional.get().getMetadata().getName(),
optional.get().getMetadata().getNamespace(),
optional.get().getSpec().getReplicas() > 0 ? StreamDeploymentStatus.Status.RUNNING : StreamDeploymentStatus.Status.STOPPED,
LocalDateTime.now())) : Optional.empty();
}
@Override
public StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String normalizedName = this.getNormalizedName(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
String containerName = this.getContainerName(streamRecord);
if (this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to create StreamDeployment. A deployment already exists for %s.", deploymentName));
}
V1DeploymentBuilder v1DeploymentBuilder = new V1DeploymentBuilder();
v1DeploymentBuilder
.withNewMetadata()
.withName(deploymentName)
.withNamespace(this.namespace)
.withLabels(Collections.singletonMap(LABEL_TYPE_NAME, LABEL_TYPE_VALUE))
.endMetadata()
.withNewSpec()
.withReplicas(0)
.withNewSelector()
.withMatchLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endSelector()
.withNewTemplate()
.withNewMetadata()
.withLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endMetadata()
.withNewSpec()
.withContainers(new V1ContainerBuilder()
.withName(containerName)
.withImage(this.getImage(streamRecord))
.withEnv(this.getEnvVar(streamRecord))
.build())
.endSpec()
.endTemplate()
.endSpec();
V1Deployment v1Deployment = v1DeploymentBuilder.build();
try {
V1Deployment namespacedDeployment = appsV1Api.createNamespacedDeployment(namespace, v1Deployment, null, null, null);
return new StreamDeploymentStatus(namespacedDeployment.getMetadata().getName(), namespacedDeployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
@Override
public StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
}
V1Deployment v1Deployment = this.patchReplicas(deploymentName, 1);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.RUNNING, LocalDateTime.now());
}
@Override
public StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
}
V1Deployment v1Deployment = this.patchReplicas(deploymentName, 0);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
}
private V1Deployment patchReplicas(String deploymentName, int replicas) {
String patchString = String.format(PATCH_REPLICAS, replicas);
try {
return PatchUtils.patch(
V1Deployment.class,
() ->
appsV1Api.patchNamespacedDeploymentCall(
deploymentName,
this.namespace,
new V1Patch(patchString),
null,
null,
null, // field-manager is optional
null,
null),
V1Patch.PATCH_FORMAT_JSON_PATCH,
appsV1Api.getApiClient());
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
@Override
public void deleteStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
try {
String deploymentName = this.getDeploymentName(streamRecord);
appsV1Api.deleteNamespacedDeployment(deploymentName, this.namespace, null, null, null, null, null, null);
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
public void deleteAllStreamDeployments() {
try {
appsV1Api.deleteCollectionNamespacedDeployment(this.namespace, null, null, null, null, null, LABEL_SELECTOR, null, null, null, null, null, null, null);
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
private Optional<V1Deployment> findExistingDeployment(String deploymentName) {
try {
V1DeploymentList v1DeploymentList = appsV1Api.listNamespacedDeployment(this.namespace, null, null, null, null, LABEL_SELECTOR, null, null, null, null, null);
return v1DeploymentList.getItems().stream().filter(v1Deployment -> v1Deployment.getMetadata().getName().equals(deploymentName)).findFirst();
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
private void validateStreamRecord(@NotNull StreamRecord streamRecord) {
if (streamRecord.getId() == null) {
throw new StreamAdminException("StreamRecord.id is null");
}
if (streamRecord.getKind() == null) {
throw new StreamAdminException("StreamRecord.kind is null");
}
if (streamRecord.getData() == null) {
throw new StreamAdminException("StreamRecord.data is null");
}
if (streamRecord.getData().getDatasetProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment.image is null");
}
}
private String getNormalizedName(StreamRecord streamRecord) {
return streamRecord.getId().substring(streamRecord.getId().lastIndexOf(':') + 1);
}
private String getDeploymentName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), DEPLOYMENT_NAME_SUFFIX);
}
private String getContainerName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), CONTAINER_NAME_SUFFIX);
}
private String getImage(StreamRecord streamRecord) {
String image = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage();
if (Objects.isNull(image)) {
throw new StreamAdminException("Unable to deploy: ExtensionProperties.StreamDeployment.Image is null");
}
return image;
}
private List<V1EnvVar> getEnvVar(StreamRecord streamRecord) {
Map<String, String> envMap = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getEnv();
if (Objects.isNull(envMap) || envMap.isEmpty()) {
return Collections.emptyList();
}
return envMap.entrySet().stream().map(entry ->
new V1EnvVarBuilder()
.withName(entry.getKey())
.withValue(entry.getValue())
.build()
).collect(Collectors.toList());
}
}
package org.opengroup.osdu.streaming.service;
import java.time.LocalDateTime;
import java.util.Objects;
public class StreamDeploymentStatus {
public enum Status {
RUNNING, STOPPED
}
private String deploymentName;
private String namespace;
//TODO This isn't really the status, but rather a reflection of the replicas count.
private Status status;
private LocalDateTime dateTime;
public StreamDeploymentStatus(String deploymentName, String namespace, Status status, LocalDateTime dateTime) {
this.deploymentName = deploymentName;
this.namespace = namespace;
this.status = status;
this.dateTime = dateTime;
}
public String getDeploymentName() {
return deploymentName;
}
public String getNamespace() {
return namespace;
}
public Status getStatus() {
return status;
}
public LocalDateTime getDateTime() {
return dateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamDeploymentStatus that = (StreamDeploymentStatus) o;
return Objects.equals(deploymentName, that.deploymentName) && Objects.equals(namespace, that.namespace) && status == that.status && Objects.equals(dateTime, that.dateTime);
}
@Override
public int hashCode() {
return Objects.hash(deploymentName, namespace, status, dateTime);
}
@Override
public String toString() {
return "StreamDeploymentStatus{" +
"deploymentName='" + deploymentName + '\'' +
", namespace='" + namespace + '\'' +
", status=" + status +
", dateTime=" + dateTime +
'}';
}
}
...@@ -25,7 +25,6 @@ import org.opengroup.osdu.core.common.storage.IStorageService; ...@@ -25,7 +25,6 @@ import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope; import org.springframework.web.context.annotation.RequestScope;
...@@ -41,17 +40,20 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -41,17 +40,20 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
@Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Autowired
private DpsHeaders headers; private DpsHeaders headers;
@Autowired
private IStorageFactory storageFactory; private IStorageFactory storageFactory;
@Autowired
private TopicAdminService topicAdminService; private TopicAdminService topicAdminService;
private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
}
@Override @Override
public StreamRecord getStream(String streamRecordId) { public StreamRecord getStream(String streamRecordId) {
...@@ -108,6 +110,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -108,6 +110,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// create topic // create topic
this.topicAdminService.createTopic(streamRecord); this.topicAdminService.createTopic(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) { } catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
......
...@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory; ...@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -31,7 +31,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -31,7 +31,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override @Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) { public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try { try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(List.of(streamRecord.getKind())).values(); Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty(); return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty();
} catch (ExecutionException e) { } catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) { if (e.getCause() instanceof UnknownTopicOrPartitionException) {
...@@ -56,7 +56,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -56,7 +56,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
//.partitions(10) //.partitions(10)
//.replicas(3) //.replicas(3)
.build(); .build();
adminClient.createTopics(List.of(newTopic)).all().get(); adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get(); return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
...@@ -65,7 +65,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -65,7 +65,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override @Override
public void deleteTopic(StreamRecord streamRecord) { public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(List.of(streamRecord.getKind())); adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
} }
} }
...@@ -10,7 +10,6 @@ import org.springframework.kafka.core.KafkaAdmin; ...@@ -10,7 +10,6 @@ import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
@Configuration @Configuration
public class KafkaAdminConfiguration { public class KafkaAdminConfiguration {
...@@ -27,10 +26,11 @@ public class KafkaAdminConfiguration { ...@@ -27,10 +26,11 @@ public class KafkaAdminConfiguration {
} }
@Bean @Bean
public AdminClient adminClient() throws ExecutionException, InterruptedException { public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configurationMap = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configurationMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
AdminClient adminClient = KafkaAdminClient.create(configs); configurationMap.put(AdminClientConfig.CLIENT_ID_CONFIG, "stream_admin_service");
AdminClient adminClient = KafkaAdminClient.create(configurationMap);
return adminClient; return adminClient;
} }
......
package org.opengroup.osdu.streaming.util;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class KubernetesAdminConfiguration {
public KubernetesAdminConfiguration() throws IOException {
ApiClient client = Config.defaultClient();
io.kubernetes.client.openapi.Configuration.setDefaultApiClient(client);
client.setDebugging(true);
}
@Bean
public CoreV1Api coreV1Api() {
return new CoreV1Api();
}