diff --git a/NOTICE b/NOTICE index 98b4782bc1e6ab5c581f8ab14adbf80b5b4f54d5..38c0f7ec96b3276a435ed8e847669d57792fd682 100644 --- a/NOTICE +++ b/NOTICE @@ -58,11 +58,13 @@ The following software have components provided under the terms of this license: - AWS SDK for Java - Core (from https://aws.amazon.com/sdkforjava) - Adapter: RxJava (from https://github.com/square/retrofit) - Animal Sniffer Annotations (from https://repo1.maven.org/maven2/org/codehaus/mojo/animal-sniffer-annotations) +- Apache Commons BeanUtils (from http://commons.apache.org/proper/commons-beanutils/, https://commons.apache.org/proper/commons-beanutils/, https://repo1.maven.org/maven2/commons-beanutils/commons-beanutils) - Apache Commons Codec (from http://commons.apache.org/proper/commons-codec/, https://commons.apache.org/proper/commons-codec/) - Apache Commons Collections (from http://commons.apache.org/proper/commons-collections/, https://commons.apache.org/proper/commons-collections/) - Apache Commons IO (from http://commons.apache.org/io/, https://commons.apache.org/proper/commons-io/, https://repo1.maven.org/maven2/commons-io/commons-io) - Apache Commons Lang (from http://commons.apache.org/proper/commons-lang/, https://commons.apache.org/proper/commons-lang/) - Apache Commons Logging (from http://commons.apache.org/logging/, http://commons.apache.org/proper/commons-logging/) +- Apache Commons Validator (from http://commons.apache.org/proper/commons-validator/, http://jakarta.apache.org/commons/${pom.artifactId.substring(8)}/, https://repo1.maven.org/maven2/commons-validator/commons-validator) - Apache Geronimo JMS Spec 2.0 (from http://geronimo.apache.org/maven/${siteId}/${version}) - Apache Groovy (from http://groovy-lang.org, http://groovy.codehaus.org/, https://groovy-lang.org) - Apache HTTP transport v2 for the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-apache-v2) @@ -87,8 +89,12 @@ The following software have components provided under the terms of this license: - Byte Buddy Java agent (from https://repo1.maven.org/maven2/net/bytebuddy/byte-buddy-agent) - ClassMate (from http://github.com/cowtowncoder/java-classmate) - Cloud Key Management Service (KMS) API v1-rev20220407-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-cloudkms) +- Cloud Key Management Service (KMS) API v1-rev20220617-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-cloudkms) - Cloud Storage JSON API v1-rev20220604-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) - Cloud Storage JSON API v1-rev20220608-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) +- Cloud Storage JSON API v1-rev20220705-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) +- Collections (from https://repo1.maven.org/maven2/commons-collections/commons-collections) +- Commons Digester (from http://commons.apache.org/digester/) - Converter: Jackson (from https://github.com/square/retrofit, https://repo1.maven.org/maven2/com/squareup/retrofit2/converter-jackson) - Core functionality for the Reactor Netty library (from https://github.com/reactor/reactor-netty) - Expression Language 3.0 (from http://el-spec.java.net, http://uel.java.net, https://projects.eclipse.org/projects/ee4j.el) diff --git a/provider/notification-aws/pom.xml b/provider/notification-aws/pom.xml index ddbc7390d49855a480b91c5e8f66b5f773518f3b..10044013e6605873c9d933d7300267040b96d044 100644 --- a/provider/notification-aws/pom.xml +++ b/provider/notification-aws/pom.xml @@ -40,6 +40,7 @@ <os-core-common.version>0.14.0</os-core-common.version> <jackson-databind.version>2.13.2.2</jackson-databind.version> <jackson.version>2.13.2</jackson.version> + <spring-webmvc.version>5.3.22</spring-webmvc.version> </properties> <dependencyManagement> @@ -93,6 +94,12 @@ <artifactId>spring-boot-starter-actuator</artifactId> </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-webmvc</artifactId> + <version>${spring-webmvc.version}</version> + </dependency> + <!-- unit test dependencies --> <dependency> <groupId>org.powermock</groupId> diff --git a/provider/notification-azure/pom.xml b/provider/notification-azure/pom.xml index aaf7c25be2602c7b75b755718d1859c61e83a3b7..a345b6f9f3f7bdf3c345932a2a30029cbe333115 100644 --- a/provider/notification-azure/pom.xml +++ b/provider/notification-azure/pom.xml @@ -39,7 +39,7 @@ <springframework.version>4.3.0.RELEASE</springframework.version> <reactor.netty.version>0.11.0.RELEASE</reactor.netty.version> <reactor.core.version>3.3.0.RELEASE</reactor.core.version> - <osdu.corelibazure.version>0.15.0-rc6</osdu.corelibazure.version> + <osdu.corelibazure.version>0.16.0-rc5</osdu.corelibazure.version> <osdu.oscorecommon.version>0.14.0</osdu.oscorecommon.version> <junit.version>5.6.0</junit.version> <jjwt.version>3.8.1</jjwt.version> @@ -52,6 +52,7 @@ <woodstox-core.version>5.3.0</woodstox-core.version> <jackson-databind.version>2.13.2.2</jackson-databind.version> <jackson.version>2.13.2</jackson.version> + <spring-webmvc.version>5.3.22</spring-webmvc.version> </properties> <dependencyManagement> @@ -149,6 +150,12 @@ <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-webmvc</artifactId> + <version>${spring-webmvc.version}</version> + </dependency> + <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java index 7f7ce4cb602714f0987157dbfb46067828af5b42..0efe060252d1384ab62fabc278b2961aea7c6142 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/AppProperties.java @@ -45,4 +45,4 @@ public class AppProperties implements IAppProperties { public String getPubSubServiceAccountEmail() { return String.format("de-notification-service@%s.iam.gserviceaccount.com", this.project); } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java index 886e3d95d9c4a8c9c1350dc3c6367f927854d8a4..d0b50c7e801aef279d8fc0d0d9f9ebf7bc1095e7 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/OqmConfigurationProperties.java @@ -30,4 +30,4 @@ import org.springframework.context.annotation.Configuration; public class OqmConfigurationProperties { private String registerSubscriberControlTopicName = "register-subscriber-control"; private int waitingTime = 30000; -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index b57ac2db24b3481f1eabbc260a2e6c03040e4916..1e1986573dc02dd3592d3139d8ea3ef6e704a052 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -22,7 +22,6 @@ import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; -import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; @@ -42,10 +41,10 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING /** * Runs once on the service start. - * 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. - * 2. Checks for the "subscriber control topic" and creates if it is absent. + * 1. Fetches oqm for tenants' message brokers' pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. + * 2. Checks for the "subscriber control topic" on every tenant's message broker and creates if it is absent. * - This topic is a "control channel" between Register and Notification services. - * - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers. + * - Register sends events on new pull Subscriptions being created, Notification listens for events and creates corresponding Subscribers. */ @Slf4j @Component @@ -75,14 +74,38 @@ public class OqmSubscriberManager { private final Long constructDate = System.currentTimeMillis(); private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; - private OqmSubscription subscriberControlTopicSubscription = null; - @PostConstruct void postConstruct() { log.info("OqmSubscriberManager bean constructed. Provisioning STARTED"); - //Get all Tenant infos + provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); + provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); + + log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); + } + + @PreDestroy + void onPreDestroy() { + log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); + unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); + log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); + } + + void unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers() { + for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { + String tenantId = tenant.getDataPartitionId(); + log.info("* OqmSubscriberManager on pre-destroy for tenant {}:", tenantId); + OqmSubscription subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); + if (subscriberControlTopicSubscriptionForTenant != null) { + log.info("* * OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription DELETED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName, tenantId); + driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); + } + } + } + + void provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers() { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId()); //For every Tenant Destination get "subscriberable" Subscriptions @@ -96,47 +119,52 @@ public class OqmSubscriberManager { } log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); } + } - TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() - .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + void provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers() { + String controlTopicName = properties.getRegisterSubscriberControlTopicName(); + for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { + + String tenantId = tenant.getDataPartitionId(); + + log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence at tenant's '{}' message broker", controlTopicName, tenantId); + OqmTopic subscriberControlTopic = driver.getTopic(controlTopicName, getDestination(tenant)).orElse(null); + boolean controlTopicForTenantJustCreated; + if (subscriberControlTopic != null) { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists at tenant's '{}' message broker", controlTopicName, tenantId); + controlTopicForTenantJustCreated = false; + } else { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist at tenant's '{}' message broker. Trying to create it:", controlTopicName, tenantId); + driver.createAndGetTopic(controlTopicName, getDestination(tenant)); + controlTopicForTenantJustCreated = true; + } - log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName()); - OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null); - if (controlTopic != null) { - log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName()); - } else { - log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName()); - driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)); - } + log.info("* * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + OqmSubscription subscriberControlTopicSubscriptionForTenant = null; + OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) + .topics(Collections.singletonList(subscriberControlTopic)).build(); - OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) - .topics(Collections.singletonList(controlTopic)).build(); + if (!controlTopicForTenantJustCreated) { + subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); + } - subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + if (subscriberControlTopicSubscriptionForTenant != null) { + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription CREATED.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); + } else { + subscriberControlTopicSubscriptionForTenant = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription already EXISTS.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription); - log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + registerControlTopicSubscriber(tenant, subscriberControlTopicSubscriptionForTenant); + log.info("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscriber REGISTERED.", + controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); - } + } - @PreDestroy - void onPreDestroy() { - log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); - if (subscriberControlTopicSubscription != null) { - TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() - .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); - log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); - driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); } - log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); } private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { @@ -175,7 +203,7 @@ public class OqmSubscriberManager { subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId()); } - private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) { + private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { @@ -196,12 +224,12 @@ public class OqmSubscriberManager { oqmAckReplier.ack(); }; - OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build(); + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscription).messageReceiver(receiver).build(); OqmDestination destination = getDestination(tenantInfo); driver.subscribe(subscriber, destination); log.info("Just subscribed at topic {} subscription {} for tenant {}", - controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId()); + controlTopicSubscription.getTopics().get(0), controlTopicSubscription.getName(), tenantInfo.getDataPartitionId()); } public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) { diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java index d764bf314d11cf95a25bedc18c4303dcfc0cdd96..8bcf81db00c2a0df0209e27ef99f67bc3df4369a 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java @@ -74,4 +74,4 @@ public class OqmNotificationHandler { log.debug("Sending out notification to endpoint: " + endpoint); return response; } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java index 230c0ab5b6dd5e92695730ec3c16225139bbef6a..119dbc79b17b395ca7dbf6b4f20b224be0ce2686 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java @@ -38,4 +38,4 @@ public class GoogleServiceAccountValidatorGenerator { } return verifier; } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java index af1dedf525c432bdc2684444ad0fab0c1c82e95b..ffeacb56283279d5f98125a0681685a1013887e2 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java @@ -64,4 +64,4 @@ public class GoogleServiceAccountValidatorImpl implements IServiceAccountValidat return false; } } -} +} \ No newline at end of file diff --git a/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..220c7d40b16f700bf5b9744442b424cd76e9ea1a --- /dev/null +++ b/provider/notification-gcp/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -0,0 +1,207 @@ +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; +import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(OqmSubscriberManager.class) +public class OqmSubscriberManagerTest { + + public static final String TENANT1 = "tenant1"; + public static final String TENANT2 = "tenant2"; + public static final String TENANT3 = "tenant3"; + + @Mock + ITenantFactory tenantInfoFactory; + @Mock + OqmDriver driver; + + @Mock + OqmNotificationHandler notificationHandler; + + OqmConfigurationProperties properties = new OqmConfigurationProperties(); + + OqmSubscriberManager manager; + + private final Long constructDate = System.currentTimeMillis(); + private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + + @Before + public void before() throws Exception { + Collection<TenantInfo> tenants = getMockedTenantInfos(); + when(tenantInfoFactory, "listTenantInfo").thenReturn(tenants); + + } + + @Test + public void whenProvisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + + //Mock 4 subscriberable subscriptions somehow distributed between mock tenants + doAnswer(question -> { + TenantInfo tenant = question.getArgument(0); + String tenantId = tenant.getDataPartitionId(); + switch (tenantId) { + case TENANT1: + return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); + case TENANT2: + return Arrays.asList( + OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build(), + OqmSubscription.builder().name("de-" + tenantId + "-subscription-B").build() + ); + case TENANT3: + return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); + default: + return Collections.emptyList(); + } + }).when(manager).getSubscriberableSubscriptions(any(TenantInfo.class)); + + PowerMockito.doNothing().when(manager, "registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + //Run being tested method. + manager.provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); + + //verify 3 times getSubscriberableSubscriptions method was called + verifyPrivate(manager, times(3)).invoke("getSubscriberableSubscriptions", any(TenantInfo.class)); + + //verify only 4 subscribers creation were requested and completed + verifyPrivate(manager, times(4)).invoke("registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + } + + @Test + public void whenProvisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { + + //Mock only tenant 2 broker has control topic created so far. Others are not yet. + when(driver.getTopic(ArgumentMatchers.eq(properties.getRegisterSubscriberControlTopicName()), any(OqmDestination.class))) + .thenAnswer((question) -> { + String topicName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + if (destination.getPartitionId().equals(TENANT2)) { + return Optional.of(OqmTopic.builder().name(topicName).build()); + } + + return Optional.empty(); + }); + + //Mock only tenant 2 broker has control topic subscription created so far. Others are not yet. + when(driver.getSubscription(any(String.class), any(OqmDestination.class))) + .thenAnswer((question) -> { + String subscriptionName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + if (destination.getPartitionId().equals(TENANT2)) { + return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); + } + + return Optional.empty(); + }); + + //Mock driver smoothly creates a subscription + when(driver.createAndGetSubscription(any(OqmSubscription.class), any(OqmDestination.class))) + .thenAnswer(question -> question.<OqmSubscription>getArgument(0)); + + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + //Mock subscriberControlTopicSubscriptionName + Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); + //Mock subscriber silent and successful creation + PowerMockito.doNothing().when(manager, "registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + + //Run being tested method. + manager.provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); + + //verify 3 times the driver was asked if the topic exists (for all tenants) + verify(driver, times(3)).getTopic(any(String.class), any(OqmDestination.class)); + + //verify 2 times the driver was asked to create and get topic (for tenants 1 and 3 + verify(driver, times(2)).createAndGetTopic(any(String.class), any(OqmDestination.class)); + + //verify only 1 time the driver was asked if the subscription exists (for TENANT2) + verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); + + //verify 2 times the driver was asked to create the subscription (for TENANT1, TENANT2) + verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); + + //verify only 2 subscribers creation were requested and completed + verifyPrivate(manager, times(2)).invoke("registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + } + + @Test + public void whenUnprovisionControlTopicsSubscriptionsFromAllTenantsBrokers_thenProperOperationsInvoked() { + //Mock all 3 tenants' brokers have control topic subscriptions created so far. + when(driver.getSubscription(any(String.class), any(OqmDestination.class))) + .thenAnswer((question) -> { + String subscriptionName = question.getArgument(0); + OqmDestination destination = question.getArgument(1); + return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); + }); + + //Mock driver smoothly deletes a subscription + PowerMockito.doNothing().when(driver).deleteSubscription(any(String.class), any(OqmDestination.class)); + + manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); + //Mock subscriberControlTopicSubscriptionName + Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); + + //Run being tested method. + manager.unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); + + //verify 3 times the driver was asked about a subscription existence (for all tenants) + verify(driver, times(3)) + .getSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); + + + //verify 3 times the driver was asked to delete the subscription (for all tenants) + verify(driver, times(3)) + .deleteSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); + + } + + @NotNull + private Collection<TenantInfo> getMockedTenantInfos() { + Collection<TenantInfo> tenants = new ArrayList<>(); + + //Mock we have 3 tenants + tenants.add(new TenantInfo() { + { + setId(1L); + setDataPartitionId(TENANT1); + } + }); + tenants.add(new TenantInfo() { + { + setId(2L); + setDataPartitionId(TENANT2); + } + }); + tenants.add(new TenantInfo() { + { + setId(3L); + setDataPartitionId(TENANT3); + } + }); + return tenants; + } +} \ No newline at end of file