From 5b70a3c4a8d52e0d059088b699d08f9a07469122 Mon Sep 17 00:00:00 2001 From: Rustam_Lotsmanenko <rustam_lotsmanenko@epam.com> Date: Fri, 15 Dec 2023 12:38:36 +0200 Subject: [PATCH] added handling for unavailable subscriptions --- .../gcp/pubsub/MessageBrokerProvider.java | 3 +- .../OqmNotificationDeliveryService.java | 2 +- .../gcp/pubsub/OqmSubscriberManager.java | 9 ++-- ...ava => OqmConfigurationEventReceiver.java} | 12 ++--- .../pubsub/receiver/OqmEventReplicator.java | 2 +- .../SubscriptionCacheRepo.java | 37 +++++++++++++- .../ExternalSubscriptionsManager.java | 50 ++++++++++++++----- .../RegisterSubscriptionService.java} | 12 +++-- ...=> OqmConfigurationEventReceiverTest.java} | 10 ++-- .../gcp/pubsub/OqmEventReplicatorTest.java | 5 +- .../gcp/pubsub/OqmSubscriberManagerTest.java | 5 +- 11 files changed, 104 insertions(+), 43 deletions(-) rename provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/{OqmSubscriptionConfigurationReceiver.java => OqmConfigurationEventReceiver.java} (89%) rename provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/{config => repo}/SubscriptionCacheRepo.java (75%) rename provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/{config => service}/ExternalSubscriptionsManager.java (55%) rename provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/{pubsub/OqmSubscriptionHandler.java => service/RegisterSubscriptionService.java} (91%) rename provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/{OqmSubscriptionConfigurationReceiverTest.java => OqmConfigurationEventReceiverTest.java} (91%) diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java index ddaedeefe..cdd935b2a 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java @@ -8,6 +8,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; @@ -24,7 +25,7 @@ import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberM public class MessageBrokerProvider { private final OqmDriver driver; - private final OqmSubscriptionHandler subscriptionHandler; + private final RegisterSubscriptionService subscriptionHandler; public void validateMessageBrokerConfiguration(TenantInfo tenantInfo) { List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId()); diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java index 91b521a71..ad4d33a25 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java @@ -29,7 +29,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index 60b1cc7e3..2c51db098 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -23,10 +23,11 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmSubscriptionConfigurationReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmReplicatedEventReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -51,7 +52,7 @@ public class OqmSubscriberManager { public static final String SERVICE_SUFFIX = "-service"; public static final String PUBLISH_SUFFIX = "-publish"; public static final String CONTROL_SUBSCRIPTION = NOTIFICATION_PREFIX + "control-sub"; - private final OqmSubscriptionHandler subscriptionHandler; + private final RegisterSubscriptionService subscriptionHandler; private final ITenantFactory tenantInfoFactory; private final OqmDriver driver; private final OqmNotificationDeliveryService notificationHandler; @@ -104,7 +105,7 @@ public class OqmSubscriberManager { private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(controlTopicSubscription) - .messageReceiver(new OqmSubscriptionConfigurationReceiver(externalSubscriptionsManager)) + .messageReceiver(new OqmConfigurationEventReceiver(externalSubscriptionsManager)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmSubscriptionConfigurationReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java similarity index 89% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmSubscriptionConfigurationReceiver.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java index b106395ee..769cf961c 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmSubscriptionConfigurationReceiver.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java @@ -22,11 +22,11 @@ import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; @Slf4j -public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver { +public class OqmConfigurationEventReceiver implements OqmMessageReceiver { private static final String SUBSCRIPTION_CREATED = "Subscription Created"; private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; @@ -34,7 +34,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver private final ExternalSubscriptionsManager externalSubscriptionsManager; - public OqmSubscriptionConfigurationReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) { + public OqmConfigurationEventReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) { this.externalSubscriptionsManager = externalSubscriptionsManager; } @@ -48,7 +48,6 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver } finally { ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); } - log.debug("OQM | Control topic | Message acknowledged by client"); replier.ack(); } @@ -62,8 +61,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", pubsubMessage, serviceTopic, subscriptionId); switch (pubsubMessage) { - case SUBSCRIPTION_CREATED: - case SUBSCRIPTION_UPDATED: + case SUBSCRIPTION_CREATED, SUBSCRIPTION_UPDATED: externalSubscriptionsManager.updateSubscription(dataPartitionId, subscriptionId); log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic); break; @@ -72,7 +70,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver if (deleted) { log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic); } else { - log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId); + log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but it was not found in cache.", subscriptionId); } break; default: diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java index 3420944f2..9b98e761b 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java @@ -32,7 +32,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; /** diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionCacheRepo.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java similarity index 75% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionCacheRepo.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java index 654c6468e..90380900f 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionCacheRepo.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.config; +package org.opengroup.osdu.notification.provider.gcp.repo; import java.util.Collections; import java.util.HashSet; @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.gcp.config.RedisProperties; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ScanOptions; @@ -35,6 +36,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class SubscriptionCacheRepo { + public static final String POISON = ":poison"; private final RedisProperties redisProperties; private final RedisTemplate<String, Subscription> redisTemplate; @@ -62,6 +64,39 @@ public class SubscriptionCacheRepo { return redisTemplate.opsForValue().get(subscriberCacheKey); } + /** + * To avoid Register service spamming, if a subscriber was deleted, but replicated events are + * still being processed. + * + * @param dataPartitionId + * @param topic + * @param subscriptionId + */ + public void poisonSub(String dataPartitionId, String topic, String subscriptionId) { + String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON; + + redisTemplate.opsForValue().set( + subPoisonKey, + new Subscription(), + redisProperties.getRedisExpiration(), + TimeUnit.SECONDS + ); + } + + /** + * Used to check if subscribers were not found in Register. + * + * @param dataPartitionId + * @param topic + * @param subscriptionId + * @return empty Subscription + */ + public Subscription checkIfSubIsPoisoned(String dataPartitionId, String topic, + String subscriptionId) { + String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON; + return redisTemplate.opsForValue().get(subPoisonKey); + } + public void updateSubscription(String dataPartitionId, Subscription subscription) { String subscriberCacheKey = getSubscriberCacheKey( dataPartitionId, diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java similarity index 55% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java index 3a3bd98b2..ab7e558f0 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.config; +package org.opengroup.osdu.notification.provider.gcp.service; import java.util.List; import java.util.Objects; @@ -23,7 +23,7 @@ import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.common.model.notification.Subscription; -import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler; +import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo; import org.springframework.stereotype.Component; @Component @@ -32,7 +32,7 @@ import org.springframework.stereotype.Component; public class ExternalSubscriptionsManager { private final SubscriptionCacheRepo subscriptionCacheRepo; - private final OqmSubscriptionHandler subscriptionHandler; + private final RegisterSubscriptionService subscriptionHandler; public void invokeTenantSubscribers(String dataPartitionId) { Set<Subscription> members = subscriptionCacheRepo.getDataPartitionSubscribers(dataPartitionId); @@ -47,11 +47,29 @@ public class ExternalSubscriptionsManager { } public Subscription getSubscription(String dataPartitionId, String topic, String subscriptionId) { - return subscriptionCacheRepo.readSubscription( - dataPartitionId, - topic, - subscriptionId - ); + Subscription subscription = subscriptionCacheRepo.readSubscription(dataPartitionId, topic, subscriptionId); + if (subscription != null) { + return subscription; + } + + log.debug("Subscription: {} not in cache, checking if it available in Register.", subscriptionId); + subscription = subscriptionCacheRepo.checkIfSubIsPoisoned(dataPartitionId, topic, subscriptionId); + if (subscription != null) { + log.debug("Subscription:{} marked as not available in cache.", subscriptionId); + return null; + } + return retrieveSubscriptionFromHandler(dataPartitionId, subscriptionId, topic); + } + + private Subscription retrieveSubscriptionFromHandler(String dataPartitionId, String subscriptionId, String topic) { + Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId(dataPartitionId,subscriptionId); + if (subscription == null) { + log.debug("Subscription:{} not available in Register. Marking it as not available in cache.", subscriptionId); + subscriptionCacheRepo.poisonSub(dataPartitionId, topic, subscriptionId); + return null; + } + subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + return subscription; } public boolean removeSubscription(String dataPartitionId, String topic, String subscriptionId) { @@ -63,17 +81,23 @@ public class ExternalSubscriptionsManager { dataPartitionId, subscriptionId ); - subscriptionCacheRepo.updateSubscription(dataPartitionId, subscription); + if(Objects.isNull(subscription)){ + log.warn("Cannot update sub config with id:{} in the cache. Skipping it.", subscriptionId); + }else { + subscriptionCacheRepo.updateSubscription(dataPartitionId, subscription); + } } private void reloadSubscriptionInfoCache(String dataPartitionId) { List<Subscription> fragmentarySubInfos = subscriptionHandler.getAllSubscriptionInfos( dataPartitionId); for (Subscription freagmentedSubscription : fragmentarySubInfos) { - Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId( - dataPartitionId, freagmentedSubscription.getNotificationId()); - subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId(dataPartitionId, freagmentedSubscription.getNotificationId()); + if(Objects.isNull(subscription)){ + log.warn("Cannot init cache for sub id:{}. Empty response from Register. Skipping it.", freagmentedSubscription.getNotificationId()); + }else { + subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + } } } - } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java similarity index 91% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java index 4f7e33c5a..b41976eec 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.service; import java.util.List; import java.util.stream.Collectors; @@ -43,7 +43,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor @ConditionalOnProperty(name = "oqmDriver") @Slf4j -public class OqmSubscriptionHandler { +public class RegisterSubscriptionService { private final ISubscriptionFactory registerClientFactory; private final DpsHeadersProvider dpsHeadersProvider; @@ -52,7 +52,6 @@ public class OqmSubscriptionHandler { public List<OqmTopic> getTopics(String dataPartitionId) { DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); ISubscriptionService registerClient = registerClientFactory.create(headers); - try { return registerClient.getTopics() .stream() @@ -80,7 +79,12 @@ public class OqmSubscriptionHandler { DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); ISubscriptionService registerClient = registerClientFactory.create(headers); List<Subscription> subscriptions = registerClient.query(subscriptionId); - return subscriptions.get(0); + if(subscriptions.isEmpty()){ + log.warn("Empty response from Register for subId: {}, config fetch not possible.", subscriptionId); + return null; + }else { + return subscriptions.get(0); + } } catch (SubscriptionException se) { throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se); } diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionConfigurationReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java similarity index 91% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionConfigurationReceiverTest.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java index 8c4971470..813574980 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionConfigurationReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java @@ -31,15 +31,15 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.config.SubscriptionCacheRepo; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmSubscriptionConfigurationReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver; @RunWith(MockitoJUnitRunner.class) -public class OqmSubscriptionConfigurationReceiverTest { +public class OqmConfigurationEventReceiverTest { @InjectMocks - private OqmSubscriptionConfigurationReceiver sut; + private OqmConfigurationEventReceiver sut; @Mock private OqmAckReplier replier; @Mock diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java index 9582f4cfe..776c169b3 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java @@ -17,7 +17,6 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; -import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,13 +29,11 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java index 7269114cf..025bad77c 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -27,10 +27,11 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import java.util.Arrays; import java.util.List; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -47,7 +48,7 @@ public class OqmSubscriberManagerTest { @Mock private MessageBrokerProvider subscriptionProvider; @Mock - private OqmSubscriptionHandler subscriptionHandler; + private RegisterSubscriptionService subscriptionHandler; @Mock private ExternalSubscriptionsManager externalSubscriptionsManager; -- GitLab