Commit b815ca90 authored by Dmitry Kniazev's avatar Dmitry Kniazev
Browse files

Merge branch 'demo/aws' into 'develop'

testing in AWS environment

See merge request !12
parents c87a67f1 91c27f04
Pipeline #77112 failed with stage
in 1 minute and 26 seconds
FROM maven:3-jdk-8-alpine FROM openjdk:8-jre
WORKDIR /usr/src/app WORKDIR /usr/lib/osdu
COPY . /usr/src/app COPY ./target/*.jar /usr/lib/osdu
RUN mvn package #RUN mvn package
ENV PORT 5000 ENV PORT 8080
EXPOSE $PORT EXPOSE $PORT
CMD [ "sh", "-c", "mvn -Dserver.port=${PORT} spring-boot:run" ] #CMD [ "sh", "-c", "mvn -Dserver.port=${PORT} spring-boot:run" ]
CMD [ "sh", "java", "-jar", "stream-admin-service-0.0.1-SNAPSHOT.jar" ]
...@@ -16,12 +16,14 @@ package org.opengroup.osdu.streaming.service; ...@@ -16,12 +16,14 @@ package org.opengroup.osdu.streaming.service;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record; import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.core.common.model.storage.StorageException; import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords; import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory; import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService; import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamDatasetDatasetProperties.StreamTypeEnum;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -47,7 +49,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -47,7 +49,7 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
private DeploymentAdminService deploymentAdminService; private DeploymentAdminService deploymentAdminService;
public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory, public StreamingAdminServiceImpl(ObjectMapper objectMapper, DpsHeaders headers, IStorageFactory storageFactory,
TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) { TopicAdminService topicAdminService, DeploymentAdminService deploymentAdminService) {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.headers = headers; this.headers = headers;
this.storageFactory = storageFactory; this.storageFactory = storageFactory;
...@@ -109,10 +111,18 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { ...@@ -109,10 +111,18 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
upRec = storageService.upsertRecord(rec); upRec = storageService.upsertRecord(rec);
// create topic // create topic
this.topicAdminService.createTopic(streamRecord); // this.topicAdminService.createTopic(streamRecord);
StreamTypeEnum streamType = streamRecord.getData().getDatasetProperties().getStreamType();
logger.debug("Stream Type: {}", streamType);
if (streamType == StreamTypeEnum.SOURCE)
this.topicAdminService.createTopics(streamRecord.getData().getDatasetProperties().getStreamDefinition()
.getSinkBindings().stream().toArray(String[]::new));
// create deployment
this.deploymentAdminService.createStreamDeployment(streamRecord); this.deploymentAdminService.createStreamDeployment(streamRecord);
} catch (StorageException e) { } catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); logger.error("Got exception from storage service: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString()); logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
} }
......
...@@ -11,6 +11,8 @@ public interface TopicAdminService { ...@@ -11,6 +11,8 @@ public interface TopicAdminService {
TopicDescription createTopic(StreamRecord streamRecord); TopicDescription createTopic(StreamRecord streamRecord);
void createTopics(String[] topicList);
void deleteTopic(StreamRecord streamRecord); void deleteTopic(StreamRecord streamRecord);
} }
package org.opengroup.osdu.streaming.service; package org.opengroup.osdu.streaming.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.opengroup.osdu.streaming.exception.StreamAdminException; import org.opengroup.osdu.streaming.exception.StreamAdminException;
import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.model.StreamRecord;
...@@ -12,11 +21,6 @@ import org.slf4j.LoggerFactory; ...@@ -12,11 +21,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@Service @Service
public class TopicAdminServiceImpl implements TopicAdminService { public class TopicAdminServiceImpl implements TopicAdminService {
...@@ -31,8 +35,11 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -31,8 +35,11 @@ public class TopicAdminServiceImpl implements TopicAdminService {
@Override @Override
public Optional<TopicDescription> getTopic(StreamRecord streamRecord) { public Optional<TopicDescription> getTopic(StreamRecord streamRecord) {
try { try {
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient.describeTopics(Collections.singletonList(streamRecord.getKind())).values(); Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap = adminClient
return kafkaFutureMap.containsKey(streamRecord.getKind()) ? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get()) : Optional.empty(); .describeTopics(Collections.singletonList(streamRecord.getKind())).values();
return kafkaFutureMap.containsKey(streamRecord.getKind())
? Optional.of(kafkaFutureMap.get(streamRecord.getKind()).get())
: Optional.empty();
} catch (ExecutionException e) { } catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) { if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return Optional.empty(); return Optional.empty();
...@@ -47,20 +54,21 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -47,20 +54,21 @@ public class TopicAdminServiceImpl implements TopicAdminService {
public TopicDescription createTopic(StreamRecord streamRecord) { public TopicDescription createTopic(StreamRecord streamRecord) {
logger.debug("Creating Topic for StreamRecord: {}", streamRecord); logger.debug("Creating Topic for StreamRecord: {}", streamRecord);
Optional<TopicDescription> optional = this.getTopic(streamRecord); Optional<TopicDescription> optional = this.getTopic(streamRecord);
if (optional.isPresent()){ if (optional.isPresent()) {
return optional.get(); return optional.get();
} }
try { try {
NewTopic newTopic = TopicBuilder.name(streamRecord.getKind()) NewTopic newTopic = TopicBuilder.name(streamRecord.getKind())
//TODO: How to setup partitions and replicas // TODO: How to setup partitions and replicas
//.partitions(10) // .partitions(10)
//.replicas(3) // .replicas(3)
.build(); .build();
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
return getTopic(streamRecord).get(); return getTopic(streamRecord).get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new StreamAdminException(e); throw new StreamAdminException(e);
} }
} }
@Override @Override
...@@ -68,4 +76,24 @@ public class TopicAdminServiceImpl implements TopicAdminService { ...@@ -68,4 +76,24 @@ public class TopicAdminServiceImpl implements TopicAdminService {
adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind())); adminClient.deleteTopics(Collections.singletonList(streamRecord.getKind()));
} }
@Override
public void createTopics(String[] topicList) {
logger.debug("Creating Topic(s): {}", topicList);
List<NewTopic> newTopics = new ArrayList<NewTopic>();
try {
for (String topicName : topicList) {
NewTopic newTopic = TopicBuilder.name(topicName).build();
newTopics.add(newTopic);
}
CreateTopicsResult topics = adminClient.createTopics(newTopics);
logger.debug("Creating Topic(s): {}", topics.all().get());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
logger.warn(e.getMessage());
} else {
logger.debug("Got exception with cause|class: {}|{}", e.getCause(), e.getClass());
throw new StreamAdminException(e);
}
}
}
} }
...@@ -14,7 +14,7 @@ import java.util.Map; ...@@ -14,7 +14,7 @@ import java.util.Map;
@Configuration @Configuration
public class KafkaAdminConfiguration { public class KafkaAdminConfiguration {
@Value(value = "${kafka.bootstrapAddress:localhost:9092}") @Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress; private String bootstrapAddress;
@Bean @Bean
......
# OSDU
osdu.storage.api=http://os-storage:8080/api/storage/v2
# Logging
logging.level.org.opengroup.osdu.streaming=debug
# spring web logging level
logging.level.web=debug
# this turns on debugging the http requests
spring.mvc.log-request-details=true
# Kafka
kafka.bootstrapAddress=kafka.osdu-streams:9092
\ No newline at end of file
...@@ -2,12 +2,9 @@ ...@@ -2,12 +2,9 @@
server.servlet.contextPath=/api/streaming/v1/ server.servlet.contextPath=/api/streaming/v1/
# Profile # Profile
spring.profiles.active=local spring.profiles.active=dev_aws
spring.main.allow-bean-definition-overriding=true spring.main.allow-bean-definition-overriding=true
logging.level.root=INFO logging.level.root=INFO
osdu.storage.api=https://blah:1234/api/storage/v2
deployment.namespace=osdu-streams deployment.namespace=osdu-streams
\ No newline at end of file
...@@ -4,7 +4,7 @@ metadata: ...@@ -4,7 +4,7 @@ metadata:
name: streams-${deploymentName}-deployment name: streams-${deploymentName}-deployment
namespace: osdu-streams namespace: osdu-streams
spec: spec:
replicas: 1 replicas: 0
selector: selector:
matchLabels: matchLabels:
run: ${deploymentName} run: ${deploymentName}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment