Skip to content
Snippets Groups Projects
Commit 2992349a authored by Rostislav Dublin (EPAM)'s avatar Rostislav Dublin (EPAM)
Browse files

OQM stuff added to register Subscribers on all Pull Subscriptions. OQM...

OQM stuff added to register Subscribers on all Pull Subscriptions. OQM successfully receives events from subscriptions and conduct to finite clients webhook endpoints
parent 172f16f0
No related branches found
No related tags found
1 merge request!144(GONRG-3831) GCP Notification: OQM mapper
Pipeline #79658 failed
......@@ -22,12 +22,17 @@ import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.mappers.oqm.OqmConfigurationProperties;
import org.opengroup.osdu.notification.service.NotificationHandler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
......@@ -35,6 +40,13 @@ import java.util.stream.Stream;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
/**
* Runs once on the service start.
* 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription.
* 2. Checks for the "subscriber control topic" and creates if it is absent.
* - This topic is a "control channel" between Register and Notification services.
* - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers.
*/
@Slf4j
@Component
@Scope(SCOPE_SINGLETON)
......@@ -58,7 +70,12 @@ public class OqmSubscriberManager {
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmNotificationHandler notificationHandler;
private final NotificationHandler notificationHandler;
private final OqmConfigurationProperties properties;
private final Long constructDate = System.currentTimeMillis();
private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate;
private OqmSubscription subscriberControlTopicSubscription = null;
@PostConstruct
......@@ -73,42 +90,118 @@ public class OqmSubscriberManager {
log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName());
//Register a Subscriber on every subscription
OqmDestination destination = getDestination(tenantInfo);
registerSubscriber(tenantInfo, subscription);
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
}
log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId());
}
String pubsubMessage = oqmMessage.getData();
String notificationId = subscription.getName();
Map<String, String> headerAttributes = oqmMessage.getAttributes();
TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
.orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName());
OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null);
if (controlTopic != null) {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName());
} else {
log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName());
driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant));
}
HttpResponse response;
boolean ackedNacked = false;
try {
response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
if (!response.isSuccessCode()) {
log.error(NOT_ACKNOWLEDGE + response.getBody());
} else {
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
}
ackedNacked = true;
OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName)
.topics(Collections.singletonList(controlTopic)).build();
} catch (Exception e) {
log.debug(NOT_ACKNOWLEDGE, e);
}
subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant));
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
if (!ackedNacked) oqmAckReplier.nack();
};
registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription);
log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
driver.subscribe(subscriber, destination);
log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
}
log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
}
log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
@PreDestroy
void onPreDestroy() {
log.info("OqmSubscriberManager bean on pre-destroy: STARTED");
if (subscriberControlTopicSubscription != null) {
TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny()
.orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo"));
log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.",
properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName);
driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant));
}
log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED");
}
private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) {
OqmDestination destination = getDestination(tenantInfo);
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
String pubsubMessage = oqmMessage.getData();
String notificationId = subscription.getName();
Map<String, String> headerAttributes = oqmMessage.getAttributes();
HttpResponse response;
boolean ackedNacked = false;
try {
response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
if (!response.isSuccessCode()) {
log.error(NOT_ACKNOWLEDGE + response.getBody());
} else {
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
}
ackedNacked = true;
} catch (Exception e) {
log.debug(NOT_ACKNOWLEDGE, e);
}
if (!ackedNacked) oqmAckReplier.nack();
};
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
driver.subscribe(subscriber, destination);
log.info("Just subscribed at topic {} subscription {} for tenant {}",
subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId());
}
private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) {
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
String pubsubMessage = oqmMessage.getData();
Map<String, String> headerAttributes = oqmMessage.getAttributes();
String subscriptionId = headerAttributes.get("subscription-id");
String dataPartitionId = headerAttributes.get("data-partition-id");
String topic = headerAttributes.get("topic");
OqmSubscription subscription = OqmSubscription.builder()
.topic(OqmTopic.builder().name(topic).build())
.name(subscriptionId)
.build();
registerSubscriber(tenantInfoFactory.getTenantInfo(dataPartitionId), subscription);
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
};
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build();
OqmDestination destination = getDestination(tenantInfo);
driver.subscribe(subscriber, destination);
log.info("Just subscribed at topic {} subscription {} for tenant {}",
controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId());
}
public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) {
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub;
package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
......@@ -28,11 +28,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@ConditionalOnProperty(name = "oqmDriver")
public class OqmNotificationHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class);
@Autowired
......
package org.opengroup.osdu.notification.provider.gcp.pubsub;
package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.cryptographic.SignatureService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
......@@ -19,6 +20,7 @@ import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SING
@Scope(SCOPE_SINGLETON)
@Primary
@Slf4j
@ConditionalOnProperty(name = "oqmDriver")
public class OqmSignatureService extends SignatureService {
@PostConstruct
void postConstruct() {
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub;
package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.cache.MultiTenantCache;
......@@ -25,11 +25,13 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@ConditionalOnProperty(name = "oqmDriver")
public class OqmSubscriptionCacheFactory {
@Autowired
private ITenantFactory tenantFactory;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub;
package org.opengroup.osdu.notification.provider.gcp.pubsub.di;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
......@@ -29,6 +29,7 @@ import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.io.IOException;
......@@ -37,6 +38,7 @@ import java.util.List;
import java.util.Map;
@Component
@ConditionalOnProperty(name = "oqmDriver")
public class OqmSubscriptionHandler {
@Autowired
private ISubscriptionFactory subscriptionFactory;
......
The content of the package is 4 classes derived from the eponymous classes from the core part
(find them by names with no "Oqm"prefix). They were derived for one only reason - to free OqmSubscriberManager
from addiction to "request scope" which is not valid for the OQM work context.
In the future, one may probably find a better way to achieve the same (and reuse original classes)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment