Skip to content
Snippets Groups Projects
Commit 2a893ee5 authored by Vasyl Leskiv [SLB]'s avatar Vasyl Leskiv [SLB] Committed by Alok Joshi
Browse files

Add latency metric for notification service

parent 85475e78
No related branches found
No related tags found
1 merge request!552Add latency metric for notification service
Showing
with 281 additions and 32 deletions
/*
Copyright 2020-2025 Google LLC
Copyright 2020-2025 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.notification.util;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.springframework.stereotype.Component;
@Component
public class MetricServiceImpl implements IMetricService {
@Override
public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) {
//TODO
}
}
......@@ -74,7 +74,7 @@ public class PubsubEndpoint {
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
HttpResponse response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
HttpResponse response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes, true);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE + response.getBody());
return new ResponseEntity<String>(NOT_ACKNOWLEDGE, HttpStatus.valueOf(response.getResponseCode()));
......
// Copyright 2017-2025, SLB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.interfaces;
public interface IMetricService {
void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName);
}
......@@ -24,7 +24,9 @@ import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.opengroup.osdu.notification.provider.interfaces.ISubscriberNotificationRequestLogger;
import org.opengroup.osdu.notification.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -37,7 +39,8 @@ import java.util.Map;
@Component
public class NotificationHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(NotificationHandler.class);
private static final String X_COLLABORATION_HEADER = "x-collaboration";
private static final String X_COLLABORATION_HEADER = "x-collaboration";
@Autowired
private HttpClient httpClient;
@Autowired
......@@ -48,8 +51,10 @@ public class NotificationHandler {
private int WAITING_TIME;
@Autowired
private ISubscriberNotificationRequestLogger subscriberNotificationRequestLogger;
@Autowired
private IMetricService metricService;
public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes, boolean sendLatencyMetric) throws Exception {
Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
......@@ -77,8 +82,18 @@ public class NotificationHandler {
}
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
String messageEnqueueTimeAttribute = headerAttributes.get(Config.MESSAGE_ENQUEUED_TIME);
long messageEnqueuedTimestamp = 0L;
if (!Strings.isNullOrEmpty(messageEnqueueTimeAttribute)) {
messageEnqueuedTimestamp = Long.valueOf(messageEnqueueTimeAttribute);
}
Long timestampBeforeSendingToSubscriber = System.currentTimeMillis();
this.LOGGER.debug("Sending out notification to endpoint: " + endpoint);
HttpResponse response = httpClient.send(request);
if (sendLatencyMetric) {
metricService.sendLatencyMetric(timestampBeforeSendingToSubscriber - messageEnqueuedTimestamp, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID), headerAttributes.get(DpsHeaders.CORRELATION_ID), headerAttributes.get(Config.TOPIC_NAME));
}
subscriberNotificationRequestLogger.log(notificationId, subscription, response);
return response;
}
......
// Copyright 2017-2025, SLB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.utils;
public class Config {
public static final String VIEWER = "users.datalake.viewers";
public static final String EDITOR = "users.datalake.editors";
public static final String ADMIN = "users.datalake.admins";
public static final String OPS = "users.datalake.ops";
public static final String CRON = "cron.job";
public static final String PUBSUB = "notification.pubsub";
public static final String MESSAGE_ENQUEUED_TIME = "enqueuedTime";
public static final String TOPIC_NAME = "topicName";
}
......@@ -62,7 +62,7 @@ public class PubsubEndpointTest {
@Test
public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromNotificationHandler() throws Exception {
response.setResponseCode(200);
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenReturn(response);
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenReturn(response);
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(200, responseEntity.getStatusCode().value());
Assert.assertEquals("message acknowledged by client", responseEntity.getBody().toString());
......@@ -71,7 +71,7 @@ public class PubsubEndpointTest {
@Test
public void should_return400_whenPubsubMessageValidAndFailureCodeReturnedFromNotificationHandler() throws Exception {
response.setResponseCode(400);
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenReturn(response);
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenReturn(response);
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(400, responseEntity.getStatusCode().value());
Assert.assertEquals("message not acknowledged by client", responseEntity.getBody().toString());
......@@ -80,7 +80,7 @@ public class PubsubEndpointTest {
@Test(expected = Exception.class)
public void should_return400_whenPubsubMessageValidAndNotificationHandlerThrowsException() throws Exception {
response.setResponseCode(400);
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenThrow(new Exception("error"));
when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenThrow(new Exception("error"));
this.sut.recordChanged();
fail("should throw Exception");
}
......
......@@ -25,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.GsaSecret;
import org.opengroup.osdu.core.common.model.notification.GsaSecretValue;
import org.opengroup.osdu.core.common.model.notification.HmacSecret;
......@@ -32,8 +33,8 @@ import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.opengroup.osdu.notification.provider.interfaces.ISubscriberNotificationRequestLogger;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
import java.util.Map;
......@@ -41,7 +42,13 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opengroup.osdu.notification.utils.Config.MESSAGE_ENQUEUED_TIME;
import static org.opengroup.osdu.notification.utils.Config.TOPIC_NAME;
@RunWith(MockitoJUnitRunner.class)
public class NotificationHandlerTest {
......@@ -66,6 +73,10 @@ public class NotificationHandlerTest {
@Mock
private JaxRsDpsLog log;
@Mock
private IMetricService metricService;
@Mock
private ISubscriberNotificationRequestLogger subscriberNotificationRequestLogger;
......@@ -113,33 +124,74 @@ public class NotificationHandlerTest {
public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromClient_gsa() throws Exception {
response.setResponseCode(200);
Map<String, String> headers = new HashMap<String, String>();
headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604");
headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id");
headers.put(DpsHeaders.CORRELATION_ID, "correlation-id");
headers.put(TOPIC_NAME, "topicName");
when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(gsa_subscription);
when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth);
when(this.httpClient.send(any())).thenReturn(response);
when(this.secretAuth.getPushUrl(gsa_subscription.getPushEndpoint())).thenReturn(gsa_subscription.getPushEndpoint());
when(this.secretAuth.getRequestHeaders()).thenReturn(headers);
HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers);
HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true);
assertEquals(200, response.getResponseCode());
verify(metricService, times(1)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString());
}
@Test
public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromClient_hmac() throws Exception {
response.setResponseCode(200);
Map<String, String> headers = new HashMap<String, String>();
headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604");
headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id");
headers.put(DpsHeaders.CORRELATION_ID, "correlation-id");
headers.put(TOPIC_NAME, "topicName");
when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(hmac_subscription);
when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth);
when(this.httpClient.send(any())).thenReturn(response);
when(this.secretAuth.getPushUrl(hmac_subscription.getPushEndpoint())).thenReturn(hmac_subscription.getPushEndpoint());
when(this.secretAuth.getRequestHeaders()).thenReturn(headers);
HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers);
HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true);
assertEquals(200, response.getResponseCode());
verify(metricService, times(1)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString());
}
@Test(expected = SubscriptionException.class)
public void should_throwException_whenSubscriptionHandlerThrowsException() throws Exception {
Map<String, String> headers = new HashMap<String, String>();
when(subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenThrow(new SubscriptionException("error", response));
this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers);
this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true);
fail("should throw SubscriptionException");
}
@Test
public void should_notSendLatencyMetric_whenSubscriptionHandlerThrowsException() throws Exception {
Map<String, String> headers = new HashMap<String, String>();
when(subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenThrow(new SubscriptionException("error", response));
try {
this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true);
} catch (SubscriptionException e) {
verify(metricService, times(0)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString());
} catch (Exception ex) {
fail("should throw SubscriptionException");
}
}
@Test
public void should_return200WithoutSendingLatencyMetric_whenSendingLatencyMetricFlagSetToFalse() throws Exception {
response.setResponseCode(200);
Map<String, String> headers = new HashMap<String, String>();
headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604");
headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id");
headers.put(DpsHeaders.CORRELATION_ID, "correlation-id");
headers.put(TOPIC_NAME, "topicName");
when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(hmac_subscription);
when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth);
when(this.httpClient.send(any())).thenReturn(response);
when(this.secretAuth.getPushUrl(hmac_subscription.getPushEndpoint())).thenReturn(hmac_subscription.getPushEndpoint());
when(this.secretAuth.getRequestHeaders()).thenReturn(headers);
HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, false);
assertEquals(200, response.getResponseCode());
verify(metricService, times(0)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString());
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.aws.utils;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.springframework.stereotype.Component;
@Component
public class MetricServiceImpl implements IMetricService {
@Override
public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) {
//TODO
}
}
......@@ -42,7 +42,7 @@ public class MessageHandler extends AbstractMessageHandler {
public void processMessage(IMessage message) throws Exception {
try {
this.processNotification.performNotification(message, subscriptionName);
this.processNotification.performNotification(message, subscriptionName, subscriptionClient.getTopicName());
} catch (Exception e) {
int messageDeliveryCount = (int) message.getDeliveryCount();
if (messageDeliveryCount >= maxDeliveryCount && deleteMessageAfterMaxDeliveryCount) {
......
......@@ -18,9 +18,12 @@ import com.google.api.client.util.Strings;
import com.microsoft.applicationinsights.TelemetryClient;
import com.microsoft.applicationinsights.telemetry.RequestTelemetry;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.SubscriptionClient;
import org.apache.commons.lang3.time.StopWatch;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.notification.di.SubscriptionClientFactory;
import org.opengroup.osdu.notification.provider.azure.messageBus.models.TopicSubscriptions;
import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeContextHolder;
import org.opengroup.osdu.notification.provider.azure.models.NotificationContent;
import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.RequestBodyAdapter;
......@@ -33,9 +36,14 @@ import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.concurrent.ConcurrentMap;
import java.util.Date;
import static org.opengroup.osdu.notification.utils.Config.MESSAGE_ENQUEUED_TIME;
import static org.opengroup.osdu.notification.utils.Config.TOPIC_NAME;
@Component
@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}")
public class ProcessNotification {
......@@ -50,7 +58,7 @@ public class ProcessNotification {
@Autowired
private MDCContextMap mdcContextMap;
public void performNotification(IMessage message, String subscriptionName) throws Exception {
public void performNotification(IMessage message, String subscriptionName, String topicName) throws Exception {
TelemetryClient telemetryClient = new TelemetryClient();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
......@@ -71,8 +79,13 @@ public class ProcessNotification {
MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId, collaborationId));
dpsHeaders.setThreadContext(dataPartitionId, correlationId, collaborationId);
Long enqueuedTime = message.getEnqueuedTimeUtc().toEpochMilli();
notificationContent.getExtractAttributes().putIfAbsent(MESSAGE_ENQUEUED_TIME, enqueuedTime.toString());
notificationContent.getExtractAttributes().putIfAbsent(TOPIC_NAME, topicName);
boolean sendLatencyMetric = message.getDeliveryCount() > 1 ? false : true;
HttpResponse response = notificationHandler.notifySubscriber(notificationContent.getNotificationId(),
notificationContent.getData(), notificationContent.getExtractAttributes());
notificationContent.getData(), notificationContent.getExtractAttributes(), sendLatencyMetric);
RequestTelemetry requestTelemetry = new RequestTelemetry(
"SBQueueRequest",
......
// Copyright 2017-2025, SLB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.util;
import com.google.common.base.Strings;
import com.microsoft.applicationinsights.TelemetryClient;
import com.microsoft.applicationinsights.telemetry.MetricTelemetry;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.opengroup.osdu.notification.utils.Config;
import org.springframework.stereotype.Component;
@Component
public class MetricServiceImpl implements IMetricService {
private static final String LATENCY_METRIC_NAME = "[Notification] Event notification latency";
private final TelemetryClient telemetryClient = new TelemetryClient();
@Override
public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) {
this.sendMetric(LATENCY_METRIC_NAME, latency, dataPartitionId, correlationId, topicName);
}
private void sendMetric(String name, long value, String dataPartitionId, String correlationId, String topicName) {
MetricTelemetry metric = new MetricTelemetry();
metric.setName(name);
metric.setValue(value);
if (!Strings.isNullOrEmpty(dataPartitionId)) {
metric.getProperties().putIfAbsent(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
}
if (!Strings.isNullOrEmpty(topicName)) {
metric.getProperties().putIfAbsent(Config.TOPIC_NAME, topicName);
}
metric.getProperties().putIfAbsent(DpsHeaders.CORRELATION_ID, correlationId);
telemetryClient.trackMetric(metric);
telemetryClient.flush();
}
}
......@@ -42,6 +42,7 @@ public class MessageHandlerTest {
private static final String subscriptionName = "TestSubscription";
private static final String errorMsg = "Error processing notification";
private static final String appName = "test";
private static final String topicName = "testTopicName";
private MessageHandler messageHandler;
......@@ -64,6 +65,7 @@ public class MessageHandlerTest {
processNotification = mock(ProcessNotification.class);
when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName);
when(subscriptionClient.getTopicName()).thenReturn(topicName);
when(subscriptionDescription.getMaxDeliveryCount()).thenReturn(maxDeliveryCount);
messageHandler = new MessageHandler(subscriptionClient, processNotification, appName, subscriptionDescription, true);
......@@ -71,18 +73,18 @@ public class MessageHandlerTest {
@Test
public void shouldInvoke_performNotification() throws Exception {
lenient().doNothing().when(processNotification).performNotification(message, subscriptionName);
lenient().doNothing().when(processNotification).performNotification(message, subscriptionName, topicName);
messageHandler.processMessage(message);
verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName());
verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName(), topicName);
}
@Test
public void shouldThrow_WhenProcessNotificationThrowsException() throws Exception {
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName);
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName);
try {
messageHandler.processMessage(message);
} catch (Exception e) {
verify(processNotification, times(1)).performNotification(message, subscriptionName);
verify(processNotification, times(1)).performNotification(message, subscriptionName, topicName);
assertEquals(e.getMessage().compareTo(errorMsg), 0);
}
}
......@@ -92,11 +94,11 @@ public class MessageHandlerTest {
ReflectionTestUtils.setField(messageHandler, "deleteMessageAfterMaxDeliveryCount", true);
when(message.getLockToken()).thenReturn(uuid);
when(message.getDeliveryCount()).thenReturn(10L);
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName);
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName);
try {
messageHandler.processMessage(message);
} catch (Exception e) {
verify(processNotification, times(1)).performNotification(message, subscriptionName);
verify(processNotification, times(1)).performNotification(message, subscriptionName, topicName);
verify(subscriptionClient, times(1)).complete(uuid);
assertEquals(e.getMessage().compareTo(errorMsg), 0);
}
......@@ -106,7 +108,7 @@ public class MessageHandlerTest {
public void shouldThrowExceptionIfRetryCountExceededButConfigIsFalse() throws Exception {
ReflectionTestUtils.setField(messageHandler, "deleteMessageAfterMaxDeliveryCount", false);
when(message.getDeliveryCount()).thenReturn(10L);
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName);
doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName);
try {
messageHandler.processMessage(message);
} catch (Exception e) {
......
......@@ -33,6 +33,7 @@ import org.opengroup.osdu.notification.provider.azure.models.NotificationContent
import org.opengroup.osdu.notification.provider.azure.util.MDCContextMap;
import org.opengroup.osdu.notification.service.NotificationHandler;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
......@@ -45,6 +46,7 @@ public class ProcessNotificationTest {
private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85";
private static final String notificationData = "[{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"},{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}]";
private static final String subscriptionName = "TestSubscription";
private static final String topicName = "testTopicName";
private static final String notificationId = "Notification-Test-Subscription";
private HttpResponse response = new HttpResponse();
private static final Map<String, String> requestAttributes = new HashMap();
......@@ -76,15 +78,28 @@ public class ProcessNotificationTest {
when(notificationContent.getExtractAttributes()).thenReturn(requestAttributes);
when(notificationContent.getNotificationId()).thenReturn(notificationId);
when(notificationContent.getData()).thenReturn(notificationData);
when(message.getEnqueuedTimeUtc()).thenReturn(Instant.ofEpochMilli(1722283070604l));
when(message.getDeliveryCount()).thenReturn(1l);
}
@Test
public void shouldSuccessfullyPerformNotification() throws Exception {
response.setResponseCode(200);
when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent);
when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response);
processNotification.performNotification(message, subscriptionName);
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes);
when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, true)).thenReturn(response);
processNotification.performNotification(message, subscriptionName, topicName);
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true);
verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName);
}
@Test
public void shouldSetSendLatencyMetricFlagToFlaseWhenNotFirstTimeDelivery() throws Exception {
response.setResponseCode(200);
when(message.getDeliveryCount()).thenReturn(10l);
when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent);
when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, false)).thenReturn(response);
processNotification.performNotification(message, subscriptionName, topicName);
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, false);
verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName);
}
......@@ -92,12 +107,12 @@ public class ProcessNotificationTest {
public void shouldThrowExceptionWhenNotifySubscriberFails() throws Exception {
response.setResponseCode(400);
when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent);
when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response);
when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, true)).thenReturn(response);
try {
processNotification.performNotification(message, subscriptionName);
processNotification.performNotification(message, subscriptionName, topicName);
fail(EXCEPTION_NOT_THROWN);
} catch (Exception e) {
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes);
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true);
verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName);
Assert.assertEquals(NOT_ACKNOWLEDGE, e.getMessage());
}
......@@ -107,12 +122,12 @@ public class ProcessNotificationTest {
public void shouldThrowExceptionWhenNotifySubscriberThrowsException() throws Exception {
response.setResponseCode(400);
when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent);
doThrow(new Exception()).when(notificationHandler).notifySubscriber(notificationId, notificationData, requestAttributes);
doThrow(new Exception()).when(notificationHandler).notifySubscriber(notificationId, notificationData, requestAttributes, true);
try {
processNotification.performNotification(message, subscriptionName);
processNotification.performNotification(message, subscriptionName, topicName);
fail(EXCEPTION_NOT_THROWN);
} catch (Exception e) {
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes);
verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true);
verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName);
Assert.assertNotNull(e);
}
......
/* Licensed Materials - Property of IBM */
/* (c) Copyright IBM Corp. 2020. All Rights Reserved.*/
package org.opengroup.osdu.notification.provider.ibm.util;
import org.opengroup.osdu.notification.provider.interfaces.IMetricService;
import org.springframework.stereotype.Component;
@Component
public class MetricServiceImpl implements IMetricService {
@Override
public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) {
//TODO
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment