Skip to content
Snippets Groups Projects
Commit 512d95e3 authored by Dmitrii Novikov (EPAM)'s avatar Dmitrii Novikov (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

Wrapped subscription info cache with singleton provider

parent 79d13ec1
No related branches found
No related tags found
1 merge request!408Wrapped subscription info cache with singleton provider
Showing
with 163 additions and 128 deletions
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.config;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class ExternalSubscriptionsManager {
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final OqmSubscriptionHandler subscriptionHandler;
public ExternalSubscriptions getExternalSubscriptions(String dataPartitionId) {
if (subscriptionInfoCache.get(dataPartitionId) == null) {
log.debug("Subscription info cache wasn't found for tenant {}", dataPartitionId);
reloadSubscriptionInfoCache(dataPartitionId);
}
return subscriptionInfoCache.get(dataPartitionId);
}
public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) {
List<Subscription> cachedInfos = Optional.ofNullable(getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions();
List<Subscription> filteredCachedInfos = filterSubscriptionInfosByTopic(cachedInfos, subscriptionId, serviceTopic);
if (filteredCachedInfos.isEmpty()) {
return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos);
} else {
log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size());
return filteredCachedInfos.get(0);
}
}
public void updateExternalSubscriptionsCache(String dataPartitionId, ExternalSubscriptions externalSubscriptions) {
subscriptionInfoCache.put(dataPartitionId, externalSubscriptions);
log.debug("Subscription info cache was updated for tenant {} with {} values", dataPartitionId, externalSubscriptions.getSubscriptions().size());
}
/**
* Get all subscription infos from Register service and enrich each of them with secret by additional request
*
* @param dataPartitionId partition id
*/
private void reloadSubscriptionInfoCache(String dataPartitionId) {
List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(dataPartitionId);
List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream()
.map(subscription -> getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic()))
.collect(Collectors.toList());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build());
log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size());
}
private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedInfos) {
List<Subscription> freshInfos = subscriptionHandler.getSubscriptionsById(dataPartitionId, subscriptionId);
if (freshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
List<Subscription> filteredFreshInfos = filterSubscriptionInfosByTopic(freshInfos, subscriptionId, serviceTopic);
if (filteredFreshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
cachedInfos.addAll(filteredFreshInfos);
updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedInfos).build());
log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size());
return filteredFreshInfos.get(0);
}
private static List<Subscription> filterSubscriptionInfosByTopic(List<Subscription> infos, String subscriptionId, String serviceTopic) {
return infos.stream()
.filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId()))
.collect(Collectors.toList());
}
}
......@@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
......@@ -42,7 +43,7 @@ public class OqmNotificationHandler {
private final OqmConfigurationProperties oqmConfigurationProperties;
private final HttpClient httpClient;
private final OqmSubscriptionHandler subscriptionHandler;
private final ExternalSubscriptionsManager externalSubscriptionsManager;
private final AuthFactory authFactory;
public HttpResponse notifySubscriber(String subscriptionId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
......@@ -55,7 +56,7 @@ public class OqmNotificationHandler {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes");
}
Subscription subscription = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
Subscription subscription = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
Secret secret = subscription.getSecret();
SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType());
......
......@@ -19,14 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver;
......@@ -34,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
......@@ -58,10 +54,9 @@ public class OqmSubscriberManager {
private final OqmSubscriptionHandler subscriptionHandler;
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmConfigurationProperties properties;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final OqmNotificationHandler notificationHandler;
private final MessageBrokerProvider messageBrokerProvider;
private final ExternalSubscriptionsManager externalSubscriptionsManager;
@PostConstruct
void postConstruct() {
......@@ -78,14 +73,7 @@ public class OqmSubscriberManager {
*/
void provisionSubscriptionInfoCache() {
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
String dataPartitionId = tenantInfo.getDataPartitionId();
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(Collections.emptyList()).build());
List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(tenantInfo.getDataPartitionId());
List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream()
.map(subscription -> subscriptionHandler.getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic()))
.collect(Collectors.toList());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build());
log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size());
externalSubscriptionsManager.getExternalSubscriptions(tenantInfo.getDataPartitionId());
}
}
......@@ -116,7 +104,7 @@ public class OqmSubscriberManager {
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) {
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(controlTopicSubscription)
.messageReceiver(new OqmControlTopicReceiver(subscriptionHandler, subscriptionInfoCache))
.messageReceiver(new OqmControlTopicReceiver(externalSubscriptionsManager))
.build();
driver.subscribe(subscriber, getDestination(tenantInfo));
}
......@@ -131,7 +119,7 @@ public class OqmSubscriberManager {
void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) {
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(serviceSubscription)
.messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, subscriptionInfoCache))
.messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, externalSubscriptionsManager))
.build();
driver.subscribe(subscriber, getDestination(tenantInfo));
}
......
......@@ -17,10 +17,11 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription;
......@@ -28,17 +29,11 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.service.DpsHeadersProvider;
import org.opengroup.osdu.notification.provider.gcp.service.SubscriptionServiceGc;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* GC implementation of {@link org.opengroup.osdu.notification.service.SubscriptionHandler}.
* The current implementation differs in non-request spring scope and handling OQM message instead of HTTP requests.
......@@ -51,7 +46,6 @@ import java.util.stream.Collectors;
public class OqmSubscriptionHandler {
private final ISubscriptionFactory registerClientFactory;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final DpsHeadersProvider dpsHeadersProvider;
private final SubscriptionServiceGc subscriptionServiceGc;
......@@ -81,52 +75,13 @@ public class OqmSubscriptionHandler {
}
}
public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) {
List<Subscription> cachedInfos = getCachedSubscriptionInfos(dataPartitionId);
List<Subscription> filteredCachedInfos = filterSubscriptionInfos(cachedInfos, subscriptionId, serviceTopic);
if (filteredCachedInfos.isEmpty()) {
return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos);
} else {
log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size());
return filteredCachedInfos.get(0);
}
}
private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedSubscriptionInfos) {
public List<Subscription> getSubscriptionsById(String dataPartitionId, String subscriptionId) {
try {
DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId);
ISubscriptionService registerClient = registerClientFactory.create(headers);
List<Subscription> freshInfos = registerClient.query(subscriptionId);
if (freshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
List<Subscription> filteredFreshInfos = filterSubscriptionInfos(freshInfos, subscriptionId, serviceTopic);
if (filteredFreshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
cachedSubscriptionInfos.addAll(filteredFreshInfos);
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedSubscriptionInfos).build());
log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size());
return filteredFreshInfos.get(0);
return registerClient.query(subscriptionId);
} catch (SubscriptionException se) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se);
}
}
private List<Subscription> getCachedSubscriptionInfos(String dataPartitionId) {
return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions();
}
private static List<Subscription> filterSubscriptionInfos(List<Subscription> infos, String subscriptionId, String serviceTopic) {
return infos.stream()
.filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId()))
.collect(Collectors.toList());
}
}
\ No newline at end of file
......@@ -17,22 +17,20 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Slf4j
public class OqmControlTopicReceiver implements OqmMessageReceiver {
......@@ -40,13 +38,10 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
private static final String SUBSCRIPTION_UPDATED = "Subscription Updated";
private static final String SUBSCRIPTION_DELETED = "Subscription Deleted";
private final OqmSubscriptionHandler subscriptionHandler;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final ExternalSubscriptionsManager externalSubscriptionsManager;
public OqmControlTopicReceiver(OqmSubscriptionHandler subscriptionHandler,
IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) {
this.subscriptionHandler = subscriptionHandler;
this.subscriptionInfoCache = subscriptionInfoCache;
public OqmControlTopicReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) {
this.externalSubscriptionsManager = externalSubscriptionsManager;
}
@Override
......@@ -73,7 +68,8 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`",
pubsubMessage, serviceTopic, subscriptionId);
List<Subscription> subscriptionInfos = Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId))
List<Subscription> subscriptionInfos = Optional.ofNullable(
externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions();
Optional<Subscription> cachedInfo = subscriptionInfos.stream()
......@@ -84,17 +80,17 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
case SUBSCRIPTION_CREATED:
case SUBSCRIPTION_UPDATED:
cachedInfo.ifPresent(subscriptionInfos::remove);
Subscription freshSubscriptionInfo = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
Subscription freshSubscriptionInfo = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
if (freshSubscriptionInfo != null) {
subscriptionInfos.add(freshSubscriptionInfo);
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic);
}
break;
case SUBSCRIPTION_DELETED:
if (cachedInfo.isPresent()) {
subscriptionInfos.remove(cachedInfo.get());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic);
} else {
log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId);
......
......@@ -19,12 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
import java.util.HashMap;
......@@ -40,14 +39,14 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver {
private final OqmSubscription subscription;
private final OqmDriver driver;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final ExternalSubscriptionsManager externalSubscriptionsManager;
public OqmServiceTopicReceiver(OqmSubscription subscription,
OqmDriver driver,
IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) {
ExternalSubscriptionsManager externalSubscriptionsManager) {
this.subscription = subscription;
this.driver = driver;
this.subscriptionInfoCache = subscriptionInfoCache;
this.externalSubscriptionsManager = externalSubscriptionsManager;
}
@Override
......@@ -101,7 +100,7 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver {
}
private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) {
return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId))
return Optional.ofNullable(externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions().stream()
.filter(info -> serviceTopic.equals(info.getTopic()))
......
......@@ -17,26 +17,26 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class OqmControlTopicReceiverTest {
......@@ -45,44 +45,44 @@ public class OqmControlTopicReceiverTest {
@Mock
private OqmAckReplier replier;
@Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
@Mock
private OqmSubscriptionHandler subscriptionHandler;
@Mock
private Subscription subscription;
@Mock
private ExternalSubscriptions externalSubscriptions;
@Mock
private List<Subscription> subscriptionInfos;
@Mock
private ExternalSubscriptionsManager externalSubscriptionsManager;
@Test
public void testReceiveMessageSubscriptionCreated() {
when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription);
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build();
sut.receiveMessage(createMessage, replier);
verify(subscriptionInfos, times(1)).add(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any());
verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack();
}
@Test
public void testReceiveMessageSubscriptionUpdated() {
when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription);
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build();
sut.receiveMessage(updatedMessage, replier);
verify(subscriptionInfos, times(1)).add(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any());
verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack();
}
@Test
public void testReceiveMessageSubscriptionDeleted() {
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription));
when(subscription.getNotificationId()).thenReturn("4");
when(subscription.getTopic()).thenReturn("topic1");
......@@ -95,7 +95,8 @@ public class OqmControlTopicReceiverTest {
.build();
sut.receiveMessage(deletedMessage, replier);
verify(subscriptionInfos, times(1)).remove(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any());
verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack();
}
......
......@@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver;
......@@ -46,13 +47,13 @@ public class OqmServiceTopicReceiverTest {
@Mock
private OqmAckReplier replier;
@Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
@Mock
private OqmDriver driver;
@Mock
private OqmSubscription oqmSubscription;
@Mock
private ExternalSubscriptions externalSubscriptions;
@Mock
private ExternalSubscriptionsManager externalSubscriptionsManager;
private List<Subscription> subscriptionInfos;
private OqmTopic topic;
......@@ -71,8 +72,8 @@ public class OqmServiceTopicReceiverTest {
@Test
public void testReceiveMessageSubscriptionCreated() {
when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic));
when(subscriptionInfoCache.get("tenant1")).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{
put("data-partition-id", "tenant1");
}}).build();
......
......@@ -30,6 +30,7 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
......@@ -44,8 +45,6 @@ public class OqmSubscriberManagerTest {
@InjectMocks
private OqmSubscriberManager sut;
@Spy
private OqmConfigurationProperties properties;
@Mock
private ITenantFactory tenantFactory;
@Mock
......@@ -55,17 +54,15 @@ public class OqmSubscriberManagerTest {
@Mock
private OqmSubscriptionHandler subscriptionHandler;
@Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private ExternalSubscriptionsManager externalSubscriptionsManager;
private List<TenantInfo> tenantInfos;
private List<OqmTopic> topics;
private List<Subscription> subscriptions;
@Before
public void setUp() {
tenantInfos = createTenants();
topics = createTopics();
subscriptions = createSubscriptions();
}
private List<TenantInfo> createTenants() {
......@@ -89,12 +86,6 @@ public class OqmSubscriberManagerTest {
return Arrays.asList(topic1, topic2,topic3, topic4);
}
private List<Subscription> createSubscriptions() {
Subscription subscription1 = new Subscription();
Subscription subscription2 = new Subscription();
return Arrays.asList(subscription1, subscription2);
}
@Test
public void testProvisioningControlTopicSubscribers() {
when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos);
......@@ -113,9 +104,7 @@ public class OqmSubscriberManagerTest {
@Test
public void testProvisioningSubscriptionInfoCache() {
when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos);
when(subscriptionHandler.getAllSubscriptionInfos(anyString())).thenReturn(subscriptions);
sut.provisionSubscriptionInfoCache();
verify(subscriptionHandler, times(6)).getSubscription(anyString(), anyString(), anyString());
verify(subscriptionInfoCache, times(6)).put(any(), any());
verify(externalSubscriptionsManager, times(3)).getExternalSubscriptions(any());
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment