Skip to content
Snippets Groups Projects
Commit d7af07e4 authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

upgrade OQM, added pubsub validation, remove sub provisioning(GONRG-7073)

parent 93f05a51
No related branches found
No related tags found
1 merge request!386upgrade OQM, added pubsub validation, remove sub provisioning(GONRG-7073)
......@@ -16,7 +16,6 @@ Apache-1.1
The following software have components provided under the terms of this license:
- Apache Geronimo JMS Spec 2.0 (from http://geronimo.apache.org/maven/${siteId}/${version})
- Apache Log4j Core (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core)
- Apache Log4j JUL Adapter (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-jul)
- AspectJ Weaver (from http://www.aspectj.org, https://www.eclipse.org/aspectj/)
- Microsoft Application Insights Java Agent (from https://github.com/Microsoft/ApplicationInsights-Java)
......@@ -319,6 +318,7 @@ BSD-2-Clause
The following software have components provided under the terms of this license:
- API Common (from https://github.com/googleapis, https://github.com/googleapis/api-common-java, https://repo1.maven.org/maven2/com/google/api/api-common)
- Apache Log4j Core (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core)
- GAX (Google Api eXtensions) for Java (Core) (from https://github.com/googleapis, https://github.com/googleapis/gax-java, https://repo1.maven.org/maven2/com/google/api/gax)
- GAX (Google Api eXtensions) for Java (gRPC) (from <https://repo1.maven.org/maven2/com/google/api/gax-grpc>, https://repo1.maven.org/maven2/com/google/api/gax-grpc)
- Hamcrest (from http://hamcrest.org/JavaHamcrest/)
......@@ -490,6 +490,13 @@ The following software have components provided under the terms of this license:
- Jakarta XML Binding API (from https://repo1.maven.org/maven2/jakarta/xml/bind/jakarta.xml.bind-api, https://repo1.maven.org/maven2/org/jboss/spec/javax/xml/bind/jboss-jaxb-api_2.3_spec)
- Java Servlet 4.0 API
========================================================================
GPL-1.0-only
========================================================================
The following software have components provided under the terms of this license:
- Java Architecture for XML Binding (from http://jaxb.java.net/, https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api)
========================================================================
GPL-2.0-only
========================================================================
......@@ -528,7 +535,6 @@ The following software have components provided under the terms of this license:
- Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca)
- Jakarta Servlet (from https://projects.eclipse.org/projects/ee4j.servlet)
- Java Architecture for XML Binding (from http://jaxb.java.net/, https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api)
- Java Servlet 4.0 API
========================================================================
......
......@@ -114,11 +114,11 @@ $ (cd notification-core/ && mvn clean install)
At PubSub should be created set of topics and subscriptions. (see [Overview](#overview))
| topic name | subscription name pattern | description | sensitive? |
|-------------------------------------------------------|--------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|
| `register-subscriber-control` | `notification-control-topic-{data-partition-id}` | Register subscriber control topic | yes |
| `{topic-name}` e.g. `records-changed` | `notification-{topic-name}-service` | Service topics from [topics.json](https://community.opengroup.org/osdu/platform/system/register/-/blob/master/provider/register-gc/src/main/resources/topics.json) on Register service | yes |
| `{topic-name}-publish` e.g. `records-changed-publish` | `notification-{topic-name}-publish}` | Publish topics in relation 1-on-1 for each service topic | yes |
| topic name | subscription name pattern | description | sensitive? |
|-------------------------------------------------------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|
| `register-subscriber-control` | `notification-control-sub` | Register subscriber control topic | yes |
| `{topic-name}` e.g. `records-changed` | `notification-{topic-name}-service` | Service topics from [topics.json](https://community.opengroup.org/osdu/platform/system/register/-/blob/master/provider/register-gc/src/main/resources/topics.json) on Register service | yes |
| `{topic-name}-publish` e.g. `records-changed-publish` | `notification-{topic-name}-publish` | Publish topics in relation 1-on-1 for each service topic | yes |
Control topic name can be overridden by:
......
......@@ -16,7 +16,6 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.opengroup.osdu</groupId>
<artifactId>notification-gc</artifactId>
<version>0.21.0-SNAPSHOT</version>
<name>notification-gc</name>
......@@ -34,11 +33,21 @@
<java.version>8</java.version>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
<jackson-databind.version>2.13.2.2</jackson-databind.version>
<jackson.version>2.13.2</jackson.version>
<spring-boot-maven-plugin.version>2.7.6</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>2.15.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.opengroup.osdu</groupId>
......@@ -98,25 +107,10 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>oqm</artifactId>
<version>0.21.0-rc3</version>
<version>0.21.0-rc5</version>
</dependency>
</dependencies>
......@@ -159,12 +153,6 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
......
......@@ -2,18 +2,17 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberManager.*;
......@@ -22,64 +21,62 @@ import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberM
@RequiredArgsConstructor
@ConditionalOnProperty(name = "oqmDriver")
@Slf4j
@Deprecated
public class OqmSubscriptionProvider {
public class MessageBrokerProvider {
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmConfigurationProperties properties;
private final OqmSubscriptionHandler subscriptionHandler;
@Deprecated
public void createSubscriptions(TenantInfo tenantInfo) {
public void validateMessageBrokerConfiguration(TenantInfo tenantInfo) {
List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId());
topics.forEach(oqmTopic -> getOrCrateSubscription(NOTIFICATION_PREFIX + oqmTopic.getName() + SERVICE_SUFFIX,
oqmTopic.getName(), tenantInfo));
topics.forEach(oqmTopic -> getOrCrateSubscription(NOTIFICATION_PREFIX + oqmTopic.getName() + PUBLISH_SUFFIX,
oqmTopic.getName() + PUBLISH_SUFFIX, tenantInfo));
topics.forEach(oqmTopic -> getMessageBrokerTopic(oqmTopic.getName(), tenantInfo));
topics.forEach(
oqmTopic -> getMessageBrokerTopic(oqmTopic.getName() + PUBLISH_SUFFIX, tenantInfo));
// Source message broker subscriptions
topics.forEach(oqmTopic -> getMessageBrokerSubscription(
NOTIFICATION_PREFIX + oqmTopic.getName() + SERVICE_SUFFIX,
tenantInfo));
// Target message broker subscriptions for replication
topics.forEach(oqmTopic -> getMessageBrokerSubscription(
NOTIFICATION_PREFIX + oqmTopic.getName() + PUBLISH_SUFFIX,
tenantInfo));
}
@NotNull
@Deprecated
public OqmSubscription getOrCrateSubscription(String subscriptionName, String topicName, TenantInfo tenantInfo) {
public OqmSubscription getMessageBrokerSubscription(String subscriptionName, TenantInfo tenantInfo) {
OqmSubscription subscription = driver.getSubscription(subscriptionName, getDestination(tenantInfo)).orElse(null);
if (subscription == null) {
OqmSubscription subscriptionRequest = OqmSubscription.builder()
.name(subscriptionName)
.topics(Collections.singletonList(getOrCreateTopic(topicName, tenantInfo)))
.build();
subscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenantInfo));
log.debug("Subscription with name '{}' CREATED.", subscriptionName);
log.error("Subscriber: {} not found. Create subscriber and restart service.",
subscriptionName);
throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required subscription not exists.",
String.format(
"Required subscription not exists. Create subscription: %s and restart service.",
subscriptionName
)
);
}
return subscription;
}
@NotNull
@Deprecated
public OqmTopic getOrCreateTopic(String topicName, TenantInfo tenantInfo) {
public OqmTopic getMessageBrokerTopic(String topicName, TenantInfo tenantInfo) {
OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null);
if (topic == null) {
topic = driver.createAndGetTopic(topicName, getDestination(tenantInfo));
log.debug("OQM: '{}' control topic CREATED", topicName);
log.error("OQM: check for topic {} existence: ABSENT.", topicName);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required topic not exists.",
String.format(
"Required topic not exists. Create topic: %s and restart service.",
topicName
)
);
}
return topic;
}
@Deprecated
public void deleteControlTopicsSubscription(String controlTopicSubscriptionName) {
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
log.debug("OqmSubscriptionProvider on pre-destroy for tenant {}:", tenantInfo.getDataPartitionId());
OqmSubscription controlTopicSubscription = driver.getSubscription(controlTopicSubscriptionName, getDestination(tenantInfo)).orElse(null);
if (controlTopicSubscription != null) {
driver.deleteSubscription(controlTopicSubscriptionName, getDestination(tenantInfo));
log.debug("Deleted '{}' subscriber control topic subscription with name '{}'.",
properties.getRegisterSubscriberControlTopicName(), controlTopicSubscriptionName);
}
}
}
private OqmDestination getDestination(TenantInfo tenantInfo) {
return OqmDestination.builder()
.partitionId(tenantInfo.getDataPartitionId())
......
......@@ -34,7 +34,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
......@@ -55,15 +54,14 @@ public class OqmSubscriberManager {
public static final String NOTIFICATION_PREFIX = "notification-";
public static final String SERVICE_SUFFIX = "-service";
public static final String PUBLISH_SUFFIX = "-publish";
public static final String CONTROL_SUBSCRIPTION = NOTIFICATION_PREFIX + "control-sub";
private final OqmSubscriptionHandler subscriptionHandler;
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmConfigurationProperties properties;
private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache;
private final OqmNotificationHandler notificationHandler;
@Deprecated
private final OqmSubscriptionProvider subscriptionProvider;
private final MessageBrokerProvider messageBrokerProvider;
@PostConstruct
void postConstruct() {
......@@ -74,15 +72,6 @@ public class OqmSubscriberManager {
log.debug("OQM | Provisioning COMPLETED.");
}
@PreDestroy
void onPreDestroy() {
log.debug("OQM | Destroy STARTED.");
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
subscriptionProvider.deleteControlTopicsSubscription(getControlTopicSubscriptionName(tenantInfo));
}
log.debug("OQM | Destroy COMPLETED.");
}
/**
* Initial filling a third-party subscription info cache with entities, which have already stored
* in Register service's database. Cache should contain all necessary information to send notification to the recipient.
......@@ -101,10 +90,8 @@ public class OqmSubscriberManager {
}
void provisionControlTopicSubscribers() {
String controlTopicName = properties.getRegisterSubscriberControlTopicName();
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
OqmSubscription subscription = subscriptionProvider.getOrCrateSubscription(getControlTopicSubscriptionName(tenantInfo), controlTopicName, tenantInfo);
OqmSubscription subscription = messageBrokerProvider.getMessageBrokerSubscription(CONTROL_SUBSCRIPTION, tenantInfo);
registerControlTopicSubscriber(tenantInfo, subscription);
log.debug("Control topic subscriber REGISTERED for tenant: {}.", tenantInfo.getDataPartitionId());
}
......@@ -113,7 +100,7 @@ public class OqmSubscriberManager {
void provisionServiceSubscribers() {
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
log.debug("Provision service topics subscribers for tenant: '{}'", tenantInfo.getDataPartitionId());
subscriptionProvider.createSubscriptions(tenantInfo);
messageBrokerProvider.validateMessageBrokerConfiguration(tenantInfo);
List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId());
for (OqmSubscription subscription : buildServiceSubscriptions(topics)) {
registerServiceTopicSubscriber(tenantInfo, subscription);
......@@ -201,11 +188,6 @@ public class OqmSubscriberManager {
.collect(Collectors.toList());
}
@Deprecated
private String getControlTopicSubscriptionName(TenantInfo tenantInfo) {
return NOTIFICATION_PREFIX + "control-topic-" + tenantInfo.getDataPartitionId();
}
private OqmDestination getDestination(TenantInfo tenantInfo) {
return OqmDestination.builder()
.partitionId(tenantInfo.getDataPartitionId())
......
......@@ -17,4 +17,5 @@
service.token.provider=GCP
partition-auth-enabled=true
oqmDriver=pubsub
\ No newline at end of file
oqmDriver=pubsub
dead-lettering-required=true
\ No newline at end of file
......@@ -51,7 +51,7 @@ public class OqmSubscriberManagerTest {
@Mock
private OqmDriver driver;
@Mock
private OqmSubscriptionProvider subscriptionProvider;
private MessageBrokerProvider subscriptionProvider;
@Mock
private OqmSubscriptionHandler subscriptionHandler;
@Mock
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment