From f3113184414affa833b364392080460e6dcd14c8 Mon Sep 17 00:00:00 2001 From: Alok Joshi <AJoshi19@slb.com> Date: Wed, 29 Mar 2023 11:45:42 -0500 Subject: [PATCH] close clients for deleted subscriptions --- .../messageBus/SubscriptionManagerImpl.java | 27 +++++++++++++++++++ .../messageBus/models/TopicSubscriptions.java | 13 +++++++++ 2 files changed, 40 insertions(+) diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java index f97cc59d4..72f241397 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java @@ -39,7 +39,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -82,7 +85,11 @@ public class SubscriptionManagerImpl implements ISubscriptionManager { azureCosmosProperties.registerSubscriptionContainerName(), Subscription.class); ManagementClient managementClient = factory.getManager(partition); + Map<String, List<String>> topicSubscriptions = new HashMap<>(); for (Subscription subscription : subscriptionsList) { + topicSubscriptions.putIfAbsent(subscription.getTopic(), new ArrayList<>()); + topicSubscriptions.get(subscription.getTopic()).add(subscription.getNotificationId()); + // To check if its a not new subscription. if (!this.topicSubscriptions.checkIfNewTopicSubscription(partition, subscription.getTopic(), subscription.getNotificationId())) { // Update existing subscriptions and skip registration @@ -109,6 +116,26 @@ public class SubscriptionManagerImpl implements ISubscriptionManager { } } + for (Map.Entry<String, List<String>> entry : topicSubscriptions.entrySet()) { + List<String> deletedTopicSubscriptions = this.topicSubscriptions.getDeletedTopicSubscriptions(partition, entry.getKey(), entry.getValue()); + if (deletedTopicSubscriptions.isEmpty()) { + continue; + } + for (String deletedSubscription : deletedTopicSubscriptions) { + try { + LOGGER.info("Cleanup => found deleted subscription " + deletedSubscription + " for topic " + entry.getKey()); + SubscriptionClient subscriptionClient = this.subscriptionClientFactory + .getSubscriptionClient(partition, entry.getKey(), deletedSubscription); + subscriptionClient.close(); + LOGGER.info("Cleanup => closed deleted subscription " + deletedSubscription + " for topic " + entry.getKey()); + } catch (InterruptedException | ServiceBusException e) { + LOGGER.error("Error while closing subscription client {}", e.getMessage(), e); + } catch (Exception e) { + LOGGER.error("Unknown exception occurred while closing subscription client: ", e); + } + } + } + } catch (AppException e) { LOGGER.error("Error creating Cosmos Client {}", e.getMessage(), e); } catch (Exception e) { diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java index 92eb9abba..3e3cabea7 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java @@ -49,4 +49,17 @@ public class TopicSubscriptions { this.existingTopicSubscriptions.putAll(this.currentTopicSubscriptions); this.currentTopicSubscriptions.clear(); } + + public List<String> getDeletedTopicSubscriptions(String partition, String sbTopicName, List<String> subscriptions) { + if (existingTopicSubscriptions.get(partition) == null) { + return new ArrayList<>(); + } + Map<String, List<String>> existingTopics = existingTopicSubscriptions.get(partition); + if (existingTopics.get(sbTopicName) == null || existingTopics.get(sbTopicName).isEmpty()) { + return new ArrayList<>(); + } + List<String> existingSubscriptions = existingTopics.get(sbTopicName); + existingSubscriptions.removeAll(subscriptions); + return existingSubscriptions; + } } -- GitLab