From 2a893ee579cefd527d9ae41645e3456cac66f6f4 Mon Sep 17 00:00:00 2001 From: "Vasyl Leskiv [SLB]" <vleskiv@slb.com> Date: Mon, 27 Jan 2025 15:10:01 +0000 Subject: [PATCH] Add latency metric for notification service --- .../notification/util/MetricServiceImpl.java | 29 +++++++++ .../osdu/notification/api/PubsubEndpoint.java | 2 +- .../provider/interfaces/IMetricService.java | 19 ++++++ .../service/NotificationHandler.java | 19 +++++- .../osdu/notification/utils/Config.java | 20 ++++++- .../notification/api/PubsubEndpointTest.java | 6 +- .../service/NotificationHandlerTest.java | 60 +++++++++++++++++-- .../provider/aws/utils/MetricServiceImpl.java | 26 ++++++++ .../azure/messageBus/MessageHandler.java | 2 +- .../azure/messageBus/ProcessNotification.java | 17 +++++- .../azure/util/MetricServiceImpl.java | 49 +++++++++++++++ .../messageBus/MessageHandlerTest.java | 16 ++--- .../messageBus/ProcessNotificationTest.java | 33 +++++++--- .../provider/ibm/util/MetricServiceImpl.java | 15 +++++ 14 files changed, 281 insertions(+), 32 deletions(-) create mode 100644 notification-core-plus/src/main/java/org/opengroup/osdu/notification/util/MetricServiceImpl.java create mode 100644 notification-core/src/main/java/org/opengroup/osdu/notification/provider/interfaces/IMetricService.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/MetricServiceImpl.java create mode 100644 provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MetricServiceImpl.java create mode 100644 provider/notification-ibm/src/main/java/org/opengroup/osdu/notification/provider/ibm/util/MetricServiceImpl.java diff --git a/notification-core-plus/src/main/java/org/opengroup/osdu/notification/util/MetricServiceImpl.java b/notification-core-plus/src/main/java/org/opengroup/osdu/notification/util/MetricServiceImpl.java new file mode 100644 index 000000000..e51953443 --- /dev/null +++ b/notification-core-plus/src/main/java/org/opengroup/osdu/notification/util/MetricServiceImpl.java @@ -0,0 +1,29 @@ +/* + Copyright 2020-2025 Google LLC + Copyright 2020-2025 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package org.opengroup.osdu.notification.util; + +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; +import org.springframework.stereotype.Component; + +@Component +public class MetricServiceImpl implements IMetricService { + @Override + public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) { + //TODO + } +} diff --git a/notification-core/src/main/java/org/opengroup/osdu/notification/api/PubsubEndpoint.java b/notification-core/src/main/java/org/opengroup/osdu/notification/api/PubsubEndpoint.java index 5b82846c9..16595d468 100644 --- a/notification-core/src/main/java/org/opengroup/osdu/notification/api/PubsubEndpoint.java +++ b/notification-core/src/main/java/org/opengroup/osdu/notification/api/PubsubEndpoint.java @@ -74,7 +74,7 @@ public class PubsubEndpoint { String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody(); String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody(); Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody(); - HttpResponse response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes); + HttpResponse response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes, true); if (!response.isSuccessCode()) { this.log.error(NOT_ACKNOWLEDGE + response.getBody()); return new ResponseEntity<String>(NOT_ACKNOWLEDGE, HttpStatus.valueOf(response.getResponseCode())); diff --git a/notification-core/src/main/java/org/opengroup/osdu/notification/provider/interfaces/IMetricService.java b/notification-core/src/main/java/org/opengroup/osdu/notification/provider/interfaces/IMetricService.java new file mode 100644 index 000000000..a4e9cfad3 --- /dev/null +++ b/notification-core/src/main/java/org/opengroup/osdu/notification/provider/interfaces/IMetricService.java @@ -0,0 +1,19 @@ +// Copyright 2017-2025, SLB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.interfaces; + +public interface IMetricService { + void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName); +} diff --git a/notification-core/src/main/java/org/opengroup/osdu/notification/service/NotificationHandler.java b/notification-core/src/main/java/org/opengroup/osdu/notification/service/NotificationHandler.java index ea3c86e78..a91ad3aa9 100644 --- a/notification-core/src/main/java/org/opengroup/osdu/notification/service/NotificationHandler.java +++ b/notification-core/src/main/java/org/opengroup/osdu/notification/service/NotificationHandler.java @@ -24,7 +24,9 @@ import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.*; import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; import org.opengroup.osdu.notification.provider.interfaces.ISubscriberNotificationRequestLogger; +import org.opengroup.osdu.notification.utils.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +39,8 @@ import java.util.Map; @Component public class NotificationHandler { private final static Logger LOGGER = LoggerFactory.getLogger(NotificationHandler.class); - private static final String X_COLLABORATION_HEADER = "x-collaboration"; + private static final String X_COLLABORATION_HEADER = "x-collaboration"; + @Autowired private HttpClient httpClient; @Autowired @@ -48,8 +51,10 @@ public class NotificationHandler { private int WAITING_TIME; @Autowired private ISubscriberNotificationRequestLogger subscriberNotificationRequestLogger; + @Autowired + private IMetricService metricService; - public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { + public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes, boolean sendLatencyMetric) throws Exception { Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId); Secret secret = subscription.getSecret(); String endpoint = subscription.getPushEndpoint(); @@ -77,8 +82,18 @@ public class NotificationHandler { } HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build(); + String messageEnqueueTimeAttribute = headerAttributes.get(Config.MESSAGE_ENQUEUED_TIME); + long messageEnqueuedTimestamp = 0L; + if (!Strings.isNullOrEmpty(messageEnqueueTimeAttribute)) { + messageEnqueuedTimestamp = Long.valueOf(messageEnqueueTimeAttribute); + } + Long timestampBeforeSendingToSubscriber = System.currentTimeMillis(); this.LOGGER.debug("Sending out notification to endpoint: " + endpoint); HttpResponse response = httpClient.send(request); + + if (sendLatencyMetric) { + metricService.sendLatencyMetric(timestampBeforeSendingToSubscriber - messageEnqueuedTimestamp, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID), headerAttributes.get(DpsHeaders.CORRELATION_ID), headerAttributes.get(Config.TOPIC_NAME)); + } subscriberNotificationRequestLogger.log(notificationId, subscription, response); return response; } diff --git a/notification-core/src/main/java/org/opengroup/osdu/notification/utils/Config.java b/notification-core/src/main/java/org/opengroup/osdu/notification/utils/Config.java index 8ccd9576d..f4832388f 100644 --- a/notification-core/src/main/java/org/opengroup/osdu/notification/utils/Config.java +++ b/notification-core/src/main/java/org/opengroup/osdu/notification/utils/Config.java @@ -1,10 +1,24 @@ +// Copyright 2017-2025, SLB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.notification.utils; public class Config { - public static final String VIEWER = "users.datalake.viewers"; - public static final String EDITOR = "users.datalake.editors"; - public static final String ADMIN = "users.datalake.admins"; public static final String OPS = "users.datalake.ops"; public static final String CRON = "cron.job"; public static final String PUBSUB = "notification.pubsub"; + + public static final String MESSAGE_ENQUEUED_TIME = "enqueuedTime"; + public static final String TOPIC_NAME = "topicName"; } diff --git a/notification-core/src/test/java/org/opengroup/osdu/notification/api/PubsubEndpointTest.java b/notification-core/src/test/java/org/opengroup/osdu/notification/api/PubsubEndpointTest.java index ebc388a71..d02d2235c 100644 --- a/notification-core/src/test/java/org/opengroup/osdu/notification/api/PubsubEndpointTest.java +++ b/notification-core/src/test/java/org/opengroup/osdu/notification/api/PubsubEndpointTest.java @@ -62,7 +62,7 @@ public class PubsubEndpointTest { @Test public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromNotificationHandler() throws Exception { response.setResponseCode(200); - when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenReturn(response); + when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenReturn(response); ResponseEntity responseEntity = this.sut.recordChanged(); Assert.assertEquals(200, responseEntity.getStatusCode().value()); Assert.assertEquals("message acknowledged by client", responseEntity.getBody().toString()); @@ -71,7 +71,7 @@ public class PubsubEndpointTest { @Test public void should_return400_whenPubsubMessageValidAndFailureCodeReturnedFromNotificationHandler() throws Exception { response.setResponseCode(400); - when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenReturn(response); + when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenReturn(response); ResponseEntity responseEntity = this.sut.recordChanged(); Assert.assertEquals(400, responseEntity.getStatusCode().value()); Assert.assertEquals("message not acknowledged by client", responseEntity.getBody().toString()); @@ -80,7 +80,7 @@ public class PubsubEndpointTest { @Test(expected = Exception.class) public void should_return400_whenPubsubMessageValidAndNotificationHandlerThrowsException() throws Exception { response.setResponseCode(400); - when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>())).thenThrow(new Exception("error")); + when(this.notificationHandler.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, new HashMap<>(), true)).thenThrow(new Exception("error")); this.sut.recordChanged(); fail("should throw Exception"); } diff --git a/notification-core/src/test/java/org/opengroup/osdu/notification/service/NotificationHandlerTest.java b/notification-core/src/test/java/org/opengroup/osdu/notification/service/NotificationHandlerTest.java index cf6c62507..5b1f602c8 100644 --- a/notification-core/src/test/java/org/opengroup/osdu/notification/service/NotificationHandlerTest.java +++ b/notification-core/src/test/java/org/opengroup/osdu/notification/service/NotificationHandlerTest.java @@ -25,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.http.HttpClient; import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.GsaSecret; import org.opengroup.osdu.core.common.model.notification.GsaSecretValue; import org.opengroup.osdu.core.common.model.notification.HmacSecret; @@ -32,8 +33,8 @@ import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.notification.SubscriptionException; import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; import org.opengroup.osdu.notification.provider.interfaces.ISubscriberNotificationRequestLogger; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.Map; @@ -41,7 +42,13 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opengroup.osdu.notification.utils.Config.MESSAGE_ENQUEUED_TIME; +import static org.opengroup.osdu.notification.utils.Config.TOPIC_NAME; @RunWith(MockitoJUnitRunner.class) public class NotificationHandlerTest { @@ -66,6 +73,10 @@ public class NotificationHandlerTest { @Mock private JaxRsDpsLog log; + + @Mock + private IMetricService metricService; + @Mock private ISubscriberNotificationRequestLogger subscriberNotificationRequestLogger; @@ -113,33 +124,74 @@ public class NotificationHandlerTest { public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromClient_gsa() throws Exception { response.setResponseCode(200); Map<String, String> headers = new HashMap<String, String>(); + headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604"); + headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id"); + headers.put(DpsHeaders.CORRELATION_ID, "correlation-id"); + headers.put(TOPIC_NAME, "topicName"); when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(gsa_subscription); when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth); when(this.httpClient.send(any())).thenReturn(response); when(this.secretAuth.getPushUrl(gsa_subscription.getPushEndpoint())).thenReturn(gsa_subscription.getPushEndpoint()); when(this.secretAuth.getRequestHeaders()).thenReturn(headers); - HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers); + HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true); assertEquals(200, response.getResponseCode()); + verify(metricService, times(1)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString()); } @Test public void should_return200_whenPubsubMessageValidAndSuccessCodeReturnedFromClient_hmac() throws Exception { response.setResponseCode(200); Map<String, String> headers = new HashMap<String, String>(); + headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604"); + headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id"); + headers.put(DpsHeaders.CORRELATION_ID, "correlation-id"); + headers.put(TOPIC_NAME, "topicName"); when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(hmac_subscription); when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth); when(this.httpClient.send(any())).thenReturn(response); when(this.secretAuth.getPushUrl(hmac_subscription.getPushEndpoint())).thenReturn(hmac_subscription.getPushEndpoint()); when(this.secretAuth.getRequestHeaders()).thenReturn(headers); - HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers); + HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true); assertEquals(200, response.getResponseCode()); + verify(metricService, times(1)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString()); } @Test(expected = SubscriptionException.class) public void should_throwException_whenSubscriptionHandlerThrowsException() throws Exception { Map<String, String> headers = new HashMap<String, String>(); when(subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenThrow(new SubscriptionException("error", response)); - this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers); + this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true); fail("should throw SubscriptionException"); } + + @Test + public void should_notSendLatencyMetric_whenSubscriptionHandlerThrowsException() throws Exception { + Map<String, String> headers = new HashMap<String, String>(); + when(subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenThrow(new SubscriptionException("error", response)); + try { + this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, true); + } catch (SubscriptionException e) { + verify(metricService, times(0)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString()); + } catch (Exception ex) { + fail("should throw SubscriptionException"); + } + } + + @Test + public void should_return200WithoutSendingLatencyMetric_whenSendingLatencyMetricFlagSetToFalse() throws Exception { + response.setResponseCode(200); + Map<String, String> headers = new HashMap<String, String>(); + headers.put(MESSAGE_ENQUEUED_TIME, "1722283070604"); + headers.put(DpsHeaders.DATA_PARTITION_ID, "dp-id"); + headers.put(DpsHeaders.CORRELATION_ID, "correlation-id"); + headers.put(TOPIC_NAME, "topicName"); + when(this.subscriptionHandler.getSubscriptionFromCache(NOTIFICATION_ID)).thenReturn(hmac_subscription); + when(this.authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(this.httpClient.send(any())).thenReturn(response); + when(this.secretAuth.getPushUrl(hmac_subscription.getPushEndpoint())).thenReturn(hmac_subscription.getPushEndpoint()); + when(this.secretAuth.getRequestHeaders()).thenReturn(headers); + HttpResponse response = this.sut.notifySubscriber(NOTIFICATION_ID, PUBSUB_MESSAGE, headers, false); + assertEquals(200, response.getResponseCode()); + verify(metricService, times(0)).sendLatencyMetric(anyLong(), anyString(), anyString(), anyString()); + } } diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/MetricServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/MetricServiceImpl.java new file mode 100644 index 000000000..c68a24c24 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/MetricServiceImpl.java @@ -0,0 +1,26 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.aws.utils; + +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; +import org.springframework.stereotype.Component; + +@Component +public class MetricServiceImpl implements IMetricService { + @Override + public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) { + //TODO + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java index d7f3e660c..826118919 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java @@ -42,7 +42,7 @@ public class MessageHandler extends AbstractMessageHandler { public void processMessage(IMessage message) throws Exception { try { - this.processNotification.performNotification(message, subscriptionName); + this.processNotification.performNotification(message, subscriptionName, subscriptionClient.getTopicName()); } catch (Exception e) { int messageDeliveryCount = (int) message.getDeliveryCount(); if (messageDeliveryCount >= maxDeliveryCount && deleteMessageAfterMaxDeliveryCount) { diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java index e5182e6c6..7e96c7897 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java @@ -18,9 +18,12 @@ import com.google.api.client.util.Strings; import com.microsoft.applicationinsights.TelemetryClient; import com.microsoft.applicationinsights.telemetry.RequestTelemetry; import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.SubscriptionClient; import org.apache.commons.lang3.time.StopWatch; import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.notification.di.SubscriptionClientFactory; +import org.opengroup.osdu.notification.provider.azure.messageBus.models.TopicSubscriptions; import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeContextHolder; import org.opengroup.osdu.notification.provider.azure.models.NotificationContent; import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.RequestBodyAdapter; @@ -33,9 +36,14 @@ import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; + +import java.time.Instant; import java.util.concurrent.ConcurrentMap; import java.util.Date; +import static org.opengroup.osdu.notification.utils.Config.MESSAGE_ENQUEUED_TIME; +import static org.opengroup.osdu.notification.utils.Config.TOPIC_NAME; + @Component @ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") public class ProcessNotification { @@ -50,7 +58,7 @@ public class ProcessNotification { @Autowired private MDCContextMap mdcContextMap; - public void performNotification(IMessage message, String subscriptionName) throws Exception { + public void performNotification(IMessage message, String subscriptionName, String topicName) throws Exception { TelemetryClient telemetryClient = new TelemetryClient(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -71,8 +79,13 @@ public class ProcessNotification { MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId, collaborationId)); dpsHeaders.setThreadContext(dataPartitionId, correlationId, collaborationId); + Long enqueuedTime = message.getEnqueuedTimeUtc().toEpochMilli(); + notificationContent.getExtractAttributes().putIfAbsent(MESSAGE_ENQUEUED_TIME, enqueuedTime.toString()); + notificationContent.getExtractAttributes().putIfAbsent(TOPIC_NAME, topicName); + boolean sendLatencyMetric = message.getDeliveryCount() > 1 ? false : true; + HttpResponse response = notificationHandler.notifySubscriber(notificationContent.getNotificationId(), - notificationContent.getData(), notificationContent.getExtractAttributes()); + notificationContent.getData(), notificationContent.getExtractAttributes(), sendLatencyMetric); RequestTelemetry requestTelemetry = new RequestTelemetry( "SBQueueRequest", diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MetricServiceImpl.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MetricServiceImpl.java new file mode 100644 index 000000000..dd7a6fd5d --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MetricServiceImpl.java @@ -0,0 +1,49 @@ +// Copyright 2017-2025, SLB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.util; + +import com.google.common.base.Strings; +import com.microsoft.applicationinsights.TelemetryClient; +import com.microsoft.applicationinsights.telemetry.MetricTelemetry; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; +import org.opengroup.osdu.notification.utils.Config; +import org.springframework.stereotype.Component; + +@Component +public class MetricServiceImpl implements IMetricService { + private static final String LATENCY_METRIC_NAME = "[Notification] Event notification latency"; + private final TelemetryClient telemetryClient = new TelemetryClient(); + + @Override + public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) { + this.sendMetric(LATENCY_METRIC_NAME, latency, dataPartitionId, correlationId, topicName); + } + + private void sendMetric(String name, long value, String dataPartitionId, String correlationId, String topicName) { + MetricTelemetry metric = new MetricTelemetry(); + metric.setName(name); + metric.setValue(value); + if (!Strings.isNullOrEmpty(dataPartitionId)) { + metric.getProperties().putIfAbsent(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + } + if (!Strings.isNullOrEmpty(topicName)) { + metric.getProperties().putIfAbsent(Config.TOPIC_NAME, topicName); + } + metric.getProperties().putIfAbsent(DpsHeaders.CORRELATION_ID, correlationId); + telemetryClient.trackMetric(metric); + telemetryClient.flush(); + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java index 18f4a549c..be3811d59 100644 --- a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java @@ -42,6 +42,7 @@ public class MessageHandlerTest { private static final String subscriptionName = "TestSubscription"; private static final String errorMsg = "Error processing notification"; private static final String appName = "test"; + private static final String topicName = "testTopicName"; private MessageHandler messageHandler; @@ -64,6 +65,7 @@ public class MessageHandlerTest { processNotification = mock(ProcessNotification.class); when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName); + when(subscriptionClient.getTopicName()).thenReturn(topicName); when(subscriptionDescription.getMaxDeliveryCount()).thenReturn(maxDeliveryCount); messageHandler = new MessageHandler(subscriptionClient, processNotification, appName, subscriptionDescription, true); @@ -71,18 +73,18 @@ public class MessageHandlerTest { @Test public void shouldInvoke_performNotification() throws Exception { - lenient().doNothing().when(processNotification).performNotification(message, subscriptionName); + lenient().doNothing().when(processNotification).performNotification(message, subscriptionName, topicName); messageHandler.processMessage(message); - verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); + verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName(), topicName); } @Test public void shouldThrow_WhenProcessNotificationThrowsException() throws Exception { - doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName); + doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName); try { messageHandler.processMessage(message); } catch (Exception e) { - verify(processNotification, times(1)).performNotification(message, subscriptionName); + verify(processNotification, times(1)).performNotification(message, subscriptionName, topicName); assertEquals(e.getMessage().compareTo(errorMsg), 0); } } @@ -92,11 +94,11 @@ public class MessageHandlerTest { ReflectionTestUtils.setField(messageHandler, "deleteMessageAfterMaxDeliveryCount", true); when(message.getLockToken()).thenReturn(uuid); when(message.getDeliveryCount()).thenReturn(10L); - doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName); + doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName); try { messageHandler.processMessage(message); } catch (Exception e) { - verify(processNotification, times(1)).performNotification(message, subscriptionName); + verify(processNotification, times(1)).performNotification(message, subscriptionName, topicName); verify(subscriptionClient, times(1)).complete(uuid); assertEquals(e.getMessage().compareTo(errorMsg), 0); } @@ -106,7 +108,7 @@ public class MessageHandlerTest { public void shouldThrowExceptionIfRetryCountExceededButConfigIsFalse() throws Exception { ReflectionTestUtils.setField(messageHandler, "deleteMessageAfterMaxDeliveryCount", false); when(message.getDeliveryCount()).thenReturn(10L); - doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName); + doThrow(new Exception(errorMsg)).when(processNotification).performNotification(message, subscriptionName, topicName); try { messageHandler.processMessage(message); } catch (Exception e) { diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java index 95852d80a..33ba5ba4b 100644 --- a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java @@ -33,6 +33,7 @@ import org.opengroup.osdu.notification.provider.azure.models.NotificationContent import org.opengroup.osdu.notification.provider.azure.util.MDCContextMap; import org.opengroup.osdu.notification.service.NotificationHandler; +import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -45,6 +46,7 @@ public class ProcessNotificationTest { private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85"; private static final String notificationData = "[{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"},{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}]"; private static final String subscriptionName = "TestSubscription"; + private static final String topicName = "testTopicName"; private static final String notificationId = "Notification-Test-Subscription"; private HttpResponse response = new HttpResponse(); private static final Map<String, String> requestAttributes = new HashMap(); @@ -76,15 +78,28 @@ public class ProcessNotificationTest { when(notificationContent.getExtractAttributes()).thenReturn(requestAttributes); when(notificationContent.getNotificationId()).thenReturn(notificationId); when(notificationContent.getData()).thenReturn(notificationData); + when(message.getEnqueuedTimeUtc()).thenReturn(Instant.ofEpochMilli(1722283070604l)); + when(message.getDeliveryCount()).thenReturn(1l); } @Test public void shouldSuccessfullyPerformNotification() throws Exception { response.setResponseCode(200); when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); - when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response); - processNotification.performNotification(message, subscriptionName); - verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, true)).thenReturn(response); + processNotification.performNotification(message, subscriptionName, topicName); + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true); + verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); + } + + @Test + public void shouldSetSendLatencyMetricFlagToFlaseWhenNotFirstTimeDelivery() throws Exception { + response.setResponseCode(200); + when(message.getDeliveryCount()).thenReturn(10l); + when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); + when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, false)).thenReturn(response); + processNotification.performNotification(message, subscriptionName, topicName); + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, false); verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); } @@ -92,12 +107,12 @@ public class ProcessNotificationTest { public void shouldThrowExceptionWhenNotifySubscriberFails() throws Exception { response.setResponseCode(400); when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); - when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response); + when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes, true)).thenReturn(response); try { - processNotification.performNotification(message, subscriptionName); + processNotification.performNotification(message, subscriptionName, topicName); fail(EXCEPTION_NOT_THROWN); } catch (Exception e) { - verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true); verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); Assert.assertEquals(NOT_ACKNOWLEDGE, e.getMessage()); } @@ -107,12 +122,12 @@ public class ProcessNotificationTest { public void shouldThrowExceptionWhenNotifySubscriberThrowsException() throws Exception { response.setResponseCode(400); when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); - doThrow(new Exception()).when(notificationHandler).notifySubscriber(notificationId, notificationData, requestAttributes); + doThrow(new Exception()).when(notificationHandler).notifySubscriber(notificationId, notificationData, requestAttributes, true); try { - processNotification.performNotification(message, subscriptionName); + processNotification.performNotification(message, subscriptionName, topicName); fail(EXCEPTION_NOT_THROWN); } catch (Exception e) { - verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes, true); verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); Assert.assertNotNull(e); } diff --git a/provider/notification-ibm/src/main/java/org/opengroup/osdu/notification/provider/ibm/util/MetricServiceImpl.java b/provider/notification-ibm/src/main/java/org/opengroup/osdu/notification/provider/ibm/util/MetricServiceImpl.java new file mode 100644 index 000000000..23f70e146 --- /dev/null +++ b/provider/notification-ibm/src/main/java/org/opengroup/osdu/notification/provider/ibm/util/MetricServiceImpl.java @@ -0,0 +1,15 @@ +/* Licensed Materials - Property of IBM */ +/* (c) Copyright IBM Corp. 2020. All Rights Reserved.*/ + +package org.opengroup.osdu.notification.provider.ibm.util; + +import org.opengroup.osdu.notification.provider.interfaces.IMetricService; +import org.springframework.stereotype.Component; + +@Component +public class MetricServiceImpl implements IMetricService { + @Override + public void sendLatencyMetric(long latency, String dataPartitionId, String correlationId, String topicName) { + //TODO + } +} -- GitLab