From 4cc4e0fe5715fd529432b3391e0d660a6c39f212 Mon Sep 17 00:00:00 2001
From: Derek Hudson <dhudsons@amazon.com>
Date: Thu, 21 Sep 2023 15:16:15 +0000
Subject: [PATCH] Added a filter based on the Subscription's OSDU topic before
 sending messages.

---
 provider/notification-aws/pom.xml             |  2 +-
 .../impl/NotificationQueueServiceImpl.java    |  4 +++
 .../NotificationQueueServiceImplTest.java     | 36 +++++++++++++++----
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/provider/notification-aws/pom.xml b/provider/notification-aws/pom.xml
index 66898238a..f2b59ca50 100644
--- a/provider/notification-aws/pom.xml
+++ b/provider/notification-aws/pom.xml
@@ -76,7 +76,7 @@
         <dependency>
             <groupId>org.opengroup.osdu.core.aws</groupId>
             <artifactId>os-core-lib-aws</artifactId>
-            <version>0.23.0</version>
+            <version>0.23.0-dhudsons-notification-refactor-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>
diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java
index 387c40eeb..59df9e47f 100644
--- a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java
+++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java
@@ -24,6 +24,7 @@ import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2;
 import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig;
 import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider;
 import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException;
+import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder;
 import org.opengroup.osdu.core.common.http.HttpResponse;
 import org.opengroup.osdu.core.common.model.notification.Subscription;
 import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc;
@@ -150,6 +151,9 @@ public class NotificationQueueServiceImpl implements NotificationQueueService {
     }
 
     private boolean notifySubscriber(Subscription subscription, String messageBody, Map<String, String> headerAttributes) {
+        // Only process this subscription if the topics match.
+        if (!subscription.getTopic().equals(headerAttributes.get(PublishRequestBuilder.OSDU_TOPIC_ATTRIBUTE_NAME))) return true;
+
         HttpResponse response;
         try {
             response = notificationHandler.notifySubscriber(subscription, messageBody, headerAttributes);
diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java
index 0f9bbedf0..585556bab 100644
--- a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java
+++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java
@@ -16,6 +16,8 @@ package org.opengroup.osdu.notification.provider.aws.queue.impl;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.withSettings;
@@ -39,7 +41,9 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory;
 import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2;
 import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider;
+import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder;
 import org.opengroup.osdu.core.common.http.HttpResponse;
+import org.opengroup.osdu.core.common.model.http.DpsHeaders;
 import org.opengroup.osdu.core.common.model.notification.Subscription;
 import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc;
 import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationQueueServiceImpl;
@@ -75,6 +79,8 @@ public class NotificationQueueServiceImplTest {
 
     private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction;
 
+    private static final String DEFAULT_MESSAGE_TOPIC = "some-topic";
+
     @BeforeClass
     public static void setup() {
         mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class,
@@ -98,7 +104,7 @@ public class NotificationQueueServiceImplTest {
 
         Subscription subscription1 = new Subscription();
         subscription1.setName("My listener");
-        subscription1.setTopic("records-changed");
+        subscription1.setTopic(DEFAULT_MESSAGE_TOPIC);
         subscription1.setPushEndpoint("/api/test/subscriber");
         subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a");
         subscription1.setId("testSubscription1Id");
@@ -123,6 +129,18 @@ public class NotificationQueueServiceImplTest {
         assertEquals("testMessage1", responseMessageList.get(0).getMessageId());
     }
 
+    @Test
+    public void processNotificationMessage_withNoSubuscriberForTopic() throws Exception {
+        List<FailedNotificationDoc> list = new ArrayList<>();
+        when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list))));
+
+        List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition", "non-subscribed-topic"));
+
+        List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList);
+
+        verify(notificationHandler, never()).notifySubscriber(any(), any(), any());
+    }
+
     @Test
     public void processNotificationMessage_noSubscribers_success() {
         when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Collections.emptyList());
@@ -233,13 +251,19 @@ public class NotificationQueueServiceImplTest {
         assertEquals("testMessage2", responseMessageList.get(1).getMessageId());
     }
 
-    private Message createMessage(String messageId, String dataPartitionId) {
+    private void addMessageAttribute(Message message, String key, String value) {
+        message.getMessageAttributes().put(key, new MessageAttributeValue().withDataType("String").withStringValue(value));
+    }
+
+    private Message createMessage(String messageId, String dataPartitionId, String topicName) {
         Message message = new Message();
         message.setMessageId(messageId);
-        MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue()
-            .withDataType("String");
-        dataPartitionIdAttribute.setStringValue(dataPartitionId);
-        message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute);
+        addMessageAttribute(message, DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
+        addMessageAttribute(message, PublishRequestBuilder.OSDU_TOPIC_ATTRIBUTE_NAME, topicName);
         return message;
     }
+
+    private Message createMessage(String messageId, String dataPartitionId) {
+        return createMessage(messageId, dataPartitionId, DEFAULT_MESSAGE_TOPIC);
+    }
 }
-- 
GitLab