Skip to content
Snippets Groups Projects
Commit 5b70a3c4 authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM)
Browse files

added handling for unavailable subscriptions

parent d9d9b19c
No related branches found
No related tags found
1 merge request!462Fix GC caching, and enable baremetal int tests.
Pipeline #232293 failed
Showing
with 104 additions and 43 deletions
......@@ -8,6 +8,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
......@@ -24,7 +25,7 @@ import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberM
public class MessageBrokerProvider {
private final OqmDriver driver;
private final OqmSubscriptionHandler subscriptionHandler;
private final RegisterSubscriptionService subscriptionHandler;
public void validateMessageBrokerConfiguration(TenantInfo tenantInfo) {
List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId());
......
......@@ -29,7 +29,7 @@ import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
......
......@@ -23,10 +23,11 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmSubscriptionConfigurationReceiver;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmReplicatedEventReceiver;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator;
import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
......@@ -51,7 +52,7 @@ public class OqmSubscriberManager {
public static final String SERVICE_SUFFIX = "-service";
public static final String PUBLISH_SUFFIX = "-publish";
public static final String CONTROL_SUBSCRIPTION = NOTIFICATION_PREFIX + "control-sub";
private final OqmSubscriptionHandler subscriptionHandler;
private final RegisterSubscriptionService subscriptionHandler;
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmNotificationDeliveryService notificationHandler;
......@@ -104,7 +105,7 @@ public class OqmSubscriberManager {
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) {
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(controlTopicSubscription)
.messageReceiver(new OqmSubscriptionConfigurationReceiver(externalSubscriptionsManager))
.messageReceiver(new OqmConfigurationEventReceiver(externalSubscriptionsManager))
.build();
driver.subscribe(subscriber, getDestination(tenantInfo));
}
......
......@@ -22,11 +22,11 @@ import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
@Slf4j
public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver {
public class OqmConfigurationEventReceiver implements OqmMessageReceiver {
private static final String SUBSCRIPTION_CREATED = "Subscription Created";
private static final String SUBSCRIPTION_UPDATED = "Subscription Updated";
......@@ -34,7 +34,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver
private final ExternalSubscriptionsManager externalSubscriptionsManager;
public OqmSubscriptionConfigurationReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) {
public OqmConfigurationEventReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) {
this.externalSubscriptionsManager = externalSubscriptionsManager;
}
......@@ -48,7 +48,6 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver
} finally {
ThreadScopeContextHolder.currentThreadScopeAttributes().clear();
}
log.debug("OQM | Control topic | Message acknowledged by client");
replier.ack();
}
......@@ -62,8 +61,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver
log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", pubsubMessage, serviceTopic, subscriptionId);
switch (pubsubMessage) {
case SUBSCRIPTION_CREATED:
case SUBSCRIPTION_UPDATED:
case SUBSCRIPTION_CREATED, SUBSCRIPTION_UPDATED:
externalSubscriptionsManager.updateSubscription(dataPartitionId, subscriptionId);
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic);
break;
......@@ -72,7 +70,7 @@ public class OqmSubscriptionConfigurationReceiver implements OqmMessageReceiver
if (deleted) {
log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic);
} else {
log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId);
log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but it was not found in cache.", subscriptionId);
}
break;
default:
......
......@@ -32,7 +32,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder;
/**
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.config;
package org.opengroup.osdu.notification.provider.gcp.repo;
import java.util.Collections;
import java.util.HashSet;
......@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.provider.gcp.config.RedisProperties;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
......@@ -35,6 +36,7 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class SubscriptionCacheRepo {
public static final String POISON = ":poison";
private final RedisProperties redisProperties;
private final RedisTemplate<String, Subscription> redisTemplate;
......@@ -62,6 +64,39 @@ public class SubscriptionCacheRepo {
return redisTemplate.opsForValue().get(subscriberCacheKey);
}
/**
* To avoid Register service spamming, if a subscriber was deleted, but replicated events are
* still being processed.
*
* @param dataPartitionId
* @param topic
* @param subscriptionId
*/
public void poisonSub(String dataPartitionId, String topic, String subscriptionId) {
String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON;
redisTemplate.opsForValue().set(
subPoisonKey,
new Subscription(),
redisProperties.getRedisExpiration(),
TimeUnit.SECONDS
);
}
/**
* Used to check if subscribers were not found in Register.
*
* @param dataPartitionId
* @param topic
* @param subscriptionId
* @return empty Subscription
*/
public Subscription checkIfSubIsPoisoned(String dataPartitionId, String topic,
String subscriptionId) {
String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON;
return redisTemplate.opsForValue().get(subPoisonKey);
}
public void updateSubscription(String dataPartitionId, Subscription subscription) {
String subscriberCacheKey = getSubscriberCacheKey(
dataPartitionId,
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.config;
package org.opengroup.osdu.notification.provider.gcp.service;
import java.util.List;
import java.util.Objects;
......@@ -23,7 +23,7 @@ import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler;
import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo;
import org.springframework.stereotype.Component;
@Component
......@@ -32,7 +32,7 @@ import org.springframework.stereotype.Component;
public class ExternalSubscriptionsManager {
private final SubscriptionCacheRepo subscriptionCacheRepo;
private final OqmSubscriptionHandler subscriptionHandler;
private final RegisterSubscriptionService subscriptionHandler;
public void invokeTenantSubscribers(String dataPartitionId) {
Set<Subscription> members = subscriptionCacheRepo.getDataPartitionSubscribers(dataPartitionId);
......@@ -47,11 +47,29 @@ public class ExternalSubscriptionsManager {
}
public Subscription getSubscription(String dataPartitionId, String topic, String subscriptionId) {
return subscriptionCacheRepo.readSubscription(
dataPartitionId,
topic,
subscriptionId
);
Subscription subscription = subscriptionCacheRepo.readSubscription(dataPartitionId, topic, subscriptionId);
if (subscription != null) {
return subscription;
}
log.debug("Subscription: {} not in cache, checking if it available in Register.", subscriptionId);
subscription = subscriptionCacheRepo.checkIfSubIsPoisoned(dataPartitionId, topic, subscriptionId);
if (subscription != null) {
log.debug("Subscription:{} marked as not available in cache.", subscriptionId);
return null;
}
return retrieveSubscriptionFromHandler(dataPartitionId, subscriptionId, topic);
}
private Subscription retrieveSubscriptionFromHandler(String dataPartitionId, String subscriptionId, String topic) {
Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId(dataPartitionId,subscriptionId);
if (subscription == null) {
log.debug("Subscription:{} not available in Register. Marking it as not available in cache.", subscriptionId);
subscriptionCacheRepo.poisonSub(dataPartitionId, topic, subscriptionId);
return null;
}
subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription);
return subscription;
}
public boolean removeSubscription(String dataPartitionId, String topic, String subscriptionId) {
......@@ -63,17 +81,23 @@ public class ExternalSubscriptionsManager {
dataPartitionId,
subscriptionId
);
subscriptionCacheRepo.updateSubscription(dataPartitionId, subscription);
if(Objects.isNull(subscription)){
log.warn("Cannot update sub config with id:{} in the cache. Skipping it.", subscriptionId);
}else {
subscriptionCacheRepo.updateSubscription(dataPartitionId, subscription);
}
}
private void reloadSubscriptionInfoCache(String dataPartitionId) {
List<Subscription> fragmentarySubInfos = subscriptionHandler.getAllSubscriptionInfos(
dataPartitionId);
for (Subscription freagmentedSubscription : fragmentarySubInfos) {
Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId(
dataPartitionId, freagmentedSubscription.getNotificationId());
subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription);
Subscription subscription = subscriptionHandler.getSubscriptionsByNotificationId(dataPartitionId, freagmentedSubscription.getNotificationId());
if(Objects.isNull(subscription)){
log.warn("Cannot init cache for sub id:{}. Empty response from Register. Skipping it.", freagmentedSubscription.getNotificationId());
}else {
subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription);
}
}
}
}
\ No newline at end of file
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub;
package org.opengroup.osdu.notification.provider.gcp.service;
import java.util.List;
import java.util.stream.Collectors;
......@@ -43,7 +43,7 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@ConditionalOnProperty(name = "oqmDriver")
@Slf4j
public class OqmSubscriptionHandler {
public class RegisterSubscriptionService {
private final ISubscriptionFactory registerClientFactory;
private final DpsHeadersProvider dpsHeadersProvider;
......@@ -52,7 +52,6 @@ public class OqmSubscriptionHandler {
public List<OqmTopic> getTopics(String dataPartitionId) {
DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId);
ISubscriptionService registerClient = registerClientFactory.create(headers);
try {
return registerClient.getTopics()
.stream()
......@@ -80,7 +79,12 @@ public class OqmSubscriptionHandler {
DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId);
ISubscriptionService registerClient = registerClientFactory.create(headers);
List<Subscription> subscriptions = registerClient.query(subscriptionId);
return subscriptions.get(0);
if(subscriptions.isEmpty()){
log.warn("Empty response from Register for subId: {}, config fetch not possible.", subscriptionId);
return null;
}else {
return subscriptions.get(0);
}
} catch (SubscriptionException se) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se);
}
......
......@@ -31,15 +31,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.config.SubscriptionCacheRepo;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmSubscriptionConfigurationReceiver;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver;
@RunWith(MockitoJUnitRunner.class)
public class OqmSubscriptionConfigurationReceiverTest {
public class OqmConfigurationEventReceiverTest {
@InjectMocks
private OqmSubscriptionConfigurationReceiver sut;
private OqmConfigurationEventReceiver sut;
@Mock
private OqmAckReplier replier;
@Mock
......
......@@ -17,7 +17,6 @@
package org.opengroup.osdu.notification.provider.gcp.pubsub;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -30,13 +29,11 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
......
......@@ -27,10 +27,11 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager;
import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager;
import java.util.Arrays;
import java.util.List;
import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
......@@ -47,7 +48,7 @@ public class OqmSubscriberManagerTest {
@Mock
private MessageBrokerProvider subscriptionProvider;
@Mock
private OqmSubscriptionHandler subscriptionHandler;
private RegisterSubscriptionService subscriptionHandler;
@Mock
private ExternalSubscriptionsManager externalSubscriptionsManager;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment