Commit 7915ff03 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Added implementation and updated definition of the stream record to handle...

Added implementation and updated definition of the stream record to handle extension properties associated with the StreamDeployment such as "ExtensionProperties.StreamDeployment.Image"
parent 92965cde
Pipeline #74418 failed with stage
in 50 seconds
test:
stage: test
image: openjdk:11
image: openjdk:8
script: ./mvnw -ntp test
tags: ['docker-runner']
\ No newline at end of file
......@@ -350,6 +350,20 @@ components:
$ref: "#/components/schemas/StreamDefinition"
ExtensionProperties:
type: object
properties:
StreamDeployment:
type: object
properties:
Image:
type: string
example: "quay.io/org/image:tag"
Env:
type: object
additionalProperties:
type: string
example:
ENV1: "ENV VALUE 1"
ENV2: "ENV VALUE 2"
StreamDefinition:
type: object
required:
......
......@@ -17,7 +17,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<!-- Dependency Versions -->
......
......@@ -4,14 +4,14 @@ import org.opengroup.osdu.streaming.model.StreamRecord;
public interface DeploymentAdminService {
StreamRecord createStream(StreamRecord streamRecord);
StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord);
StreamRecord getStreamInfo(String id);
StreamDeploymentStatus getStreamDeploymentStatus(String id);
void startStream(StreamRecord streamRecord);
StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord);
void stopStream(StreamRecord streamRecord);
StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord);
void deleteStream(StreamRecord streamRecord);
void deleteStreamDeployment(StreamRecord streamRecord);
}
package org.opengroup.osdu.streaming.service;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.PatchUtils;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import java.util.Map;
import java.util.Optional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Service
@RequestScope
public class DeploymentAdminServiceImpl implements DeploymentAdminService {
private static final String PATCH_REPLICAS = "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]";
private CoreV1Api coreV1Api;
private AppsV1Api appsV1Api;
private String namespace;
......@@ -30,12 +35,13 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
}
@Override
public StreamRecord createStream(StreamRecord streamRecord) {
public StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord) {
String deploymentName = this.getDeploymentName(streamRecord);
if (this.findExistingDeployment(namespace, deploymentName).isPresent()){
if (this.findExistingDeployment(namespace, deploymentName).isPresent()) {
throw new StreamAdminException("Unable to create ");
}
V1Deployment deployment = new V1DeploymentBuilder()
V1DeploymentBuilder v1DeploymentBuilder = new V1DeploymentBuilder();
v1DeploymentBuilder
.withNewMetadata()
.withName(String.format("%s-%s", deploymentName, this.deploymentSuffix))
.withNamespace(this.namespace)
......@@ -51,43 +57,106 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
.endMetadata()
.withNewSpec()
.withContainers(new V1ContainerBuilder()
.withImage("TEST")
.withImage(this.getImage(streamRecord))
.withEnv(this.getEnvVar(streamRecord))
.build())
.endSpec()
.endTemplate()
.endSpec()
.build();
.endSpec();
V1Deployment v1Deployment = v1DeploymentBuilder.build();
try {
V1Deployment namespacedDeployment = appsV1Api.createNamespacedDeployment(namespace, deployment, null, null, null);
V1Deployment namespacedDeployment = appsV1Api.createNamespacedDeployment(namespace, v1Deployment, null, null, null);
return new StreamDeploymentStatus(namespacedDeployment.getMetadata().getName(), namespacedDeployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
} catch (ApiException e) {
throw new StreamAdminException(e);
}
return streamRecord;
}
public String getDeploymentName(StreamRecord streamRecord) {
private String getDeploymentName(StreamRecord streamRecord) {
return streamRecord.getKind();
}
private String getImage(StreamRecord streamRecord) {
String image = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage();
if (Objects.isNull(image)) {
throw new StreamAdminException("Unable to deploy: ExtensionProperties.StreamDeployment.Image is null");
}
return image;
}
private List<V1EnvVar> getEnvVar(StreamRecord streamRecord) {
Map<String, String> envMap = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getEnv();
if (envMap.isEmpty()) {
return Collections.emptyList();
}
return envMap.entrySet().stream().map(entry ->
new V1EnvVarBuilder()
.withName(entry.getKey())
.withValue(entry.getValue())
.build()
).collect(Collectors.toList());
}
@Override
public StreamRecord getStreamInfo(String id) {
return null;
public StreamDeploymentStatus getStreamDeploymentStatus(String id) {
Optional<V1Deployment> optional = findExistingDeployment(this.namespace, id);
return optional.isPresent() ? new StreamDeploymentStatus(
optional.get().getMetadata().getName(),
optional.get().getMetadata().getNamespace(),
optional.get().getSpec().getReplicas() > 0 ? StreamDeploymentStatus.Status.RUNNING : StreamDeploymentStatus.Status.STOPPED,
LocalDateTime.now()) : null;
}
@Override
public void startStream(StreamRecord streamRecord) {
//TODO Check if it's deployed
//TODO
public StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord) {
if (this.findExistingDeployment(this.namespace, this.getDeploymentName(streamRecord)).isEmpty()) {
throw new StreamAdminException("Unable to start StreamDeployment - Does not exist. ");
}
V1Deployment v1Deployment = this.patchReplicas(this.getDeploymentName(streamRecord), 1);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.RUNNING, LocalDateTime.now());
}
@Override
public void stopStream(StreamRecord streamRecord) {
public StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord) {
if (this.findExistingDeployment(this.namespace, this.getDeploymentName(streamRecord)).isEmpty()) {
throw new StreamAdminException("Unable to stop StreamDeployment - Does not exist. ");
}
V1Deployment v1Deployment = this.patchReplicas(this.getDeploymentName(streamRecord), 0);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
}
private V1Deployment patchReplicas(String deploymentName, int replicas) {
String jsonPatchStr = String.format(PATCH_REPLICAS, 1);
try {
V1Deployment v1Deployment =
PatchUtils.patch(
V1Deployment.class,
() ->
appsV1Api.patchNamespacedDeploymentCall(
deploymentName,
this.namespace,
new V1Patch(jsonPatchStr),
null,
null,
null, // field-manager is optional
null,
null),
V1Patch.PATCH_FORMAT_JSON_PATCH,
appsV1Api.getApiClient());
return v1Deployment;
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
@Override
public void deleteStream(StreamRecord streamRecord) {
public void deleteStreamDeployment(StreamRecord streamRecord) {
try {
appsV1Api.deleteNamespacedDeployment(this.getDeploymentName(streamRecord), this.namespace, null, null, null, null, null, null);
} catch (ApiException e) {
throw new StreamAdminException(e);
}
}
private Optional<V1Deployment> findExistingDeployment(String namespace, String name) {
......
package org.opengroup.osdu.streaming.service;
import java.time.LocalDateTime;
import java.util.Objects;
public class StreamDeploymentStatus {
public enum Status {
RUNNING, STOPPED
}
private String deploymentName;
private String namespace;
private Status status;
private LocalDateTime dateTime;
public StreamDeploymentStatus(String deploymentName, String namespace, Status status, LocalDateTime dateTime) {
this.deploymentName = deploymentName;
this.namespace = namespace;
this.status = status;
this.dateTime = dateTime;
}
public String getDeploymentName() {
return deploymentName;
}
public String getNamespace() {
return namespace;
}
public Status getStatus() {
return status;
}
public LocalDateTime getDateTime() {
return dateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamDeploymentStatus that = (StreamDeploymentStatus) o;
return Objects.equals(deploymentName, that.deploymentName) && Objects.equals(namespace, that.namespace) && status == that.status && Objects.equals(dateTime, that.dateTime);
}
@Override
public int hashCode() {
return Objects.hash(deploymentName, namespace, status, dateTime);
}
@Override
public String toString() {
return "StreamDeploymentStatus{" +
"deploymentName='" + deploymentName + '\'' +
", namespace='" + namespace + '\'' +
", status=" + status +
", dateTime=" + dateTime +
'}';
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment