Skip to content
Snippets Groups Projects
Commit f3113184 authored by Alok Joshi's avatar Alok Joshi
Browse files

close clients for deleted subscriptions

parent 8c7da4ad
No related branches found
No related tags found
1 merge request!364Close deleted subscriptions
Pipeline #175325 passed
......@@ -39,7 +39,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
......@@ -82,7 +85,11 @@ public class SubscriptionManagerImpl implements ISubscriptionManager {
azureCosmosProperties.registerSubscriptionContainerName(), Subscription.class);
ManagementClient managementClient = factory.getManager(partition);
Map<String, List<String>> topicSubscriptions = new HashMap<>();
for (Subscription subscription : subscriptionsList) {
topicSubscriptions.putIfAbsent(subscription.getTopic(), new ArrayList<>());
topicSubscriptions.get(subscription.getTopic()).add(subscription.getNotificationId());
// To check if its a not new subscription.
if (!this.topicSubscriptions.checkIfNewTopicSubscription(partition, subscription.getTopic(), subscription.getNotificationId())) {
// Update existing subscriptions and skip registration
......@@ -109,6 +116,26 @@ public class SubscriptionManagerImpl implements ISubscriptionManager {
}
}
for (Map.Entry<String, List<String>> entry : topicSubscriptions.entrySet()) {
List<String> deletedTopicSubscriptions = this.topicSubscriptions.getDeletedTopicSubscriptions(partition, entry.getKey(), entry.getValue());
if (deletedTopicSubscriptions.isEmpty()) {
continue;
}
for (String deletedSubscription : deletedTopicSubscriptions) {
try {
LOGGER.info("Cleanup => found deleted subscription " + deletedSubscription + " for topic " + entry.getKey());
SubscriptionClient subscriptionClient = this.subscriptionClientFactory
.getSubscriptionClient(partition, entry.getKey(), deletedSubscription);
subscriptionClient.close();
LOGGER.info("Cleanup => closed deleted subscription " + deletedSubscription + " for topic " + entry.getKey());
} catch (InterruptedException | ServiceBusException e) {
LOGGER.error("Error while closing subscription client {}", e.getMessage(), e);
} catch (Exception e) {
LOGGER.error("Unknown exception occurred while closing subscription client: ", e);
}
}
}
} catch (AppException e) {
LOGGER.error("Error creating Cosmos Client {}", e.getMessage(), e);
} catch (Exception e) {
......
......@@ -49,4 +49,17 @@ public class TopicSubscriptions {
this.existingTopicSubscriptions.putAll(this.currentTopicSubscriptions);
this.currentTopicSubscriptions.clear();
}
public List<String> getDeletedTopicSubscriptions(String partition, String sbTopicName, List<String> subscriptions) {
if (existingTopicSubscriptions.get(partition) == null) {
return new ArrayList<>();
}
Map<String, List<String>> existingTopics = existingTopicSubscriptions.get(partition);
if (existingTopics.get(sbTopicName) == null || existingTopics.get(sbTopicName).isEmpty()) {
return new ArrayList<>();
}
List<String> existingSubscriptions = existingTopics.get(sbTopicName);
existingSubscriptions.removeAll(subscriptions);
return existingSubscriptions;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment