diff --git a/NOTICE b/NOTICE index c98d48db0e383f25b8bf65ecc3cb2fc973b6624e..350e57837be04cc0c64a47ba5439d40597eb07da 100644 --- a/NOTICE +++ b/NOTICE @@ -155,6 +155,7 @@ The following software have components provided under the terms of this license: - Kotlin Stdlib Common (from https://kotlinlang.org/) - Kotlin Stdlib Jdk7 (from <https://kotlinlang.org/>, https://kotlinlang.org/) - Kotlin Stdlib Jdk8 (from <https://kotlinlang.org/>, https://kotlinlang.org/) +- Lettuce (from http://github.com/lettuce-io/lettuce-core) - Lucene Core (from https://repo1.maven.org/maven2/org/apache/lucene/lucene-core) - Metrics Core (from https://repo1.maven.org/maven2/io/dropwizard/metrics/metrics-core) - Microsoft Application Insights Java Agent (from https://github.com/Microsoft/ApplicationInsights-Java) @@ -236,12 +237,16 @@ The following software have components provided under the terms of this license: - Spring Boot WebFlux Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-webflux, https://spring.io/projects/spring-boot) - Spring Commons Logging Bridge (from https://github.com/spring-projects/spring-framework) - Spring Context (from http://www.springframework.org, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-context) +- Spring Context Support (from https://github.com/spring-projects/spring-framework) - Spring Core (from http://www.springframework.org, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-core) - Spring Data Core (from https://spring.io/projects/spring-data) +- Spring Data KeyValue (from https://repo1.maven.org/maven2/org/springframework/data/spring-data-keyvalue) - Spring Data MongoDB - Core (from https://repo1.maven.org/maven2/org/springframework/data/spring-data-mongodb) +- Spring Data Redis (from http://github.com/spring-projects/spring-data-redis, https://repo1.maven.org/maven2/org/springframework/data/spring-data-redis, https://spring.io/projects/spring-data-redis) - Spring Expression Language (SpEL) (from https://github.com/SpringSource/spring-framework, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-expression) - Spring JMS (from http://www.springframework.org, https://github.com/SpringSource/spring-framework, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-jms) - Spring Messaging (from https://github.com/spring-projects/spring-framework) +- Spring Object/XML Marshalling (from https://github.com/spring-projects/spring-framework) - Spring Plugin - Metadata Extension (from https://repo1.maven.org/maven2/org/springframework/plugin/spring-plugin-metadata) - Spring Plugin Core (from https://github.com/spring-projects/spring-plugin/spring-plugin-core, https://repo1.maven.org/maven2/org/springframework/plugin/spring-plugin-core) - Spring Security - Core (from http://spring.io/spring-security, https://repo1.maven.org/maven2/org/springframework/security/spring-security-core, https://spring.io/projects/spring-security, https://spring.io/spring-security) @@ -364,6 +369,7 @@ The following software have components provided under the terms of this license: - RE2/J (from http://github.com/google/re2j) - Redisson (from http://redisson.org) - Spring Core (from http://www.springframework.org, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-core) +- Spring Data KeyValue (from https://repo1.maven.org/maven2/org/springframework/data/spring-data-keyvalue) - ThreeTen backport (from https://github.com/ThreeTen/threetenbp, https://www.threeten.org/threetenbp) ======================================================================== @@ -628,7 +634,7 @@ The following software have components provided under the terms of this license: - QpidJMS Client (from https://repo1.maven.org/maven2/org/apache/qpid/qpid-jms-client) - SLF4J API Module (from http://www.slf4j.org) - Spongy Castle (from http://rtyley.github.io/spongycastle/) -- Spring Data for Azure Cosmos DB SQL API (from https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-spring-data-cosmos) +- Spring Data for Azure Cosmos DB SQL API (from https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-spring-data-cosmos, https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/spring/azure-spring-data-cosmos) - ThreeTen backport (from https://github.com/ThreeTen/threetenbp, https://www.threeten.org/threetenbp) - adal4j (from https://github.com/AzureAD/azure-activedirectory-library-for-java) - micrometer-core (from https://github.com/micrometer-metrics/micrometer) @@ -693,7 +699,6 @@ The following software have components provided under the terms of this license: - Undertow Core (from <https://repo1.maven.org/maven2/io/undertow/undertow-core>, https://repo1.maven.org/maven2/io/undertow/undertow-core) - Undertow WebSockets JSR356 implementations (from <https://repo1.maven.org/maven2/io/undertow/undertow-websockets-jsr>, https://repo1.maven.org/maven2/io/undertow/undertow-websockets-jsr) - XNIO API (from <http://www.jboss.org/xnio>, http://www.jboss.org/xnio) -- XNIO NIO Implementation (from <https://repo1.maven.org/maven2/org/jboss/xnio/xnio-nio>, https://repo1.maven.org/maven2/org/jboss/xnio/xnio-nio) ======================================================================== unknown diff --git a/provider/notification-gc/docs/baremetal/README.md b/provider/notification-gc/docs/baremetal/README.md index 0d168b61eee2766ec9262c8d8020ea2e8f4567a7..809766233946a9abd12b0efcd7a9edb21814f952 100644 --- a/provider/notification-gc/docs/baremetal/README.md +++ b/provider/notification-gc/docs/baremetal/README.md @@ -82,9 +82,9 @@ After the service has started it should be accessible via a web browser by visit **Entitlements configuration for integration accounts** -| DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | -| --- | --- | --- | --- | -|notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br>| service.entitlements.user<br/>users<br/>users.datalake.admins</br> | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/>| +| DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | +|-----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|---------------------------------------------------------------------|------------------------------------------| +| notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br> | service.entitlements.user<br/>users<br/>users.datalake.admins</br>service.legal.admin | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/> | Above variables should be configured in the release pipeline to run integration tests. You should also replace them with proper values if you wish to run tests locally. diff --git a/provider/notification-gc/docs/gc/README.md b/provider/notification-gc/docs/gc/README.md index b2c8bb3ff1ba31344e6b21a810c2fb6f55a5566b..eaa1e5ad418b8f84fc23f5ff53f545f3ec6c418f 100644 --- a/provider/notification-gc/docs/gc/README.md +++ b/provider/notification-gc/docs/gc/README.md @@ -84,9 +84,9 @@ After the service has started it should be accessible via a web browser by visit **Entitlements configuration for integration accounts** -| DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | -| --- | --- | --- | --- | -|notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br>| service.entitlements.user<br/>users<br/>users.datalake.admins</br> | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/>| +| DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | +|-----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|---------------------------------------------------------------------|------------------------------------------| +| notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br> | service.entitlements.user<br/>users<br/>users.datalake.admins</br>service.legal.admin | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/> | Above variables should be configured in the release pipeline to run integration tests. You should also replace them with proper values if you wish to run tests locally. diff --git a/provider/notification-gc/pom.xml b/provider/notification-gc/pom.xml index 933e7c83c0a628ece30a7fb3be94202800aed979..5db03544771ac599bff920c3a047c32d0609860d 100644 --- a/provider/notification-gc/pom.xml +++ b/provider/notification-gc/pom.xml @@ -48,6 +48,16 @@ </dependencyManagement> <dependencies> + <dependency> + <groupId>org.springframework.data</groupId> + <artifactId>spring-data-redis</artifactId> + <version>2.7.18</version> + </dependency> + <dependency> + <groupId>io.lettuce</groupId> + <artifactId>lettuce-core</artifactId> + <version>6.3.0.RELEASE</version> + </dependency> <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>core-lib-gc</artifactId> diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java index 24673cf6985135590b59eee2561420f636dcc947..a5d0155a2eee851189ee7736a9df35301b646749 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java @@ -17,34 +17,70 @@ package org.opengroup.osdu.notification.provider.gcp.config; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; import lombok.RequiredArgsConstructor; import org.opengroup.osdu.core.common.cache.ICache; -import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.cache.VmCache; +import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.partition.PartitionInfo; -import org.opengroup.osdu.core.gcp.cache.RedisCacheBuilder; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration.LettuceClientConfigurationBuilder; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @RequiredArgsConstructor public class CacheConfig { - private final RedisCacheBuilder<String, ExternalSubscriptions> builder; - - @Bean - public IRedisCache<String, ExternalSubscriptions> subscriptionCache(RedisProperties properties) { - return builder.buildRedisCache( - properties.getRedisHost(), - properties.getRedisPort(), - properties.getRedisPassword(), - properties.getRedisExpiration(), - properties.getRedisWithSsl(), - String.class, - ExternalSubscriptions.class - ); + @Bean + public RedisTemplate<String, Subscription> setsTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate<String, Subscription> template = new RedisTemplate<>(); + Jackson2JsonRedisSerializer<Subscription> jsonRedisSerializer = new Jackson2JsonRedisSerializer<>( + Subscription.class); + ObjectMapper mapper = new ObjectMapper(); + jsonRedisSerializer.setObjectMapper(mapper); + template.setValueSerializer(jsonRedisSerializer); + template.setKeySerializer(new StringRedisSerializer()); + template.setHashKeySerializer(new StringRedisSerializer()); + template.setConnectionFactory(redisConnectionFactory); + return template; + } + + @Bean + public RedisConnectionFactory redisConnectionFactory(RedisConfiguration redisConfiguration, + RedisProperties redisProperties) { + LettuceClientConfigurationBuilder configurationBuilder = LettuceClientConfiguration.builder(); + + if (Boolean.TRUE.equals(redisProperties.getRedisWithSsl())) { + configurationBuilder.useSsl(); } + configurationBuilder.commandTimeout(redisProperties.getTimeOut()); + LettuceClientConfiguration lettuceClientConfiguration = configurationBuilder.build(); + + return new LettuceConnectionFactory( + redisConfiguration, + lettuceClientConfiguration + ); + } + + @Bean + public RedisConfiguration redisConfiguration(RedisProperties redisProperties) { + RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(); + redisStandaloneConfiguration.setHostName(redisProperties.getRedisHost()); + redisStandaloneConfiguration.setPort(redisProperties.getRedisPort()); + if (Objects.nonNull(redisProperties.getRedisPassword()) && !redisProperties.getRedisPassword().isEmpty()) { + redisStandaloneConfiguration.setPassword(redisProperties.getRedisPassword()); + } + return redisStandaloneConfiguration; + } @Bean public ICache<String, PartitionInfo> partitionInfoCache() { diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java deleted file mode 100644 index 7889a433389510dcace6193903cd621648e8335a..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ExternalSubscriptionsManager.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2020-2023 Google LLC - * Copyright 2020-2023 EPAM Systems, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.notification.provider.gcp.config; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.cache.IRedisCache; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.notification.Subscription; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -@Slf4j -public class ExternalSubscriptionsManager { - - private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; - private final OqmSubscriptionHandler subscriptionHandler; - - public ExternalSubscriptions getExternalSubscriptions(String dataPartitionId) { - if (subscriptionInfoCache.get(dataPartitionId) == null) { - log.debug("Subscription info cache wasn't found for tenant {}", dataPartitionId); - reloadSubscriptionInfoCache(dataPartitionId); - } - return subscriptionInfoCache.get(dataPartitionId); - } - - public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) { - List<Subscription> cachedInfos = Optional.ofNullable(getExternalSubscriptions(dataPartitionId)) - .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) - .getSubscriptions(); - - Subscription subscription = getFilteredSubscription(cachedInfos, dataPartitionId, - subscriptionId, serviceTopic); - - if (Objects.nonNull(subscription) && Objects.isNull(subscription.getSecret())) { - return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos); - } - return subscription; - } - - private Subscription getFilteredSubscription(List<Subscription> subscriptions, String dataPartitionId, - String subscriptionId, String serviceTopic) { - List<Subscription> filteredInfos = filterSubscriptionInfosByTopic(subscriptions, subscriptionId, serviceTopic); - if (filteredInfos.isEmpty()) { - return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, subscriptions); - } else { - log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredInfos.size()); - return filteredInfos.get(0); - } - } - - public void updateExternalSubscriptionsCache(String dataPartitionId, ExternalSubscriptions externalSubscriptions) { - subscriptionInfoCache.put(dataPartitionId, externalSubscriptions); - log.debug("Subscription info cache was updated for tenant {} with {} values", dataPartitionId, externalSubscriptions.getSubscriptions().size()); - } - - /** - * Get all subscription infos from Register service and enrich each of them with secret by additional request - * - * @param dataPartitionId partition id - */ - private void reloadSubscriptionInfoCache(String dataPartitionId) { - List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(dataPartitionId); - List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream() - .map(subscription -> getFilteredSubscription(subscriptionInfos, dataPartitionId, subscription.getNotificationId(), subscription.getTopic())) - .collect(Collectors.toList()); - subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build()); - log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size()); - } - - private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedInfos) { - List<Subscription> freshInfos = subscriptionHandler.getSubscriptionsById(dataPartitionId, subscriptionId); - if (freshInfos.isEmpty()) { - log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); - return null; - } - - List<Subscription> filteredFreshInfos = filterSubscriptionInfosByTopic(freshInfos, subscriptionId, serviceTopic); - if (filteredFreshInfos.isEmpty()) { - log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); - return null; - } - cachedInfos.addAll(filteredFreshInfos); - updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedInfos).build()); - - log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size()); - return filteredFreshInfos.get(0); - } - - private static List<Subscription> filterSubscriptionInfosByTopic(List<Subscription> infos, String subscriptionId, String serviceTopic) { - return infos.stream() - .filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId())) - .collect(Collectors.toList()); - } -} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java index fb7ae89786697d495c7c6804c7cc35758e6104ba..6ebea2825113674d495ca105285859e51924a066 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java @@ -1,6 +1,7 @@ package org.opengroup.osdu.notification.provider.gcp.config; +import java.time.Duration; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -16,4 +17,5 @@ public class RedisProperties { private String redisPassword; private Integer redisExpiration = 300; private Boolean redisWithSsl = false; + private Duration timeOut = Duration.ofSeconds(30L); } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java index ddaedeefefbffc19e7333f1e5f19f2d3f4a07c31..cdd935b2ac06d1a11a886c71853c2c0b39e74956 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/MessageBrokerProvider.java @@ -8,6 +8,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; @@ -24,7 +25,7 @@ import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberM public class MessageBrokerProvider { private final OqmDriver driver; - private final OqmSubscriptionHandler subscriptionHandler; + private final RegisterSubscriptionService subscriptionHandler; public void validateMessageBrokerConfiguration(TenantInfo tenantInfo) { List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId()); diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java similarity index 70% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java index a4ecd52967de78adb6bfddeea2c1c22d62a33fc4..ad4d33a25cfaffbe87a135b5e0b6f7b416464612 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationDeliveryService.java @@ -16,6 +16,7 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; +import java.util.Map; import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -23,24 +24,21 @@ import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.http.HttpClient; import org.opengroup.osdu.core.common.http.HttpRequest; import org.opengroup.osdu.core.common.http.HttpResponse; -import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.Secret; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.notification.auth.factory.AuthFactory; import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; -import java.util.Map; - @Component @ConditionalOnProperty(name = "oqmDriver") @RequiredArgsConstructor @Slf4j -public class OqmNotificationHandler { +public class OqmNotificationDeliveryService { private final OqmConfigurationProperties oqmConfigurationProperties; private final HttpClient httpClient; @@ -53,17 +51,27 @@ public class OqmNotificationHandler { String correlationId = headerAttributes.get(DpsHeaders.CORRELATION_ID); if (serviceTopic == null || dataPartitionId == null || correlationId == null) { - log.warn("Service topic: {}, dataPartitionId: {}, correlationId: {}.", serviceTopic, dataPartitionId, correlationId); - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes"); + log.warn("Missing vital properties, event cannot be delivered. Service topic: {}, dataPartitionId: {}, correlationId: {}.", serviceTopic, dataPartitionId, correlationId); + return getStubResponse(); } - Subscription subscription = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic); + Subscription subscription = externalSubscriptionsManager.getSubscription( + dataPartitionId, + serviceTopic, + subscriptionId + ); + if (Objects.isNull(subscription)) { - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, - "Subscriber config not found.", - String.format("Subscriber with id: %s not found in cache", subscriptionId) - ); + log.warn("Warning! The event was replicated previously but there is no configuration for subId: {}, skipping the event.", subscriptionId); + return getStubResponse(); + } + + if(!isValidSub(subscription)){ + externalSubscriptionsManager.removeSubscription(dataPartitionId, serviceTopic, subscriptionId); + log.warn("Warning! Subscriber configuration is not valid: {}, skipping the event. Evicting sub from cache.", subscriptionId); + return getStubResponse(); } + Secret secret = subscription.getSecret(); SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType()); @@ -85,4 +93,17 @@ public class OqmNotificationHandler { log.debug("Notification handler | Sending notification to Sub ID: `{}` Endpoint: `{}`.", subscriptionId, subscription.getPushEndpoint()); return response; } + + private boolean isValidSub(Subscription subscription) { + boolean endpointIsValid = Objects.nonNull(subscription.getPushEndpoint()) && !subscription.getPushEndpoint() + .isEmpty(); + boolean secretIsValid = Objects.nonNull(subscription.getSecret()); + return endpointIsValid && secretIsValid; + } + + private static HttpResponse getStubResponse() { + HttpResponse response = new HttpResponse(); + response.setResponseCode(HttpStatus.SC_OK); + return response; + } } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index f7908a74217dad86bc9da0680751d3ca8b87f569..2c51db098d44a298f00e6177acee0d93f1f2d505 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -23,10 +23,11 @@ import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmReplicatedEventReceiver; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -51,10 +52,10 @@ public class OqmSubscriberManager { public static final String SERVICE_SUFFIX = "-service"; public static final String PUBLISH_SUFFIX = "-publish"; public static final String CONTROL_SUBSCRIPTION = NOTIFICATION_PREFIX + "control-sub"; - private final OqmSubscriptionHandler subscriptionHandler; + private final RegisterSubscriptionService subscriptionHandler; private final ITenantFactory tenantInfoFactory; private final OqmDriver driver; - private final OqmNotificationHandler notificationHandler; + private final OqmNotificationDeliveryService notificationHandler; private final MessageBrokerProvider messageBrokerProvider; private final ExternalSubscriptionsManager externalSubscriptionsManager; @@ -73,7 +74,7 @@ public class OqmSubscriberManager { */ void provisionSubscriptionInfoCache() { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { - externalSubscriptionsManager.getExternalSubscriptions(tenantInfo.getDataPartitionId()); + externalSubscriptionsManager.invokeTenantSubscribers(tenantInfo.getDataPartitionId()); } } @@ -104,7 +105,7 @@ public class OqmSubscriberManager { private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(controlTopicSubscription) - .messageReceiver(new OqmControlTopicReceiver(externalSubscriptionsManager)) + .messageReceiver(new OqmConfigurationEventReceiver(externalSubscriptionsManager)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } @@ -119,7 +120,7 @@ public class OqmSubscriberManager { void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(serviceSubscription) - .messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, externalSubscriptionsManager)) + .messageReceiver(new OqmEventReplicator(serviceSubscription, driver, externalSubscriptionsManager)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } @@ -135,7 +136,7 @@ public class OqmSubscriberManager { private void registerPublishTopicSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(subscription) - .messageReceiver(new OqmPublishTopicReceiver(subscription, notificationHandler)) + .messageReceiver(new OqmReplicatedEventReceiver(subscription, notificationHandler)) .build(); driver.subscribe(subscriber, getDestination(tenantInfo)); } diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..aee8d2e7a9c1cadd44efa5078c028796eeec5500 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmConfigurationEventReceiver.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; + +@Slf4j +public class OqmConfigurationEventReceiver implements OqmMessageReceiver { + + private static final String SUBSCRIPTION_CREATED = "Subscription Created"; + private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; + private static final String SUBSCRIPTION_DELETED = "Subscription Deleted"; + + private final ExternalSubscriptionsManager externalSubscriptionsManager; + + public OqmConfigurationEventReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) { + this.externalSubscriptionsManager = externalSubscriptionsManager; + } + + @Override + public void receiveMessage(OqmMessage message, OqmAckReplier replier) { + try { + handleMessage(message); + } catch (Exception e) { + log.error("OQM | Control topic | Message not acknowledged by client", e); + replier.nack(); + } finally { + ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); + } + log.debug("OQM | Control topic | Message acknowledged by client"); + replier.ack(); + } + + private void handleMessage(OqmMessage oqmMessage) { + String pubsubMessage = oqmMessage.getData(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + String subscriptionId = headerAttributes.get("subscription-id"); + String serviceTopic = headerAttributes.get("topic"); + String dataPartitionId = headerAttributes.get("data-partition-id"); + log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", pubsubMessage, serviceTopic, subscriptionId); + + switch (pubsubMessage) { + case SUBSCRIPTION_CREATED, SUBSCRIPTION_UPDATED -> { + externalSubscriptionsManager.updateSubscription(dataPartitionId, subscriptionId); + log.debug( + "OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", + subscriptionId, serviceTopic); + } + case SUBSCRIPTION_DELETED -> { + boolean deleted = externalSubscriptionsManager.removeSubscription(dataPartitionId, + serviceTopic, subscriptionId); + if (deleted) { + log.debug( + "OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", + subscriptionId, serviceTopic); + } else { + log.warn( + "OQM | Control topic requested to delete subscription with id: `{}`, but it was not found in cache.", + subscriptionId); + } + } + default -> log.warn( + "Not supported operation! OQM | Control topic | Received unsupported control topic operation: {}", + pubsubMessage); + } + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java deleted file mode 100644 index 45891a9ae478f29cbd952399909bdb2e4bff0761..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2020-2023 Google LLC - * Copyright 2020-2023 EPAM Systems, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.notification.Subscription; -import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; -import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; -import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; - -@Slf4j -public class OqmControlTopicReceiver implements OqmMessageReceiver { - - private static final String SUBSCRIPTION_CREATED = "Subscription Created"; - private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; - private static final String SUBSCRIPTION_DELETED = "Subscription Deleted"; - - private final ExternalSubscriptionsManager externalSubscriptionsManager; - - public OqmControlTopicReceiver(ExternalSubscriptionsManager externalSubscriptionsManager) { - this.externalSubscriptionsManager = externalSubscriptionsManager; - } - - @Override - public void receiveMessage(OqmMessage message, OqmAckReplier replier) { - try { - handleMessage(message); - } catch (Exception e) { - log.error("OQM | Control topic | Message not acknowledged by client", e); - replier.nack(); - } finally { - ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); - } - - log.debug("OQM | Control topic | Message acknowledged by client"); - replier.ack(); - } - - private void handleMessage(OqmMessage oqmMessage) { - String pubsubMessage = oqmMessage.getData(); - Map<String, String> headerAttributes = oqmMessage.getAttributes(); - String subscriptionId = headerAttributes.get("subscription-id"); - String serviceTopic = headerAttributes.get("topic"); - String dataPartitionId = headerAttributes.get("data-partition-id"); - log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", - pubsubMessage, serviceTopic, subscriptionId); - - List<Subscription> subscriptionInfos = Optional.ofNullable( - externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId)) - .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) - .getSubscriptions(); - Optional<Subscription> cachedInfo = subscriptionInfos.stream() - .filter(info -> subscriptionId.equals(info.getNotificationId()) && serviceTopic.equals(info.getTopic())) - .findFirst(); - - switch (pubsubMessage) { - case SUBSCRIPTION_CREATED: - case SUBSCRIPTION_UPDATED: - cachedInfo.ifPresent(subscriptionInfos::remove); - Subscription freshSubscriptionInfo = externalSubscriptionsManager.getSubscription(dataPartitionId, subscriptionId, serviceTopic); - if (freshSubscriptionInfo != null) { - subscriptionInfos.add(freshSubscriptionInfo); - externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); - log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic); - } - break; - case SUBSCRIPTION_DELETED: - if (cachedInfo.isPresent()) { - subscriptionInfos.remove(cachedInfo.get()); - externalSubscriptionsManager.updateExternalSubscriptionsCache(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); - log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic); - } else { - log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId); - } - break; - default: - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Not supported", "OQM | Control topic | Received unsupported control topic operation: " + pubsubMessage); - } - } -} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java similarity index 78% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java index 770bc57880fb6b4e88d5e521287447617172b52b..9b98e761bd07773f3917d5502420b4cdf73a6712 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmEventReplicator.java @@ -17,31 +17,35 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.notification.Subscription; -import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; -import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; +import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberManager.PUBLISH_SUFFIX; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; -import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberManager.*; - +/** + * Used for event replication among existing subscribers in the Register service. + */ @Slf4j -public class OqmServiceTopicReceiver implements OqmMessageReceiver { +public class OqmEventReplicator implements OqmMessageReceiver { private final OqmSubscription subscription; private final OqmDriver driver; private final ExternalSubscriptionsManager externalSubscriptionsManager; - public OqmServiceTopicReceiver(OqmSubscription subscription, + public OqmEventReplicator(OqmSubscription subscription, OqmDriver driver, ExternalSubscriptionsManager externalSubscriptionsManager) { this.subscription = subscription; @@ -73,7 +77,8 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { .name(serviceTopic + PUBLISH_SUFFIX) .build(); - List<Subscription> cachedSubscriptionInfos = getCachedSubscriptionInfosByTopic(dataPartitionId, serviceTopic); + Set<Subscription> cachedSubscriptionInfos = externalSubscriptionsManager.getSubscriptionsForTopic( + dataPartitionId, serviceTopic); for (Subscription info : cachedSubscriptionInfos) { Map<String, String> attributes = new HashMap<>(refineAttributes(message)); attributes.put("subscription-id", info.getNotificationId()); @@ -98,12 +103,4 @@ public class OqmServiceTopicReceiver implements OqmMessageReceiver { log.debug("Message attributes for message id: `{}` were refined: `{}`", message.getId(), refinedAttributes); return refinedAttributes; } - - private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) { - return Optional.ofNullable(externalSubscriptionsManager.getExternalSubscriptions(dataPartitionId)) - .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) - .getSubscriptions().stream() - .filter(info -> serviceTopic.equals(info.getTopic())) - .collect(Collectors.toList()); - } } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmReplicatedEventReceiver.java similarity index 83% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmReplicatedEventReceiver.java index 49f985eb1a9e9ab5bd55515a154a212121854ab7..63115eb465e437b9d2c013aeeb4d9b912d70f208 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmReplicatedEventReceiver.java @@ -25,18 +25,18 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; -import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmNotificationHandler; +import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmNotificationDeliveryService; import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; @Slf4j -public class OqmPublishTopicReceiver implements OqmMessageReceiver { +public class OqmReplicatedEventReceiver implements OqmMessageReceiver { private final OqmSubscription subscription; - private final OqmNotificationHandler notificationHandler; + private final OqmNotificationDeliveryService notificationDeliveryService; - public OqmPublishTopicReceiver(OqmSubscription subscription, OqmNotificationHandler notificationHandler) { + public OqmReplicatedEventReceiver(OqmSubscription subscription, OqmNotificationDeliveryService notificationDeliveryService) { this.subscription = subscription; - this.notificationHandler = notificationHandler; + this.notificationDeliveryService = notificationDeliveryService; } @Override @@ -62,7 +62,7 @@ public class OqmPublishTopicReceiver implements OqmMessageReceiver { Map<String, String> headerAttributes = oqmMessage.getAttributes(); String subscriptionId = headerAttributes.get("subscription-id"); - HttpResponse response = notificationHandler.notifySubscriber(subscriptionId, pubsubMessage, headerAttributes); + HttpResponse response = notificationDeliveryService.notifySubscriber(subscriptionId, pubsubMessage, headerAttributes); if (!response.isSuccessCode()) { throw new AppException(response.getResponseCode(), "Notification failed", response.getBody()); diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java new file mode 100644 index 0000000000000000000000000000000000000000..d9788610e9713b0edb26b8b936dba824a23bec47 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/repo/SubscriptionCacheRepo.java @@ -0,0 +1,132 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.repo; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.gcp.config.RedisProperties; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SubscriptionCacheRepo { + + public static final String POISON = ":poison"; + private final RedisProperties redisProperties; + private final RedisTemplate<String, Subscription> redisTemplate; + + public void writeSubscription(String dataPartitionId, Subscription subscription) { + String subscriberCacheKey = getSubscriberCacheKey( + dataPartitionId, + subscription.getTopic(), + subscription.getNotificationId() + ); + redisTemplate.opsForValue().set( + subscriberCacheKey, + subscription + ); + } + + public Subscription readSubscription(String dataPartitionId, String topic, + String subscriptionId) { + String subscriberCacheKey = getSubscriberCacheKey( + dataPartitionId, + topic, + subscriptionId + ); + return redisTemplate.opsForValue().get(subscriberCacheKey); + } + + /** + * To avoid Register service spamming, if a subscriber was deleted, but replicated events are + * still being processed. + * + * @param dataPartitionId + * @param topic + * @param subscriptionId + */ + public void poisonSub(String dataPartitionId, String topic, String subscriptionId) { + String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON; + + redisTemplate.opsForValue().set( + subPoisonKey, + new Subscription(), + redisProperties.getRedisExpiration(), + TimeUnit.SECONDS + ); + } + + /** + * Used to check if subscribers were not found in Register. + * + * @param dataPartitionId + * @param topic + * @param subscriptionId + * @return empty Subscription + */ + public Subscription checkIfSubIsPoisoned(String dataPartitionId, String topic, + String subscriptionId) { + String subPoisonKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId) + POISON; + return redisTemplate.opsForValue().get(subPoisonKey); + } + + public boolean deleteSubscription(String dataPartitionId, String topic, String subscriptionId) { + String subscriberCacheKey = getSubscriberCacheKey(dataPartitionId, topic, subscriptionId); + Boolean delete = redisTemplate.delete(subscriberCacheKey); + return Boolean.TRUE.equals(delete); + } + + public Set<Subscription> getDataPartitionSubscribers(String dataPartitionId) { + return getSubscriptions(dataPartitionId + ":*"); + } + + public Set<Subscription> getTopicSubscribers(String dataPartitionId, String topic) { + return getSubscriptions(dataPartitionId + ":" + topic + ":*"); + } + + private HashSet<Subscription> getSubscriptions(String pattern) { + ScanOptions options = ScanOptions.scanOptions() + .match(pattern) + .build(); + + HashSet<String> keys = new HashSet<>(); + + try (Cursor<String> cursor = redisTemplate.scan(options)) { + while (cursor.hasNext()) { + keys.add(cursor.next()); + } + } + + return new HashSet<>(Optional.ofNullable(redisTemplate.opsForValue().multiGet(keys)) + .orElse(Collections.emptyList())); + } + + private static String getSubscriberCacheKey(String dataPartitionId, String topic, String subId) { + return dataPartitionId + ":" + topic + ":" + subId; + } +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java new file mode 100644 index 0000000000000000000000000000000000000000..2be560afb9bb9df5a9184ae4156bb0f2795f76da --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManager.java @@ -0,0 +1,102 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.service; + +import java.util.List; +import java.util.Objects; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ExternalSubscriptionsManager { + + private final SubscriptionCacheRepo subscriptionCacheRepo; + private final RegisterSubscriptionService subscriptionService; + + public void invokeTenantSubscribers(String dataPartitionId) { + Set<Subscription> members = subscriptionCacheRepo.getDataPartitionSubscribers(dataPartitionId); + if (Objects.isNull(members) || members.isEmpty()) { + log.debug("Subscription info cache wasn't found for tenant {}", dataPartitionId); + reloadSubscriptionInfoCache(dataPartitionId); + } + } + + public Set<Subscription> getSubscriptionsForTopic(String dataPartitionId, String topicName) { + return subscriptionCacheRepo.getTopicSubscribers(dataPartitionId, topicName); + } + + public Subscription getSubscription(String dataPartitionId, String topic, String subscriptionId) { + Subscription subscription = subscriptionCacheRepo.readSubscription(dataPartitionId, topic, subscriptionId); + if (subscription != null) { + return subscription; + } + + log.debug("Subscription: {} not in cache, checking if it available in Register.", subscriptionId); + subscription = subscriptionCacheRepo.checkIfSubIsPoisoned(dataPartitionId, topic, subscriptionId); + if (subscription != null) { + log.debug("Subscription:{} marked as not available in cache.", subscriptionId); + return null; + } + return retrieveSubscriptionFromHandler(dataPartitionId, subscriptionId, topic); + } + + private Subscription retrieveSubscriptionFromHandler(String dataPartitionId, String subscriptionId, String topic) { + Subscription subscription = subscriptionService.getSubscriptionsByNotificationId(dataPartitionId,subscriptionId); + if (subscription == null) { + log.debug("Subscription:{} not available in Register. Marking it as not available in cache.", subscriptionId); + subscriptionCacheRepo.poisonSub(dataPartitionId, topic, subscriptionId); + return null; + } + subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + return subscription; + } + + public boolean removeSubscription(String dataPartitionId, String topic, String subscriptionId) { + return subscriptionCacheRepo.deleteSubscription(dataPartitionId, topic, subscriptionId); + } + + public void updateSubscription(String dataPartitionId, String subscriptionId) { + Subscription subscription = subscriptionService.getSubscriptionsByNotificationId( + dataPartitionId, + subscriptionId + ); + if(Objects.isNull(subscription)){ + log.warn("Cannot update sub config with id:{} in the cache. Skipping it.", subscriptionId); + }else { + subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + } + } + + private void reloadSubscriptionInfoCache(String dataPartitionId) { + List<Subscription> fragmentarySubInfos = subscriptionService.getAllSubscriptionInfos(dataPartitionId); + for (Subscription freagmentedSubscription : fragmentarySubInfos) { + Subscription subscription = subscriptionService.getSubscriptionsByNotificationId(dataPartitionId, freagmentedSubscription.getNotificationId()); + if(Objects.isNull(subscription)){ + log.warn("Cannot init cache for sub id:{}. Empty response from Register. Skipping it.", freagmentedSubscription.getNotificationId()); + }else { + subscriptionCacheRepo.writeSubscription(dataPartitionId, subscription); + } + } + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java similarity index 87% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java index fa45de0977e07c0c27b45a16d9499df5b8a32270..b41976eec34a1d8d5f1268450727d1c8ca8fd966 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/RegisterSubscriptionService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub; +package org.opengroup.osdu.notification.provider.gcp.service; import java.util.List; import java.util.stream.Collectors; @@ -43,7 +43,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor @ConditionalOnProperty(name = "oqmDriver") @Slf4j -public class OqmSubscriptionHandler { +public class RegisterSubscriptionService { private final ISubscriptionFactory registerClientFactory; private final DpsHeadersProvider dpsHeadersProvider; @@ -52,7 +52,6 @@ public class OqmSubscriptionHandler { public List<OqmTopic> getTopics(String dataPartitionId) { DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); ISubscriptionService registerClient = registerClientFactory.create(headers); - try { return registerClient.getTopics() .stream() @@ -75,11 +74,17 @@ public class OqmSubscriptionHandler { } } - public List<Subscription> getSubscriptionsById(String dataPartitionId, String subscriptionId) { + public Subscription getSubscriptionsByNotificationId(String dataPartitionId, String subscriptionId) { try { DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); ISubscriptionService registerClient = registerClientFactory.create(headers); - return registerClient.query(subscriptionId); + List<Subscription> subscriptions = registerClient.query(subscriptionId); + if(subscriptions.isEmpty()){ + log.warn("Empty response from Register for subId: {}, config fetch not possible.", subscriptionId); + return null; + }else { + return subscriptions.get(0); + } } catch (SubscriptionException se) { throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se); } diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java index ae1914887178455e7aa0bf7f078bba1b006fb8d9..a89b06e7b714a2f35d72569ec928dcf71db0d63f 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java @@ -45,7 +45,7 @@ public class SubscriptionServiceGc { if (response.isSuccessCode()) { try { ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(response.getBody(), new TypeReference<List<Subscription>>(){}); + return objectMapper.readValue(response.getBody(), new TypeReference<>() {}); } catch (IOException ex) { throw new SubscriptionException("Exception in deserializing response", response, ex); } diff --git a/provider/notification-gc/src/main/resources/application.properties b/provider/notification-gc/src/main/resources/application.properties index 8cc65690eda213d5ae3b56e1406d62de56f168b1..4c65e44ecab9c8cdd40670a557ef02011000f0dd 100644 --- a/provider/notification-gc/src/main/resources/application.properties +++ b/provider/notification-gc/src/main/resources/application.properties @@ -43,10 +43,12 @@ partition-auth-enabled=true oqmDriver=pubsub # Redis config -redis-host=${REDIS_USER_INFO_HOST:127.0.0.1} -redis-port=${REDIS_USER_INFO_PORT:6379} -redis-password=${REDIS_USER_INFO_PASSWORD:} -redis-with-ssl=${REDIS_USER_INFO_WITH_SSL:false} +redis-host=${REDIS_HOST:127.0.0.1} +redis-port=${REDIS_PORT:6379} +redis-password=${REDIS_PASSWORD:} +redis-with-ssl=${REDIS_WITH_SSL:false} cache.codec=jackson -propertyResolver.strategy=partition \ No newline at end of file +propertyResolver.strategy=partition + +spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java similarity index 53% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java index 40a17281950a2e2d4949a6d27dcbbe0a91ba1386..81357498098df662d7a455a14ffd368dfc082473 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmConfigurationEventReceiverTest.java @@ -20,11 +20,9 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.List; -import java.util.stream.Stream; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -33,70 +31,58 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmConfigurationEventReceiver; @RunWith(MockitoJUnitRunner.class) -public class OqmControlTopicReceiverTest { +public class OqmConfigurationEventReceiverTest { @InjectMocks - private OqmControlTopicReceiver sut; + private OqmConfigurationEventReceiver sut; @Mock private OqmAckReplier replier; @Mock private Subscription subscription; @Mock - private ExternalSubscriptions externalSubscriptions; - @Mock private List<Subscription> subscriptionInfos; @Mock private ExternalSubscriptionsManager externalSubscriptionsManager; + @Mock + private SubscriptionCacheRepo subscriptionCacheRepo; @Test public void testReceiveMessageSubscriptionCreated() { - when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription); - when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); - when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); sut.receiveMessage(createMessage, replier); - verify(subscriptionInfos, times(1)).add(subscription); - verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); - verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); + verify(externalSubscriptionsManager, times(1)).updateSubscription(any(), any()); verify(replier, times(1)).ack(); } @Test public void testReceiveMessageSubscriptionUpdated() { - when(externalSubscriptionsManager.getSubscription(any(), any(), any())).thenReturn(subscription); - when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); - when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build(); sut.receiveMessage(updatedMessage, replier); - verify(subscriptionInfos, times(1)).add(subscription); - verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); - verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); + verify(externalSubscriptionsManager, times(1)).updateSubscription(any(), any()); verify(replier, times(1)).ack(); } @Test public void testReceiveMessageSubscriptionDeleted() { - when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); - when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); - when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription)); - when(subscription.getNotificationId()).thenReturn("4"); - when(subscription.getTopic()).thenReturn("topic1"); + String partitionId = "test"; + String topic = "topic1"; + String subId = "4"; + OqmMessage deletedMessage = OqmMessage.builder() .data("Subscription Deleted") - .attributes(new HashMap<String, String>() {{ - put("subscription-id", "4"); - put("topic", "topic1"); + .attributes(new HashMap<>() {{ + put("subscription-id", subId); + put("topic", topic); + put("data-partition-id", partitionId); }}) .build(); sut.receiveMessage(deletedMessage, replier); - verify(subscriptionInfos, times(1)).remove(subscription); - verify(externalSubscriptionsManager, times(1)).getExternalSubscriptions(any()); - verify(externalSubscriptionsManager, times(1)).updateExternalSubscriptionsCache(any(), any()); + verify(externalSubscriptionsManager, times(1)).removeSubscription(partitionId, topic, subId); verify(replier, times(1)).ack(); } @@ -104,6 +90,6 @@ public class OqmControlTopicReceiverTest { public void testReceiveMessageUnsupported() { OqmMessage unsupportedMessage = OqmMessage.builder().data("Unsupported").attributes(new HashMap<>()).build(); sut.receiveMessage(unsupportedMessage, replier); - verify(replier, times(1)).nack(); + verify(replier, times(1)).ack(); } } \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java similarity index 72% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java index c859a72eadd57475ce24eafb35b03941a9a12254..776c169b3459bea532e85a9f5c026de4d43f8149 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmEventReplicatorTest.java @@ -17,33 +17,34 @@ package org.opengroup.osdu.notification.provider.gcp.pubsub; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.opengroup.osdu.core.common.cache.IRedisCache; import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmEventReplicator; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) -public class OqmServiceTopicReceiverTest { +public class OqmEventReplicatorTest { + public static final String TEST_TENANT = "tenant1"; + public static final String TEST_TOPIC = "topic1"; @InjectMocks - private OqmServiceTopicReceiver sut; + private OqmEventReplicator sut; @Mock private OqmAckReplier replier; @Mock @@ -51,31 +52,31 @@ public class OqmServiceTopicReceiverTest { @Mock private OqmSubscription oqmSubscription; @Mock - private ExternalSubscriptions externalSubscriptions; - @Mock private ExternalSubscriptionsManager externalSubscriptionsManager; - private List<Subscription> subscriptionInfos; + private Set<Subscription> subscriptionInfos; private OqmTopic topic; @Before public void setUp() { - topic = OqmTopic.builder().name("topic1").build(); + topic = OqmTopic.builder().name(TEST_TOPIC).build(); Subscription subscription1 = new Subscription(); subscription1.setTopic(topic.getName()); + subscription1.setId("1"); Subscription subscription2 = new Subscription(); subscription2.setTopic(topic.getName()); + subscription2.setId("2"); Subscription subscription3 = new Subscription(); subscription3.setTopic(topic.getName()); - subscriptionInfos = Arrays.asList(subscription1, subscription2, subscription3); + subscription3.setId("3"); + subscriptionInfos = Stream.of(subscription1, subscription2, subscription3).collect(Collectors.toSet()); } @Test public void testReceiveMessageSubscriptionCreated() { when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic)); - when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); - when(externalSubscriptionsManager.getExternalSubscriptions(any())).thenReturn(externalSubscriptions); + when(externalSubscriptionsManager.getSubscriptionsForTopic(TEST_TENANT, TEST_TOPIC)).thenReturn(subscriptionInfos); OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{ - put("data-partition-id", "tenant1"); + put("data-partition-id", TEST_TENANT); }}).build(); sut.receiveMessage(message, replier); verify(driver, times(3)).publish(any(OqmMessage.class), any(), any()); diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmReplicatedEventReceiverTest.java similarity index 94% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmReplicatedEventReceiverTest.java index dc1a08318c45c9ad366073fa9f1b9088dd6f0dcf..c5cf438c024581fc9cbc33b51c6a81448b3b3df4 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmReplicatedEventReceiverTest.java @@ -27,7 +27,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmReplicatedEventReceiver; import java.util.Collections; import java.util.HashMap; @@ -37,14 +37,14 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) -public class OqmPublishTopicReceiverTest { +public class OqmReplicatedEventReceiverTest { @InjectMocks - private OqmPublishTopicReceiver sut; + private OqmReplicatedEventReceiver sut; @Mock private OqmAckReplier replier; @Mock - private OqmNotificationHandler notificationHandler; + private OqmNotificationDeliveryService notificationHandler; @Mock private HttpResponse httpResponse; @Mock diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java index a3d33bdddb5acff9f729db552dcaaa7cb00c2bdb..025bad77c97f4e63805df4c5682f700c9af5e57a 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -22,20 +22,16 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import org.opengroup.osdu.core.common.cache.IRedisCache; -import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.notification.provider.gcp.config.ExternalSubscriptionsManager; -import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; -import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.service.ExternalSubscriptionsManager; import java.util.Arrays; import java.util.List; +import org.opengroup.osdu.notification.provider.gcp.service.RegisterSubscriptionService; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -52,7 +48,7 @@ public class OqmSubscriberManagerTest { @Mock private MessageBrokerProvider subscriptionProvider; @Mock - private OqmSubscriptionHandler subscriptionHandler; + private RegisterSubscriptionService subscriptionHandler; @Mock private ExternalSubscriptionsManager externalSubscriptionsManager; @@ -105,6 +101,6 @@ public class OqmSubscriberManagerTest { public void testProvisioningSubscriptionInfoCache() { when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); sut.provisionSubscriptionInfoCache(); - verify(externalSubscriptionsManager, times(3)).getExternalSubscriptions(any()); + verify(externalSubscriptionsManager, times(3)).invokeTenantSubscribers(any()); } } \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManagerTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManagerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0e0ac0b9702744cabcbd17eae75ce1a2d3ec4846 --- /dev/null +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/service/ExternalSubscriptionsManagerTest.java @@ -0,0 +1,168 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.service; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.gcp.repo.SubscriptionCacheRepo; + +@RunWith(MockitoJUnitRunner.class) +public class ExternalSubscriptionsManagerTest { + + public static final String TEST_TENANT = "test-tenant"; + private static final String TENANT_WO_SUBS = "no-sub-tenant"; + public static final String TEST_TOPIC = "topic"; + public static final String TEST_NOTIF_ID = "test-id"; + + @InjectMocks + private ExternalSubscriptionsManager manager; + @Mock + private SubscriptionCacheRepo mockSubCacheRepo; + @Mock + private RegisterSubscriptionService mockSubService; + + private Subscription subscription; + + private Set<Subscription> subscriptions; + + @Before + public void setUp() { + subscription = new Subscription(); + subscription.setNotificationId(TEST_NOTIF_ID); + subscription.setTopic(TEST_TOPIC); + + subscriptions = new HashSet<>() {{ + add(subscription); + }}; + } + + @Test + public void testInvokeTenantSubscribers_noActiveSubs() { + when(mockSubCacheRepo.getDataPartitionSubscribers(TENANT_WO_SUBS)).thenReturn( + Collections.emptySet()); + manager.invokeTenantSubscribers(TENANT_WO_SUBS); + verify(mockSubCacheRepo, times(0)).writeSubscription(anyString(), any()); + verify(mockSubService, times(1)).getAllSubscriptionInfos(TENANT_WO_SUBS); + } + + @Test + public void testInvokeTenantSubscribers_withActiveSubs() { + when(mockSubCacheRepo.getDataPartitionSubscribers(TEST_TENANT)).thenReturn(subscriptions); + manager.invokeTenantSubscribers(TEST_TENANT); + verify(mockSubService, times(0)).getAllSubscriptionInfos(TEST_TENANT); + } + + @Test + public void testInvokeTenantSubscribers_withActiveSubs_emptyCache() { + when(mockSubCacheRepo.getDataPartitionSubscribers(TEST_TENANT)).thenReturn(null); + when(mockSubService.getAllSubscriptionInfos(TEST_TENANT)).thenReturn(Collections.singletonList(subscription)); + when(mockSubService.getSubscriptionsByNotificationId(TEST_TENANT, TEST_NOTIF_ID)).thenReturn(subscription); + + manager.invokeTenantSubscribers(TEST_TENANT); + + verify(mockSubService, times(1)).getAllSubscriptionInfos(TEST_TENANT); + verify(mockSubCacheRepo, times(1)).writeSubscription(TEST_TENANT, subscription); + } + + @Test + public void testGetSubscriptionsForTopic() { + doReturn(subscriptions).when(mockSubCacheRepo).getTopicSubscribers(TEST_TENANT, TEST_TOPIC); + Set<Subscription> actualSubscriptions = manager.getSubscriptionsForTopic(TEST_TENANT, + TEST_TOPIC); + assertEquals(subscriptions, actualSubscriptions); + } + + @Test + public void testGetSubscription_inCache() { + when(mockSubCacheRepo.readSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID)).thenReturn( + subscription); + Subscription actualSub = manager.getSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + assertEquals(subscription, actualSub); + verify(mockSubService, times(0)).getSubscriptionsByNotificationId(anyString(), anyString()); + } + + @Test + public void testGetSubscription_notInCache_active() { + when(mockSubCacheRepo.readSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID)).thenReturn(null); + when(mockSubService.getSubscriptionsByNotificationId(TEST_TENANT, TEST_NOTIF_ID)).thenReturn( + subscription); + + Subscription actualSub = manager.getSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + assertEquals(subscription, actualSub); + verify(mockSubCacheRepo, times(1)).writeSubscription(TEST_TENANT, subscription); + } + + @Test + public void testGetSubscription_notInCache_notAvailableInRegister() { + when(mockSubCacheRepo.readSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID)).thenReturn(null); + when(mockSubService.getSubscriptionsByNotificationId(TEST_TENANT, TEST_NOTIF_ID)).thenReturn( + null); + + Subscription actualSub = manager.getSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + assertNull(actualSub); + verify(mockSubCacheRepo, times(1)).poisonSub(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + } + + @Test + public void testGetSubscription_InCache_Poisoned() { + when(mockSubCacheRepo.readSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID)).thenReturn(null); + when(mockSubCacheRepo.checkIfSubIsPoisoned(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID)).thenReturn( + new Subscription()); + + Subscription actualSub = manager.getSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + assertNull(actualSub); + verify(mockSubService, times(0)).getSubscriptionsByNotificationId(anyString(), anyString()); + } + + @Test + public void testRemoveSubscription() { + boolean b = manager.removeSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + verify(mockSubCacheRepo, times(1)).deleteSubscription(TEST_TENANT, TEST_TOPIC, TEST_NOTIF_ID); + } + + @Test + public void testUpdateSubscription_availableInRegister() { + when(mockSubService.getSubscriptionsByNotificationId(TEST_TENANT, TEST_NOTIF_ID)).thenReturn( + subscription); + manager.updateSubscription(TEST_TENANT, TEST_NOTIF_ID); + verify(mockSubCacheRepo, times(1)).writeSubscription(TEST_TENANT, subscription); + } + + @Test + public void testUpdateSubscription_notAvailableInRegister() { + manager.updateSubscription(TEST_TENANT, TEST_NOTIF_ID); + verify(mockSubCacheRepo, times(0)).writeSubscription(anyString(), any()); + } +} \ No newline at end of file diff --git a/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java new file mode 100644 index 0000000000000000000000000000000000000000..f5d8f4f2dfa8404a80beaf88a1bad21efc1c771d --- /dev/null +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java @@ -0,0 +1,149 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.api; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.opengroup.osdu.notification.util.Constants.GROUP_ID; + +import com.google.common.base.Strings; +import com.sun.jersey.api.client.ClientResponse; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.GsaSecret; +import org.opengroup.osdu.core.common.model.notification.GsaSecretValue; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.notification.ISubscriptionService; +import org.opengroup.osdu.core.common.notification.SubscriptionAPIConfig; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.common.notification.SubscriptionFactory; +import org.opengroup.osdu.notification.util.AnthosTestUtils; +import org.opengroup.osdu.notification.util.ServicesUtils; +import org.opengroup.osdu.notification.util.TestUtils; + +public class TestPushEndpointGsa { + public static final String REGISTER_BASE_URL = "REGISTER_BASE_URL"; + public static final String TOPIC_ID = "TOPIC_ID"; + public static final String INTEGRATION_AUDIENCE = "INTEGRATION_AUDIENCE"; + public static final String OSDU_TENANT = "OSDU_TENANT"; + public static final String STORAGE_HOST = "STORAGE_HOST"; + public static final String LEGAL_HOST = "LEGAL_HOST"; + public static final String DE_OPS_TESTER = "DE_OPS_TESTER"; + private String subscriptionId = null; + private String notificationId = null; + private ISubscriptionService subscriptionService; + private static SubscriptionFactory factory; + private TestUtils testUtils = new AnthosTestUtils(); + private final String suffix = String.valueOf(System.currentTimeMillis()); + private String baseRegisterUrl; + private String topic; + private String tenant; + private String integrationAudience; + private String storageHost; + private String legalHost; + private String groupId; + private static final String LEGAL_TAG_NAME = "notification-test-gsa"; + private ServicesUtils servicesUtils; + + @After + public void deleteResource() throws Exception { + subscriptionService.delete(subscriptionId); + servicesUtils.deleteStorageRecords(3, suffix); + servicesUtils.deleteLegalTag(LEGAL_TAG_NAME); + } + + @Before + public void createResource() throws Exception { + baseRegisterUrl = System.getProperty(REGISTER_BASE_URL, System.getenv(REGISTER_BASE_URL)); + topic = System.getProperty(TOPIC_ID, System.getenv(TOPIC_ID)); + integrationAudience = System.getProperty(INTEGRATION_AUDIENCE, System.getenv(INTEGRATION_AUDIENCE)); + tenant = System.getProperty(OSDU_TENANT, System.getenv(OSDU_TENANT)); + if (Strings.isNullOrEmpty(integrationAudience)) { + integrationAudience = tenant; + } + storageHost = System.getProperty(STORAGE_HOST, System.getenv(STORAGE_HOST)); + legalHost = System.getProperty(LEGAL_HOST, System.getenv(LEGAL_HOST)); + groupId = System.getProperty(GROUP_ID, System.getenv(GROUP_ID)); + servicesUtils = new ServicesUtils(storageHost, legalHost, testUtils, tenant, groupId); + servicesUtils.createLegalTag(LEGAL_TAG_NAME); + createResourceInPartition(tenant); + } + + @Test + public void testPushEndpoint() throws Exception { + servicesUtils.createStorageRecords(suffix, 3, LEGAL_TAG_NAME); + Thread.sleep(10000); + assertNotNull(subscriptionId); + assertNotNull(notificationId); + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, tenant); + ClientResponse clientResponse = testUtils.send(baseRegisterUrl, "/test-gc/state", "GET", testUtils.getOpsToken(), null, "", + headers, false); + Map<String, Number> response = new HashMap<>(); + response = testUtils.getResult(clientResponse, 200, response.getClass()); + assertNotNull(response); + assertTrue(response.containsKey(suffix)); + assertTrue(response.get(suffix).longValue() >= 3); + } + + private void createResourceInPartition(String partitionId) throws Exception { + + SubscriptionAPIConfig config = SubscriptionAPIConfig.builder().rootUrl(baseRegisterUrl).build(); + factory = new SubscriptionFactory(config); + + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getOpsToken()); + DpsHeaders dpsHeaders = DpsHeaders.createFromMap(headers); + subscriptionService = factory.create(dpsHeaders); + + Subscription subscription = new Subscription(); + subscription.setName("subscription-integration-test-gsa-" + suffix); + subscription.setDescription("subscription created for gsa integration test " + suffix); + subscription.setTopic(topic); + subscription.setPushEndpoint(getPushUrl()); + + Secret gsaSecret = new GsaSecret(); + GsaSecretValue gsaSecretValue = new GsaSecretValue(); + gsaSecretValue.setAudience(integrationAudience); + String opsTester = System.getProperty(DE_OPS_TESTER, System.getenv(DE_OPS_TESTER)); + gsaSecretValue.setKey(new String(Base64.getDecoder().decode(opsTester))); + gsaSecret.setSecretType("GSA"); + ((GsaSecret) gsaSecret).setValue(gsaSecretValue); + subscription.setSecret(gsaSecret); + try { + Subscription subscriptionCreated = subscriptionService.create(subscription); + notificationId = subscriptionCreated.getNotificationId(); + subscriptionId = subscriptionCreated.getId(); + } catch (SubscriptionException e) { + System.out.println("Subscription exception inner response : " + e.getHttpResponse()); + throw e; + } + } + + private String getPushUrl() { + return baseRegisterUrl + "/test-gc/gsa-challenge/" + suffix; + } + +} diff --git a/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java new file mode 100644 index 0000000000000000000000000000000000000000..0d5b0458a5f8cea6c2caa3071528251ead1aa005 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.api; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.opengroup.osdu.notification.util.Constants.GROUP_ID; + +import com.sun.jersey.api.client.ClientResponse; +import java.util.HashMap; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.notification.ISubscriptionService; +import org.opengroup.osdu.core.common.notification.SubscriptionAPIConfig; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.common.notification.SubscriptionFactory; +import org.opengroup.osdu.notification.util.AnthosTestUtils; +import org.opengroup.osdu.notification.util.ServicesUtils; +import org.opengroup.osdu.notification.util.TestUtils; + +public class TestPushEndpointHMAC { + public static final String REGISTER_BASE_URL = "REGISTER_BASE_URL"; + public static final String TOPIC_ID = "TOPIC_ID"; + public static final String HMAC_SECRET = "HMAC_SECRET"; + public static final String OSDU_TENANT = "OSDU_TENANT"; + public static final String STORAGE_HOST = "STORAGE_HOST"; + public static final String LEGAL_HOST = "LEGAL_HOST"; + private String subscriptionId = null; + private String notificationId = null; + private ISubscriptionService subscriptionService; + private static SubscriptionFactory factory; + private TestUtils testUtils = new AnthosTestUtils(); + private final String suffix = String.valueOf(System.currentTimeMillis()); + private String baseRegisterUrl; + private String topic; + private String hmacSecretValue; + private String tenant; + private String storageHost; + private String legalHost; + private String groupId; + private static final String LEGAL_TAG_NAME = "notification-test-hmac"; + private ServicesUtils servicesUtils; + + @After + public void deleteResource() throws Exception { + subscriptionService.delete(subscriptionId); + servicesUtils.deleteStorageRecords(3, suffix); + servicesUtils.deleteLegalTag(LEGAL_TAG_NAME); + } + + @Before + public void createResource() throws Exception { + baseRegisterUrl = System.getProperty(REGISTER_BASE_URL, System.getenv(REGISTER_BASE_URL)); + topic = System.getProperty(TOPIC_ID, System.getenv(TOPIC_ID)); + hmacSecretValue = System.getProperty(HMAC_SECRET, System.getenv(HMAC_SECRET)); + tenant = System.getProperty(OSDU_TENANT, System.getenv(OSDU_TENANT)); + storageHost = System.getProperty(STORAGE_HOST, System.getenv(STORAGE_HOST)); + legalHost = System.getProperty(LEGAL_HOST, System.getenv(LEGAL_HOST)); + groupId = System.getProperty(GROUP_ID, System.getenv(GROUP_ID)); + servicesUtils = new ServicesUtils(storageHost, legalHost, testUtils, tenant, groupId); + servicesUtils.createLegalTag(LEGAL_TAG_NAME); + createResourceInPartition(tenant); + } + + @Test + public void testPushEndpoint() throws Exception { + servicesUtils.createStorageRecords(suffix, 3, LEGAL_TAG_NAME); + Thread.sleep(10000); + assertNotNull(subscriptionId); + assertNotNull(notificationId); + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, tenant); + ClientResponse clientResponse = testUtils.send(baseRegisterUrl, "/test-gc/state", "GET", testUtils.getOpsToken(), null, "", + headers, false); + Map<String, Number> response = new HashMap<>(); + response = testUtils.getResult(clientResponse, 200, response.getClass()); + assertNotNull(response); + assertTrue(response.containsKey(suffix)); + assertTrue(response.get(suffix).longValue() >= 3); + } + + private void createResourceInPartition(String partitionId) throws Exception { + + SubscriptionAPIConfig config = SubscriptionAPIConfig.builder().rootUrl(baseRegisterUrl).build(); + factory = new SubscriptionFactory(config); + + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getOpsToken()); + DpsHeaders dpsHeaders = DpsHeaders.createFromMap(headers); + subscriptionService = factory.create(dpsHeaders); + + Subscription subscription = new Subscription(); + subscription.setName("subscription-integration-test-hmac-" + suffix); + subscription.setDescription("subscription created for hmac integration test " + suffix); + subscription.setTopic(topic); + subscription.setPushEndpoint(getPushUrl()); + HmacSecret secret = new HmacSecret(); + secret.setValue(hmacSecretValue); + + subscription.setSecret(secret); + try { + Subscription subscriptionCreated = subscriptionService.create(subscription); + notificationId = subscriptionCreated.getNotificationId(); + subscriptionId = subscriptionCreated.getId(); + } catch (SubscriptionException e) { + System.out.println("Subscription exception inner response : " + e.getHttpResponse()); + throw e; + } + } + + private String getPushUrl() { + return baseRegisterUrl + "/test-gc/challenge/" + suffix; + } +} diff --git a/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/Constants.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/Constants.java new file mode 100644 index 0000000000000000000000000000000000000000..7889fab81ba31cbd3f5c840c2aca9d838c02aa21 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/Constants.java @@ -0,0 +1,6 @@ +package org.opengroup.osdu.notification.util; + +public class Constants { + + public static final String GROUP_ID = "GROUP_ID"; +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/FileUtils.java similarity index 50% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java rename to testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/FileUtils.java index 953d1460555a6c2edfd19c81d7ba130027d76de9..fc0713c1665b4db5276e88a9b6c8b1c64085fa4d 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/FileUtils.java @@ -15,20 +15,25 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.model; +package org.opengroup.osdu.notification.util; -import lombok.*; -import org.opengroup.osdu.core.common.model.notification.Subscription; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; -import java.util.List; +public class FileUtils { + public String readFromLocalFilePath(String filePath) throws IOException { -/** - * Wrapped list of subscriptions to create redis cache bean. - */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@Builder -public class ExternalSubscriptions { - private List<Subscription> subscriptions; + InputStream inStream = this.getClass().getResourceAsStream(filePath); + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + StringBuilder stringBuilder = new StringBuilder(); + + String eachLine = ""; + while ((eachLine = br.readLine()) != null) { + stringBuilder.append(eachLine); + } + + return stringBuilder.toString(); + } } diff --git a/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/HttpClient.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/HttpClient.java new file mode 100644 index 0000000000000000000000000000000000000000..b14839c915deb5235307f3d80a0491beabe5df64 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/HttpClient.java @@ -0,0 +1,108 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.util; + +import static org.junit.Assert.assertEquals; + +import com.google.gson.Gson; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import java.net.URI; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.Map; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.StringUtils; + +public class HttpClient { + private HttpClient(){ + } + + public static ClientResponse send(String url, String path, String httpMethod, + Map<String, String> headers, + String requestBody, String query) throws Exception { + + String normalizedUrl = new URI(String.format("%s/%s", url, path)).normalize().toString(); + normalizedUrl = StringUtils.removeEnd(normalizedUrl, "/"); + log(httpMethod, normalizedUrl + query, headers, requestBody); + Client client = getClient(); + + WebResource webResource = client.resource(normalizedUrl + query); + WebResource.Builder builder = webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON); + headers.forEach(builder::header); + + if ("POST".equals(httpMethod) && StringUtils.isEmpty(requestBody)) { + requestBody = "{}"; //solves 411 error when sending empty-body POST request + } + + return builder.method(httpMethod, ClientResponse.class, requestBody); + } + + private static void log(String method, String url, Map<String, String> headers, String body) { + System.out.println(String.format("%s: %s", method, url)); + System.out.println(body); + } + + @SuppressWarnings("unchecked") + public static <T> T getResult(ClientResponse response, int exepectedStatus, Class<T> classOfT) { + assertEquals(exepectedStatus, response.getStatus()); + if (exepectedStatus == 204) { + return null; + } + + assertEquals("application/json; charset=UTF-8", response.getType().toString()); + String json = response.getEntity(String.class); + if (classOfT == String.class) { + return (T) json; + } + + Gson gson = new Gson(); + return gson.fromJson(json, classOfT); + } + + protected static Client getClient() { + TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] certs, String authType) { + } + + @Override + public void checkServerTrusted(X509Certificate[] certs, String authType) { + } + }}; + + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, trustAllCerts, new SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + } catch (Exception e) { + } + return Client.create(); + } +} diff --git a/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/ServicesUtils.java b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/ServicesUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..eabebc585ab5316e311ea07969044766fe9e9fe8 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/java/org/opengroup/osdu/notification/util/ServicesUtils.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.sun.jersey.api.client.ClientResponse; +import java.util.HashMap; +import java.util.Map; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; + +public class ServicesUtils { + private String storageHost; + private String legalHost; + private TestUtils testUtils; + private String partitionId; + private String groupId; + private FileUtils fileUtils; + + public ServicesUtils(String storageHost, String legalHost, TestUtils testUtils, String partitionId, String groupId) { + this.storageHost = storageHost; + this.legalHost = legalHost; + this.testUtils = testUtils; + this.partitionId = partitionId; + this.groupId = groupId; + this.fileUtils = new FileUtils(); + } + + public ClientResponse createLegalTag(String tagName) throws Exception { + String legalBody = fileUtils.readFromLocalFilePath("/LegalTag.json"); + legalBody = legalBody.replace("{{tagName}}", tagName); + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getAdminToken()); + + ClientResponse legalResponse = HttpClient.send(legalHost, "legaltags", "POST", headers, legalBody, ""); + + boolean createdOrAlreadyExists = legalResponse.getStatus() == 201 || legalResponse.getStatus() == 409; + assertTrue(createdOrAlreadyExists); + return legalResponse; + } + + public void deleteLegalTag(String tagName) throws Exception{ + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getAdminToken()); + ClientResponse legalResponse = HttpClient.send(legalHost, String.format("legaltags/%s", tagName), "DELETE", + headers, "", ""); + assertEquals(204, legalResponse.getStatus()); + } + + public void createStorageRecords(String suffix, int count, String legalTag) throws Exception{ + String body = fileUtils.readFromLocalFilePath("/StorageRecord.json"); + body = body.replace("{{data-partition-id}}", partitionId); + body = body.replace("{{legal-tag}}", partitionId + "-" + legalTag); + body = body.replace("{{group_id}}", groupId); + for (int i = 0; i < count; i++) { + String actualBody = body.replace("{{ids-suffix}}", suffix + String.valueOf(i)); + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getAdminToken()); + ClientResponse storageResponse = HttpClient.send(storageHost, "records", "PUT", + headers, actualBody, ""); + assertEquals(201, storageResponse.getStatus()); + } + } + + public void deleteStorageRecords(int count, String suffix) throws Exception{ + for (int i = 0; i < count; i++) { + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, partitionId); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getAdminToken()); + String recordId = partitionId + ":dataset--ConnectedSource.Generic:notification-test-" + suffix + i; + ClientResponse storageResponse = HttpClient.send(storageHost, "records/" + recordId, "DELETE", + headers, "", ""); + assertEquals(204, storageResponse.getStatus()); + } + } + +} diff --git a/testing/notification-test-baremetal/src/test/resources/LegalTag.json b/testing/notification-test-baremetal/src/test/resources/LegalTag.json new file mode 100644 index 0000000000000000000000000000000000000000..e9629c72a833ef80a685f15d028679e75ac6edb6 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/resources/LegalTag.json @@ -0,0 +1,16 @@ +{ + "name": "{{tagName}}", + "properties": { + "countryOfOrigin": [ + "US" + ], + "contractId": "A1234", + "expirationDate": 2222222222222, + "originator": "Default", + "dataType": "Public Domain Data", + "securityClassification": "Public", + "personalData": "No Personal Data", + "exportClassification": "EAR99" + }, + "description": "Test legal tag for notification" +} \ No newline at end of file diff --git a/testing/notification-test-baremetal/src/test/resources/StorageRecord.json b/testing/notification-test-baremetal/src/test/resources/StorageRecord.json new file mode 100644 index 0000000000000000000000000000000000000000..c4a8e395f201baf6ce31ca39d0de01968a2439a3 --- /dev/null +++ b/testing/notification-test-baremetal/src/test/resources/StorageRecord.json @@ -0,0 +1,33 @@ +[ + { + "id": "{{data-partition-id}}:dataset--ConnectedSource.Generic:notification-test-{{ids-suffix}}", + "kind": "{{data-partition-id}}:wks:dataset--ConnectedSource.Generic:1.0.0", + "data": { + "Name": "name", + "DatasetProperties": { + "ConnectedSourceDataJobId": "no-data", + "ConnectedSourceRegistryEntryId": "no-data", + "SourceDataPartitionId": "no-data", + "SourceRecordId": "no-data" + } + }, + "namespace": "{{data-partition-id}}:osdu", + "legal": { + "legaltags": [ + "{{legal-tag}}" + ], + "otherRelevantDataCountries": [ + "US" + ], + "status": "compliant" + }, + "acl": { + "viewers": [ + "data.default.viewers@{{data-partition-id}}.{{group_id}}" + ], + "owners": [ + "data.default.owners@{{data-partition-id}}.{{group_id}}" + ] + } + } +] \ No newline at end of file diff --git a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java index 2245bebf5425439ebd5a0680c6c4e0b7695be2ff..c68244d9961c6f9b7b25ccf74bcae1466e0c9560 100644 --- a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java +++ b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointGsa.java @@ -85,6 +85,7 @@ public class TestPushEndpointGsa { servicesUtils = new ServicesUtils(storageHost, legalHost, testUtils, tenant, groupId); servicesUtils.createLegalTag(LEGAL_TAG_NAME); createResourceInPartition(tenant); + Thread.sleep(10000); } @Test diff --git a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java index a4b5ec17d9874c37f3c54f47b4908710b5d7231f..9e6da6e807ef78e35d26212990f34e975a7b833b 100644 --- a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java +++ b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPushEndpointHMAC.java @@ -81,6 +81,7 @@ public class TestPushEndpointHMAC { servicesUtils = new ServicesUtils(storageHost, legalHost, testUtils, tenant, groupId); servicesUtils.createLegalTag(LEGAL_TAG_NAME); createResourceInPartition(tenant); + Thread.sleep(10000); } @Test