diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java index 7f7ce4cb602714f0987157dbfb46067828af5b42..0efe060252d1384ab62fabc278b2961aea7c6142 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java @@ -45,4 +45,4 @@ public class AppProperties implements IAppProperties { public String getPubSubServiceAccountEmail() { return String.format("de-notification-service@%s.iam.gserviceaccount.com", this.project); } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java index 886e3d95d9c4a8c9c1350dc3c6367f927854d8a4..d0b50c7e801aef279d8fc0d0d9f9ebf7bc1095e7 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java @@ -30,4 +30,4 @@ import org.springframework.context.annotation.Configuration; public class OqmConfigurationProperties { private String registerSubscriberControlTopicName = "register-subscriber-control"; private int waitingTime = 30000; -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index b57ac2db24b3481f1eabbc260a2e6c03040e4916..1e1986573dc02dd3592d3139d8ea3ef6e704a052 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -22,7 +22,6 @@ import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; -import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; @@ -42,10 +41,10 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING /** * Runs once on the service start. - * 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. - * 2. Checks for the "subscriber control topic" and creates if it is absent. + * 1. Fetches oqm for tenants' message brokers' pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. + * 2. Checks for the "subscriber control topic" on every tenant's message broker and creates if it is absent. * - This topic is a "control channel" between Register and Notification services. - * - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers. + * - Register sends events on new pull Subscriptions being created, Notification listens for events and creates corresponding Subscribers. */ @Slf4j @Component @@ -75,14 +74,38 @@ public class OqmSubscriberManager { private final Long constructDate = System.currentTimeMillis(); private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; - private OqmSubscription subscriberControlTopicSubscription = null; - @PostConstruct void postConstruct() { log.info("OqmSubscriberManager bean constructed. Provisioning STARTED"); - //Get all Tenant infos + provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); + provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); + + log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); + } + + @PreDestroy + void onPreDestroy() { + log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); + unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); + log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); + } + + void unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers() { + for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { + String tenantId = tenant.getDataPartitionId(); + log.info("* OqmSubscriberManager on pre-destroy for tenant {}:", tenantId); + OqmSubscription subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); + if (subscriberControlTopicSubscriptionForTenant != null) { + log.info("* * OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription DELETED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName, tenantId); + driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); + } + } + } + + void provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers() { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId()); //For every Tenant Destination get "subscriberable" Subscriptions @@ -96,47 +119,52 @@ public class OqmSubscriberManager { } log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); } + } - TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() - .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + void provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers() { + String controlTopicName = properties.getRegisterSubscriberControlTopicName(); + for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { + + String tenantId = tenant.getDataPartitionId(); + + log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence at tenant's '{}' message broker", controlTopicName, tenantId); + OqmTopic subscriberControlTopic = driver.getTopic(controlTopicName, getDestination(tenant)).orElse(null); + boolean controlTopicForTenantJustCreated; + if (subscriberControlTopic != null) { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists at tenant's '{}' message broker", controlTopicName, tenantId); + controlTopicForTenantJustCreated = false; + } else { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist at tenant's '{}' message broker. Trying to create it:", controlTopicName, tenantId); + driver.createAndGetTopic(controlTopicName, getDestination(tenant)); + controlTopicForTenantJustCreated = true; + } - log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName()); - OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null); - if (controlTopic != null) { - log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName()); - } else { - log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName()); - driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)); - } + log.info("* * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + OqmSubscription subscriberControlTopicSubscriptionForTenant = null; + OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) + .topics(Collections.singletonList(subscriberControlTopic)).build(); - OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) - .topics(Collections.singletonList(controlTopic)).build(); + if (!controlTopicForTenantJustCreated) { + subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); + } - subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + if (subscriberControlTopicSubscriptionForTenant != null) { + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription CREATED.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); + } else { + subscriberControlTopicSubscriptionForTenant = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription already EXISTS.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + registerControlTopicSubscriber(tenant, subscriberControlTopicSubscriptionForTenant); + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscriber REGISTERED.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); - } + } - @PreDestroy - void onPreDestroy() { - log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); - if (subscriberControlTopicSubscription != null) { - TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() - .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); - log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); - driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); } - log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); } private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { @@ -175,7 +203,7 @@ public class OqmSubscriberManager { subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId()); } - private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) { + private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { @@ -196,12 +224,12 @@ public class OqmSubscriberManager { oqmAckReplier.ack(); }; - OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build(); + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscription).messageReceiver(receiver).build(); OqmDestination destination = getDestination(tenantInfo); driver.subscribe(subscriber, destination); log.info("Just subscribed at topic {} subscription {} for tenant {}", - controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId()); + controlTopicSubscription.getTopics().get(0), controlTopicSubscription.getName(), tenantInfo.getDataPartitionId()); } public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) { diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java index d764bf314d11cf95a25bedc18c4303dcfc0cdd96..8bcf81db00c2a0df0209e27ef99f67bc3df4369a 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java @@ -74,4 +74,4 @@ public class OqmNotificationHandler { log.debug("Sending out notification to endpoint: " + endpoint); return response; } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java index 230c0ab5b6dd5e92695730ec3c16225139bbef6a..119dbc79b17b395ca7dbf6b4f20b224be0ce2686 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java @@ -38,4 +38,4 @@ public class GoogleServiceAccountValidatorGenerator { } return verifier; } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java index af1dedf525c432bdc2684444ad0fab0c1c82e95b..ffeacb56283279d5f98125a0681685a1013887e2 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java @@ -64,4 +64,4 @@ public class GoogleServiceAccountValidatorImpl implements IServiceAccountValidat return false; } } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..220c7d40b16f700bf5b9744442b424cd76e9ea1a --- /dev/null +++ b/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -0,0 +1,207 @@ +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; +import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(OqmSubscriberManager.class) +public class OqmSubscriberManagerTest { + + public static final String TENANT1 = "tenant1"; + public static final String TENANT2 = "tenant2"; + public static final String TENANT3 = "tenant3"; + + @Mock + ITenantFactory tenantInfoFactory; + @Mock + OqmDriver driver; + + @Mock + OqmNotificationHandler notificationHandler; + + OqmConfigurationProperties properties = new OqmConfigurationProperties(); + + OqmSubscriberManager manager; + + private final Long constructDate = System.currentTimeMillis(); + private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + + @Before + public void before() throws Exception { + Collection<TenantInfo> tenants = getMockedTenantInfos(); + when(tenantInfoFactory, "listTenantInfo").thenReturn(tenants); + + } + + @Test + public void whenProvisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + + //Mock 4 subscriberable subscriptions somehow distributed between mock tenants + doAnswer(question -> { + TenantInfo tenant = question.getArgument(0); + String tenantId = tenant.getDataPartitionId(); + switch (tenantId) { + case TENANT1: + return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); + case TENANT2: + return Arrays.asList( + OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build(), + OqmSubscription.builder().name("de-" + tenantId + "-subscription-B").build() + ); + case TENANT3: + return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); + default: + return Collections.emptyList(); + } + }).when(manager).getSubscriberableSubscriptions(any(TenantInfo.class)); + + PowerMockito.doNothing().when(manager, "registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + //Run being tested method. + manager.provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); + + //verify 3 times getSubscriberableSubscriptions method was called + verifyPrivate(manager, times(3)).invoke("getSubscriberableSubscriptions", any(TenantInfo.class)); + + //verify only 4 subscribers creation were requested and completed + verifyPrivate(manager, times(4)).invoke("registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + } + + @Test + public void whenProvisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { + + //Mock only tenant 2 broker has control topic created so far. Others are not yet. + when(driver.getTopic(ArgumentMatchers.eq(properties.getRegisterSubscriberControlTopicName()), any(OqmDestination.class))) + .thenAnswer((question) -> { + String topicName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + if (destination.getPartitionId().equals(TENANT2)) { + return Optional.of(OqmTopic.builder().name(topicName).build()); + } + + return Optional.empty(); + }); + + //Mock only tenant 2 broker has control topic subscription created so far. Others are not yet. + when(driver.getSubscription(any(String.class), any(OqmDestination.class))) + .thenAnswer((question) -> { + String subscriptionName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + if (destination.getPartitionId().equals(TENANT2)) { + return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); + } + + return Optional.empty(); + }); + + //Mock driver smoothly creates a subscription + when(driver.createAndGetSubscription(any(OqmSubscription.class), any(OqmDestination.class))) + .thenAnswer(question -> question.<OqmSubscription>getArgument(0)); + + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + //Mock subscriberControlTopicSubscriptionName + Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); + //Mock subscriber silent and successful creation + PowerMockito.doNothing().when(manager, "registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + //Run being tested method. + manager.provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); + + //verify 3 times the driver was asked if the topic exists (for all tenants) + verify(driver, times(3)).getTopic(any(String.class), any(OqmDestination.class)); + + //verify 2 times the driver was asked to create and get topic (for tenants 1 and 3 + verify(driver, times(2)).createAndGetTopic(any(String.class), any(OqmDestination.class)); + + //verify only 1 time the driver was asked if the subscription exists (for TENANT2) + verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); + + //verify 2 times the driver was asked to create the subscription (for TENANT1, TENANT2) + verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); + + //verify only 2 subscribers creation were requested and completed + verifyPrivate(manager, times(2)).invoke("registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + } + + @Test + public void whenUnprovisionControlTopicsSubscriptionsFromAllTenantsBrokers_thenProperOperationsInvoked() { + //Mock all 3 tenants' brokers have control topic subscriptions created so far. + when(driver.getSubscription(any(String.class), any(OqmDestination.class))) + .thenAnswer((question) -> { + String subscriptionName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); + }); + + //Mock driver smoothly deletes a subscription + PowerMockito.doNothing().when(driver).deleteSubscription(any(String.class), any(OqmDestination.class)); + + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + //Mock subscriberControlTopicSubscriptionName + Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); + + //Run being tested method. + manager.unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); + + //verify 3 times the driver was asked about a subscription existence (for all tenants) + verify(driver, times(3)) + .getSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); + + + //verify 3 times the driver was asked to delete the subscription (for all tenants) + verify(driver, times(3)) + .deleteSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); + + } + + @NotNull + private Collection<TenantInfo> getMockedTenantInfos() { + Collection<TenantInfo> tenants = new ArrayList<>(); + + //Mock we have 3 tenants + tenants.add(new TenantInfo() { + { + setId(1L); + setDataPartitionId(TENANT1); + } + }); + tenants.add(new TenantInfo() { + { + setId(2L); + setDataPartitionId(TENANT2); + } + }); + tenants.add(new TenantInfo() { + { + setId(3L); + setDataPartitionId(TENANT3); + } + }); + return tenants; + } +} \ No newline at end of file