Skip to content
Snippets Groups Projects
Commit 22e5ca1c authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'cache-init' into 'master'

Wrapped subscription info cache with singleton provider

See merge request !408
parents 79d13ec1 512d95e3
No related branches found
No related tags found
1 merge request!408Wrapped subscription info cache with singleton provider
Pipeline #203517 failed
Showing
with 163 additions and 128 deletions
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.config;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class ExternalSubscriptionsManager {
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final OqmSubscriptionHandler subscriptionHandler;
public ExternalSubscriptions getExternalSubscriptions(String dataPartitionId) {
if (subscriptionInfoCache.get(dataPartitionId) == null) {
log.debug("Subscription info cache wasn't found for tenant {}", dataPartitionId);
reloadSubscriptionInfoCache(dataPartitionId);
}
return subscriptionInfoCache.get(dataPartitionId);
}
public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) {
List<Subscription> cachedInfos = Optional.ofNullable(getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions();
List<Subscription> filteredCachedInfos = filterSubscriptionInfosByTopic(cachedInfos, subscriptionId, serviceTopic);
if (filteredCachedInfos.isEmpty()) {
return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos);
} else {
log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size());
return filteredCachedInfos.get(0);
}
}
public void updateExternalSubscriptionsCache(String dataPartitionId, ExternalSubscriptions externalSubscriptions) {
subscriptionInfoCache.put(dataPartitionId, externalSubscriptions);
log.debug("Subscription info cache was updated for tenant {} with {} values", dataPartitionId, externalSubscriptions.getSubscriptions().size());
}
/**
* Get all subscription infos from Register service and enrich each of them with secret by additional request
*
* @param dataPartitionId partition id
*/
private void reloadSubscriptionInfoCache(String dataPartitionId) {
List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(dataPartitionId);
List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream()
.map(subscription -> getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic()))
.collect(Collectors.toList());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build());
log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size());
}
private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedInfos) {
List<Subscription> freshInfos = subscriptionHandler.getSubscriptionsById(dataPartitionId, subscriptionId);
if (freshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
List<Subscription> filteredFreshInfos = filterSubscriptionInfosByTopic(freshInfos, subscriptionId, serviceTopic);
if (filteredFreshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
cachedInfos.addAll(filteredFreshInfos);
updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedInfos).build());
log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size());
return filteredFreshInfos.get(0);
}
private static List<Subscription> filterSubscriptionInfosByTopic(List<Subscription> infos, String subscriptionId, String serviceTopic) {
return infos.stream()
.filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId()))
.collect(Collectors.toList());
}
}
...@@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret; ...@@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -42,7 +43,7 @@ public class OqmNotificationHandler { ...@@ -42,7 +43,7 @@ public class OqmNotificationHandler {
private final OqmConfigurationProperties oqmConfigurationProperties; private final OqmConfigurationProperties oqmConfigurationProperties;
private final HttpClient httpClient; private final HttpClient httpClient;
private final OqmSubscriptionHandler subscriptionHandler; private final ExternalSubscriptionsManager externalSubscriptionsManager;
private final AuthFactory authFactory; private final AuthFactory authFactory;
public HttpResponse notifySubscriber(String subscriptionId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { public HttpResponse notifySubscriber(String subscriptionId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
...@@ -55,7 +56,7 @@ public class OqmNotificationHandler { ...@@ -55,7 +56,7 @@ public class OqmNotificationHandler {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes"); throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes");
} }
Subscription subscription = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); Subscription subscription = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
Secret secret = subscription.getSecret(); Secret secret = subscription.getSecret();
SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType()); SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType());
......
...@@ -19,14 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; ...@@ -19,14 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver;
...@@ -34,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; ...@@ -34,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -58,10 +54,9 @@ public class OqmSubscriberManager { ...@@ -58,10 +54,9 @@ public class OqmSubscriberManager {
private final OqmSubscriptionHandler subscriptionHandler; private final OqmSubscriptionHandler subscriptionHandler;
private final ITenantFactory tenantInfoFactory; private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver; private final OqmDriver driver;
private final OqmConfigurationProperties properties;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final OqmNotificationHandler notificationHandler; private final OqmNotificationHandler notificationHandler;
private final MessageBrokerProvider messageBrokerProvider; private final MessageBrokerProvider messageBrokerProvider;
private final ExternalSubscriptionsManager externalSubscriptionsManager;
@PostConstruct @PostConstruct
void postConstruct() { void postConstruct() {
...@@ -78,14 +73,7 @@ public class OqmSubscriberManager { ...@@ -78,14 +73,7 @@ public class OqmSubscriberManager {
*/ */
void provisionSubscriptionInfoCache() { void provisionSubscriptionInfoCache() {
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
String dataPartitionId = tenantInfo.getDataPartitionId(); externalSubscriptionsManager.getExternalSubscriptions(tenantInfo.getDataPartitionId());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(Collections.emptyList()).build());
List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(tenantInfo.getDataPartitionId());
List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream()
.map(subscription -> subscriptionHandler.getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic()))
.collect(Collectors.toList());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build());
log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size());
} }
} }
...@@ -116,7 +104,7 @@ public class OqmSubscriberManager { ...@@ -116,7 +104,7 @@ public class OqmSubscriberManager {
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) {
OqmSubscriber subscriber = OqmSubscriber.builder() OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(controlTopicSubscription) .subscription(controlTopicSubscription)
.messageReceiver(new OqmControlTopicReceiver(subscriptionHandler, subscriptionInfoCache)) .messageReceiver(new OqmControlTopicReceiver(externalSubscriptionsManager))
.build(); .build();
driver.subscribe(subscriber, getDestination(tenantInfo)); driver.subscribe(subscriber, getDestination(tenantInfo));
} }
...@@ -131,7 +119,7 @@ public class OqmSubscriberManager { ...@@ -131,7 +119,7 @@ public class OqmSubscriberManager {
void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) { void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) {
OqmSubscriber subscriber = OqmSubscriber.builder() OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(serviceSubscription) .subscription(serviceSubscription)
.messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, subscriptionInfoCache)) .messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, externalSubscriptionsManager))
.build(); .build();
driver.subscribe(subscriber, getDestination(tenantInfo)); driver.subscribe(subscriber, getDestination(tenantInfo));
} }
......
...@@ -17,10 +17,11 @@ ...@@ -17,10 +17,11 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub; package org.opengroup.osdu.notification.provider.gcp.pubsub;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
...@@ -28,17 +29,11 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; ...@@ -28,17 +29,11 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService; import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException; import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.service.DpsHeadersProvider; import org.opengroup.osdu.notification.provider.gcp.service.DpsHeadersProvider;
import org.opengroup.osdu.notification.provider.gcp.service.SubscriptionServiceGc; import org.opengroup.osdu.notification.provider.gcp.service.SubscriptionServiceGc;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* GC implementation of {@link org.opengroup.osdu.notification.service.SubscriptionHandler}. * GC implementation of {@link org.opengroup.osdu.notification.service.SubscriptionHandler}.
* The current implementation differs in non-request spring scope and handling OQM message instead of HTTP requests. * The current implementation differs in non-request spring scope and handling OQM message instead of HTTP requests.
...@@ -51,7 +46,6 @@ import java.util.stream.Collectors; ...@@ -51,7 +46,6 @@ import java.util.stream.Collectors;
public class OqmSubscriptionHandler { public class OqmSubscriptionHandler {
private final ISubscriptionFactory registerClientFactory; private final ISubscriptionFactory registerClientFactory;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final DpsHeadersProvider dpsHeadersProvider; private final DpsHeadersProvider dpsHeadersProvider;
private final SubscriptionServiceGc subscriptionServiceGc; private final SubscriptionServiceGc subscriptionServiceGc;
...@@ -81,52 +75,13 @@ public class OqmSubscriptionHandler { ...@@ -81,52 +75,13 @@ public class OqmSubscriptionHandler {
} }
} }
public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) { public List<Subscription> getSubscriptionsById(String dataPartitionId, String subscriptionId) {
List<Subscription> cachedInfos = getCachedSubscriptionInfos(dataPartitionId);
List<Subscription> filteredCachedInfos = filterSubscriptionInfos(cachedInfos, subscriptionId, serviceTopic);
if (filteredCachedInfos.isEmpty()) {
return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos);
} else {
log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size());
return filteredCachedInfos.get(0);
}
}
private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedSubscriptionInfos) {
try { try {
DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId);
ISubscriptionService registerClient = registerClientFactory.create(headers); ISubscriptionService registerClient = registerClientFactory.create(headers);
return registerClient.query(subscriptionId);
List<Subscription> freshInfos = registerClient.query(subscriptionId);
if (freshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
List<Subscription> filteredFreshInfos = filterSubscriptionInfos(freshInfos, subscriptionId, serviceTopic);
if (filteredFreshInfos.isEmpty()) {
log.warn("Subscription info with sub ID: `{}` not found", subscriptionId);
return null;
}
cachedSubscriptionInfos.addAll(filteredFreshInfos);
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedSubscriptionInfos).build());
log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size());
return filteredFreshInfos.get(0);
} catch (SubscriptionException se) { } catch (SubscriptionException se) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se); throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se);
} }
} }
private List<Subscription> getCachedSubscriptionInfos(String dataPartitionId) {
return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions();
}
private static List<Subscription> filterSubscriptionInfos(List<Subscription> infos, String subscriptionId, String serviceTopic) {
return infos.stream()
.filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId()))
.collect(Collectors.toList());
}
} }
\ No newline at end of file
...@@ -17,22 +17,20 @@ ...@@ -17,22 +17,20 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Slf4j @Slf4j
public class OqmControlTopicReceiver implements OqmMessageReceiver { public class OqmControlTopicReceiver implements OqmMessageReceiver {
...@@ -40,13 +38,10 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { ...@@ -40,13 +38,10 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; private static final String SUBSCRIPTION_UPDATED = "Subscription Updated";
private static final String SUBSCRIPTION_DELETED = "Subscription Deleted"; private static final String SUBSCRIPTION_DELETED = "Subscription Deleted";
private final OqmSubscriptionHandler subscriptionHandler; private final ExternalSubscriptionsManager externalSubscriptionsManager;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
public OqmControlTopicReceiver(OqmSubscriptionHandler subscriptionHandler, public OqmControlTopicReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) {
IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { this.externalSubscriptionsManager = externalSubscriptionsManager;
this.subscriptionHandler = subscriptionHandler;
this.subscriptionInfoCache = subscriptionInfoCache;
} }
@Override @Override
...@@ -73,7 +68,8 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { ...@@ -73,7 +68,8 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`",
pubsubMessage, serviceTopic, subscriptionId); pubsubMessage, serviceTopic, subscriptionId);
List<Subscription> subscriptionInfos = Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) List<Subscription> subscriptionInfos = Optional.ofNullable(
externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions(); .getSubscriptions();
Optional<Subscription> cachedInfo = subscriptionInfos.stream() Optional<Subscription> cachedInfo = subscriptionInfos.stream()
...@@ -84,17 +80,17 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver { ...@@ -84,17 +80,17 @@ public class OqmControlTopicReceiver implements OqmMessageReceiver {
case SUBSCRIPTION_CREATED: case SUBSCRIPTION_CREATED:
case SUBSCRIPTION_UPDATED: case SUBSCRIPTION_UPDATED:
cachedInfo.ifPresent(subscriptionInfos::remove); cachedInfo.ifPresent(subscriptionInfos::remove);
Subscription freshSubscriptionInfo = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); Subscription freshSubscriptionInfo = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic);
if (freshSubscriptionInfo != null) { if (freshSubscriptionInfo != null) {
subscriptionInfos.add(freshSubscriptionInfo); subscriptionInfos.add(freshSubscriptionInfo);
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic); log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic);
} }
break; break;
case SUBSCRIPTION_DELETED: case SUBSCRIPTION_DELETED:
if (cachedInfo.isPresent()) { if (cachedInfo.isPresent()) {
subscriptionInfos.remove(cachedInfo.get()); subscriptionInfos.remove(cachedInfo.get());
subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build());
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic); log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic);
} else { } else {
log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId); log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId);
......
...@@ -19,12 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; ...@@ -19,12 +19,11 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
import java.util.HashMap; import java.util.HashMap;
...@@ -40,14 +39,14 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { ...@@ -40,14 +39,14 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver {
private final OqmSubscription subscription; private final OqmSubscription subscription;
private final OqmDriver driver; private final OqmDriver driver;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; private final ExternalSubscriptionsManager externalSubscriptionsManager;
public OqmServiceTopicReceiver(OqmSubscription subscription, public OqmServiceTopicReceiver(OqmSubscription subscription,
OqmDriver driver, OqmDriver driver,
IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { ExternalSubscriptionsManager externalSubscriptionsManager) {
this.subscription = subscription; this.subscription = subscription;
this.driver = driver; this.driver = driver;
this.subscriptionInfoCache = subscriptionInfoCache; this.externalSubscriptionsManager = externalSubscriptionsManager;
} }
@Override @Override
...@@ -101,7 +100,7 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { ...@@ -101,7 +100,7 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver {
} }
private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) { private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) {
return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) return Optional.ofNullable(externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId))
.orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized"))
.getSubscriptions().stream() .getSubscriptions().stream()
.filter(info -> serviceTopic.equals(info.getTopic())) .filter(info -> serviceTopic.equals(info.getTopic()))
......
...@@ -17,26 +17,26 @@ ...@@ -17,26 +17,26 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub; package org.opengroup.osdu.notification.provider.gcp.pubsub;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class OqmControlTopicReceiverTest { public class OqmControlTopicReceiverTest {
...@@ -45,44 +45,44 @@ public class OqmControlTopicReceiverTest { ...@@ -45,44 +45,44 @@ public class OqmControlTopicReceiverTest {
@Mock @Mock
private OqmAckReplier replier; private OqmAckReplier replier;
@Mock @Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
@Mock
private OqmSubscriptionHandler subscriptionHandler;
@Mock
private Subscription subscription; private Subscription subscription;
@Mock @Mock
private ExternalSubscriptions externalSubscriptions; private ExternalSubscriptions externalSubscriptions;
@Mock @Mock
private List<Subscription> subscriptionInfos; private List<Subscription> subscriptionInfos;
@Mock
private ExternalSubscriptionsManager externalSubscriptionsManager;
@Test @Test
public void testReceiveMessageSubscriptionCreated() { public void testReceiveMessageSubscriptionCreated() {
when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription);
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build();
sut.receiveMessage(createMessage, replier); sut.receiveMessage(createMessage, replier);
verify(subscriptionInfos, times(1)).add(subscription); verify(subscriptionInfos, times(1)).add(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any()); verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack(); verify(replier, times(1)).ack();
} }
@Test @Test
public void testReceiveMessageSubscriptionUpdated() { public void testReceiveMessageSubscriptionUpdated() {
when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription);
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build(); OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build();
sut.receiveMessage(updatedMessage, replier); sut.receiveMessage(updatedMessage, replier);
verify(subscriptionInfos, times(1)).add(subscription); verify(subscriptionInfos, times(1)).add(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any()); verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack(); verify(replier, times(1)).ack();
} }
@Test @Test
public void testReceiveMessageSubscriptionDeleted() { public void testReceiveMessageSubscriptionDeleted() {
when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription)); when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription));
when(subscription.getNotificationId()).thenReturn("4"); when(subscription.getNotificationId()).thenReturn("4");
when(subscription.getTopic()).thenReturn("topic1"); when(subscription.getTopic()).thenReturn("topic1");
...@@ -95,7 +95,8 @@ public class OqmControlTopicReceiverTest { ...@@ -95,7 +95,8 @@ public class OqmControlTopicReceiverTest {
.build(); .build();
sut.receiveMessage(deletedMessage, replier); sut.receiveMessage(deletedMessage, replier);
verify(subscriptionInfos, times(1)).remove(subscription); verify(subscriptionInfos, times(1)).remove(subscription);
verify(subscriptionInfoCache, times(1)).put(any(), any()); verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any());
verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any());
verify(replier, times(1)).ack(); verify(replier, times(1)).ack();
} }
......
...@@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.cache.IRedisCache; ...@@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.cache.IRedisCache;
import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver;
...@@ -46,13 +47,13 @@ public class OqmServiceTopicReceiverTest { ...@@ -46,13 +47,13 @@ public class OqmServiceTopicReceiverTest {
@Mock @Mock
private OqmAckReplier replier; private OqmAckReplier replier;
@Mock @Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
@Mock
private OqmDriver driver; private OqmDriver driver;
@Mock @Mock
private OqmSubscription oqmSubscription; private OqmSubscription oqmSubscription;
@Mock @Mock
private ExternalSubscriptions externalSubscriptions; private ExternalSubscriptions externalSubscriptions;
@Mock
private ExternalSubscriptionsManager externalSubscriptionsManager;
private List<Subscription> subscriptionInfos; private List<Subscription> subscriptionInfos;
private OqmTopic topic; private OqmTopic topic;
...@@ -71,8 +72,8 @@ public class OqmServiceTopicReceiverTest { ...@@ -71,8 +72,8 @@ public class OqmServiceTopicReceiverTest {
@Test @Test
public void testReceiveMessageSubscriptionCreated() { public void testReceiveMessageSubscriptionCreated() {
when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic)); when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic));
when(subscriptionInfoCache.get("tenant1")).thenReturn(externalSubscriptions);
when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos);
when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions);
OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{ OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{
put("data-partition-id", "tenant1"); put("data-partition-id", "tenant1");
}}).build(); }}).build();
......
...@@ -30,6 +30,7 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; ...@@ -30,6 +30,7 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions;
...@@ -44,8 +45,6 @@ public class OqmSubscriberManagerTest { ...@@ -44,8 +45,6 @@ public class OqmSubscriberManagerTest {
@InjectMocks @InjectMocks
private OqmSubscriberManager sut; private OqmSubscriberManager sut;
@Spy
private OqmConfigurationProperties properties;
@Mock @Mock
private ITenantFactory tenantFactory; private ITenantFactory tenantFactory;
@Mock @Mock
...@@ -55,17 +54,15 @@ public class OqmSubscriberManagerTest { ...@@ -55,17 +54,15 @@ public class OqmSubscriberManagerTest {
@Mock @Mock
private OqmSubscriptionHandler subscriptionHandler; private OqmSubscriptionHandler subscriptionHandler;
@Mock @Mock
private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; private ExternalSubscriptionsManager externalSubscriptionsManager;
private List<TenantInfo> tenantInfos; private List<TenantInfo> tenantInfos;
private List<OqmTopic> topics; private List<OqmTopic> topics;
private List<Subscription> subscriptions;
@Before @Before
public void setUp() { public void setUp() {
tenantInfos = createTenants(); tenantInfos = createTenants();
topics = createTopics(); topics = createTopics();
subscriptions = createSubscriptions();
} }
private List<TenantInfo> createTenants() { private List<TenantInfo> createTenants() {
...@@ -89,12 +86,6 @@ public class OqmSubscriberManagerTest { ...@@ -89,12 +86,6 @@ public class OqmSubscriberManagerTest {
return Arrays.asList(topic1, topic2,topic3, topic4); return Arrays.asList(topic1, topic2,topic3, topic4);
} }
private List<Subscription> createSubscriptions() {
Subscription subscription1 = new Subscription();
Subscription subscription2 = new Subscription();
return Arrays.asList(subscription1, subscription2);
}
@Test @Test
public void testProvisioningControlTopicSubscribers() { public void testProvisioningControlTopicSubscribers() {
when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos);
...@@ -113,9 +104,7 @@ public class OqmSubscriberManagerTest { ...@@ -113,9 +104,7 @@ public class OqmSubscriberManagerTest {
@Test @Test
public void testProvisioningSubscriptionInfoCache() { public void testProvisioningSubscriptionInfoCache() {
when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos);
when(subscriptionHandler.getAllSubscriptionInfos(anyString())).thenReturn(subscriptions);
sut.provisionSubscriptionInfoCache(); sut.provisionSubscriptionInfoCache();
verify(subscriptionHandler, times(6)).getSubscription(anyString(), anyString(), anyString()); verify(externalSubscriptionsManager, times(3)).getExternalSubscriptions(any());
verify(subscriptionInfoCache, times(6)).put(any(), any());
} }
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment