Commit 49e9d218 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Implemented TopicAdminService using the Spring Kafka Admin API

parent a85ccfdf
Pipeline #69439 failed with stages
in 3 minutes and 16 seconds
mvnw 100644 → 100755
File mode changed from 100644 to 100755
......@@ -29,6 +29,7 @@
<junit-version>4.12</junit-version>
<jackson-databind-nullable>0.2.1</jackson-databind-nullable>
<springfox-version>3.0.0</springfox-version>
<testcontainers.version>1.15.3</testcontainers.version>
</properties>
<licenses>
......@@ -96,6 +97,10 @@
<artifactId>springfox-boot-starter</artifactId>
<version>${springfox-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.swagger.parser.v3</groupId>
......@@ -108,6 +113,22 @@
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
......
......@@ -41,7 +41,7 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminControllerImpl implements StreamApi {
Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
@Autowired
private StreamingAdminService streamingAdminService;
......
......@@ -27,18 +27,27 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
@Component
public class StorageClientFactory extends AbstractFactoryBean<IStorageFactory> {
Logger logger = LoggerFactory.getLogger(StorageClientFactory.class);
private static final Logger logger = LoggerFactory.getLogger(StorageClientFactory.class);
@Inject
private ObjectMapper objectMapper;
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
private final HttpResponseBodyMapper bodyMapper = new HttpResponseBodyMapper(objectMapper);
private HttpResponseBodyMapper bodyMapper;
@Value("${osdu.storage.api}")
private String STORAGE_API;
@PostConstruct
public void postConstruct() {
bodyMapper = new HttpResponseBodyMapper(objectMapper);
}
@Override
public Class<?> getObjectType() {
return IStorageFactory.class;
......
......@@ -33,7 +33,7 @@ import org.springframework.web.context.annotation.RequestScope;
@Configuration
public class StreamingBeansConfig {
Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class);
@Bean
@RequestScope
......
......@@ -18,6 +18,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
......@@ -41,13 +42,10 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminServiceImpl implements StreamingAdminService {
Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
// TODO: Turn FAIL_ON_UNKNOWN_PROPERTIES on and change the API interface to
// handle all the fields properly, since this is now ignoring some neccessary
// fields (like ExtensionProperties and others)
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private DpsHeaders headers;
......@@ -55,6 +53,9 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
@Autowired
private IStorageFactory storageFactory;
@Autowired
private TopicAdminService topicAdminService;
@Override
public StreamRecord getStream(String streamRecordId) {
StreamRecord sRec = null;
......@@ -107,6 +108,9 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// try inserting a new record to storage
upRec = storageService.upsertRecord(rec);
// create topic
NewTopic newTopic = this.topicAdminService.createTopic(streamRecord);
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
......
package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface TopicAdminService {
NewTopic createTopic(StreamRecord streamRecord);
}
package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service;
@Service
public class TopicAdminServiceImpl implements TopicAdminService {
private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class);
@Override
public NewTopic createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
return TopicBuilder.name(streamRecord.getKind())
.partitions(10)
.replicas(3)
.build();
}
}
package org.opengroup.osdu.streaming.util;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
public class ObjectMapperProvider {
@Bean
@Primary
public ObjectMapper objectMapper() {
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment