diff --git a/provider/notification-aws/pom.xml b/provider/notification-aws/pom.xml index 66898238af4ca7fb177c81e8d0f03e2206a7fffb..f2b59ca50b54f0f6fdb9b6588e40f43624edceb5 100644 --- a/provider/notification-aws/pom.xml +++ b/provider/notification-aws/pom.xml @@ -76,7 +76,7 @@ <dependency> <groupId>org.opengroup.osdu.core.aws</groupId> <artifactId>os-core-lib-aws</artifactId> - <version>0.23.0</version> + <version>0.23.0-dhudsons-notification-refactor-SNAPSHOT</version> </dependency> <dependency> <groupId>commons-io</groupId> diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java index 387c40eeb946bd486ab0394d1aed7ecbde0a3179..59df9e47f0a0e5352912d5a966526852f1f8e37c 100644 --- a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java @@ -24,6 +24,7 @@ import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; @@ -150,6 +151,9 @@ public class NotificationQueueServiceImpl implements NotificationQueueService { } private boolean notifySubscriber(Subscription subscription, String messageBody, Map<String, String> headerAttributes) { + // Only process this subscription if the topics match. + if (!subscription.getTopic().equals(headerAttributes.get(PublishRequestBuilder.OSDU_TOPIC_ATTRIBUTE_NAME))) return true; + HttpResponse response; try { response = notificationHandler.notifySubscriber(subscription, messageBody, headerAttributes); diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java index 0f9bbedf0a31ef19e6383da7e96ab800712576b7..585556babd565ae99c9c42cf6688e39a3158f5a9 100644 --- a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java @@ -16,6 +16,8 @@ package org.opengroup.osdu.notification.provider.aws.queue.impl; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.withSettings; @@ -39,7 +41,9 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationQueueServiceImpl; @@ -75,6 +79,8 @@ public class NotificationQueueServiceImplTest { private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + private static final String DEFAULT_MESSAGE_TOPIC = "some-topic"; + @BeforeClass public static void setup() { mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, @@ -98,7 +104,7 @@ public class NotificationQueueServiceImplTest { Subscription subscription1 = new Subscription(); subscription1.setName("My listener"); - subscription1.setTopic("records-changed"); + subscription1.setTopic(DEFAULT_MESSAGE_TOPIC); subscription1.setPushEndpoint("/api/test/subscriber"); subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); subscription1.setId("testSubscription1Id"); @@ -123,6 +129,18 @@ public class NotificationQueueServiceImplTest { assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); } + @Test + public void processNotificationMessage_withNoSubuscriberForTopic() throws Exception { + List<FailedNotificationDoc> list = new ArrayList<>(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition", "non-subscribed-topic")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + + verify(notificationHandler, never()).notifySubscriber(any(), any(), any()); + } + @Test public void processNotificationMessage_noSubscribers_success() { when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Collections.emptyList()); @@ -233,13 +251,19 @@ public class NotificationQueueServiceImplTest { assertEquals("testMessage2", responseMessageList.get(1).getMessageId()); } - private Message createMessage(String messageId, String dataPartitionId) { + private void addMessageAttribute(Message message, String key, String value) { + message.getMessageAttributes().put(key, new MessageAttributeValue().withDataType("String").withStringValue(value)); + } + + private Message createMessage(String messageId, String dataPartitionId, String topicName) { Message message = new Message(); message.setMessageId(messageId); - MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue() - .withDataType("String"); - dataPartitionIdAttribute.setStringValue(dataPartitionId); - message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute); + addMessageAttribute(message, DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + addMessageAttribute(message, PublishRequestBuilder.OSDU_TOPIC_ATTRIBUTE_NAME, topicName); return message; } + + private Message createMessage(String messageId, String dataPartitionId) { + return createMessage(messageId, dataPartitionId, DEFAULT_MESSAGE_TOPIC); + } }