diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java index bef7ffd10d50865a05591beaa13e90394ff9c882..724c6319f201fa811b58e71690990cbab54de26c 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java @@ -14,39 +14,25 @@ package org.opengroup.osdu.notification.provider.azure.messageBus; -import com.microsoft.azure.servicebus.ExceptionPhase; import com.microsoft.azure.servicebus.IMessage; -import com.microsoft.azure.servicebus.IMessageHandler; import com.microsoft.azure.servicebus.SubscriptionClient; +import org.opengroup.osdu.azure.servicebus.AbstractMessageHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; - -public class MessageHandler implements IMessageHandler { +public class MessageHandler extends AbstractMessageHandler { private final static Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class); - private final SubscriptionClient receiveClient; private ProcessNotification processNotification; + private String subscriptionName; - public MessageHandler(SubscriptionClient client, ProcessNotification processNotification) { - this.receiveClient = client; + public MessageHandler(SubscriptionClient client, ProcessNotification processNotification, String appName) { + super(appName, client); + this.subscriptionName = client.getSubscriptionName(); this.processNotification = processNotification; } - @Override - public CompletableFuture<Void> onMessageAsync(IMessage message) { - try { - this.processNotification.performNotification(message, receiveClient.getSubscriptionName()); - return this.receiveClient.completeAsync(message.getLockToken()); - } catch (Exception e) { - LOGGER.error("Unable to process the Notification : " + e); - return this.receiveClient.abandonAsync(message.getLockToken()); - } - } - - @Override - public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) { - LOGGER.error("{} - {}", exceptionPhase, throwable.getMessage()); + public void processMessage(IMessage message) throws Exception { + this.processNotification.performNotification(message, subscriptionName); } } diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java index d4dfe1dbf5995ec122f20d1d093961df607c6921..e5f444306d6d71d5ff6852f2ef2130a9678f3bdc 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java @@ -121,7 +121,7 @@ public class SubscriptionManagerImpl implements ISubscriptionManager { private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) throws ServiceBusException, InterruptedException { - MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification); + MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification, azureServiceBusConfig.getSpringApplicationName()); subscriptionClient.registerMessageHandler( messageHandler, new MessageHandlerOptions(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxConcurrentCalls()), diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java index b9df67e2e4f6aec63c1c7c8cbbd818fff2653d05..bbfdc55935e3f209fe04e448771094c99248c031 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java @@ -24,6 +24,9 @@ import org.springframework.context.annotation.Configuration; @Configuration @Getter public class AzureServiceBusConfig { + @Value("${spring.application.name}") + private String springApplicationName; + @Value("${executor-n-threads}") private String nThreads; diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java index 62579f4323a801d2230e318628ffd13e223ae507..0744b8d21806bc521418a787e56c110d98e77519 100644 --- a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java @@ -21,52 +21,59 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.jupiter.MockitoExtension; import org.opengroup.osdu.notification.provider.azure.messageBus.MessageHandler; import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification; +import java.time.Instant; import java.util.UUID; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class MessageHandlerTest { - private static final UUID uuid = UUID.randomUUID(); private static final String subscriptionName = "TestSubscription"; + private static final String errorMsg = "Error processing notification"; + private static final String appName = "test"; - @InjectMocks private MessageHandler messageHandler; - @Mock private ProcessNotification processNotification; - @Mock - private SubscriptionClient subscriptionClient; - - @Mock private Message message; + private SubscriptionClient subscriptionClient; + @Before public void init() { - when(message.getLockToken()).thenReturn(uuid); + subscriptionClient = mock(SubscriptionClient.class); + message = mock(Message.class); + processNotification = mock(ProcessNotification.class); + when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName); + + messageHandler = new MessageHandler(subscriptionClient, processNotification, appName); } @Test - public void shouldInvokeCompleteAsync() throws Exception { + public void shouldInvoke_performNotification() throws Exception { lenient().doNothing().when(processNotification).performNotification(message, subscriptionName); - messageHandler.onMessageAsync(message); - verify(subscriptionClient, times(1)).completeAsync(uuid); + messageHandler.processMessage(message); verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); } @Test - public void shouldInvokeAbandonAsyncWhenProcessNotificationThrowsException() throws Exception { - doThrow(new Exception()).when(processNotification).performNotification(message, subscriptionName); - messageHandler.onMessageAsync(message); - verify(subscriptionClient, times(1)).abandonAsync(uuid); - verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); + public void shouldThrow_WhenProcessNotificationThrowsException() throws Exception { + doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName); + try { + messageHandler.processMessage(message); + } catch (Exception e) { + verify(processNotification, times(1)).performNotification(message, subscriptionName); + assertEquals(e.getMessage().compareTo(errorMsg), 0); + } } }