Commit 4ae0321a authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Implemented TopicAdminService deleteTopic and getTopic. Added tests to cover the methods.

parent 0318f6f3
Pipeline #72144 failed with stages
in 2 minutes and 58 seconds
...@@ -101,7 +101,6 @@ ...@@ -101,7 +101,6 @@
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.swagger.parser.v3</groupId> <groupId>io.swagger.parser.v3</groupId>
<artifactId>swagger-parser</artifactId> <artifactId>swagger-parser</artifactId>
...@@ -178,8 +177,8 @@ ...@@ -178,8 +177,8 @@
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <version>3.8.1</version>
<configuration> <configuration>
<source>1.8</source> <source>11</source>
<target>1.8</target> <target>11</target>
<verbose>false</verbose> <verbose>false</verbose>
</configuration> </configuration>
</plugin> </plugin>
......
package org.opengroup.osdu.streaming.exception;
public class StreamAdminException extends RuntimeException {
public StreamAdminException(String message) {
super(message);
}
public StreamAdminException(String message, Throwable cause) {
super(message, cause);
}
public StreamAdminException(Throwable cause) {
super(cause);
}
}
...@@ -16,7 +16,6 @@ package org.opengroup.osdu.streaming.service; ...@@ -16,7 +16,6 @@ package org.opengroup.osdu.streaming.service;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record; import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException; import org.opengroup.osdu.core.common.model.storage.StorageException;
...@@ -108,7 +107,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -108,7 +107,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
upRec = storageService.upsertRecord(rec); upRec = storageService.upsertRecord(rec);
// create topic // create topic
NewTopic newTopic = this.topicAdminService.createTopic(streamRecord); this.topicAdminService.createTopic(streamRecord);
} catch (StorageException e) { } catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
......
package org.opengroup.osdu.streaming.service; package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import java.util.Optional;
public interface TopicAdminService { public interface TopicAdminService {
NewTopic createTopic(StreamRecord streamRecord); Optional<TopicDescription> getTopic(StreamRecord streamRecord);
TopicDescription createTopic(StreamRecord streamRecord);
void deleteTopic(StreamRecord streamRecord);
} }
package org.opengroup.osdu.streaming.service; package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@Service @Service
public class TopicAdminServiceImpl implements TopicAdminService { public class TopicAdminServiceImpl implements TopicAdminService {
private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class); private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class);
private AdminClient adminClient;
public TopicAdminServiceImpl(AdminClient adminClient) {
this.adminClient = adminClient;
}
@Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(List.of(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return Optional.empty();
}
throw new StreamAdminException(e);
} catch (InterruptedException e) {
throw new StreamAdminException(e);
}
}
@Override @Override
public NewTopic createTopic(StreamRecord streamRecord) { public TopicDescription createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord); logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
return TopicBuilder.name(streamRecord.getKind()) Optional<TopicDescription> optional = this.getTopic(streamRecord);
//TODO: How to setup partitions and topics if (optional.isPresent()){
.partitions(10) return optional.get();
.replicas(3) }
.build(); try {
NewTopic newTopic = TopicBuilder.name(streamRecord.getKind())
//TODO: How to setup partitions and replicas
//.partitions(10)
//.replicas(3)
.build();
adminClient.createTopics(List.of(newTopic)).all().get();
return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e);
}
}
@Override
public void deleteTopic(StreamRecord streamRecord) {
adminClient.deleteTopics(List.of(streamRecord.getKind()));
} }
} }
package org.opengroup.osdu.streaming.util; package org.opengroup.osdu.streaming.util;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -8,9 +10,10 @@ import org.springframework.kafka.core.KafkaAdmin; ...@@ -8,9 +10,10 @@ import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
@Configuration @Configuration
public class KafkaTopicConfiguration { public class KafkaAdminConfiguration {
@Value(value = "${kafka.bootstrapAddress:localhost:9092}") @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
private String bootstrapAddress; private String bootstrapAddress;
...@@ -19,7 +22,16 @@ public class KafkaTopicConfiguration { ...@@ -19,7 +22,16 @@ public class KafkaTopicConfiguration {
public KafkaAdmin kafkaAdmin() { public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs); KafkaAdmin kafkaAdmin = new KafkaAdmin(configs);
return kafkaAdmin;
}
@Bean
public AdminClient adminClient() throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
AdminClient adminClient = KafkaAdminClient.create(configs);
return adminClient;
} }
} }
...@@ -2,4 +2,5 @@ ...@@ -2,4 +2,5 @@
server.servlet.contextPath=/api/streaming/v1/ server.servlet.contextPath=/api/streaming/v1/
# Profile # Profile
spring.profiles.active=local spring.profiles.active=local
\ No newline at end of file spring.main.allow-bean-definition-overriding=true
\ No newline at end of file
package org.opengroup.osdu.streaming.service; package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class TopicAdminServiceTest { public class TopicAdminServiceTest {
@ClassRule @ClassRule
public static KafkaContainer kafka = public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired @Autowired
private TopicAdminService topicAdminService; private TopicAdminService topicAdminService;
@Test @Test
public void topicCreation() { public void topicCreationAndRetrieval() {
StreamRecord streamRecord = new StreamRecord();
streamRecord.setKind(UUID.randomUUID().toString());
TopicDescription topicDescription = topicAdminService.createTopic(streamRecord);
assertThat(topicDescription).isNotNull();
Optional<TopicDescription> optionalTopicDescription = topicAdminService.getTopic(streamRecord);
assertThat(optionalTopicDescription.isPresent()).isTrue();
}
@Test
public void topicCreationAndDeletion() {
StreamRecord streamRecord = new StreamRecord();
streamRecord.setKind(UUID.randomUUID().toString());
TopicDescription topicDescription = topicAdminService.createTopic(streamRecord);
assertThat(topicDescription).isNotNull();
topicAdminService.deleteTopic(streamRecord);
Optional<TopicDescription> optionalTopicDescription = topicAdminService.getTopic(streamRecord);
assertThat(optionalTopicDescription.isEmpty()).isTrue();
}
@Test
public void topicCreateSameName() {
StreamRecord streamRecord = new StreamRecord(); StreamRecord streamRecord = new StreamRecord();
streamRecord.setKind("test"); streamRecord.setKind(UUID.randomUUID().toString());
NewTopic newTopic = topicAdminService.createTopic(streamRecord); TopicDescription topicDescription = topicAdminService.createTopic(streamRecord);
assertThat(newTopic).isNotNull(); assertThat(topicDescription).isNotNull();
assertThat(newTopic.name()).isEqualTo("test"); TopicDescription topicDescription2 = topicAdminService.createTopic(streamRecord);
assertThat(topicDescription).isEqualTo(topicDescription2);
}
@TestConfiguration
static class KafkaTestContainersConfiguration {
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
KafkaAdmin kafkaAdmin = new KafkaAdmin(configs);
return kafkaAdmin;
}
@Bean
public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
AdminClient adminClient = KafkaAdminClient.create(configs);
return adminClient;
}
} }
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment