From f8869a0d58b0ad985b6a761def1087522ea7fc95 Mon Sep 17 00:00:00 2001
From: Rostislav_Dublin <Rostislav_Dublin@epam.com>
Date: Wed, 17 Nov 2021 16:49:20 +0300
Subject: [PATCH] OQM stuff added to register Subscribers on all Pull
 Subscriptions. OQM successfully receives events from subscriptions and
 conduct to finite clients webhook endpoints

---
 pom.xml                                       |   2 +-
 provider/notification-gcp/pom.xml             |  13 +-
 .../oqm/MqTenantOqmDestinationResolver.java   | 144 ++++++++++++++++++
 .../oqm/PsTenantOqmDestinationResolver.java   |  97 ++++++++++++
 .../gcp/pubsub/OqmSubscriberManager.java      | 123 +++++++++++++++
 .../tempForPoc/OqmNotificationHandler.java    |  70 +++++++++
 .../tempForPoc/OqmSignatureService.java       |  27 ++++
 .../OqmSubscriptionCacheFactory.java          |  68 +++++++++
 .../tempForPoc/OqmSubscriptionHandler.java    | 107 +++++++++++++
 .../gcp/util/GcpAppServiceConfig.java         |  33 ++++
 .../gcp/util/GoogleServiceAccountImpl.java    |   2 +-
 .../util/ServiceAccountJwtGcpClientImpl.java  |   5 +-
 .../src/main/resources/logback.xml            |   4 +-
 13 files changed, 687 insertions(+), 8 deletions(-)
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmNotificationHandler.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSignatureService.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionCacheFactory.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionHandler.java
 create mode 100644 provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java

diff --git a/pom.xml b/pom.xml
index b8ad8e89b..4daf71429 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
 		<java.version>8</java.version>
 		<maven.compiler.target>${java.version}</maven.compiler.target>
 		<maven.compiler.source>${java.version}</maven.compiler.source>
-		<os-core-common.version>0.11.0</os-core-common.version>
+		<os-core-common.version>0.12.0</os-core-common.version>
 	</properties>
 
 	<licenses>
diff --git a/provider/notification-gcp/pom.xml b/provider/notification-gcp/pom.xml
index 19bcd78c2..6645da5d7 100644
--- a/provider/notification-gcp/pom.xml
+++ b/provider/notification-gcp/pom.xml
@@ -37,6 +37,17 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.opengroup.osdu</groupId>
+            <artifactId>osm</artifactId>
+            <version>0.13.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opengroup.osdu</groupId>
+            <artifactId>oqm</artifactId>
+            <version>0.13.1-SNAPSHOT</version>
+        </dependency>
+
         <dependency>
             <groupId>org.opengroup.osdu</groupId>
             <artifactId>os-core-common</artifactId>
@@ -44,7 +55,7 @@
         <dependency>
             <groupId>org.opengroup.osdu</groupId>
             <artifactId>core-lib-gcp</artifactId>
-            <version>0.11.0</version>
+            <version>0.12.0</version>
         </dependency>
 
         <dependency>
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java
new file mode 100644
index 000000000..44de6246f
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java
@@ -0,0 +1,144 @@
+package org.opengroup.osdu.notification.provider.gcp.mappers.oqm;
+
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.http.client.Client;
+import com.rabbitmq.http.client.ClientParameters;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.opengroup.osdu.core.common.model.http.DpsHeaders;
+import org.opengroup.osdu.core.common.partition.*;
+import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
+import org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolution;
+import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
+import org.opengroup.osdu.core.gcp.osm.translate.TranslatorRuntimeException;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
+
+/**
+ * For RabbitMQ. Tenant Based OQM destination resolver
+ *
+ * @author Rostislav_Dublin
+ * @since 09.11.2021
+ */
+@Component
+@Scope(SCOPE_SINGLETON)
+@ConditionalOnProperty(name = "oqmDriver", havingValue = "rabbitmq")
+@RequiredArgsConstructor
+@Slf4j
+public class MqTenantOqmDestinationResolver implements org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolver {
+
+    private final static String PARTITION_PROPERTIES_AMQP_PREFIX = "register.oqm.rabbitmq.amqp.";
+    private final static String PARTITION_PROPERTY_AMQP_HOST = PARTITION_PROPERTIES_AMQP_PREFIX.concat("host");
+    private final static String PARTITION_PROPERTY_AMQP_PORT = PARTITION_PROPERTIES_AMQP_PREFIX.concat("port");
+    private final static String PARTITION_PROPERTY_AMQP_PATH = PARTITION_PROPERTIES_AMQP_PREFIX.concat("path");
+    private final static String PARTITION_PROPERTY_AMQP_USERNAME = PARTITION_PROPERTIES_AMQP_PREFIX.concat("username");
+    private final static String PARTITION_PROPERTY_AMQP_PASSWORD = PARTITION_PROPERTIES_AMQP_PREFIX.concat("password");
+
+    private final static String PARTITION_PROPERTIES_ADMIN_PREFIX = "register.oqm.rabbitmq.admin.";
+    private final static String PARTITION_PROPERTY_ADMIN_SCHEMA = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("schema");
+    private final static String PARTITION_PROPERTY_ADMIN_HOST = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("host");
+    private final static String PARTITION_PROPERTY_ADMIN_PORT = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("port");
+    private final static String PARTITION_PROPERTY_ADMIN_PATH = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("path");
+    private final static String PARTITION_PROPERTY_ADMIN_USERNAME = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("username");
+    private final static String PARTITION_PROPERTY_ADMIN_PASSWORD = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("password");
+
+    private final DpsHeaders dpsHeaders;
+    private final IPartitionFactory partitionFactory;
+
+    private Map<OqmDestination, ConnectionFactory> amqpConnectionFactoryCache = new HashMap<>();
+    private Map<OqmDestination, Client> httpClientCache = new HashMap<>();
+
+    @Override
+    public MqOqmDestinationResolution resolve(OqmDestination destination) {
+
+        IPartitionProvider partitionProvider = partitionFactory.create(dpsHeaders);
+        PartitionInfo partitionInfo;
+        try {
+            partitionInfo = partitionProvider.get(destination.getPartitionId());
+        } catch (PartitionException e) {
+            throw new TranslatorRuntimeException(e, "Partition '{}' destination resolution issue", destination.getPartitionId());
+        }
+        Map<String, Property> partitionProperties = partitionInfo.getProperties();
+
+        String amqpHost = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_HOST)).map(Property::getValue).map(Object::toString).orElse(null);
+        String amqpPort = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PORT)).map(Property::getValue).map(Object::toString).orElse(null);
+        String amqpPath = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PATH)).map(Property::getValue).map(Object::toString).orElse(null);
+        String amqpUser = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_USERNAME)).map(Property::getValue).map(Object::toString).orElse(null);
+        String amqpPass = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PASSWORD)).map(Property::getValue).map(Object::toString).orElse(null);
+
+        String adminSchm = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_SCHEMA)).map(Property::getValue).map(Object::toString).orElse(null);
+        String adminHost = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_HOST)).map(Property::getValue).map(Object::toString).orElse(null);
+        String adminPort = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PORT)).map(Property::getValue).map(Object::toString).orElse(null);
+        String adminPath = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PATH)).map(Property::getValue).map(Object::toString).orElse(null);
+        String adminUser = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_USERNAME)).map(Property::getValue).map(Object::toString).orElse(null);
+        String adminPass = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PASSWORD)).map(Property::getValue).map(Object::toString).orElse(null);
+
+        if (amqpHost == null || amqpPort == null || amqpPath == null || amqpUser == null || amqpPass == null) {
+            throw new TranslatorRuntimeException(null, "Partition '{}' RabbitMQ OQM destination resolution AMQP part configuration issue", destination.getPartitionId());
+        }
+        if (adminSchm == null || adminHost == null || adminPort == null || adminPath == null || adminUser == null || adminPass == null) {
+            throw new TranslatorRuntimeException(null, "Partition '{}' RabbitMQ OQM destination resolution ADMIN part configuration issue", destination.getPartitionId());
+        }
+
+        //noinspection SwitchStatementWithTooFewBranches
+        switch (destination.getPartitionId()) {
+            default:
+
+                String virtualHost = "/";
+
+                ConnectionFactory amqpFactory = amqpConnectionFactoryCache.get(destination);
+                if (amqpFactory == null) {
+
+                    URI amqpUri;
+                    try {
+                        amqpUri = new URI("amqp", amqpUser + ":" + amqpPass, amqpHost, Integer.parseInt(amqpPort), amqpPath, null, null);
+                        amqpFactory = new ConnectionFactory();
+                        amqpFactory.setUri(amqpUri);
+                        amqpConnectionFactoryCache.put(destination, amqpFactory);
+
+                    } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
+                        throw new OqmDriverRuntimeException("RabbitMQ amqp URI and ConnectionFactory", e);
+                    }
+                }
+
+                Client httpClient = httpClientCache.get(destination);
+                if (httpClient == null) {
+                    try {
+                        URI httpUrl = new URI(adminSchm, null, adminHost, Integer.parseInt(adminPort), adminPath, null, null);
+                        ClientParameters clientParameters = new ClientParameters().url(httpUrl.toURL())
+                                .username("guest").password("guest");
+
+                        httpClient = new Client(clientParameters);
+                        httpClientCache.put(destination, httpClient);
+
+                    } catch (URISyntaxException | MalformedURLException e) {
+                        throw new OqmDriverRuntimeException("RabbitMQ http(api) URI and Client", e);
+                    }
+                }
+
+                return MqOqmDestinationResolution.builder()
+                        .amqpFactory(amqpFactory)
+                        .adminClient(httpClient)
+                        .virtualHost(virtualHost)
+                        .build();
+        }
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("On pre-destroy.");
+    }
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java
new file mode 100644
index 000000000..5c7293412
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java
@@ -0,0 +1,97 @@
+package org.opengroup.osdu.notification.provider.gcp.mappers.oqm;
+
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
+import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
+import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
+import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolution;
+import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolver;
+import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
+import org.opengroup.osdu.notification.provider.gcp.util.GcpAppServiceConfig;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
+
+/**
+ * For GCP PubSub. Tenant Based OQM destination resolver
+ *
+ * @author Rostislav_Dublin
+ * @since 09.11.2021
+ */
+@Component
+@Scope(SCOPE_SINGLETON)
+@ConditionalOnProperty(name = "oqmDriver", havingValue = "pubsub")
+@Slf4j
+@RequiredArgsConstructor
+public class PsTenantOqmDestinationResolver implements PsOqmDestinationResolver {
+
+    private Map<OqmDestination, TopicAdminClient> topicClientCache = new HashMap<>();
+    private Map<OqmDestination, SubscriptionAdminClient> subscriptionClientCache = new HashMap<>();
+
+    private final ITenantFactory tenantInfoFactory;
+    private final GcpAppServiceConfig config;
+
+    @Override
+    public PsOqmDestinationResolution resolve(OqmDestination destination) {
+        TenantInfo ti = tenantInfoFactory.getTenantInfo(destination.getPartitionId());
+        String partitionId = destination.getPartitionId();
+
+        //noinspection SwitchStatementWithTooFewBranches
+        switch (partitionId) {
+            default:
+                String servicesProjectId = config.getGoogleCloudProject();
+                String dataProjectId = ti.getProjectId();
+
+                TopicAdminClient tac = topicClientCache.get(destination);
+                if (tac == null) {
+                    try {
+                        TopicAdminSettings tas = TopicAdminSettings.newBuilder().build();
+                        tac = TopicAdminClient.create(tas);
+                        topicClientCache.put(destination, tac);
+                    } catch (IOException e) {
+                        throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve TopicAdminClient", e);
+                    }
+                }
+
+                SubscriptionAdminClient sac = subscriptionClientCache.get(destination);
+                if (sac == null) {
+                    try {
+                        sac = SubscriptionAdminClient.create();
+                        subscriptionClientCache.put(destination, sac);
+                    } catch (IOException e) {
+                        throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve SubscriptionAdminClient", e);
+                    }
+                }
+
+                return PsOqmDestinationResolution.builder()
+                        .servicesProjectId(servicesProjectId)
+                        .dataProjectId(dataProjectId)
+                        .topicAdminClient(tac)
+                        .subscriptionAdminClient(sac)
+                        .build();
+        }
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("On pre-destroy. {} topic client(s) & {} subscription clients to shutdown",
+                topicClientCache.size(), subscriptionClientCache.size());
+        for (TopicAdminClient tac : topicClientCache.values()) {
+            tac.shutdown();
+        }
+        for (SubscriptionAdminClient sac : subscriptionClientCache.values()) {
+            sac.shutdown();
+        }
+    }
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
new file mode 100644
index 000000000..069094efe
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2017-2020, Schlumberger
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opengroup.osdu.notification.provider.gcp.pubsub;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.opengroup.osdu.core.common.http.HttpResponse;
+import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
+import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
+import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
+import org.opengroup.osdu.core.gcp.oqm.model.*;
+import org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc.OqmNotificationHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
+
+@Slf4j
+@Component
+@Scope(SCOPE_SINGLETON)
+@ConditionalOnProperty(name = "oqmDriver")
+@RequiredArgsConstructor
+public class OqmSubscriberManager {
+
+    private final String ACKNOWLEDGE = "message acknowledged by client";
+    private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
+
+    //TODO should be externalized to application.properties
+    private static final List<OqmTopic> INTERESTED_TOPICS =
+            Stream.of("records-changed", "schema-changed", "status-changed", "legaltags_changed")
+                    .map(topicName -> OqmTopic.builder().name(topicName).build()).collect(Collectors.toList());
+
+    private static final String INTERESTED_SUBSCRIPTIONS_PREFIX = "de-";
+
+    private static final OqmSubscriptionQuery INTERESTED_SUBSCRIPTIONS_QUERY = OqmSubscriptionQuery.builder()
+            .forAnyOfTopics(INTERESTED_TOPICS).namePrefix(INTERESTED_SUBSCRIPTIONS_PREFIX)
+            .subscriberable(true).build();
+
+    private final ITenantFactory tenantInfoFactory;
+    private final OqmDriver driver;
+    private final OqmNotificationHandler notificationHandler;
+
+
+    @PostConstruct
+    void postConstruct() {
+        log.info("OqmSubscriberManager bean constructed. Provisioning STARTED");
+
+        //Get all Tenant infos
+        for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
+            log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId());
+            //For every Tenant Destination get "subscriberable" Subscriptions
+            for (OqmSubscription subscription : getSubscriberableSubscriptions(tenantInfo)) {
+                log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName());
+
+                //Register a Subscriber on every subscription
+                OqmDestination destination = getDestination(tenantInfo);
+
+                OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
+
+                    String pubsubMessage = oqmMessage.getData();
+                    String notificationId = subscription.getName();
+                    Map<String, String> headerAttributes = oqmMessage.getAttributes();
+
+
+                    HttpResponse response;
+                    boolean ackedNacked = false;
+                    try {
+                        response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
+
+                        if (!response.isSuccessCode()) {
+                            log.error(NOT_ACKNOWLEDGE + response.getBody());
+                        } else {
+                            log.debug(ACKNOWLEDGE);
+                            oqmAckReplier.ack();
+                        }
+                        ackedNacked = true;
+
+                    } catch (Exception e) {
+                        log.debug(NOT_ACKNOWLEDGE, e);
+                    }
+
+                    if (!ackedNacked) oqmAckReplier.nack();
+                };
+
+                OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
+                driver.subscribe(subscriber, destination);
+
+                log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
+            }
+            log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
+        }
+    }
+
+    public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) {
+        return driver.listSubscriptions(null, INTERESTED_SUBSCRIPTIONS_QUERY, getDestination(tenantInfo));
+    }
+
+    private OqmDestination getDestination(TenantInfo tenantInfo) {
+        return OqmDestination.builder().partitionId(tenantInfo.getDataPartitionId()).build();
+    }
+
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmNotificationHandler.java
new file mode 100644
index 000000000..33f8198d5
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmNotificationHandler.java
@@ -0,0 +1,70 @@
+/*
+ *   Copyright 2017-2020, Schlumberger
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
+
+import org.opengroup.osdu.core.common.http.HttpClient;
+import org.opengroup.osdu.core.common.http.HttpRequest;
+import org.opengroup.osdu.core.common.http.HttpResponse;
+import org.opengroup.osdu.core.common.model.http.DpsHeaders;
+import org.opengroup.osdu.core.common.model.notification.Secret;
+import org.opengroup.osdu.core.common.model.notification.Subscription;
+import org.opengroup.osdu.notification.auth.factory.AuthFactory;
+import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class OqmNotificationHandler {
+    private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class);
+    @Autowired
+    private HttpClient httpClient;
+    @Autowired
+    private OqmSubscriptionHandler subscriptionHandler;
+    @Autowired
+    private AuthFactory authFactory;
+    @Value("${app.waitingTime:30000}")
+    private int WAITING_TIME;
+
+    public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
+        Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId, headerAttributes);
+        Secret secret = subscription.getSecret();
+        String endpoint = subscription.getPushEndpoint();
+        String secretType = secret.getSecretType();
+        String pushUrl;
+
+        // Authentication Secret
+        SecretAuth secretAuth = authFactory.getSecretAuth(secretType);
+        secretAuth.setSecret(secret);
+        pushUrl = secretAuth.getPushUrl(endpoint);
+
+        Map<String, String> requestHeader = secretAuth.getRequestHeaders();
+        requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
+        requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
+        requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
+
+        HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
+        HttpResponse response = httpClient.send(request);
+        LOGGER.debug("Sending out notification to endpoint: " + endpoint);
+        return response;
+    }
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSignatureService.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSignatureService.java
new file mode 100644
index 000000000..5643d420e
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSignatureService.java
@@ -0,0 +1,27 @@
+package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.opengroup.osdu.core.common.cryptographic.SignatureService;
+import org.springframework.context.annotation.Primary;
+import org.springframework.context.annotation.Scope;
+import org.springframework.context.annotation.ScopedProxyMode;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
+
+/**
+ * @author Rostislav_Dublin
+ * @since 17.11.2021
+ */
+@Component
+@Scope(SCOPE_SINGLETON)
+@Primary
+@Slf4j
+public class OqmSignatureService extends SignatureService {
+    @PostConstruct
+    void postConstruct() {
+        log.info("OqmSignatureService bean constructed.");
+    }
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionCacheFactory.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionCacheFactory.java
new file mode 100644
index 000000000..61fb13057
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionCacheFactory.java
@@ -0,0 +1,68 @@
+/*
+ *   Copyright 2017-2020, Schlumberger
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
+
+import org.opengroup.osdu.core.common.cache.ICache;
+import org.opengroup.osdu.core.common.cache.MultiTenantCache;
+import org.opengroup.osdu.core.common.cache.VmCache;
+import org.opengroup.osdu.core.common.model.http.AppException;
+import org.opengroup.osdu.core.common.model.http.DpsHeaders;
+import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
+import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+@Component
+public class OqmSubscriptionCacheFactory {
+    @Autowired
+    private ITenantFactory tenantFactory;
+
+    private MultiTenantCache<String> caches;
+
+    public OqmSubscriptionCacheFactory(@Value("${app.expireTime}") int expireTime, @Value("${app.maxCacheSize}") int maxCacheSize) {
+        this.caches = new MultiTenantCache<>(new VmCache<>(expireTime, maxCacheSize));
+    }
+
+    public void put(String key, String val, Map<String, String> headerAttributes) {
+        this.partitionCache(headerAttributes).put(key, val);
+    }
+
+    public String get(String key, Map<String, String> headerAttributes) {
+        return this.partitionCache(headerAttributes).get(key);
+    }
+
+    public void delete(String key, Map<String, String> headerAttributes) {
+        this.partitionCache(headerAttributes).delete(key);
+    }
+
+    public void clearAll(Map<String, String> headerAttributes) {
+        this.partitionCache(headerAttributes).clearAll();
+    }
+
+    private ICache<String, String> partitionCache(Map<String, String> headerAttributes) {
+        String tenantId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID);
+        TenantInfo tenantInfo = this.tenantFactory.getTenantInfo(tenantId);
+        if (tenantInfo == null) {
+            throw AppException.createUnauthorized(String.format("could not retrieve tenant info for data partition id: %s", tenantId));
+        }
+        return this.caches.get(String.format("%s:subscription", tenantInfo.getDataPartitionId()));
+    }
+
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionHandler.java
new file mode 100644
index 000000000..0149129fe
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/tempForPoc/OqmSubscriptionHandler.java
@@ -0,0 +1,107 @@
+/*
+ *   Copyright 2017-2020, Schlumberger
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import org.apache.http.HttpStatus;
+import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
+import org.opengroup.osdu.core.common.model.http.AppException;
+import org.opengroup.osdu.core.common.model.http.DpsHeaders;
+import org.opengroup.osdu.core.common.model.notification.Subscription;
+import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
+import org.opengroup.osdu.core.common.notification.ISubscriptionService;
+import org.opengroup.osdu.core.common.notification.SubscriptionException;
+import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class OqmSubscriptionHandler {
+    @Autowired
+    private ISubscriptionFactory subscriptionFactory;
+    @Autowired
+    private OqmSubscriptionCacheFactory subscriptionCacheFactory;
+    @Autowired
+    private JaxRsDpsLog log;
+
+    @Autowired
+    private IServiceAccountJwtClient serviceAccountJwtClient;
+
+    private static final Gson gson = new Gson();
+    private ObjectMapper objectMapper;
+
+    public Subscription getSubscriptionFromCache(String notificationId, Map<String, String> headerAttributes) {
+        String subscriptionString = subscriptionCacheFactory.get(notificationId, headerAttributes);
+        try {
+            if (Strings.isNullOrEmpty(subscriptionString))
+                subscriptionString = querySubscriptionAndUpdateCache(notificationId, headerAttributes);
+            ObjectMapper objectMapper = this.getObjectMapper();
+            return objectMapper.readValue(subscriptionString, Subscription.class);
+        } catch (IOException e) {
+            throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error Parsing subscription String to object", "Unexpected error in pushing message", e);
+        } catch (SubscriptionException se) {
+            throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error query subscription from registration", "Unexpected error in pushing message", se);
+        }
+    }
+
+    private String querySubscriptionAndUpdateCache(String notificationId, Map<String, String> headerAttributes) throws AppException, SubscriptionException {
+        DpsHeaders headers = getDpsHeaders(headerAttributes);
+        ISubscriptionService service = subscriptionFactory.create(headers);
+
+        List<Subscription> subscriptionList = service.query(notificationId);
+        if (subscriptionList == null || subscriptionList.size() == 0) {
+            throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
+        }
+
+        Subscription subscription = subscriptionList.get(0);
+        String jsonSubscription = gson.toJson(subscription);
+        this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription, headerAttributes);
+
+        return jsonSubscription;
+    }
+
+    private DpsHeaders getDpsHeaders(Map<String, String> headerAttributes) {
+        Map<String, String> attributes = new HashMap<>(headerAttributes);
+
+        //extract headers from pubsub message
+        String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID);
+        String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId);
+        attributes.put(DpsHeaders.AUTHORIZATION, authToken);
+        return DpsHeaders.createFromMap(attributes);
+
+    }
+
+    //unit test purpose
+    protected ObjectMapper getObjectMapper() {
+        if (this.objectMapper == null) {
+            this.objectMapper = new ObjectMapper();
+        }
+        return this.objectMapper;
+    }
+
+    //unit test purpose
+    void setObjectMapper(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+    }
+}
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java
new file mode 100644
index 000000000..abf175474
--- /dev/null
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017-2020, Schlumberger
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opengroup.osdu.notification.provider.gcp.util;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Component;
+
+@Component
+@Primary
+public class GcpAppServiceConfig {
+
+    @Value("${GOOGLE_CLOUD_PROJECT}")
+    private String googleCloudProject;
+
+    public String getGoogleCloudProject() {
+        return googleCloudProject;
+    }
+}
\ No newline at end of file
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java
index 1555ab181..ae30169a5 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java
@@ -18,7 +18,7 @@ package org.opengroup.osdu.notification.provider.gcp.util;
 
 import lombok.SneakyThrows;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.opengroup.osdu.core.gcp.GoogleIdToken.IGoogleIdTokenFactory;
+import org.opengroup.osdu.core.gcp.googleidtoken.IGoogleIdTokenFactory;
 import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java
index 1484bad38..b5b7d51ff 100644
--- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java
+++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java
@@ -19,8 +19,7 @@ package org.opengroup.osdu.notification.provider.gcp.util;
 
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
 import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson.JacksonFactory;
+import com.google.api.client.json.gson.GsonFactory;
 import com.google.api.services.iam.v1.Iam;
 import com.google.api.services.iam.v1.IamScopes;
 import com.google.api.services.iam.v1.model.SignJwtRequest;
@@ -59,7 +58,7 @@ public class ServiceAccountJwtGcpClientImpl implements IServiceAccountJwtClient
     private AppProperties config;
     private static final String JWT_AUDIENCE = "https://www.googleapis.com/oauth2/v4/token";
     private static final String SERVICE_ACCOUNT_NAME_FORMAT = "projects/%s/serviceAccounts/%s";
-    private static final JsonFactory JSON_FACTORY = new JacksonFactory();
+    private static final GsonFactory JSON_FACTORY = new GsonFactory();
     static final String INVALID_INPUT = "Invalid inputs provided to getIdToken function";
     static final String INVALID_DATA_PARTITION = "Invalid data partition id";
 
diff --git a/provider/notification-gcp/src/main/resources/logback.xml b/provider/notification-gcp/src/main/resources/logback.xml
index 8d6d9b140..0f86f0a97 100644
--- a/provider/notification-gcp/src/main/resources/logback.xml
+++ b/provider/notification-gcp/src/main/resources/logback.xml
@@ -3,7 +3,7 @@
   <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
   <property resource="application.properties" />
   <logger name="org.opengroup.osdu" level="${LOG_LEVEL}"/>
-  <springProfile name="local">
+  <springProfile name="!local">
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
       <encoder>
         <pattern>%yellow([%thread]) %highlight(| %-5level |) %green(%d) %cyan(| %logger{15} |) %highlight(%msg) %n</pattern>
@@ -15,7 +15,7 @@
     </root>
   </springProfile>
 
-  <springProfile name="!local">
+  <springProfile name="off">
     <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
       <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
         <layout class="ch.qos.logback.contrib.json.classic.JsonLayout">
-- 
GitLab