Commit 72921614 authored by Dmitry Kniazev's avatar Dmitry Kniazev
Browse files

Merge branch '1-implement-topic-creation' into 'develop'

Resolve "Implement Topic Creation"

See merge request !5
parents ed4edfb0 0318f6f3
Pipeline #72143 failed with stages
in 3 minutes and 24 seconds
version: '3.8'
services:
zookeeper:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
\ No newline at end of file
mvnw 100644 → 100755
File mode changed from 100644 to 100755
......@@ -29,6 +29,7 @@
<junit-version>4.12</junit-version>
<jackson-databind-nullable>0.2.1</jackson-databind-nullable>
<springfox-version>3.0.0</springfox-version>
<testcontainers.version>1.15.3</testcontainers.version>
</properties>
<licenses>
......@@ -96,6 +97,10 @@
<artifactId>springfox-boot-starter</artifactId>
<version>${springfox-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.swagger.parser.v3</groupId>
......@@ -108,6 +113,27 @@
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
......
......@@ -41,7 +41,7 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminControllerImpl implements StreamApi {
Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminControllerImpl.class);
@Autowired
private StreamingAdminService streamingAdminService;
......
......@@ -14,38 +14,45 @@
package org.opengroup.osdu.streaming.di;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.http.json.HttpResponseBodyMapper;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.StorageAPIConfig;
import org.opengroup.osdu.core.common.storage.StorageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class StorageClientFactory extends AbstractFactoryBean<IStorageFactory> {
Logger logger = LoggerFactory.getLogger(StorageClientFactory.class);
private static final Logger logger = LoggerFactory.getLogger(StorageClientFactory.class);
@Autowired
private ObjectMapper objectMapper;
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
private final HttpResponseBodyMapper bodyMapper = new HttpResponseBodyMapper(objectMapper);
private HttpResponseBodyMapper bodyMapper;
@Value("${osdu.storage.api}")
private String STORAGE_API;
@PostConstruct
public void postConstruct() {
bodyMapper = new HttpResponseBodyMapper(objectMapper);
}
@Override
public Class<?> getObjectType() {
return IStorageFactory.class;
}
@Override
protected IStorageFactory createInstance() throws Exception {
protected IStorageFactory createInstance() {
logger.debug("Creating instance of IStorageFactory... " + this.getClass().getName());
return new StorageFactory(StorageAPIConfig.builder().rootUrl(STORAGE_API).build(), bodyMapper);
}
......
......@@ -14,8 +14,6 @@
package org.opengroup.osdu.streaming.di;
import javax.servlet.http.HttpServletRequest;
import org.opengroup.osdu.core.common.http.DpsHeaderFactory;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.slf4j.Logger;
......@@ -24,6 +22,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.annotation.RequestScope;
import javax.servlet.http.HttpServletRequest;
/**
* Web Configuration for Streaming Services
*
......@@ -33,7 +33,7 @@ import org.springframework.web.context.annotation.RequestScope;
@Configuration
public class StreamingBeansConfig {
Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class);
@Bean
@RequestScope
......
......@@ -15,9 +15,8 @@
package org.opengroup.osdu.streaming.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException;
......@@ -41,13 +40,10 @@ import org.springframework.web.context.annotation.RequestScope;
@RequestScope
public class StreamingAdminServiceImpl implements StreamingAdminService {
Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
// TODO: Turn FAIL_ON_UNKNOWN_PROPERTIES on and change the API interface to
// handle all the fields properly, since this is now ignoring some neccessary
// fields (like ExtensionProperties and others)
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private DpsHeaders headers;
......@@ -55,6 +51,9 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
@Autowired
private IStorageFactory storageFactory;
@Autowired
private TopicAdminService topicAdminService;
@Override
public StreamRecord getStream(String streamRecordId) {
StreamRecord sRec = null;
......@@ -107,6 +106,9 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// try inserting a new record to storage
upRec = storageService.upsertRecord(rec);
// create topic
NewTopic newTopic = this.topicAdminService.createTopic(streamRecord);
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
......
package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface TopicAdminService {
NewTopic createTopic(StreamRecord streamRecord);
}
package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service;
@Service
public class TopicAdminServiceImpl implements TopicAdminService {
private static final Logger logger = LoggerFactory.getLogger(TopicAdminService.class);
@Override
public NewTopic createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
return TopicBuilder.name(streamRecord.getKind())
//TODO: How to setup partitions and topics
.partitions(10)
.replicas(3)
.build();
}
}
package org.opengroup.osdu.streaming.util;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaTopicConfiguration {
@Value(value = "${kafka.bootstrapAddress:localhost:9092}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
package org.opengroup.osdu.streaming.util;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class ObjectMapperConfiguration {
@Bean
@Primary
public ObjectMapper objectMapper() {
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
}
}
package org.opengroup.osdu.streaming.service;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TopicAdminServiceTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private TopicAdminService topicAdminService;
@Test
public void topicCreation() {
StreamRecord streamRecord = new StreamRecord();
streamRecord.setKind("test");
NewTopic newTopic = topicAdminService.createTopic(streamRecord);
assertThat(newTopic).isNotNull();
assertThat(newTopic.name()).isEqualTo("test");
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment