Commit 5643c9c0 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Tests working locally against minikube

parent 7915ff03
Pipeline #74482 failed with stage
in 2 minutes and 54 seconds
...@@ -48,7 +48,7 @@ ...@@ -48,7 +48,7 @@
<dependency> <dependency>
<groupId>io.kubernetes</groupId> <groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId> <artifactId>client-java</artifactId>
<version>10.0.0</version> <version>12.0.1</version>
</dependency> </dependency>
<!-- Test Dependencies --> <!-- Test Dependencies -->
...@@ -105,14 +105,6 @@ ...@@ -105,14 +105,6 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
......
...@@ -2,11 +2,13 @@ package org.opengroup.osdu.streaming.service; ...@@ -2,11 +2,13 @@ package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
public interface DeploymentAdminService { public interface DeploymentAdminService {
StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord); Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord);
StreamDeploymentStatus getStreamDeploymentStatus(String id); StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord);
StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord); StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord);
......
...@@ -3,7 +3,6 @@ package org.opengroup.osdu.streaming.service; ...@@ -3,7 +3,6 @@ package org.opengroup.osdu.streaming.service;
import io.kubernetes.client.custom.V1Patch; import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api; import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.*; import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.PatchUtils; import io.kubernetes.client.util.PatchUtils;
import org.opengroup.osdu.streaming.exception.StreamAdminException; import org.opengroup.osdu.streaming.exception.StreamAdminException;
...@@ -12,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value; ...@@ -12,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope; import org.springframework.web.context.annotation.RequestScope;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -22,41 +22,56 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService { ...@@ -22,41 +22,56 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
private static final String PATCH_REPLICAS = "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]"; private static final String PATCH_REPLICAS = "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]";
private CoreV1Api coreV1Api;
private AppsV1Api appsV1Api; private AppsV1Api appsV1Api;
private String namespace; private String namespace;
private String deploymentSuffix = "deployment"; private static final String DEPLOYMENT_NAME_SUFFIX = "-deployment";
private String selectorMatchLabelKey = "run"; private static final String CONTAINER_NAME_SUFFIX = "-container";
private static final String SELECTOR_MATCH_LABEL_KEY = "run";
public DeploymentAdminServiceImpl(CoreV1Api coreV1Api, AppsV1Api appsV1Api, @Value("${deployment.namespace}") String namespace) { public DeploymentAdminServiceImpl(@Value("${deployment.namespace}") String namespace, AppsV1Api appsV1Api) {
this.coreV1Api = coreV1Api;
this.appsV1Api = appsV1Api;
this.namespace = namespace; this.namespace = namespace;
this.appsV1Api = appsV1Api;
}
@Override
public Optional<StreamDeploymentStatus> findStreamDeploymentStatus(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord);
Optional<V1Deployment> optional = findExistingDeployment(deploymentName);
return optional.isPresent() ? Optional.of(new StreamDeploymentStatus(
optional.get().getMetadata().getName(),
optional.get().getMetadata().getNamespace(),
optional.get().getSpec().getReplicas() > 0 ? StreamDeploymentStatus.Status.RUNNING : StreamDeploymentStatus.Status.STOPPED,
LocalDateTime.now())) : Optional.empty();
} }
@Override @Override
public StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord) { public StreamDeploymentStatus createStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
String normalizedName = this.getNormalizedName(streamRecord);
String deploymentName = this.getDeploymentName(streamRecord); String deploymentName = this.getDeploymentName(streamRecord);
if (this.findExistingDeployment(namespace, deploymentName).isPresent()) { String containerName = this.getContainerName(streamRecord);
throw new StreamAdminException("Unable to create "); if (this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to create StreamDeployment. A deployment already exists for %s.", deploymentName));
} }
V1DeploymentBuilder v1DeploymentBuilder = new V1DeploymentBuilder(); V1DeploymentBuilder v1DeploymentBuilder = new V1DeploymentBuilder();
v1DeploymentBuilder v1DeploymentBuilder
.withNewMetadata() .withNewMetadata()
.withName(String.format("%s-%s", deploymentName, this.deploymentSuffix)) .withName(deploymentName)
.withNamespace(this.namespace) .withNamespace(this.namespace)
.endMetadata() .endMetadata()
.withNewSpec() .withNewSpec()
.withReplicas(0) .withReplicas(0)
.withNewSelector() .withNewSelector()
.withMatchLabels(Map.of(this.selectorMatchLabelKey, deploymentName)) .withMatchLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endSelector() .endSelector()
.withNewTemplate() .withNewTemplate()
.withNewMetadata() .withNewMetadata()
.withLabels(Map.of(this.selectorMatchLabelKey, deploymentName)) .withLabels(Collections.singletonMap(SELECTOR_MATCH_LABEL_KEY, normalizedName))
.endMetadata() .endMetadata()
.withNewSpec() .withNewSpec()
.withContainers(new V1ContainerBuilder() .withContainers(new V1ContainerBuilder()
.withName(containerName)
.withImage(this.getImage(streamRecord)) .withImage(this.getImage(streamRecord))
.withEnv(this.getEnvVar(streamRecord)) .withEnv(this.getEnvVar(streamRecord))
.build()) .build())
...@@ -72,79 +87,45 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService { ...@@ -72,79 +87,45 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
} }
} }
private String getDeploymentName(StreamRecord streamRecord) {
return streamRecord.getKind();
}
private String getImage(StreamRecord streamRecord) {
String image = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage();
if (Objects.isNull(image)) {
throw new StreamAdminException("Unable to deploy: ExtensionProperties.StreamDeployment.Image is null");
}
return image;
}
private List<V1EnvVar> getEnvVar(StreamRecord streamRecord) {
Map<String, String> envMap = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getEnv();
if (envMap.isEmpty()) {
return Collections.emptyList();
}
return envMap.entrySet().stream().map(entry ->
new V1EnvVarBuilder()
.withName(entry.getKey())
.withValue(entry.getValue())
.build()
).collect(Collectors.toList());
}
@Override
public StreamDeploymentStatus getStreamDeploymentStatus(String id) {
Optional<V1Deployment> optional = findExistingDeployment(this.namespace, id);
return optional.isPresent() ? new StreamDeploymentStatus(
optional.get().getMetadata().getName(),
optional.get().getMetadata().getNamespace(),
optional.get().getSpec().getReplicas() > 0 ? StreamDeploymentStatus.Status.RUNNING : StreamDeploymentStatus.Status.STOPPED,
LocalDateTime.now()) : null;
}
@Override @Override
public StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord) { public StreamDeploymentStatus startStreamDeployment(StreamRecord streamRecord) {
if (this.findExistingDeployment(this.namespace, this.getDeploymentName(streamRecord)).isEmpty()) { this.validateStreamRecord(streamRecord);
throw new StreamAdminException("Unable to start StreamDeployment - Does not exist. "); String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
} }
V1Deployment v1Deployment = this.patchReplicas(this.getDeploymentName(streamRecord), 1); V1Deployment v1Deployment = this.patchReplicas(deploymentName, 1);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.RUNNING, LocalDateTime.now()); return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.RUNNING, LocalDateTime.now());
} }
@Override @Override
public StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord) { public StreamDeploymentStatus stopStreamDeployment(StreamRecord streamRecord) {
if (this.findExistingDeployment(this.namespace, this.getDeploymentName(streamRecord)).isEmpty()) { this.validateStreamRecord(streamRecord);
throw new StreamAdminException("Unable to stop StreamDeployment - Does not exist. "); String deploymentName = this.getDeploymentName(streamRecord);
if (!this.findExistingDeployment(deploymentName).isPresent()) {
throw new StreamAdminException(String.format("Unable to find StreamDeployment for %s", deploymentName));
} }
V1Deployment v1Deployment = this.patchReplicas(this.getDeploymentName(streamRecord), 0); V1Deployment v1Deployment = this.patchReplicas(deploymentName, 0);
return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now()); return new StreamDeploymentStatus(v1Deployment.getMetadata().getName(), v1Deployment.getMetadata().getNamespace(), StreamDeploymentStatus.Status.STOPPED, LocalDateTime.now());
} }
private V1Deployment patchReplicas(String deploymentName, int replicas) { private V1Deployment patchReplicas(String deploymentName, int replicas) {
String jsonPatchStr = String.format(PATCH_REPLICAS, 1); String patchString = String.format(PATCH_REPLICAS, 1);
try { try {
V1Deployment v1Deployment = return PatchUtils.patch(
PatchUtils.patch( V1Deployment.class,
V1Deployment.class, () ->
() -> appsV1Api.patchNamespacedDeploymentCall(
appsV1Api.patchNamespacedDeploymentCall( deploymentName,
deploymentName, this.namespace,
this.namespace, new V1Patch(patchString),
new V1Patch(jsonPatchStr), null,
null, null,
null, null, // field-manager is optional
null, // field-manager is optional null,
null, null),
null), V1Patch.PATCH_FORMAT_JSON_PATCH,
V1Patch.PATCH_FORMAT_JSON_PATCH, appsV1Api.getApiClient());
appsV1Api.getApiClient());
return v1Deployment;
} catch (ApiException e) { } catch (ApiException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
} }
...@@ -152,20 +133,79 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService { ...@@ -152,20 +133,79 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
@Override @Override
public void deleteStreamDeployment(StreamRecord streamRecord) { public void deleteStreamDeployment(StreamRecord streamRecord) {
this.validateStreamRecord(streamRecord);
try { try {
appsV1Api.deleteNamespacedDeployment(this.getDeploymentName(streamRecord), this.namespace, null, null, null, null, null, null); String deploymentName = this.getDeploymentName(streamRecord);
appsV1Api.deleteNamespacedDeployment(deploymentName, this.namespace, null, null, null, null, null, null);
} catch (ApiException e) { } catch (ApiException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
} }
} }
private Optional<V1Deployment> findExistingDeployment(String namespace, String name) { private Optional<V1Deployment> findExistingDeployment(String deploymentName) {
try { try {
V1DeploymentList v1DeploymentList = appsV1Api.listNamespacedDeployment(namespace, null, null, null, null, null, null, null, null, null); V1DeploymentList v1DeploymentList = appsV1Api.listNamespacedDeployment(this.namespace, null, null, null, null, null, null, null, null, null, null);
return v1DeploymentList.getItems().stream().filter(v1Deployment -> v1Deployment.getMetadata().getName().equals(name)).findFirst(); return v1DeploymentList.getItems().stream().filter(v1Deployment -> v1Deployment.getMetadata().getName().equals(deploymentName)).findFirst();
} catch (ApiException e) { } catch (ApiException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
} }
} }
private void validateStreamRecord(@NotNull StreamRecord streamRecord) {
if (streamRecord.getId() == null) {
throw new StreamAdminException("StreamRecord.id is null");
}
if (streamRecord.getKind() == null) {
throw new StreamAdminException("StreamRecord.kind is null");
}
if (streamRecord.getData() == null) {
throw new StreamAdminException("StreamRecord.data is null");
}
if (streamRecord.getData().getDatasetProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment is null");
}
if (streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage() == null) {
throw new StreamAdminException("StreamRecord.data.datasetProperties.extensionProperties.streamDeployment.image is null");
}
}
private String getNormalizedName(StreamRecord streamRecord) {
return streamRecord.getId().substring(streamRecord.getId().lastIndexOf(':') + 1);
}
private String getDeploymentName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), DEPLOYMENT_NAME_SUFFIX);
}
private String getContainerName(StreamRecord streamRecord) {
return String.format("%s%s", this.getNormalizedName(streamRecord), CONTAINER_NAME_SUFFIX);
}
private String getImage(StreamRecord streamRecord) {
String image = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getImage();
if (Objects.isNull(image)) {
throw new StreamAdminException("Unable to deploy: ExtensionProperties.StreamDeployment.Image is null");
}
return image;
}
private List<V1EnvVar> getEnvVar(StreamRecord streamRecord) {
Map<String, String> envMap = streamRecord.getData().getDatasetProperties().getExtensionProperties().getStreamDeployment().getEnv();
if (envMap.isEmpty()) {
return Collections.emptyList();
}
return envMap.entrySet().stream().map(entry ->
new V1EnvVarBuilder()
.withName(entry.getKey())
.withValue(entry.getValue())
.build()
).collect(Collectors.toList());
}
} }
...@@ -25,7 +25,6 @@ import org.opengroup.osdu.core.common.storage.IStorageService; ...@@ -25,7 +25,6 @@ import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope; import org.springframework.web.context.annotation.RequestScope;
...@@ -41,17 +40,20 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -41,17 +40,20 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
@Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Autowired
private DpsHeaders headers; private DpsHeaders headers;
@Autowired
private IStorageFactory storageFactory; private IStorageFactory storageFactory;
@Autowired
private TopicAdminService topicAdminService; private TopicAdminService topicAdminService;
private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper;
this.headers = headers;
this.storageFactory = storageFactory;
this.topicAdminService = topicAdminService;
this.deploymentAdminService = deploymentAdminService;
}
@Override @Override
public StreamRecord getStream(String streamRecordId) { public StreamRecord getStream(String streamRecordId) {
...@@ -108,6 +110,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -108,6 +110,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// create topic // create topic
this.topicAdminService.createTopic(streamRecord); this.topicAdminService.createTopic(streamRecord);
this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) { } catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
......
...@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory; ...@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -31,7 +31,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -31,7 +31,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override @Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) { public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try { try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(List.of(streamRecord.getKind())).values(); Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty(); return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty();
} catch (ExecutionException e) { } catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) { if (e.getCause() instanceof UnknownTopicOrPartitionException) {
...@@ -56,7 +56,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -56,7 +56,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
//.partitions(10) //.partitions(10)
//.replicas(3) //.replicas(3)
.build(); .build();
adminClient.createTopics(List.of(newTopic)).all().get(); adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get(); return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
...@@ -65,7 +65,7 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -65,7 +65,7 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override @Override
public void deleteTopic(StreamRecord streamRecord) { public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(List.of(streamRecord.getKind())); adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
} }
} }
...@@ -15,6 +15,7 @@ public class KubernetesAdminConfiguration { ...@@ -15,6 +15,7 @@ public class KubernetesAdminConfiguration {
public KubernetesAdminConfiguration() throws IOException { public KubernetesAdminConfiguration() throws IOException {
ApiClient client = Config.defaultClient(); ApiClient client = Config.defaultClient();
io.kubernetes.client.openapi.Configuration.setDefaultApiClient(client); io.kubernetes.client.openapi.Configuration.setDefaultApiClient(client);
client.setDebugging(true);
} }
@Bean @Bean
......
package org.opengroup.osdu.streaming.service; package org.opengroup.osdu.streaming.service;
import org.junit.Test; import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith; import org.opengroup.osdu.streaming.model.*;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner; import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class) @SpringBootTest
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class DeploymentAdminServiceTest { public class DeploymentAdminServiceTest {
@Autowired @Autowired
private DeploymentAdminService deploymentAdminService; private DeploymentAdminService deploymentAdminService;
@Test @Test
public void createStream() { public void getStreamDeploymentStatusNotDeployed() {
StreamRecord streamRecord = new