diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index c5b4fa9ce36d933e81f208f2141a28f62df63c6a..55bdc1fe9dd576ca44db3b08087f1d1c49a55615 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -22,12 +22,17 @@ import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.notification.provider.gcp.mappers.oqm.OqmConfigurationProperties; +import org.opengroup.osdu.notification.service.NotificationHandler; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -35,6 +40,13 @@ import java.util.stream.Stream; import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; +/** + * Runs once on the service start. + * 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. + * 2. Checks for the "subscriber control topic" and creates if it is absent. + * - This topic is a "control channel" between Register and Notification services. + * - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers. + */ @Slf4j @Component @Scope(SCOPE_SINGLETON) @@ -58,7 +70,12 @@ public class OqmSubscriberManager { private final ITenantFactory tenantInfoFactory; private final OqmDriver driver; - private final OqmNotificationHandler notificationHandler; + private final NotificationHandler notificationHandler; + private final OqmConfigurationProperties properties; + + private final Long constructDate = System.currentTimeMillis(); + private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + private OqmSubscription subscriberControlTopicSubscription = null; @PostConstruct @@ -73,42 +90,118 @@ public class OqmSubscriberManager { log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName()); //Register a Subscriber on every subscription - OqmDestination destination = getDestination(tenantInfo); + registerSubscriber(tenantInfo, subscription); - OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { + log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName()); + } + log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); + } - String pubsubMessage = oqmMessage.getData(); - String notificationId = subscription.getName(); - Map<String, String> headerAttributes = oqmMessage.getAttributes(); + TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() + .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName()); + OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null); + if (controlTopic != null) { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName()); + } else { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName()); + driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)); + } - HttpResponse response; - boolean ackedNacked = false; - try { - response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes); + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); - if (!response.isSuccessCode()) { - log.error(NOT_ACKNOWLEDGE + response.getBody()); - } else { - log.debug(ACKNOWLEDGE); - oqmAckReplier.ack(); - } - ackedNacked = true; + OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) + .topics(Collections.singletonList(controlTopic)).build(); - } catch (Exception e) { - log.debug(NOT_ACKNOWLEDGE, e); - } + subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); - if (!ackedNacked) oqmAckReplier.nack(); - }; + registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription); + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); - OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build(); - driver.subscribe(subscriber, destination); + log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); + } - log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName()); - } - log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); + @PreDestroy + void onPreDestroy() { + log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); + if (subscriberControlTopicSubscription != null) { + TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() + .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); } + log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); + } + + private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { + OqmDestination destination = getDestination(tenantInfo); + + OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { + + String pubsubMessage = oqmMessage.getData(); + String notificationId = subscription.getName(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + + + HttpResponse response; + boolean ackedNacked = false; + try { + response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes); + + if (!response.isSuccessCode()) { + log.error(NOT_ACKNOWLEDGE + response.getBody()); + } else { + log.debug(ACKNOWLEDGE); + oqmAckReplier.ack(); + } + ackedNacked = true; + + } catch (Exception e) { + log.debug(NOT_ACKNOWLEDGE, e); + } + + if (!ackedNacked) oqmAckReplier.nack(); + }; + + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build(); + driver.subscribe(subscriber, destination); + log.info("Just subscribed at topic {} subscription {} for tenant {}", + subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId()); + } + + private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) { + + OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { + + String pubsubMessage = oqmMessage.getData(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + String subscriptionId = headerAttributes.get("subscription-id"); + String dataPartitionId = headerAttributes.get("data-partition-id"); + String topic = headerAttributes.get("topic"); + + OqmSubscription subscription = OqmSubscription.builder() + .topic(OqmTopic.builder().name(topic).build()) + .name(subscriptionId) + .build(); + + registerSubscriber(tenantInfoFactory.getTenantInfo(dataPartitionId), subscription); + + log.debug(ACKNOWLEDGE); + oqmAckReplier.ack(); + }; + + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build(); + OqmDestination destination = getDestination(tenantInfo); + driver.subscribe(subscriber, destination); + + log.info("Just subscribed at topic {} subscription {} for tenant {}", + controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId()); } public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) { diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java similarity index 94% rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java index 300e1e9de452ec862ddb9089af8801cbc48f98b4..2235850d9ce603adc494b4b73af65c46b2165741 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; import org.opengroup.osdu.core.common.http.HttpClient; import org.opengroup.osdu.core.common.http.HttpRequest; @@ -28,11 +28,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.util.Map; @Component +@ConditionalOnProperty(name = "oqmDriver") public class OqmNotificationHandler { private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class); @Autowired diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java similarity index 79% rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java index dc00dc0e456c1cd480b1f926fdd13a57b183e519..0578733c99062b875356b1d64627386038bbc0a6 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java @@ -1,7 +1,8 @@ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.common.cryptographic.SignatureService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.ScopedProxyMode; @@ -19,6 +20,7 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING @Scope(SCOPE_SINGLETON) @Primary @Slf4j +@ConditionalOnProperty(name = "oqmDriver") public class OqmSignatureService extends SignatureService { @PostConstruct void postConstruct() { diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java similarity index 93% rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java index c4b27b2a28e6cf01a39e9e3977e2db7e270fad23..31d8353a241e2c0b1186edf75a698bd89b56d1ae 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; import org.opengroup.osdu.core.common.cache.ICache; import org.opengroup.osdu.core.common.cache.MultiTenantCache; @@ -25,11 +25,13 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.util.Map; @Component +@ConditionalOnProperty(name = "oqmDriver") public class OqmSubscriptionCacheFactory { @Autowired private ITenantFactory tenantFactory; diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java similarity index 96% rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java index 658121355b5683567c3d8e773f9c4a16a3e5d063..d326ced415cf96015a42a14bb05f63353e7d9a6d 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; @@ -29,6 +29,7 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionService; import org.opengroup.osdu.core.common.notification.SubscriptionException; import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.io.IOException; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Map; @Component +@ConditionalOnProperty(name = "oqmDriver") public class OqmSubscriptionHandler { @Autowired private ISubscriptionFactory subscriptionFactory; diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md new file mode 100644 index 0000000000000000000000000000000000000000..609d9001fd89dd9649ea3d94084ab80428a3bda9 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md @@ -0,0 +1,6 @@ +The content of the package is 4 classes derived from the eponymous classes from the core part +(find them by names with no "Oqm"prefix). They were derived for one only reason - to free OqmSubscriberManager +from addiction to "request scope" which is not valid for the OQM work context. + +In the future, one may probably find a better way to achieve the same (and reuse original classes) +