From 2992349aaa8e38784bf773f2ce48618585062884 Mon Sep 17 00:00:00 2001
From: Rostislav_Dublin <Rostislav_Dublin@epam.com>
Date: Wed, 1 Dec 2021 17:13:26 +0300
Subject: [PATCH] OQM stuff added to register Subscribers on all Pull
 Subscriptions. OQM successfully receives events from subscriptions and
 conduct to finite clients webhook endpoints

---
 .../gcp/pubsub/OqmSubscriberManager.java      | 147 ++++++++++++++----
 .../{ => di}/OqmNotificationHandler.java      |   4 +-
 .../pubsub/{ => di}/OqmSignatureService.java  |   4 +-
 .../{ => di}/OqmSubscriptionCacheFactory.java |   4 +-
 .../{ => di}/OqmSubscriptionHandler.java      |   4 +-
 .../provider/gcp/pubsub/di/README.md          |   6 +
 6 files changed, 138 insertions(+), 31 deletions(-)
 rename provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/{ => di}/OqmNotificationHandler.java (94%)
 rename provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/{ => di}/OqmSignatureService.java (79%)
 rename provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/{ => di}/OqmSubscriptionCacheFactory.java (93%)
 rename provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/{ => di}/OqmSubscriptionHandler.java (96%)
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md

diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
index c5b4fa9ce..55bdc1fe9 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
@@ -22,12 +22,17 @@ import org.opengroup.osdu.core.common.http.HttpResponse;
 import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
 import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
 import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
+import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
 import org.opengroup.osdu.core.gcp.oqm.model.*;
+import org.opengroup.osdu.notification.provider.gcp.mappers.oqm.OqmConfigurationProperties;
+import org.opengroup.osdu.notification.service.NotificationHandler;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -35,6 +40,13 @@ import java.util.stream.Stream;
 
 import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
 
+/**
+ * Runs once on the service start.
+ * 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription.
+ * 2. Checks for the "subscriber control topic" and creates if it is  absent.
+ * - This topic is a "control channel" between Register and Notification services.
+ * - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers.
+ */
 @Slf4j
 @Component
 @Scope(SCOPE_SINGLETON)
@@ -58,7 +70,12 @@ public class OqmSubscriberManager {
 
     private final ITenantFactory tenantInfoFactory;
     private final OqmDriver driver;
-    private final OqmNotificationHandler notificationHandler;
+    private final NotificationHandler notificationHandler;
+    private final OqmConfigurationProperties properties;
+
+    private final Long constructDate = System.currentTimeMillis();
+    private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate;
+    private OqmSubscription subscriberControlTopicSubscription = null;
 
 
     @PostConstruct
@@ -73,42 +90,118 @@ public class OqmSubscriberManager {
                 log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName());
 
                 //Register a Subscriber on every subscription
-                OqmDestination destination = getDestination(tenantInfo);
+                registerSubscriber(tenantInfo, subscription);
 
-                OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
+                log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
+            }
+            log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId());
+        }
 
-                    String pubsubMessage = oqmMessage.getData();
-                    String notificationId = subscription.getName();
-                    Map<String, String> headerAttributes = oqmMessage.getAttributes();
+        TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
+                .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
 
+        log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName());
+        OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null);
+        if (controlTopic != null) {
+            log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName());
+        } else {
+            log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName());
+            driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant));
+        }
 
-                    HttpResponse response;
-                    boolean ackedNacked = false;
-                    try {
-                        response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
+        log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'",
+                properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
 
-                        if (!response.isSuccessCode()) {
-                            log.error(NOT_ACKNOWLEDGE + response.getBody());
-                        } else {
-                            log.debug(ACKNOWLEDGE);
-                            oqmAckReplier.ack();
-                        }
-                        ackedNacked = true;
+        OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName)
+                .topics(Collections.singletonList(controlTopic)).build();
 
-                    } catch (Exception e) {
-                        log.debug(NOT_ACKNOWLEDGE, e);
-                    }
+        subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant));
+        log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.",
+                properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
 
-                    if (!ackedNacked) oqmAckReplier.nack();
-                };
+        registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription);
+        log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
+                properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
 
-                OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
-                driver.subscribe(subscriber, destination);
+        log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
+    }
 
-                log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
-            }
-            log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
+    @PreDestroy
+    void onPreDestroy() {
+        log.info("OqmSubscriberManager bean on pre-destroy: STARTED");
+        if (subscriberControlTopicSubscription != null) {
+            TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
+                    .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
+            log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
+                    properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
+            driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant));
         }
+        log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED");
+    }
+
+    private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) {
+        OqmDestination destination = getDestination(tenantInfo);
+
+        OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
+
+            String pubsubMessage = oqmMessage.getData();
+            String notificationId = subscription.getName();
+            Map<String, String> headerAttributes = oqmMessage.getAttributes();
+
+
+            HttpResponse response;
+            boolean ackedNacked = false;
+            try {
+                response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
+
+                if (!response.isSuccessCode()) {
+                    log.error(NOT_ACKNOWLEDGE + response.getBody());
+                } else {
+                    log.debug(ACKNOWLEDGE);
+                    oqmAckReplier.ack();
+                }
+                ackedNacked = true;
+
+            } catch (Exception e) {
+                log.debug(NOT_ACKNOWLEDGE, e);
+            }
+
+            if (!ackedNacked) oqmAckReplier.nack();
+        };
+
+        OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
+        driver.subscribe(subscriber, destination);
+        log.info("Just subscribed at topic {} subscription {} for tenant {}",
+                subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId());
+    }
+
+    private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) {
+
+        OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
+
+            String pubsubMessage = oqmMessage.getData();
+            Map<String, String> headerAttributes = oqmMessage.getAttributes();
+            String subscriptionId = headerAttributes.get("subscription-id");
+            String dataPartitionId = headerAttributes.get("data-partition-id");
+            String topic = headerAttributes.get("topic");
+
+            OqmSubscription subscription = OqmSubscription.builder()
+                    .topic(OqmTopic.builder().name(topic).build())
+                    .name(subscriptionId)
+                    .build();
+
+            registerSubscriber(tenantInfoFactory.getTenantInfo(dataPartitionId), subscription);
+
+            log.debug(ACKNOWLEDGE);
+            oqmAckReplier.ack();
+        };
+
+        OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build();
+        OqmDestination destination = getDestination(tenantInfo);
+        driver.subscribe(subscriber, destination);
+
+        log.info("Just subscribed at topic {} subscription {} for tenant {}",
+                controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId());
     }
 
     public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) {
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java
similarity index 94%
rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java
rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java
index 300e1e9de..2235850d9 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java
@@ -14,7 +14,7 @@
  *   limitations under the License.
  */
 
-package org.opengroup.osdu.notification.provider.gcp.pubsub;
+package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
 
 import org.opengroup.osdu.core.common.http.HttpClient;
 import org.opengroup.osdu.core.common.http.HttpRequest;
@@ -28,11 +28,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
 
 @Component
+@ConditionalOnProperty(name = "oqmDriver")
 public class OqmNotificationHandler {
     private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class);
     @Autowired
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java
similarity index 79%
rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java
rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java
index dc00dc0e4..0578733c9 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSignatureService.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java
@@ -1,7 +1,8 @@
-package org.opengroup.osdu.notification.provider.gcp.pubsub;
+package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
 
 import lombok.extern.slf4j.Slf4j;
 import org.opengroup.osdu.core.common.cryptographic.SignatureService;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Primary;
 import org.springframework.context.annotation.Scope;
 import org.springframework.context.annotation.ScopedProxyMode;
@@ -19,6 +20,7 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING
 @Scope(SCOPE_SINGLETON)
 @Primary
 @Slf4j
+@ConditionalOnProperty(name = "oqmDriver")
 public class OqmSignatureService extends SignatureService {
     @PostConstruct
     void postConstruct() {
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java
similarity index 93%
rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java
rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java
index c4b27b2a2..31d8353a2 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionCacheFactory.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java
@@ -14,7 +14,7 @@
  *   limitations under the License.
  */
 
-package org.opengroup.osdu.notification.provider.gcp.pubsub;
+package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
 
 import org.opengroup.osdu.core.common.cache.ICache;
 import org.opengroup.osdu.core.common.cache.MultiTenantCache;
@@ -25,11 +25,13 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
 import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
 
 @Component
+@ConditionalOnProperty(name = "oqmDriver")
 public class OqmSubscriptionCacheFactory {
     @Autowired
     private ITenantFactory tenantFactory;
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java
similarity index 96%
rename from provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java
rename to provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java
index 658121355..d326ced41 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java
@@ -14,7 +14,7 @@
  *   limitations under the License.
  */
 
-package org.opengroup.osdu.notification.provider.gcp.pubsub;
+package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
@@ -29,6 +29,7 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionService;
 import org.opengroup.osdu.core.common.notification.SubscriptionException;
 import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
@@ -37,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 @Component
+@ConditionalOnProperty(name = "oqmDriver")
 public class OqmSubscriptionHandler {
     @Autowired
     private ISubscriptionFactory subscriptionFactory;
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md
new file mode 100644
index 000000000..609d9001f
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md
@@ -0,0 +1,6 @@
+The content of the package is 4 classes derived from the eponymous classes from the core part 
+(find them by names with no "Oqm"prefix). They were derived for one only reason - to free OqmSubscriberManager 
+from addiction to "request scope" which is not valid for the OQM work context. 
+
+In the future, one may probably find a better way to achieve the same (and reuse original classes)
+
-- 
GitLab