Skip to content
Snippets Groups Projects
Commit 8ce255f1 authored by Rostislav Dublin (EPAM)'s avatar Rostislav Dublin (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

GCP - Fix OqmSubscriberManager (control topics and subscriptions creation and...

GCP - Fix OqmSubscriberManager (control topics and subscriptions creation and usage algorithm) (GONRG-4681)
parent 34680b87
No related branches found
No related tags found
1 merge request!235GCP - Fix OqmSubscriberManager (control topics and subscriptions creation and usage algorithm) (GONRG-4681)
Showing
with 282 additions and 47 deletions
......@@ -45,4 +45,4 @@ public class AppProperties implements IAppProperties {
public String getPubSubServiceAccountEmail() {
return String.format("de-notification-service@%s.iam.gserviceaccount.com", this.project);
}
}
}
\ No newline at end of file
......@@ -30,4 +30,4 @@ import org.springframework.context.annotation.Configuration;
public class OqmConfigurationProperties {
private String registerSubscriberControlTopicName = "register-subscriber-control";
private int waitingTime = 30000;
}
}
\ No newline at end of file
......@@ -22,7 +22,6 @@ import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler;
......@@ -42,10 +41,10 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING
/**
* Runs once on the service start.
* 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription.
* 2. Checks for the "subscriber control topic" and creates if it is absent.
* 1. Fetches oqm for tenants' message brokers' pull subscriptions in interested topics. Creates the service's subscribers in every found subscription.
* 2. Checks for the "subscriber control topic" on every tenant's message broker and creates if it is absent.
* - This topic is a "control channel" between Register and Notification services.
* - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers.
* - Register sends events on new pull Subscriptions being created, Notification listens for events and creates corresponding Subscribers.
*/
@Slf4j
@Component
......@@ -75,14 +74,38 @@ public class OqmSubscriberManager {
private final Long constructDate = System.currentTimeMillis();
private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate;
private OqmSubscription subscriberControlTopicSubscription = null;
@PostConstruct
void postConstruct() {
log.info("OqmSubscriberManager bean constructed. Provisioning STARTED");
//Get all Tenant infos
provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers();
provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers();
log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
}
@PreDestroy
void onPreDestroy() {
log.info("OqmSubscriberManager bean on pre-destroy: STARTED");
unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers();
log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED");
}
void unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers() {
for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) {
String tenantId = tenant.getDataPartitionId();
log.info("* OqmSubscriberManager on pre-destroy for tenant {}:", tenantId);
OqmSubscription subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null);
if (subscriberControlTopicSubscriptionForTenant != null) {
log.info("* * OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription DELETED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName, tenantId);
driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant));
}
}
}
void provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers() {
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId());
//For every Tenant Destination get "subscriberable" Subscriptions
......@@ -96,47 +119,52 @@ public class OqmSubscriberManager {
}
log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId());
}
}
TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
.orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
void provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers() {
String controlTopicName = properties.getRegisterSubscriberControlTopicName();
for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) {
String tenantId = tenant.getDataPartitionId();
log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence at tenant's '{}' message broker", controlTopicName, tenantId);
OqmTopic subscriberControlTopic = driver.getTopic(controlTopicName, getDestination(tenant)).orElse(null);
boolean controlTopicForTenantJustCreated;
if (subscriberControlTopic != null) {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists at tenant's '{}' message broker", controlTopicName, tenantId);
controlTopicForTenantJustCreated = false;
} else {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist at tenant's '{}' message broker. Trying to create it:", controlTopicName, tenantId);
driver.createAndGetTopic(controlTopicName, getDestination(tenant));
controlTopicForTenantJustCreated = true;
}
log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName());
OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null);
if (controlTopic != null) {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName());
} else {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName());
driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant));
}
log.info("* * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker",
controlTopicName, subscriberControlTopicSubscriptionName, tenantId);
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
OqmSubscription subscriberControlTopicSubscriptionForTenant = null;
OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName)
.topics(Collections.singletonList(subscriberControlTopic)).build();
OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName)
.topics(Collections.singletonList(controlTopic)).build();
if (!controlTopicForTenantJustCreated) {
subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null);
}
subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant));
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
if (subscriberControlTopicSubscriptionForTenant != null) {
log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription CREATED.",
controlTopicName, subscriberControlTopicSubscriptionName, tenantId);
} else {
subscriberControlTopicSubscriptionForTenant = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant));
log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription already EXISTS.",
controlTopicName, subscriberControlTopicSubscriptionName, tenantId);
registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription);
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
registerControlTopicSubscriber(tenant, subscriberControlTopicSubscriptionForTenant);
log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscriber REGISTERED.",
controlTopicName, subscriberControlTopicSubscriptionName, tenantId);
log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
}
}
@PreDestroy
void onPreDestroy() {
log.info("OqmSubscriberManager bean on pre-destroy: STARTED");
if (subscriberControlTopicSubscription != null) {
TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
.orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant));
}
log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED");
}
private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) {
......@@ -175,7 +203,7 @@ public class OqmSubscriberManager {
subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId());
}
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) {
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) {
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
......@@ -196,12 +224,12 @@ public class OqmSubscriberManager {
oqmAckReplier.ack();
};
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build();
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscription).messageReceiver(receiver).build();
OqmDestination destination = getDestination(tenantInfo);
driver.subscribe(subscriber, destination);
log.info("Just subscribed at topic {} subscription {} for tenant {}",
controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId());
controlTopicSubscription.getTopics().get(0), controlTopicSubscription.getName(), tenantInfo.getDataPartitionId());
}
public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) {
......
......@@ -74,4 +74,4 @@ public class OqmNotificationHandler {
log.debug("Sending out notification to endpoint: " + endpoint);
return response;
}
}
}
\ No newline at end of file
......@@ -38,4 +38,4 @@ public class GoogleServiceAccountValidatorGenerator {
}
return verifier;
}
}
}
\ No newline at end of file
......@@ -64,4 +64,4 @@ public class GoogleServiceAccountValidatorImpl implements IServiceAccountValidat
return false;
}
}
}
}
\ No newline at end of file
package org.opengroup.osdu.notification.provider.gcp.pubsub;
import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.util.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest(OqmSubscriberManager.class)
public class OqmSubscriberManagerTest {
public static final String TENANT1 = "tenant1";
public static final String TENANT2 = "tenant2";
public static final String TENANT3 = "tenant3";
@Mock
ITenantFactory tenantInfoFactory;
@Mock
OqmDriver driver;
@Mock
OqmNotificationHandler notificationHandler;
OqmConfigurationProperties properties = new OqmConfigurationProperties();
OqmSubscriberManager manager;
private final Long constructDate = System.currentTimeMillis();
private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate;
@Before
public void before() throws Exception {
Collection<TenantInfo> tenants = getMockedTenantInfos();
when(tenantInfoFactory, "listTenantInfo").thenReturn(tenants);
}
@Test
public void whenProvisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception {
manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties));
//Mock 4 subscriberable subscriptions somehow distributed between mock tenants
doAnswer(question -> {
TenantInfo tenant = question.getArgument(0);
String tenantId = tenant.getDataPartitionId();
switch (tenantId) {
case TENANT1:
return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build());
case TENANT2:
return Arrays.asList(
OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build(),
OqmSubscription.builder().name("de-" + tenantId + "-subscription-B").build()
);
case TENANT3:
return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build());
default:
return Collections.emptyList();
}
}).when(manager).getSubscriberableSubscriptions(any(TenantInfo.class));
PowerMockito.doNothing().when(manager, "registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class));
//Run being tested method.
manager.provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers();
//verify 3 times getSubscriberableSubscriptions method was called
verifyPrivate(manager, times(3)).invoke("getSubscriberableSubscriptions", any(TenantInfo.class));
//verify only 4 subscribers creation were requested and completed
verifyPrivate(manager, times(4)).invoke("registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class));
}
@Test
public void whenProvisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception {
//Mock only tenant 2 broker has control topic created so far. Others are not yet.
when(driver.getTopic(ArgumentMatchers.eq(properties.getRegisterSubscriberControlTopicName()), any(OqmDestination.class)))
.thenAnswer((question) -> {
String topicName = question.getArgument(0);
OqmDestination destination = question.getArgument(1);
if (destination.getPartitionId().equals(TENANT2)) {
return Optional.of(OqmTopic.builder().name(topicName).build());
}
return Optional.empty();
});
//Mock only tenant 2 broker has control topic subscription created so far. Others are not yet.
when(driver.getSubscription(any(String.class), any(OqmDestination.class)))
.thenAnswer((question) -> {
String subscriptionName = question.getArgument(0);
OqmDestination destination = question.getArgument(1);
if (destination.getPartitionId().equals(TENANT2)) {
return Optional.of(OqmSubscription.builder().name(subscriptionName).build());
}
return Optional.empty();
});
//Mock driver smoothly creates a subscription
when(driver.createAndGetSubscription(any(OqmSubscription.class), any(OqmDestination.class)))
.thenAnswer(question -> question.<OqmSubscription>getArgument(0));
manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties));
//Mock subscriberControlTopicSubscriptionName
Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName);
//Mock subscriber silent and successful creation
PowerMockito.doNothing().when(manager, "registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class));
//Run being tested method.
manager.provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers();
//verify 3 times the driver was asked if the topic exists (for all tenants)
verify(driver, times(3)).getTopic(any(String.class), any(OqmDestination.class));
//verify 2 times the driver was asked to create and get topic (for tenants 1 and 3
verify(driver, times(2)).createAndGetTopic(any(String.class), any(OqmDestination.class));
//verify only 1 time the driver was asked if the subscription exists (for TENANT2)
verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class));
//verify 2 times the driver was asked to create the subscription (for TENANT1, TENANT2)
verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class));
//verify only 2 subscribers creation were requested and completed
verifyPrivate(manager, times(2)).invoke("registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class));
}
@Test
public void whenUnprovisionControlTopicsSubscriptionsFromAllTenantsBrokers_thenProperOperationsInvoked() {
//Mock all 3 tenants' brokers have control topic subscriptions created so far.
when(driver.getSubscription(any(String.class), any(OqmDestination.class)))
.thenAnswer((question) -> {
String subscriptionName = question.getArgument(0);
OqmDestination destination = question.getArgument(1);
return Optional.of(OqmSubscription.builder().name(subscriptionName).build());
});
//Mock driver smoothly deletes a subscription
PowerMockito.doNothing().when(driver).deleteSubscription(any(String.class), any(OqmDestination.class));
manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties));
//Mock subscriberControlTopicSubscriptionName
Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName);
//Run being tested method.
manager.unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers();
//verify 3 times the driver was asked about a subscription existence (for all tenants)
verify(driver, times(3))
.getSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class));
//verify 3 times the driver was asked to delete the subscription (for all tenants)
verify(driver, times(3))
.deleteSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class));
}
@NotNull
private Collection<TenantInfo> getMockedTenantInfos() {
Collection<TenantInfo> tenants = new ArrayList<>();
//Mock we have 3 tenants
tenants.add(new TenantInfo() {
{
setId(1L);
setDataPartitionId(TENANT1);
}
});
tenants.add(new TenantInfo() {
{
setId(2L);
setDataPartitionId(TENANT2);
}
});
tenants.add(new TenantInfo() {
{
setId(3L);
setDataPartitionId(TENANT3);
}
});
return tenants;
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment