Commit 6a117808 authored by Bhushan Rade's avatar Bhushan Rade
Browse files

Merge branch 'core_common_version-ibm' into 'master'

Core common version ibm

See merge request !28
parents f0bd8620 79a9e1cc
Pipeline #63325 passed with stages
in 2 minutes and 13 seconds
......@@ -47,7 +47,7 @@ The following software have components provided under the terms of this license:
- JSON Small and Fast Parser (from https://repo1.maven.org/maven2/net/minidev/json-smart)
- JSON Web Token support for the JVM (from https://repo1.maven.org/maven2/io/jsonwebtoken/jjwt)
- Jackson dataformat: CBOR (from http://github.com/FasterXML/jackson-dataformats-binary)
- Jackson datatype: JSR310 (from https://repo1.maven.org/maven2/com/fasterxml/jackson/datatype/jackson-datatype-jsr310)
- Jackson datatype: JSR310 (from http://wiki.fasterxml.com/JacksonModuleJSR310)
- Jackson datatype: jdk8 (from https://repo1.maven.org/maven2/com/fasterxml/jackson/datatype/jackson-datatype-jdk8)
- Jackson-annotations (from http://github.com/FasterXML/jackson)
- Jackson-core (from https://github.com/FasterXML/jackson-core)
......@@ -107,11 +107,9 @@ The following software have components provided under the terms of this license:
- T-Digest (from https://github.com/tdunning/t-digest)
- aggs-matrix-stats (from https://github.com/elastic/elasticsearch)
- asm (from http://asm.ow2.io/)
- cli (from https://github.com/elastic/elasticsearch)
- compiler (from http://github.com/spullara/mustache.java)
- elasticsearch-cli (from https://github.com/elastic/elasticsearch)
- elasticsearch-core (from https://github.com/elastic/elasticsearch)
- elasticsearch-secure-sm (from https://github.com/elastic/elasticsearch)
- elasticsearch-x-content (from https://github.com/elastic/elasticsearch)
- core (from https://github.com/elastic/elasticsearch)
- error-prone annotations (from https://repo1.maven.org/maven2/com/google/errorprone/error_prone_annotations)
- io.grpc:grpc-context (from https://github.com/grpc/grpc-java)
- ion-java (from https://github.com/amznlabs/ion-java/)
......@@ -130,6 +128,7 @@ The following software have components provided under the terms of this license:
- rest (from https://github.com/elastic/elasticsearch)
- rest-high-level (from https://github.com/elastic/elasticsearch)
- rxjava (from https://github.com/ReactiveX/RxJava)
- secure-sm (from https://github.com/elastic/elasticsearch)
- server (from https://github.com/elastic/elasticsearch)
- spring-boot (from https://spring.io/projects/spring-boot)
- spring-boot-autoconfigure (from https://spring.io/projects/spring-boot)
......@@ -139,17 +138,18 @@ The following software have components provided under the terms of this license:
- spring-boot-starter-security (from https://spring.io/projects/spring-boot)
- spring-boot-starter-validation (from https://spring.io/projects/spring-boot)
- spring-boot-starter-web (from https://spring.io/projects/spring-boot)
- spring-security-config (from https://spring.io/projects/spring-security)
- spring-security-core (from https://spring.io/projects/spring-security)
- spring-security-oauth2-core (from https://spring.io/projects/spring-security)
- spring-security-oauth2-jose (from https://spring.io/projects/spring-security)
- spring-security-oauth2-resource-server (from https://spring.io/projects/spring-security)
- spring-security-web (from https://spring.io/projects/spring-security)
- spring-security-config (from http://spring.io/spring-security)
- spring-security-core (from http://spring.io/spring-security)
- spring-security-oauth2-core (from http://spring.io/spring-security)
- spring-security-oauth2-jose (from http://spring.io/spring-security)
- spring-security-oauth2-resource-server (from http://spring.io/spring-security)
- spring-security-web (from http://spring.io/spring-security)
- swagger-annotations (from https://repo1.maven.org/maven2/io/swagger/swagger-annotations)
- swagger-jaxrs (from )
- tomcat-embed-core (from http://tomcat.apache.org/)
- tomcat-embed-el (from https://tomcat.apache.org/)
- tomcat-embed-websocket (from https://tomcat.apache.org/)
- x-content (from https://github.com/elastic/elasticsearch)
========================================================================
BSD-2-Clause
......@@ -284,7 +284,7 @@ The following software have components provided under the terms of this license:
- SLF4J API Module (from http://www.slf4j.org)
- java jwt (from https://github.com/auth0/java-jwt)
- mockito-core (from https://github.com/mockito/mockito)
- spring-security-core (from https://spring.io/projects/spring-security)
- spring-security-core (from http://spring.io/spring-security)
========================================================================
MPL-1.1
......
......@@ -7,6 +7,7 @@ import java.util.Set;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicSession;
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
......@@ -27,8 +28,10 @@ import com.google.gson.Gson;
public class StatusEventPublisher implements IEventPublisher {
private static final String FAILED_TO_PUBLISH_STATUS = "Failed to publish status. ";
private final static String RECORDS_CHANGED_QUEUE_NAME="records-changed";
private Set<String> existingQueues = new HashSet<String>();
private final static String RECORDS_CHANGED_TOPIC="records-changed-topic";
String correlationId = null;
String dataPartitionId = null;
private Set<String> existingTopics = new HashSet<String>();
private Logger log = LogManager.getLogger(StatusEventPublisher.class);
......@@ -38,52 +41,56 @@ public class StatusEventPublisher implements IEventPublisher {
@Override
public void publish(Message[] messages, Map<String, String> attributes) throws CoreException {
String correlationId = attributes.get(DpsHeaders.CORRELATION_ID);
String dataPartitionId = attributes.get(DpsHeaders.DATA_PARTITION_ID);
correlationId = attributes.get(DpsHeaders.CORRELATION_ID);
dataPartitionId = attributes.get(DpsHeaders.DATA_PARTITION_ID);
log.info(DpsHeaders.CORRELATION_ID + " " + correlationId + DpsHeaders.DATA_PARTITION_ID + " " + dataPartitionId
+ " status msgs: " + messages);
String queueNameWithPrefix = dataPartitionId + "-" + RECORDS_CHANGED_QUEUE_NAME;
String topicNameWithPrefix = dataPartitionId + "-" + RECORDS_CHANGED_TOPIC;
String msg = "";
if (existingQueues.contains(queueNameWithPrefix)) {
if (existingTopics.contains(topicNameWithPrefix)) {
for(Message message:messages) {
msg = gson.toJson(message);
sendMessageToQueue(queueNameWithPrefix, msg, -1L );
sendMessageToTopic(topicNameWithPrefix, msg, -1L );
}
}
else {
String createdQueue = createQueue(queueNameWithPrefix);
log.info("queue created successfully" + createdQueue);
existingQueues.add(createdQueue);
String createdTopic = createTopic(topicNameWithPrefix);
log.info("address created successfully" + createdTopic);
existingTopics.add(createdTopic);
for(Message message:messages) {
msg = gson.toJson(message);
sendMessageToQueue(queueNameWithPrefix, msg, -1L );
sendMessageToTopic(topicNameWithPrefix, msg, -1L );
}
}
}
public String createQueue(String queueNameWithPrefix) {
public String createTopic(String topicNameWithPrefix) {
try {
ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createQueue(queueNameWithPrefix);
TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createTopic(topicNameWithPrefix);
session.close();
} catch (Exception e) {
log.error("Failed to create queue", e);
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "could not create queue" + queueNameWithPrefix,
log.error("Failed to create topic", e);
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "could not create address" + topicNameWithPrefix,
e.getMessage(), e);
}
return queueNameWithPrefix;
return topicNameWithPrefix;
}
public void sendMessageToQueue(String queueName, String msg, Long delay) {
public void sendMessageToTopic(String topicName, String msg, Long delay) {
try {
jmsTemplate.setDeliveryDelay(delay);
jmsTemplate.convertAndSend(queueName, msg);
log.info("[x] Sent '" + msg + "' to queue [" + queueName + "] with delay " + delay);
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(topicName, msg, m -> {
log.info("setting custom JMS headers before sending");
m.setStringProperty("data_partition_id", dataPartitionId);
return m;});
log.info("[x] Sent '" + msg + "' to topic [" + topicName + "] with delay " + delay);
} catch (JmsException e) {
log.error(FAILED_TO_PUBLISH_STATUS +" queue name [" + queueName + "]");
log.error(FAILED_TO_PUBLISH_STATUS +" topic name [" + topicName + "]");
log.error(e.getMessage(), e);
e.printStackTrace();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment