Skip to content
Snippets Groups Projects
Commit 8d64a346 authored by Vibhuti Sharma [Microsoft]'s avatar Vibhuti Sharma [Microsoft]
Browse files

Use worker logs from core-lib-azure

parent 83a4bcfe
No related branches found
No related tags found
3 merge requests!232Update os-core-lib-azure,!231initial commit,!204Use worker logs from core-lib-azure
...@@ -14,39 +14,25 @@ ...@@ -14,39 +14,25 @@
package org.opengroup.osdu.notification.provider.azure.messageBus; package org.opengroup.osdu.notification.provider.azure.messageBus;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage; import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.SubscriptionClient; import com.microsoft.azure.servicebus.SubscriptionClient;
import org.opengroup.osdu.azure.servicebus.AbstractMessageHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture; public class MessageHandler extends AbstractMessageHandler {
public class MessageHandler implements IMessageHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class); private final static Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
private final SubscriptionClient receiveClient;
private ProcessNotification processNotification; private ProcessNotification processNotification;
private String subscriptionName;
public MessageHandler(SubscriptionClient client, ProcessNotification processNotification) { public MessageHandler(SubscriptionClient client, ProcessNotification processNotification, String appName) {
this.receiveClient = client; super(appName, client);
this.subscriptionName = client.getSubscriptionName();
this.processNotification = processNotification; this.processNotification = processNotification;
} }
@Override public void processMessage(IMessage message) throws Exception {
public CompletableFuture<Void> onMessageAsync(IMessage message) { this.processNotification.performNotification(message, subscriptionName);
try {
this.processNotification.performNotification(message, receiveClient.getSubscriptionName());
return this.receiveClient.completeAsync(message.getLockToken());
} catch (Exception e) {
LOGGER.error("Unable to process the Notification : " + e);
return this.receiveClient.abandonAsync(message.getLockToken());
}
}
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
LOGGER.error("{} - {}", exceptionPhase, throwable.getMessage());
} }
} }
...@@ -121,7 +121,7 @@ public class SubscriptionManagerImpl implements ISubscriptionManager { ...@@ -121,7 +121,7 @@ public class SubscriptionManagerImpl implements ISubscriptionManager {
private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) throws ServiceBusException, InterruptedException { private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) throws ServiceBusException, InterruptedException {
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification); MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification, azureServiceBusConfig.getSpringApplicationName());
subscriptionClient.registerMessageHandler( subscriptionClient.registerMessageHandler(
messageHandler, messageHandler,
new MessageHandlerOptions(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxConcurrentCalls()), new MessageHandlerOptions(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxConcurrentCalls()),
......
...@@ -24,6 +24,9 @@ import org.springframework.context.annotation.Configuration; ...@@ -24,6 +24,9 @@ import org.springframework.context.annotation.Configuration;
@Configuration @Configuration
@Getter @Getter
public class AzureServiceBusConfig { public class AzureServiceBusConfig {
@Value("${spring.application.name}")
private String springApplicationName;
@Value("${executor-n-threads}") @Value("${executor-n-threads}")
private String nThreads; private String nThreads;
......
...@@ -21,52 +21,59 @@ import org.junit.Test; ...@@ -21,52 +21,59 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.notification.provider.azure.messageBus.MessageHandler; import org.opengroup.osdu.notification.provider.azure.messageBus.MessageHandler;
import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification; import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification;
import java.time.Instant;
import java.util.UUID; import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class MessageHandlerTest { public class MessageHandlerTest {
private static final UUID uuid = UUID.randomUUID();
private static final String subscriptionName = "TestSubscription"; private static final String subscriptionName = "TestSubscription";
private static final String errorMsg = "Error processing notification";
private static final String appName = "test";
@InjectMocks
private MessageHandler messageHandler; private MessageHandler messageHandler;
@Mock
private ProcessNotification processNotification; private ProcessNotification processNotification;
@Mock
private SubscriptionClient subscriptionClient;
@Mock
private Message message; private Message message;
private SubscriptionClient subscriptionClient;
@Before @Before
public void init() { public void init() {
when(message.getLockToken()).thenReturn(uuid); subscriptionClient = mock(SubscriptionClient.class);
message = mock(Message.class);
processNotification = mock(ProcessNotification.class);
when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName); when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName);
messageHandler = new MessageHandler(subscriptionClient, processNotification, appName);
} }
@Test @Test
public void shouldInvokeCompleteAsync() throws Exception { public void shouldInvoke_performNotification() throws Exception {
lenient().doNothing().when(processNotification).performNotification(message, subscriptionName); lenient().doNothing().when(processNotification).performNotification(message, subscriptionName);
messageHandler.onMessageAsync(message); messageHandler.processMessage(message);
verify(subscriptionClient, times(1)).completeAsync(uuid);
verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName());
} }
@Test @Test
public void shouldInvokeAbandonAsyncWhenProcessNotificationThrowsException() throws Exception { public void shouldThrow_WhenProcessNotificationThrowsException() throws Exception {
doThrow(new Exception()).when(processNotification).performNotification(message, subscriptionName); doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName);
messageHandler.onMessageAsync(message); try {
verify(subscriptionClient, times(1)).abandonAsync(uuid); messageHandler.processMessage(message);
verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); } catch (Exception e) {
verify(processNotification, times(1)).performNotification(message, subscriptionName);
assertEquals(e.getMessage().compareTo(errorMsg), 0);
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment