diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java new file mode 100644 index 0000000000000000000000000000000000000000..1755d01a60f1dbd66274026c8babcdd2b6f895cc --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.config; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ExternalSubscriptionsManager { + + private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private final OqmSubscriptionHandler subscriptionHandler; + + public ExternalSubscriptions getExternalSubscriptions(String dataPartitionId) { + if (subscriptionInfoCache.get(dataPartitionId) == null) { + log.debug("Subscription info cache wasn't found for tenant {}", dataPartitionId); + reloadSubscriptionInfoCache(dataPartitionId); + } + return subscriptionInfoCache.get(dataPartitionId); + } + + public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) { + List<Subscription> cachedInfos = Optional.ofNullable(getExternalSubscriptions(dataPartitionId)) + .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) + .getSubscriptions(); + List<Subscription> filteredCachedInfos = filterSubscriptionInfosByTopic(cachedInfos, subscriptionId, serviceTopic); + if (filteredCachedInfos.isEmpty()) { + return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos); + } else { + log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size()); + return filteredCachedInfos.get(0); + } + } + + public void updateExternalSubscriptionsCache(String dataPartitionId, ExternalSubscriptions externalSubscriptions) { + subscriptionInfoCache.put(dataPartitionId, externalSubscriptions); + log.debug("Subscription info cache was updated for tenant {} with {} values", dataPartitionId, externalSubscriptions.getSubscriptions().size()); + } + + /** + * Get all subscription infos from Register service and enrich each of them with secret by additional request + * + * @param dataPartitionId partition id + */ + private void reloadSubscriptionInfoCache(String dataPartitionId) { + List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(dataPartitionId); + List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream() + .map(subscription -> getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic())) + .collect(Collectors.toList()); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build()); + log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size()); + } + + private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedInfos) { + List<Subscription> freshInfos = subscriptionHandler.getSubscriptionsById(dataPartitionId, subscriptionId); + if (freshInfos.isEmpty()) { + log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); + return null; + } + + List<Subscription> filteredFreshInfos = filterSubscriptionInfosByTopic(freshInfos, subscriptionId, serviceTopic); + if (filteredFreshInfos.isEmpty()) { + log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); + return null; + } + cachedInfos.addAll(filteredFreshInfos); + updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedInfos).build()); + + log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size()); + return filteredFreshInfos.get(0); + } + + private static List<Subscription> filterSubscriptionInfosByTopic(List<Subscription> infos, String subscriptionId, String serviceTopic) { + return infos.stream() + .filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId())) + .collect(Collectors.toList()); + } +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java index edfdbbb0789d41a9dab75c1e204fc598f6828816..1b41fbb1a39e6c4003fc71e7abdfed6a95de248e 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java @@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -42,7 +43,7 @@ public class OqmNotificationHandler { private final OqmConfigurationProperties oqmConfigurationProperties; private final HttpClient httpClient; - private final OqmSubscriptionHandler subscriptionHandler; + private final ExternalSubscriptionsManager externalSubscriptionsManager; private final AuthFactory authFactory; public HttpResponse notifySubscriber(String subscriptionId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { @@ -55,7 +56,7 @@ public class OqmNotificationHandler { throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes"); } - Subscription subscription = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); + Subscription subscription = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic); Secret secret = subscription.getSecret(); SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType()); diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index 1f2fafed276d71238c79ffc2280c9d0f252ca934..f7908a74217dad86bc9da0680751d3ca8b87f569 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -19,14 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.opengroup.osdu.core.common.cache.IRedisCache; -import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; @@ -34,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -58,10 +54,9 @@ public class OqmSubscriberManager { private final OqmSubscriptionHandler subscriptionHandler; private final ITenantFactory tenantInfoFactory; private final OqmDriver driver; - private final OqmConfigurationProperties properties; - private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; private final OqmNotificationHandler notificationHandler; private final MessageBrokerProvider messageBrokerProvider; + private final ExternalSubscriptionsManager externalSubscriptionsManager; @PostConstruct void postConstruct() { @@ -78,14 +73,7 @@ public class OqmSubscriberManager { */ void provisionSubscriptionInfoCache() { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { - String dataPartitionId = tenantInfo.getDataPartitionId(); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(Collections.emptyList()).build()); - List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(tenantInfo.getDataPartitionId()); - List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream() - .map(subscription -> subscriptionHandler.getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic())) - .collect(Collectors.toList()); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build()); - log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size()); + externalSubscriptionsManager.getExternalSubscriptions(tenantInfo.getDataPartitionId()); } } @@ -116,7 +104,7 @@ public class OqmSubscriberManager { private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(controlTopicSubscription) - .messageReceiver(new OqmControlTopicReceiver(subscriptionHandler, subscriptionInfoCache)) + .messageReceiver(new OqmControlTopicReceiver(externalSubscriptionsManager)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } @@ -131,7 +119,7 @@ public class OqmSubscriberManager { void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(serviceSubscription) - .messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, subscriptionInfoCache)) + .messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, externalSubscriptionsManager)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java index b624f900c532eb150bd444a2cdb51a3607d03531..fa45de0977e07c0c27b45a16d9499df5b8a32270 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java @@ -17,10 +17,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; +import java.util.List; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.Subscription; @@ -28,17 +29,11 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; import org.opengroup.osdu.core.common.notification.ISubscriptionService; import org.opengroup.osdu.core.common.notification.SubscriptionException; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.service.DpsHeadersProvider; import org.opengroup.osdu.notification.provider.gcp.service.SubscriptionServiceGc; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - /** * GC implementation of {@link org.opengroup.osdu.notification.service.SubscriptionHandler}. * The current implementation differs in non-request spring scope and handling OQM message instead of HTTP requests. @@ -51,7 +46,6 @@ import java.util.stream.Collectors; public class OqmSubscriptionHandler { private final ISubscriptionFactory registerClientFactory; - private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; private final DpsHeadersProvider dpsHeadersProvider; private final SubscriptionServiceGc subscriptionServiceGc; @@ -81,52 +75,13 @@ public class OqmSubscriptionHandler { } } - public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) { - List<Subscription> cachedInfos = getCachedSubscriptionInfos(dataPartitionId); - List<Subscription> filteredCachedInfos = filterSubscriptionInfos(cachedInfos, subscriptionId, serviceTopic); - if (filteredCachedInfos.isEmpty()) { - return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos); - } else { - log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size()); - return filteredCachedInfos.get(0); - } - } - - private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedSubscriptionInfos) { + public List<Subscription> getSubscriptionsById(String dataPartitionId, String subscriptionId) { try { DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); ISubscriptionService registerClient = registerClientFactory.create(headers); - - List<Subscription> freshInfos = registerClient.query(subscriptionId); - if (freshInfos.isEmpty()) { - log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); - return null; - } - - List<Subscription> filteredFreshInfos = filterSubscriptionInfos(freshInfos, subscriptionId, serviceTopic); - if (filteredFreshInfos.isEmpty()) { - log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); - return null; - } - cachedSubscriptionInfos.addAll(filteredFreshInfos); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedSubscriptionInfos).build()); - - log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size()); - return filteredFreshInfos.get(0); + return registerClient.query(subscriptionId); } catch (SubscriptionException se) { throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se); } } - - private List<Subscription> getCachedSubscriptionInfos(String dataPartitionId) { - return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) - .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) - .getSubscriptions(); - } - - private static List<Subscription> filterSubscriptionInfos(List<Subscription> infos, String subscriptionId, String serviceTopic) { - return infos.stream() - .filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId())) - .collect(Collectors.toList()); - } } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java index 50beeff333fb9a2d7ca4b31d7314df9f0e255cd1..45891a9ae478f29cbd952399909bdb2e4bff0761 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java @@ -17,22 +17,20 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; +import java.util.List; +import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; -import java.util.List; -import java.util.Map; -import java.util.Optional; - @Slf4j public class OqmControlTopicReceiver implements OqmMessageReceiver { @@ -40,13 +38,10 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; private static final String SUBSCRIPTION_DELETED = "Subscription Deleted"; - private final OqmSubscriptionHandler subscriptionHandler; - private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private final ExternalSubscriptionsManager externalSubscriptionsManager; - public OqmControlTopicReceiver(OqmSubscriptionHandler subscriptionHandler, - IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { - this.subscriptionHandler = subscriptionHandler; - this.subscriptionInfoCache = subscriptionInfoCache; + public OqmControlTopicReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) { + this.externalSubscriptionsManager = externalSubscriptionsManager; } @Override @@ -73,7 +68,8 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", pubsubMessage, serviceTopic, subscriptionId); - List<Subscription> subscriptionInfos = Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) + List<Subscription> subscriptionInfos = Optional.ofNullable( + externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId)) .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) .getSubscriptions(); Optional<Subscription> cachedInfo = subscriptionInfos.stream() @@ -84,17 +80,17 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { case SUBSCRIPTION_CREATED: case SUBSCRIPTION_UPDATED: cachedInfo.ifPresent(subscriptionInfos::remove); - Subscription freshSubscriptionInfo = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); + Subscription freshSubscriptionInfo = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic); if (freshSubscriptionInfo != null) { subscriptionInfos.add(freshSubscriptionInfo); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); + externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic); } break; case SUBSCRIPTION_DELETED: if (cachedInfo.isPresent()) { subscriptionInfos.remove(cachedInfo.get()); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); + externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic); } else { log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId); diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java index bd551425fe3365d445ad83ab5e111912749f9799..770bc57880fb6b4e88d5e521287447617172b52b 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java @@ -19,12 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; import java.util.HashMap; @@ -40,14 +39,14 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { private final OqmSubscription subscription; private final OqmDriver driver; - private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private final ExternalSubscriptionsManager externalSubscriptionsManager; public OqmServiceTopicReceiver(OqmSubscription subscription, OqmDriver driver, - IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { + ExternalSubscriptionsManager externalSubscriptionsManager) { this.subscription = subscription; this.driver = driver; - this.subscriptionInfoCache = subscriptionInfoCache; + this.externalSubscriptionsManager = externalSubscriptionsManager; } @Override @@ -101,7 +100,7 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { } private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) { - return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) + return Optional.ofNullable(externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId)) .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) .getSubscriptions().stream() .filter(info -> serviceTopic.equals(info.getTopic())) diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java index 6399f67175b8efeba721ee2fd2c74e21a9cc53fa..40a17281950a2e2d4949a6d27dcbbe0a91ba1386 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java @@ -17,26 +17,26 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.stream.Stream; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.opengroup.osdu.core.common.cache.IRedisCache; -import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; -import java.util.HashMap; -import java.util.List; -import java.util.stream.Stream; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - @RunWith(MockitoJUnitRunner.class) public class OqmControlTopicReceiverTest { @@ -45,44 +45,44 @@ public class OqmControlTopicReceiverTest { @Mock private OqmAckReplier replier; @Mock - private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; - @Mock - private OqmSubscriptionHandler subscriptionHandler; - @Mock private Subscription subscription; @Mock private ExternalSubscriptions externalSubscriptions; @Mock private List<Subscription> subscriptionInfos; + @Mock + private ExternalSubscriptionsManager externalSubscriptionsManager; @Test public void testReceiveMessageSubscriptionCreated() { - when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); - when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); + when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); sut.receiveMessage(createMessage, replier); verify(subscriptionInfos, times(1)).add(subscription); - verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); + verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); verify(replier, times(1)).ack(); } @Test public void testReceiveMessageSubscriptionUpdated() { - when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); - when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); + when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build(); sut.receiveMessage(updatedMessage, replier); verify(subscriptionInfos, times(1)).add(subscription); - verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); + verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); verify(replier, times(1)).ack(); } @Test public void testReceiveMessageSubscriptionDeleted() { - when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription)); when(subscription.getNotificationId()).thenReturn("4"); when(subscription.getTopic()).thenReturn("topic1"); @@ -95,7 +95,8 @@ public class OqmControlTopicReceiverTest { .build(); sut.receiveMessage(deletedMessage, replier); verify(subscriptionInfos, times(1)).remove(subscription); - verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); + verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); verify(replier, times(1)).ack(); } diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java index eb0d6868119e22f6a50440456997e37c1b8aaadf..c859a72eadd57475ce24eafb35b03941a9a12254 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java @@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; @@ -46,13 +47,13 @@ public class OqmServiceTopicReceiverTest { @Mock private OqmAckReplier replier; @Mock - private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; - @Mock private OqmDriver driver; @Mock private OqmSubscription oqmSubscription; @Mock private ExternalSubscriptions externalSubscriptions; + @Mock + private ExternalSubscriptionsManager externalSubscriptionsManager; private List<Subscription> subscriptionInfos; private OqmTopic topic; @@ -71,8 +72,8 @@ public class OqmServiceTopicReceiverTest { @Test public void testReceiveMessageSubscriptionCreated() { when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic)); - when(subscriptionInfoCache.get("tenant1")).thenReturn(externalSubscriptions); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{ put("data-partition-id", "tenant1"); }}).build(); diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java index 9a79fe6355e3890ecc706e537bfaef6fb30d4826..a3d33bdddb5acff9f729db552dcaaa7cb00c2bdb 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -30,6 +30,7 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; @@ -44,8 +45,6 @@ public class OqmSubscriberManagerTest { @InjectMocks private OqmSubscriberManager sut; - @Spy - private OqmConfigurationProperties properties; @Mock private ITenantFactory tenantFactory; @Mock @@ -55,17 +54,15 @@ public class OqmSubscriberManagerTest { @Mock private OqmSubscriptionHandler subscriptionHandler; @Mock - private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private ExternalSubscriptionsManager externalSubscriptionsManager; private List<TenantInfo> tenantInfos; private List<OqmTopic> topics; - private List<Subscription> subscriptions; @Before public void setUp() { tenantInfos = createTenants(); topics = createTopics(); - subscriptions = createSubscriptions(); } private List<TenantInfo> createTenants() { @@ -89,12 +86,6 @@ public class OqmSubscriberManagerTest { return Arrays.asList(topic1, topic2,topic3, topic4); } - private List<Subscription> createSubscriptions() { - Subscription subscription1 = new Subscription(); - Subscription subscription2 = new Subscription(); - return Arrays.asList(subscription1, subscription2); - } - @Test public void testProvisioningControlTopicSubscribers() { when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); @@ -113,9 +104,7 @@ public class OqmSubscriberManagerTest { @Test public void testProvisioningSubscriptionInfoCache() { when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); - when(subscriptionHandler.getAllSubscriptionInfos(anyString())).thenReturn(subscriptions); sut.provisionSubscriptionInfoCache(); - verify(subscriptionHandler, times(6)).getSubscription(anyString(), anyString(), anyString()); - verify(subscriptionInfoCache, times(6)).put(any(), any()); + verify(externalSubscriptionsManager, times(3)).getExternalSubscriptions(any()); } } \ No newline at end of file