diff --git a/NOTICE b/NOTICE index d1f2b691be8e9f8f08ab21110a84807b9bfc66e5..1598403a196b4a89902a440825c7af0864828f3e 100644 --- a/NOTICE +++ b/NOTICE @@ -72,6 +72,7 @@ The following software have components provided under the terms of this license: - Asynchronous Http Client (from https://repo1.maven.org/maven2/org/asynchttpclient/async-http-client) - Asynchronous Http Client Netty Utils (from https://repo1.maven.org/maven2/org/asynchttpclient/async-http-client-netty-utils) - AutoValue Annotations (from https://github.com/google/auto/tree/master/value, https://repo1.maven.org/maven2/com/google/auto/value/auto-value-annotations) +- AutoValue Processor (from https://github.com/google/auto/tree/master/value) - BSON (from http://bsonspec.org, https://bsonspec.org) - BSON Record Codec (from <https://www.mongodb.com/>, https://www.mongodb.com/) - Bean Validation API (from http://beanvalidation.org) @@ -88,6 +89,7 @@ The following software have components provided under the terms of this license: - Converter: Jackson (from https://github.com/square/retrofit, https://repo1.maven.org/maven2/com/squareup/retrofit2/converter-jackson) - Core functionality for the Reactor Netty library (from https://github.com/reactor/reactor-netty) - FindBugs-jsr305 (from http://findbugs.sourceforge.net/) +- GAX (Google Api eXtensions) for Java (HTTP JSON) (from <https://repo1.maven.org/maven2/com/google/api/gax-httpjson>, https://repo1.maven.org/maven2/com/google/api/gax-httpjson) - GSON extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-gson) - Google APIs Client Library for Java (from https://repo1.maven.org/maven2/com/google/api-client/google-api-client) - Google App Engine extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-appengine) @@ -173,6 +175,7 @@ The following software have components provided under the terms of this license: - Mockito (from http://mockito.org, http://www.mockito.org, https://github.com/mockito/mockito) - MongoDB Driver (from https://www.mongodb.com/) - MongoDB Java Driver (from http://mongodb.org/, http://www.mongodb.org, https://www.mongodb.com/) +- NanoHttpd-Core (from https://repo1.maven.org/maven2/org/nanohttpd/nanohttpd) - Netty Reactive Streams Implementation (from https://repo1.maven.org/maven2/com/typesafe/netty/netty-reactive-streams) - Netty/Buffer (from https://repo1.maven.org/maven2/io/netty/netty-buffer) - Netty/Codec (from https://repo1.maven.org/maven2/io/netty/netty-codec) @@ -187,6 +190,8 @@ The following software have components provided under the terms of this license: - Netty/Resolver (from https://repo1.maven.org/maven2/io/netty/netty-resolver) - Netty/Resolver/DNS (from https://repo1.maven.org/maven2/io/netty/netty-resolver-dns) - Netty/Resolver/DNS/Classes/MacOS (from https://repo1.maven.org/maven2/io/netty/netty-resolver-dns-classes-macos) +- Netty/TomcatNative [BoringSSL - Static] (from https://github.com/netty/netty-tcnative/netty-tcnative-boringssl-static/) +- Netty/TomcatNative [OpenSSL - Classes] (from https://repo1.maven.org/maven2/io/netty/netty-tcnative-classes) - Netty/Transport (from https://repo1.maven.org/maven2/io/netty/netty-transport) - Netty/Transport/Classes/Epoll (from https://repo1.maven.org/maven2/io/netty/netty-transport-classes-epoll) - Netty/Transport/Classes/KQueue (from https://repo1.maven.org/maven2/io/netty/netty-transport-classes-kqueue) @@ -201,7 +206,7 @@ The following software have components provided under the terms of this license: - OkHttp Logging Interceptor (from https://github.com/square/okhttp, https://repo1.maven.org/maven2/com/squareup/okhttp3/logging-interceptor, https://square.github.io/okhttp/) - OkHttp URLConnection (from https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp-urlconnection, https://square.github.io/okhttp/) - Okio (from https://github.com/square/okio/, https://repo1.maven.org/maven2/com/squareup/okio/okio) -- OpenCensus (from https://github.com/census-instrumentation/opencensus-java) +- OpenCensus (from https://github.com/census-instrumentation/opencensus-java, https://github.com/census-instrumentation/opencensus-proto) - PWDB :: Database (from https://repo1.maven.org/maven2/org/linguafranca/pwdb/database) - PostgreSQL JDBC Driver - PowerMock (from http://www.powermock.org, https://repo1.maven.org/maven2/org/powermock/powermock-api-mockito) @@ -280,16 +285,20 @@ The following software have components provided under the terms of this license: - datastore-v1-proto-client (from https://repo1.maven.org/maven2/com/google/cloud/datastore/datastore-v1-proto-client) - documentdb-bulkexecutor (from http://azure.microsoft.com/en-us/services/documentdb/) - error-prone annotations (from https://repo1.maven.org/maven2/com/google/errorprone/error_prone_annotations) +- grpc-google-cloud-datastore-admin-v1 (from https://github.com/googleapis/java-datastore/grpc-google-cloud-datastore-admin-v1) - io.grpc:grpc-alts (from https://github.com/grpc/grpc-java) - io.grpc:grpc-api (from https://github.com/grpc/grpc-java) - io.grpc:grpc-auth (from https://github.com/grpc/grpc-java) - io.grpc:grpc-context (from https://github.com/grpc/grpc-java) - io.grpc:grpc-core (from https://github.com/grpc/grpc-java) +- io.grpc:grpc-googleapis (from https://github.com/grpc/grpc-java) - io.grpc:grpc-grpclb (from https://github.com/grpc/grpc-java) - io.grpc:grpc-netty-shaded (from https://github.com/grpc/grpc-java) - io.grpc:grpc-protobuf (from https://github.com/grpc/grpc-java) - io.grpc:grpc-protobuf-lite (from https://github.com/grpc/grpc-java) +- io.grpc:grpc-services (from https://github.com/grpc/grpc-java) - io.grpc:grpc-stub (from https://github.com/grpc/grpc-java) +- io.grpc:grpc-xds (from https://github.com/grpc/grpc-java) - ion-java (from https://github.com/amzn/ion-java/, https://github.com/amznlabs/ion-java/) - jackson-databind (from http://github.com/FasterXML/jackson, http://wiki.fasterxml.com/JacksonHome, https://github.com/FasterXML/jackson) - java-cloudant (from https://cloudant.com) @@ -308,6 +317,7 @@ The following software have components provided under the terms of this license: - org.opentest4j:opentest4j (from https://github.com/ota4j-team/opentest4j) - org.xmlunit:xmlunit-core (from http://www.xmlunit.org/, https://www.xmlunit.org/) - perfmark:perfmark-api (from https://github.com/perfmark/perfmark) +- proto-google-cloud-datastore-admin-v1 (from https://github.com/googleapis/java-datastore/proto-google-cloud-datastore-admin-v1) - proto-google-cloud-datastore-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-datastore/proto-google-cloud-datastore-v1) - proto-google-cloud-iamcredentials-v1 (from https://github.com/googleapis/google-cloud-java, https://github.com/googleapis/java-iamcredentials/proto-google-cloud-iamcredentials-v1, https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-iamcredentials-v1) - proto-google-cloud-logging-v2 (from https://github.com/googleapis/java-logging/proto-google-cloud-logging-v2, https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-logging-v2) @@ -335,7 +345,6 @@ The following software have components provided under the terms of this license: - API Common (from https://github.com/googleapis, https://github.com/googleapis/api-common-java, https://repo1.maven.org/maven2/com/google/api/api-common) - GAX (Google Api eXtensions) for Java (Core) (from https://github.com/googleapis, https://github.com/googleapis/gax-java, https://repo1.maven.org/maven2/com/google/api/gax) -- GAX (Google Api eXtensions) for Java (HTTP JSON) (from <https://repo1.maven.org/maven2/com/google/api/gax-httpjson>, https://repo1.maven.org/maven2/com/google/api/gax-httpjson) - GAX (Google Api eXtensions) for Java (gRPC) (from <https://repo1.maven.org/maven2/com/google/api/gax-grpc>, https://repo1.maven.org/maven2/com/google/api/gax-grpc) - Hamcrest (from http://hamcrest.org/JavaHamcrest/) - Hamcrest Core (from http://hamcrest.org/, http://hamcrest.org/JavaHamcrest/, https://repo1.maven.org/maven2/org/hamcrest/hamcrest-core) @@ -373,10 +382,12 @@ The following software have components provided under the terms of this license: - Microsoft Application Insights Java SDK Web Module (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Log4j 2 Appender (from https://github.com/Microsoft/ApplicationInsights-Java) - Mockito (from http://mockito.org, http://www.mockito.org, https://github.com/mockito/mockito) +- NanoHttpd-Core (from https://repo1.maven.org/maven2/org/nanohttpd/nanohttpd) - Netty/Codec/HTTP (from https://repo1.maven.org/maven2/io/netty/netty-codec-http) - PostgreSQL JDBC Driver - Protocol Buffer Java API (from http://code.google.com/p/protobuf, https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java) - Protocol Buffers [Util] (from https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java-util) +- RE2/J (from http://github.com/google/re2j) - Redisson (from http://redisson.org) - SnakeYAML (from http://code.google.com/p/snakeyaml/, http://www.snakeyaml.org, https://bitbucket.org/snakeyaml/snakeyaml) - Spring Core (from http://www.springframework.org, https://github.com/spring-projects/spring-framework, https://repo1.maven.org/maven2/org/springframework/spring-core) @@ -543,7 +554,6 @@ GPL-2.0-with-classpath-exception ======================================================================== The following software have components provided under the terms of this license: -- Checker Qual (from https://checkerframework.org) - JBoss Jakarta Annotations API (from <https://github.com/jboss/jboss-jakarta-annotations-api_spec>, https://github.com/jboss/jboss-jakarta-annotations-api_spec) - Jakarta Activation API (from https://github.com/eclipse-ee4j/jaf, https://github.com/jakartaee/jaf-api, https://repo1.maven.org/maven2/jakarta/activation/jakarta.activation-api) - Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca) @@ -594,6 +604,8 @@ LGPL-2.1-only ======================================================================== The following software have components provided under the terms of this license: +- Java Native Access (from https://github.com/java-native-access/jna, https://github.com/twall/jna) +- Java Native Access Platform (from https://github.com/java-native-access/jna) - Logback Classic Module (from http://logback.qos.ch, https://repo1.maven.org/maven2/ch/qos/logback/logback-classic) - Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic) - Logback Contrib :: JSON :: Core (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-core) diff --git a/provider/notification-gc/docs/anthos/README.md b/provider/notification-gc/docs/anthos/README.md index 62bc58a79d23fa297855ce402123206127a0167c..c819fb53fc94b42c20c02af5326d37a67287dde8 100644 --- a/provider/notification-gc/docs/anthos/README.md +++ b/provider/notification-gc/docs/anthos/README.md @@ -35,6 +35,12 @@ | `OPENID_PROVIDER_URL` | `https://keycloack.com/auth/realms/master` | URL of OpenID Connect provider, it will be used as `<OpenID URL> + /.well-known/openid-configuration` to auto configure endpoint for token request | no | - | | `<AMQP_PASSWORD_ENV_VARIABLE_NAME>` | ex `AMQP_PASS_OSDU` | Amqp password env name, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Notification service | yes | - | | `<AMQP_ADMIN_PASSWORD_ENV_VARIABLE_NAME>` | ex `AMQP_ADMIN_PASS_OSDU` | Amqp admin password env name, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Notification service | yes | - | +| `REDIS_HOST` | ex `127.0.0.1` | Redis host | no | - | +| `REDIS_PORT` | ex `6379` | Redis port | no | - | +| `REDIS_PASSWORD` | ex `` | Redis password | yes | - | +| `REDIS_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | +| `RABBITMQ_RETRY_LIMIT` | ex `3` | RabbitMq retry limit | no | | +| `RABBITMQ_RETRY_DELAY` | ex `1` | RabbitMq retry delay | no | | ### For Mappers to activate drivers @@ -198,30 +204,24 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H </details> -#### Exchanges and queues configuration +## PubSub configuration -At RabbitMq should be created exchange with name: +At PubSub should be created set of topics and subscriptions. -**name:** `register-subscriber-control` +| topic name | subscription name pattern | description | sensitive? | +|-------------------------------------------------------|--------------------------------------------------|----------------------------------------------------------|------------| +| `register-subscriber-control` | `notification-control-topic-{data-partition-id}` | Register subscriber control topic | yes | +| `{topic-name}` e.g. `records-changed` | `notification-{topic-name}-service` | Service topics from `topics.json` on Register service | yes | +| `{topic-name}-publish` e.g. `records-changed-publish` | `notification-{topic-name}-publish}` | Publish topics in relation 1-on-1 for each service topic | yes | -It can be overridden by: + +Control topic name can be overridden by: * through the Spring Boot property `oqm-register-subscriber-control-topic-name` * environment variable `OQM_REGISTER_SUBSCRIBER_CONTROL_TOPIC_NAME`  -## Interaction with message brokers - -### Specifics of work through PULL subscription - -To receive messages from brokers, this solution uses the PULL-subscriber mechanism to get 'record_changed' messages. -This is its cardinal difference from other implementations that use PUSH-subscribers (webhooks). This opens a wide -choice when choosing brokers. - -When using PULL-subscribers, there is a need to restore Storage service subscribers at the start of Storage service. -This magic happens in the `OqmSubscriberManager.java` class from `core-lib-gcp` in the @PostConstruct method. - ## Keycloak configuration [Keycloak service accounts setup](https://www.keycloak.org/docs/latest/server_admin/#_service_accounts) diff --git a/provider/notification-gc/docs/gc/README.md b/provider/notification-gc/docs/gc/README.md index 0f65c275f188cf007b58e9552cd8769f085d2231..3be94ca7f27a767c4d509114f39d599e71793531 100644 --- a/provider/notification-gc/docs/gc/README.md +++ b/provider/notification-gc/docs/gc/README.md @@ -2,6 +2,7 @@ ## Table of Contents <a name="TOC"></a> +* [Overview](#overview) * [Environment variables](#environment-variables) * [Common properties for all environments](#common-properties-for-all-environments) * [For Mappers to activate drivers](#for-mappers-to-activate-drivers) @@ -9,6 +10,12 @@ * [GCS configuration](#GCS-configuration) * [Google cloud service account configuration](#Google-cloud-service-account-configuration) +## Overview + + + + + ## Environment variables ### Common properties for all environments @@ -25,6 +32,10 @@ | name | value | description | sensitive? | source | | --- | --- | --- | --- | --- | | `SPRING_PROFILES_ACTIVE` | `gcp` | spring active profile | no | +| `REDIS_HOST` | ex `127.0.0.1` | Redis host | no | - | +| `REDIS_PORT` | ex `6379` | Redis port | no | - | +| `REDIS_PASSWORD` | ex `` | Redis password | yes | - | +| `REDIS_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | ### For Mappers to activate drivers @@ -103,11 +114,16 @@ $ (cd notification-core/ && mvn clean install) ## PubSub configuration -At PubSub should be created topic with name: +At PubSub should be created set of topics and subscriptions. (see [Overview](#overview)) + +| topic name | subscription name pattern | description | sensitive? | +|-------------------------------------------------------|--------------------------------------------------|----------------------------------------------------------|------------| +| `register-subscriber-control` | `notification-control-topic-{data-partition-id}` | Register subscriber control topic | yes | +| `{topic-name}` e.g. `records-changed` | `notification-{topic-name}-service` | Service topics from `topics.json` on Register service | yes | +| `{topic-name}-publish` e.g. `records-changed-publish` | `notification-{topic-name}-publish}` | Publish topics in relation 1-on-1 for each service topic | yes | -**name:** `register-subscriber-control` -It can be overridden by: +Control topic name can be overridden by: * through the Spring Boot property `oqm-register-subscriber-control-topic-name` * environment variable `OQM_REGISTER_SUBSCRIBER_CONTROL_TOPIC_NAME` diff --git a/provider/notification-gc/docs/gc/pics/notification-classes.png b/provider/notification-gc/docs/gc/pics/notification-classes.png new file mode 100644 index 0000000000000000000000000000000000000000..670c1152b0bb734608540128e0bd6122c6d1786c Binary files /dev/null and b/provider/notification-gc/docs/gc/pics/notification-classes.png differ diff --git a/provider/notification-gc/docs/gc/pics/notification.png b/provider/notification-gc/docs/gc/pics/notification.png new file mode 100644 index 0000000000000000000000000000000000000000..031d6b2cc085829f986f0581d91f3279e149f9de Binary files /dev/null and b/provider/notification-gc/docs/gc/pics/notification.png differ diff --git a/provider/notification-gc/pom.xml b/provider/notification-gc/pom.xml index 9a6b43c9d417d407559944d6616088fa27efeb39..803e1fe68418bdb1742508bb52f2190e29c05396 100644 --- a/provider/notification-gc/pom.xml +++ b/provider/notification-gc/pom.xml @@ -47,7 +47,7 @@ <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>core-lib-gcp</artifactId> - <version>0.19.0-rc3</version> + <version>0.20.0-rc1</version> </dependency> <dependency> diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..67e844ae4ec9a4bec853a5163314696419cc132a --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/CacheConfig.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.config; + +import lombok.RequiredArgsConstructor; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.gcp.cache.RedisCacheBuilder; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class CacheConfig { + + private final RedisCacheBuilder<String, ExternalSubscriptions> builder; + + @Bean + public IRedisCache<String, ExternalSubscriptions> subscriptionCache(RedisProperties properties) { + return builder.buildRedisCache( + properties.getRedisHost(), + properties.getRedisPort(), + properties.getRedisPassword(), + properties.getRedisExpiration(), + properties.getRedisWithSsl(), + String.class, + ExternalSubscriptions.class + ); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..06dcbf090e7ad8672088ab934fcc39a4ba422cf9 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/RedisProperties.java @@ -0,0 +1,19 @@ +package org.opengroup.osdu.notification.provider.gcp.config; + + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties +@Getter +@Setter +public class RedisProperties { + private String redisHost; + private Integer redisPort; + private String redisPassword; + private Integer redisExpiration = Integer.MAX_VALUE; + private Boolean redisWithSsl = false; +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ScopeModifierPostProcessor.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ScopeModifierPostProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..ddd60e598179798ca92a6b35b708d34a7cc998e3 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/ScopeModifierPostProcessor.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.config; + +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScope; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ScopeModifierPostProcessor implements BeanFactoryPostProcessor { + + public static final String SCOPE_THREAD = "scope_thread"; + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory factory) throws BeansException { + factory.registerScope(SCOPE_THREAD, new ThreadScope()); + + for (String beanName : factory.getBeanDefinitionNames()) { + BeanDefinition beanDef = factory.getBeanDefinition(beanName); + if (Objects.equals(beanDef.getScope(), "request")) { + beanDef.setScope(SCOPE_THREAD); + log.debug("Scope has been overridden for bean: {}", beanDef.getBeanClassName()); + } + } + } +} + + diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionAPIConfigFactory.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionAPIConfigFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..cbfd57bcc5f9557f75dbfa3e1ababd4bce36de00 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/config/SubscriptionAPIConfigFactory.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.config; + +import lombok.RequiredArgsConstructor; +import org.opengroup.osdu.core.common.notification.SubscriptionAPIConfig; +import org.opengroup.osdu.notification.provider.interfaces.IAppProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class SubscriptionAPIConfigFactory { + + private final IAppProperties config; + + @Bean + public SubscriptionAPIConfig createSubscriptionAPIConfig() { + return SubscriptionAPIConfig.builder().rootUrl(config.getRegisterAPI()).build(); + } +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java new file mode 100644 index 0000000000000000000000000000000000000000..953d1460555a6c2edfd19c81d7ba130027d76de9 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/model/ExternalSubscriptions.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.model; + +import lombok.*; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.util.List; + +/** + * Wrapped list of subscriptions to create redis cache bean. + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class ExternalSubscriptions { + private List<Subscription> subscriptions; +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java similarity index 61% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java index 4478ef8bfc53be5cd2fb38ae1231a5257973d3f0..edfdbbb0789d41a9dab75c1e204fc598f6828816 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmNotificationHandler.java @@ -14,14 +14,15 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.pubsub.di; +package org.opengroup.osdu.notification.provider.gcp.pubsub; -import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.http.HttpClient; import org.opengroup.osdu.core.common.http.HttpRequest; import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.notification.Secret; import org.opengroup.osdu.core.common.model.notification.Subscription; @@ -31,6 +32,8 @@ import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationPrope import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import java.util.Map; + @Component @ConditionalOnProperty(name = "oqmDriver") @RequiredArgsConstructor @@ -42,36 +45,36 @@ public class OqmNotificationHandler { private final OqmSubscriptionHandler subscriptionHandler; private final AuthFactory authFactory; - public HttpResponse notifySubscriber( - String notificationId, String pubsubMessage, Map<String, String> headerAttributes) - throws Exception { - Subscription subscription = - subscriptionHandler.getSubscriptionFromCache(notificationId, headerAttributes); + public HttpResponse notifySubscriber(String subscriptionId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { + String serviceTopic = headerAttributes.get("topic"); + String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); + String correlationId = headerAttributes.get(DpsHeaders.CORRELATION_ID); + + if (serviceTopic == null || dataPartitionId == null || correlationId == null) { + log.warn("Service topic: {}, dataPartitionId: {}, correlationId: {}.", serviceTopic, dataPartitionId, correlationId); + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Missed header attributes"); + } + + Subscription subscription = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); Secret secret = subscription.getSecret(); - String endpoint = subscription.getPushEndpoint(); - String secretType = secret.getSecretType(); - String pushUrl; - // Authentication Secret - SecretAuth secretAuth = authFactory.getSecretAuth(secretType); + SecretAuth secretAuth = authFactory.getSecretAuth(secret.getSecretType()); secretAuth.setSecret(secret); - pushUrl = secretAuth.getPushUrl(endpoint); + String pushUrl = secretAuth.getPushUrl(subscription.getPushEndpoint()); Map<String, String> requestHeader = secretAuth.getRequestHeaders(); requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json"); - requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID)); - requestHeader.put( - DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID)); + requestHeader.put(DpsHeaders.CORRELATION_ID, correlationId); + requestHeader.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); - HttpRequest request = - HttpRequest.post() + HttpRequest request = HttpRequest.post() .url(pushUrl) .headers(requestHeader) .body(pubsubMessage) .connectionTimeout(oqmConfigurationProperties.getWaitingTime()) .build(); HttpResponse response = httpClient.send(request); - log.debug("Sending out notification to endpoint: {}.", endpoint); + log.debug("Notification handler | Sending notification to Sub ID: `{}` Endpoint: `{}`.", subscriptionId, subscription.getPushEndpoint()); return response; } } \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java index 71fcf870624dc7bb693b47066884e90e39da200c..41f2aaedf49c675d4fadb65043ba85d74d66b7b2 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -1,243 +1,214 @@ /* - * Copyright 2017-2020, Schlumberger + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.opengroup.osdu.notification.provider.gcp.pubsub; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; -import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; /** * Runs once on the service start. - * 1. Fetches oqm for tenants' message brokers' pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. - * 2. Checks for the "subscriber control topic" on every tenant's message broker and creates if it is absent. - * - This topic is a "control channel" between Register and Notification services. - * - Register sends events on new pull Subscriptions being created, Notification listens for events and creates corresponding Subscribers. + * 1. Provisions OQM service subscribers for topics provided by Register service. + * 2. Stores the state of third-party subscribers in a cache. + * 3. Updates the state of third-party subscribers in a cache on every control topic event. + * 4. Provisions OQM publish subscribers to process notifications to third-party subscribers. */ @Slf4j @Component -@Scope(SCOPE_SINGLETON) @ConditionalOnProperty(name = "oqmDriver") @RequiredArgsConstructor public class OqmSubscriberManager { - private final String ACKNOWLEDGE = "message acknowledged by client"; - private final String NOT_ACKNOWLEDGE = "message not acknowledged by client"; - - //TODO should be externalized to application.properties - private static final List<OqmTopic> INTERESTED_TOPICS = - Stream.of("records-changed", "schema-changed", "status-changed", "legaltags_changed") - .map(topicName -> OqmTopic.builder().name(topicName).build()).collect(Collectors.toList()); - - private static final String INTERESTED_SUBSCRIPTIONS_PREFIX = "de-"; - - private static final OqmSubscriptionQuery INTERESTED_SUBSCRIPTIONS_QUERY = OqmSubscriptionQuery.builder() - .forAnyOfTopics(INTERESTED_TOPICS).namePrefix(INTERESTED_SUBSCRIPTIONS_PREFIX) - .subscriberable(true).build(); + public static final String NOTIFICATION_PREFIX = "notification-"; + public static final String SERVICE_SUFFIX = "-service"; + public static final String PUBLISH_SUFFIX = "-publish"; + private final OqmSubscriptionHandler subscriptionHandler; private final ITenantFactory tenantInfoFactory; private final OqmDriver driver; - private final OqmNotificationHandler notificationHandler; private final OqmConfigurationProperties properties; - - private final Long constructDate = System.currentTimeMillis(); - private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private final OqmNotificationHandler notificationHandler; + @Deprecated + private final OqmSubscriptionProvider subscriptionProvider; @PostConstruct void postConstruct() { - log.debug("OqmSubscriberManager bean constructed. Provisioning STARTED."); - - provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); - provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); - - log.debug("OqmSubscriberManager bean constructed. Provisioning COMPLETED."); + log.debug("OQM | Provisioning STARTED."); + provisionSubscriptionInfoCache(); + provisionControlTopicSubscribers(); + provisionServiceSubscribers(); + log.debug("OQM | Provisioning COMPLETED."); } @PreDestroy void onPreDestroy() { - log.debug("OqmSubscriberManager bean on pre-destroy: STARTED."); - unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); - log.debug("OqmSubscriberManager bean on pre-destroy: COMPLETED."); - } - - void unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers() { - for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { - String tenantId = tenant.getDataPartitionId(); - log.debug("* OqmSubscriberManager on pre-destroy for tenant {}:", tenantId); - OqmSubscription subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); - if (subscriberControlTopicSubscriptionForTenant != null) { - log.debug("* * OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription DELETED.", - properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName, tenantId); - driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); - } + log.debug("OQM | Destroy STARTED."); + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + subscriptionProvider.deleteControlTopicsSubscription(getControlTopicSubscriptionName(tenantInfo)); } + log.debug("OQM | Destroy COMPLETED."); } - void provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers() { + /** + * Initial filling a third-party subscription info cache with entities, which have already stored + * in Register service's database. Cache should contain all necessary information to send notification to the recipient. + */ + void provisionSubscriptionInfoCache() { for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { - log.debug("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId()); - //For every Tenant Destination get "subscriberable" Subscriptions - for (OqmSubscription subscription : getSubscriberableSubscriptions(tenantInfo)) { - log.debug("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName()); - - //Register a Subscriber on every subscription - registerSubscriber(tenantInfo, subscription); - - log.debug("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName()); - } - log.debug("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); + String dataPartitionId = tenantInfo.getDataPartitionId(); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(Collections.emptyList()).build()); + List<Subscription> subscriptionInfos = subscriptionHandler.getAllSubscriptionInfos(tenantInfo.getDataPartitionId()); + List<Subscription> enrichedSubscriptionInfos = subscriptionInfos.stream() + .map(subscription -> subscriptionHandler.getSubscription(dataPartitionId, subscription.getNotificationId(), subscription.getTopic())) + .collect(Collectors.toList()); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(enrichedSubscriptionInfos).build()); + log.debug("Subscription info cache PRELOADED for tenant: {}. Size is: {}.", dataPartitionId, enrichedSubscriptionInfos.size()); } } - void provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers() { + void provisionControlTopicSubscribers() { String controlTopicName = properties.getRegisterSubscriberControlTopicName(); - for (TenantInfo tenant : tenantInfoFactory.listTenantInfo()) { - - String tenantId = tenant.getDataPartitionId(); - - log.debug("* OqmSubscriberManager on check '{}' subscriber control topic existence at tenant's '{}' message broker.", controlTopicName, tenantId); - OqmTopic subscriberControlTopic = driver.getTopic(controlTopicName, getDestination(tenant)).orElse(null); - boolean controlTopicForTenantJustCreated; - if (subscriberControlTopic != null) { - log.debug("* * OqmSubscriberManager: '{}' subscriber control topic exists at tenant's '{}' message broker.", controlTopicName, tenantId); - controlTopicForTenantJustCreated = false; - } else { - log.debug("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist at tenant's '{}' message broker. Trying to create it:", controlTopicName, tenantId); - driver.createAndGetTopic(controlTopicName, getDestination(tenant)); - controlTopicForTenantJustCreated = true; - } - - log.debug("* * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker.", - controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - - OqmSubscription subscriberControlTopicSubscriptionForTenant = null; - OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) - .topics(Collections.singletonList(subscriberControlTopic)).build(); - - if (!controlTopicForTenantJustCreated) { - subscriberControlTopicSubscriptionForTenant = driver.getSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)).orElse(null); - } - - if (subscriberControlTopicSubscriptionForTenant != null) { - log.debug("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription CREATED.", - controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - } else { - subscriberControlTopicSubscriptionForTenant = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); - log.debug("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscription already EXISTS.", - controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - - registerControlTopicSubscriber(tenant, subscriberControlTopicSubscriptionForTenant); - log.debug("* * * OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}' at tenant's '{}' message broker: Subscriber REGISTERED.", - controlTopicName, subscriberControlTopicSubscriptionName, tenantId); - - } + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + OqmSubscription subscription = subscriptionProvider.getOrCrateSubscription(getControlTopicSubscriptionName(tenantInfo), controlTopicName, tenantInfo); + registerControlTopicSubscriber(tenantInfo, subscription); + log.debug("Control topic subscriber REGISTERED for tenant: {}.", tenantInfo.getDataPartitionId()); } } - private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { - OqmDestination destination = getDestination(tenantInfo); - - OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { - - String pubsubMessage = oqmMessage.getData(); - String notificationId = subscription.getName(); - Map<String, String> headerAttributes = oqmMessage.getAttributes(); - - - HttpResponse response; - boolean ackedNacked = false; - try { - response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes); - - if (!response.isSuccessCode()) { - log.error(NOT_ACKNOWLEDGE + response.getBody()); - } else { - log.debug(ACKNOWLEDGE); - oqmAckReplier.ack(); - } - ackedNacked = true; - - } catch (Exception e) { - log.debug(NOT_ACKNOWLEDGE, e); + void provisionServiceSubscribers() { + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + log.debug("Provision service topics subscribers for tenant: '{}'", tenantInfo.getDataPartitionId()); + subscriptionProvider.createSubscriptions(tenantInfo); + List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId()); + for (OqmSubscription subscription : buildServiceSubscriptions(topics)) { + registerServiceTopicSubscriber(tenantInfo, subscription); + log.debug("Subscriber REGISTERED for service subscription {}.", subscription.getName()); } - - if (!ackedNacked) oqmAckReplier.nack(); - }; - - OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build(); - driver.subscribe(subscriber, destination); - log.debug("Just subscribed at topic {} subscription {} for tenant {}.", - subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId()); + for (OqmSubscription subscription : buildPublishSubscriptions(topics)) { + registerPublishTopicSubscriber(tenantInfo, subscription); + log.debug("Subscriber REGISTERED for publish subscription {}.", subscription.getName()); + } + } } private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscription) { + OqmSubscriber subscriber = OqmSubscriber.builder() + .subscription(controlTopicSubscription) + .messageReceiver(new OqmControlTopicReceiver(subscriptionHandler, subscriptionInfoCache)) + .build(); + driver.subscribe(subscriber, getDestination(tenantInfo)); + } - OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { - - String pubsubMessage = oqmMessage.getData(); - Map<String, String> headerAttributes = oqmMessage.getAttributes(); - String subscriptionId = headerAttributes.get("subscription-id"); - String dataPartitionId = headerAttributes.get("data-partition-id"); - String topic = headerAttributes.get("topic"); - - OqmSubscription subscription = OqmSubscription.builder() - .topic(OqmTopic.builder().name(topic).build()) - .name(subscriptionId) - .build(); - - registerSubscriber(tenantInfoFactory.getTenantInfo(dataPartitionId), subscription); + /** + * Register service topic subscriber to multiply service messages per each third-party receiver + * and enrich them with subscriber-id attribute, then publish into publish topic. + * + * @param tenantInfo tenant related information. + * @param serviceSubscription source subscription of incoming messages to handle by subscriber. + */ + void registerServiceTopicSubscriber(TenantInfo tenantInfo, OqmSubscription serviceSubscription) { + OqmSubscriber subscriber = OqmSubscriber.builder() + .subscription(serviceSubscription) + .messageReceiver(new OqmServiceTopicReceiver(serviceSubscription, driver, subscriptionInfoCache)) + .build(); + driver.subscribe(subscriber, getDestination(tenantInfo)); + } - log.debug(ACKNOWLEDGE); - oqmAckReplier.ack(); - }; + /** + * Sends notification HTTP call to the push-endpoint specified at Subscription to the third-party subscriber. + * In case of successful call the outgoing message is properly acknowledged. + * In case of failure notification delivery retry logic proceeds. + * + * @param tenantInfo tenant related information. + * @param subscription source subscription of incoming messages to handle by subscriber. + */ + private void registerPublishTopicSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { + OqmSubscriber subscriber = OqmSubscriber.builder() + .subscription(subscription) + .messageReceiver(new OqmPublishTopicReceiver(subscription, notificationHandler)) + .build(); + driver.subscribe(subscriber, getDestination(tenantInfo)); + } - OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscription).messageReceiver(receiver).build(); - OqmDestination destination = getDestination(tenantInfo); - driver.subscribe(subscriber, destination); + /** + * Build all service OqmSubscriptions for topics provided by register service. + * OqmSubscription is a representation of message broker subscription, + * which should be provided on infrastructure side. + * + * @param topics topics registered on Register service. + * @return list of OqmSubscriptions. + */ + private List<OqmSubscription> buildServiceSubscriptions(List<OqmTopic> topics) { + return topics.stream() + .map(topic -> OqmSubscription.builder() + .name(NOTIFICATION_PREFIX + topic.getName() + SERVICE_SUFFIX) + .topic(topic) + .subscriberable(true) + .build()) + .collect(Collectors.toList()); + } - log.debug("Just subscribed at topic {} subscription {} for tenant {}.", - controlTopicSubscription.getTopics().get(0), controlTopicSubscription.getName(), tenantInfo.getDataPartitionId()); + /** + * Build all publish OqmSubscriptions for topics provided by register service. + * OqmSubscription is a representation of message broker subscription, + * which should be provided on infrastructure side. + * + * @param topics topics registered on Register service. + * @return list of OqmSubscriptions. + */ + private List<OqmSubscription> buildPublishSubscriptions(List<OqmTopic> topics) { + return topics.stream() + .map(topic -> OqmSubscription.builder() + .name(NOTIFICATION_PREFIX + topic.getName() + PUBLISH_SUFFIX) + .topic(OqmTopic.builder().name(topic.getName() + PUBLISH_SUFFIX).build()) + .subscriberable(true) + .build()) + .collect(Collectors.toList()); } - public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) { - return driver.listSubscriptions(null, INTERESTED_SUBSCRIPTIONS_QUERY, getDestination(tenantInfo)); + @Deprecated + private String getControlTopicSubscriptionName(TenantInfo tenantInfo) { + return NOTIFICATION_PREFIX + "control-topic-" + tenantInfo.getDataPartitionId(); } private OqmDestination getDestination(TenantInfo tenantInfo) { - return OqmDestination.builder().partitionId(tenantInfo.getDataPartitionId()).build(); + return OqmDestination.builder() + .partitionId(tenantInfo.getDataPartitionId()) + .build(); } - -} +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..b624f900c532eb150bd444a2cdb51a3607d03531 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionHandler.java @@ -0,0 +1,132 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; +import org.opengroup.osdu.core.common.notification.ISubscriptionService; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.service.DpsHeadersProvider; +import org.opengroup.osdu.notification.provider.gcp.service.SubscriptionServiceGc; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * GC implementation of {@link org.opengroup.osdu.notification.service.SubscriptionHandler}. + * The current implementation differs in non-request spring scope and handling OQM message instead of HTTP requests. + * Also, this implementation supports additional topics and subscription requests to Register service. + */ +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(name = "oqmDriver") +@Slf4j +public class OqmSubscriptionHandler { + + private final ISubscriptionFactory registerClientFactory; + private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + private final DpsHeadersProvider dpsHeadersProvider; + private final SubscriptionServiceGc subscriptionServiceGc; + + public List<OqmTopic> getTopics(String dataPartitionId) { + DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); + ISubscriptionService registerClient = registerClientFactory.create(headers); + + try { + return registerClient.getTopics() + .stream() + .map(topic -> OqmTopic.builder() + .name(topic.getName()) + .build()) + .collect(Collectors.toList()); + } catch (SubscriptionException se) { + log.error("Failed to get topics from register for partition id: {}. Response: {}", dataPartitionId, se.getResponse().getBody()); + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Failed to get topics from register", se.getResponse().toString(), se); + } + } + + public List<Subscription> getAllSubscriptionInfos(String dataPartitionId) { + try { + return subscriptionServiceGc.getAll(dataPartitionId); + } catch (SubscriptionException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Failed to get all subscription from register", + "Received subscription exception: ", e); + } + } + + public Subscription getSubscription(String dataPartitionId, String subscriptionId, String serviceTopic) { + List<Subscription> cachedInfos = getCachedSubscriptionInfos(dataPartitionId); + List<Subscription> filteredCachedInfos = filterSubscriptionInfos(cachedInfos, subscriptionId, serviceTopic); + if (filteredCachedInfos.isEmpty()) { + return sendGetSubscriptionRequest(dataPartitionId, subscriptionId, serviceTopic, cachedInfos); + } else { + log.debug("Register client cache | `{}` subscriptions info found. The first was taken.", filteredCachedInfos.size()); + return filteredCachedInfos.get(0); + } + } + + private Subscription sendGetSubscriptionRequest(String dataPartitionId, String subscriptionId, String serviceTopic, List<Subscription> cachedSubscriptionInfos) { + try { + DpsHeaders headers = dpsHeadersProvider.getDpsHeaders(dataPartitionId); + ISubscriptionService registerClient = registerClientFactory.create(headers); + + List<Subscription> freshInfos = registerClient.query(subscriptionId); + if (freshInfos.isEmpty()) { + log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); + return null; + } + + List<Subscription> filteredFreshInfos = filterSubscriptionInfos(freshInfos, subscriptionId, serviceTopic); + if (filteredFreshInfos.isEmpty()) { + log.warn("Subscription info with sub ID: `{}` not found", subscriptionId); + return null; + } + cachedSubscriptionInfos.addAll(filteredFreshInfos); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(cachedSubscriptionInfos).build()); + + log.debug("Register client | `{}` subscriptions info found. The first was taken.", filteredFreshInfos.size()); + return filteredFreshInfos.get(0); + } catch (SubscriptionException se) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "Unexpected error while sending request", se); + } + } + + private List<Subscription> getCachedSubscriptionInfos(String dataPartitionId) { + return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) + .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) + .getSubscriptions(); + } + + private static List<Subscription> filterSubscriptionInfos(List<Subscription> infos, String subscriptionId, String serviceTopic) { + return infos.stream() + .filter(info -> serviceTopic.equals(info.getTopic()) && subscriptionId.equals(info.getNotificationId())) + .collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionProvider.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..e3a59167cf31d7b36914c0e79ce1df6f3efe5c0d --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriptionProvider.java @@ -0,0 +1,88 @@ +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; + +import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberManager.*; + +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(name = "oqmDriver") +@Slf4j +@Deprecated +public class OqmSubscriptionProvider { + + private final ITenantFactory tenantInfoFactory; + private final OqmDriver driver; + private final OqmConfigurationProperties properties; + private final OqmSubscriptionHandler subscriptionHandler; + + + @Deprecated + public void createSubscriptions(TenantInfo tenantInfo) { + List<OqmTopic> topics = subscriptionHandler.getTopics(tenantInfo.getDataPartitionId()); + topics.forEach(oqmTopic -> getOrCrateSubscription(NOTIFICATION_PREFIX + oqmTopic.getName() + SERVICE_SUFFIX, + oqmTopic.getName(), tenantInfo)); + topics.forEach(oqmTopic -> getOrCrateSubscription(NOTIFICATION_PREFIX + oqmTopic.getName() + PUBLISH_SUFFIX, + oqmTopic.getName() + PUBLISH_SUFFIX, tenantInfo)); + } + + @NotNull + @Deprecated + public OqmSubscription getOrCrateSubscription(String subscriptionName, String topicName, TenantInfo tenantInfo) { + OqmSubscription subscription = driver.getSubscription(subscriptionName, getDestination(tenantInfo)).orElse(null); + + if (subscription == null) { + OqmSubscription subscriptionRequest = OqmSubscription.builder() + .name(subscriptionName) + .topics(Collections.singletonList(getOrCreateTopic(topicName, tenantInfo))) + .build(); + subscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenantInfo)); + log.debug("Subscription with name '{}' CREATED.", subscriptionName); + } + return subscription; + } + + @NotNull + @Deprecated + public OqmTopic getOrCreateTopic(String topicName, TenantInfo tenantInfo) { + OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null); + if (topic == null) { + topic = driver.createAndGetTopic(topicName, getDestination(tenantInfo)); + log.debug("OQM: '{}' control topic CREATED", topicName); + } + return topic; + } + + @Deprecated + public void deleteControlTopicsSubscription(String controlTopicSubscriptionName) { + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + log.debug("OqmSubscriptionProvider on pre-destroy for tenant {}:", tenantInfo.getDataPartitionId()); + OqmSubscription controlTopicSubscription = driver.getSubscription(controlTopicSubscriptionName, getDestination(tenantInfo)).orElse(null); + if (controlTopicSubscription != null) { + driver.deleteSubscription(controlTopicSubscriptionName, getDestination(tenantInfo)); + log.debug("Deleted '{}' subscriber control topic subscription with name '{}'.", + properties.getRegisterSubscriberControlTopicName(), controlTopicSubscriptionName); + } + } + } + + private OqmDestination getDestination(TenantInfo tenantInfo) { + return OqmDestination.builder() + .partitionId(tenantInfo.getDataPartitionId()) + .build(); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubHandshakeHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubHandshakeHandler.java index 58c1464cd42f94dced08a79ee81cb3da2d8dd532..9779ae3301b5730180f82579e34e9379f6442498 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubHandshakeHandler.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubHandshakeHandler.java @@ -21,6 +21,9 @@ import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandl import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +/** + * Not used in GC. Created to support core module interface. + */ @Component @Lazy public class PubsubHandshakeHandler implements IPubsubHandshakeHandler { diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubRequestBodyExtractor.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubRequestBodyExtractor.java index 485edb527e5bd9c2c924e85cb50634b9c45fb0f8..2a4ad0f2c1818d364a05408bfa0eb4b4f2fe6ae9 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubRequestBodyExtractor.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/PubsubRequestBodyExtractor.java @@ -40,6 +40,9 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +/** + * Not used in GC. Created to support core module interface. + */ @Component @RequestScope public class PubsubRequestBodyExtractor implements IPubsubRequestBodyExtractor { diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java deleted file mode 100644 index 28e65f7beb280db55180d449e7685ba7bd7cd314..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package org.opengroup.osdu.notification.provider.gcp.pubsub.di; - -import lombok.extern.slf4j.Slf4j; -import org.opengroup.osdu.core.common.cryptographic.SignatureService; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Primary; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; - -@Component -@Scope(SCOPE_SINGLETON) -@Primary -@Slf4j -@ConditionalOnProperty(name = "oqmDriver") -public class OqmSignatureService extends SignatureService { - @PostConstruct - void postConstruct() { - log.info("OqmSignatureService bean constructed."); - } -} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java deleted file mode 100644 index 31d8353a241e2c0b1186edf75a698bd89b56d1ae..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2017-2020, Schlumberger - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.notification.provider.gcp.pubsub.di; - -import org.opengroup.osdu.core.common.cache.ICache; -import org.opengroup.osdu.core.common.cache.MultiTenantCache; -import org.opengroup.osdu.core.common.cache.VmCache; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.tenant.TenantInfo; -import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import java.util.Map; - -@Component -@ConditionalOnProperty(name = "oqmDriver") -public class OqmSubscriptionCacheFactory { - @Autowired - private ITenantFactory tenantFactory; - - private MultiTenantCache<String> caches; - - public OqmSubscriptionCacheFactory(@Value("${app.expireTime}") int expireTime, @Value("${app.maxCacheSize}") int maxCacheSize) { - this.caches = new MultiTenantCache<>(new VmCache<>(expireTime, maxCacheSize)); - } - - public void put(String key, String val, Map<String, String> headerAttributes) { - this.partitionCache(headerAttributes).put(key, val); - } - - public String get(String key, Map<String, String> headerAttributes) { - return this.partitionCache(headerAttributes).get(key); - } - - public void delete(String key, Map<String, String> headerAttributes) { - this.partitionCache(headerAttributes).delete(key); - } - - public void clearAll(Map<String, String> headerAttributes) { - this.partitionCache(headerAttributes).clearAll(); - } - - private ICache<String, String> partitionCache(Map<String, String> headerAttributes) { - String tenantId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); - TenantInfo tenantInfo = this.tenantFactory.getTenantInfo(tenantId); - if (tenantInfo == null) { - throw AppException.createUnauthorized(String.format("could not retrieve tenant info for data partition id: %s", tenantId)); - } - return this.caches.get(String.format("%s:subscription", tenantInfo.getDataPartitionId())); - } - -} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java deleted file mode 100644 index d326ced415cf96015a42a14bb05f63353e7d9a6d..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2017-2020, Schlumberger - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.notification.provider.gcp.pubsub.di; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; -import com.google.gson.Gson; -import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.notification.Subscription; -import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; -import org.opengroup.osdu.core.common.notification.ISubscriptionService; -import org.opengroup.osdu.core.common.notification.SubscriptionException; -import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Component -@ConditionalOnProperty(name = "oqmDriver") -public class OqmSubscriptionHandler { - @Autowired - private ISubscriptionFactory subscriptionFactory; - @Autowired - private OqmSubscriptionCacheFactory subscriptionCacheFactory; - @Autowired - private JaxRsDpsLog log; - - @Autowired - private IServiceAccountJwtClient serviceAccountJwtClient; - - private static final Gson gson = new Gson(); - private ObjectMapper objectMapper; - - public Subscription getSubscriptionFromCache(String notificationId, Map<String, String> headerAttributes) { - String subscriptionString = subscriptionCacheFactory.get(notificationId, headerAttributes); - try { - if (Strings.isNullOrEmpty(subscriptionString)) - subscriptionString = querySubscriptionAndUpdateCache(notificationId, headerAttributes); - ObjectMapper objectMapper = this.getObjectMapper(); - return objectMapper.readValue(subscriptionString, Subscription.class); - } catch (IOException e) { - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error Parsing subscription String to object", "Unexpected error in pushing message", e); - } catch (SubscriptionException se) { - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error query subscription from registration", "Unexpected error in pushing message", se); - } - } - - private String querySubscriptionAndUpdateCache(String notificationId, Map<String, String> headerAttributes) throws AppException, SubscriptionException { - DpsHeaders headers = getDpsHeaders(headerAttributes); - ISubscriptionService service = subscriptionFactory.create(headers); - - List<Subscription> subscriptionList = service.query(notificationId); - if (subscriptionList == null || subscriptionList.size() == 0) { - throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found"); - } - - Subscription subscription = subscriptionList.get(0); - String jsonSubscription = gson.toJson(subscription); - this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription, headerAttributes); - - return jsonSubscription; - } - - private DpsHeaders getDpsHeaders(Map<String, String> headerAttributes) { - Map<String, String> attributes = new HashMap<>(headerAttributes); - - //extract headers from pubsub message - String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); - String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId); - attributes.put(DpsHeaders.AUTHORIZATION, authToken); - return DpsHeaders.createFromMap(attributes); - - } - - //unit test purpose - protected ObjectMapper getObjectMapper() { - if (this.objectMapper == null) { - this.objectMapper = new ObjectMapper(); - } - return this.objectMapper; - } - - //unit test purpose - void setObjectMapper(ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } -} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md deleted file mode 100644 index 609d9001fd89dd9649ea3d94084ab80428a3bda9..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md +++ /dev/null @@ -1,6 +0,0 @@ -The content of the package is 4 classes derived from the eponymous classes from the core part -(find them by names with no "Oqm"prefix). They were derived for one only reason - to free OqmSubscriberManager -from addiction to "request scope" which is not valid for the OQM work context. - -In the future, one may probably find a better way to achieve the same (and reuse original classes) - diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..50beeff333fb9a2d7ca4b31d7314df9f0e255cd1 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmControlTopicReceiver.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriptionHandler; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Slf4j +public class OqmControlTopicReceiver implements OqmMessageReceiver { + + private static final String SUBSCRIPTION_CREATED = "Subscription Created"; + private static final String SUBSCRIPTION_UPDATED = "Subscription Updated"; + private static final String SUBSCRIPTION_DELETED = "Subscription Deleted"; + + private final OqmSubscriptionHandler subscriptionHandler; + private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + + public OqmControlTopicReceiver(OqmSubscriptionHandler subscriptionHandler, + IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { + this.subscriptionHandler = subscriptionHandler; + this.subscriptionInfoCache = subscriptionInfoCache; + } + + @Override + public void receiveMessage(OqmMessage message, OqmAckReplier replier) { + try { + handleMessage(message); + } catch (Exception e) { + log.error("OQM | Control topic | Message not acknowledged by client", e); + replier.nack(); + } finally { + ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); + } + + log.debug("OQM | Control topic | Message acknowledged by client"); + replier.ack(); + } + + private void handleMessage(OqmMessage oqmMessage) { + String pubsubMessage = oqmMessage.getData(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + String subscriptionId = headerAttributes.get("subscription-id"); + String serviceTopic = headerAttributes.get("topic"); + String dataPartitionId = headerAttributes.get("data-partition-id"); + log.debug("OQM | Control topic | Received message: `{}` for service topic: `{}` with Sub ID: `{}`", + pubsubMessage, serviceTopic, subscriptionId); + + List<Subscription> subscriptionInfos = Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) + .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) + .getSubscriptions(); + Optional<Subscription> cachedInfo = subscriptionInfos.stream() + .filter(info -> subscriptionId.equals(info.getNotificationId()) && serviceTopic.equals(info.getTopic())) + .findFirst(); + + switch (pubsubMessage) { + case SUBSCRIPTION_CREATED: + case SUBSCRIPTION_UPDATED: + cachedInfo.ifPresent(subscriptionInfos::remove); + Subscription freshSubscriptionInfo = subscriptionHandler.getSubscription(dataPartitionId, subscriptionId, serviceTopic); + if (freshSubscriptionInfo != null) { + subscriptionInfos.add(freshSubscriptionInfo); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); + log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was added to cache.", subscriptionId, serviceTopic); + } + break; + case SUBSCRIPTION_DELETED: + if (cachedInfo.isPresent()) { + subscriptionInfos.remove(cachedInfo.get()); + subscriptionInfoCache.put(dataPartitionId, ExternalSubscriptions.builder().subscriptions(subscriptionInfos).build()); + log.debug("OQM | Control topic | Subscription info with Sub ID: `{}` and topic: `{}` was removed from cache.", subscriptionId, serviceTopic); + } else { + log.warn("OQM | Control topic requested to delete subscription with id: `{}`, but is was not found in cache.", subscriptionId); + } + break; + default: + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Not supported", "OQM | Control topic | Received unsupported control topic operation: " + pubsubMessage); + } + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..3f784f5b6be477e0e2279223c1c55a5dfa35b487 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmPublishTopicReceiver.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.notification.provider.gcp.pubsub.OqmNotificationHandler; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; + +import java.util.Map; + +@Slf4j +public class OqmPublishTopicReceiver implements OqmMessageReceiver { + + private final OqmSubscription subscription; + private final OqmNotificationHandler notificationHandler; + + public OqmPublishTopicReceiver(OqmSubscription subscription, OqmNotificationHandler notificationHandler) { + this.subscription = subscription; + this.notificationHandler = notificationHandler; + } + + @Override + public void receiveMessage(OqmMessage message, OqmAckReplier replier) { + String publishTopic = subscription.getTopics().get(0).getName(); + log.debug("OQM | `{}` topic | Received message id: `{}`. Attributes: `{}`.", publishTopic, message.getId(), message.getAttributes()); + + try { + handleMessage(message); + } catch (Exception e) { + log.error("OQM | `{}` topic | Message not acknowledged by client", publishTopic, e); + replier.nack(); + } finally { + ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); + } + + log.debug("OQM | `{}` topic | Message acknowledged by client", publishTopic); + replier.ack(); + } + + private void handleMessage(OqmMessage oqmMessage) throws Exception { + String pubsubMessage = oqmMessage.getData(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + String subscriptionId = headerAttributes.get("subscription-id"); + + HttpResponse response = notificationHandler.notifySubscriber(subscriptionId, pubsubMessage, headerAttributes); + + if (!response.isSuccessCode()) { + throw new AppException(response.getResponseCode(), "Notification failed", response.getBody()); + } + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..bd551425fe3365d445ad83ab5e111912749f9799 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/receiver/OqmServiceTopicReceiver.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.thread.ThreadScopeContextHolder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.opengroup.osdu.notification.provider.gcp.pubsub.OqmSubscriberManager.*; + +@Slf4j +public class OqmServiceTopicReceiver implements OqmMessageReceiver { + + private final OqmSubscription subscription; + private final OqmDriver driver; + private final IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + + public OqmServiceTopicReceiver(OqmSubscription subscription, + OqmDriver driver, + IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache) { + this.subscription = subscription; + this.driver = driver; + this.subscriptionInfoCache = subscriptionInfoCache; + } + + @Override + public void receiveMessage(OqmMessage message, OqmAckReplier replier) { + String serviceTopic = subscription.getTopics().get(0).getName(); + log.debug("OQM | `{}` topic | Received message id: `{}`. Attributes: `{}`.", serviceTopic, message.getId(), message.getAttributes()); + + try { + handleMessage(message, serviceTopic); + } catch (Exception e) { + log.error("OQM | `{}` topic | Message not acknowledged by client", serviceTopic, e); + replier.nack(); + } finally { + ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); + } + + log.debug("OQM | `{}` topic | Message acknowledged by client", serviceTopic); + replier.ack(); + } + + private void handleMessage(OqmMessage message, String serviceTopic) { + String dataPartitionId = message.getAttributes().get("data-partition-id"); + OqmTopic publishTopic = OqmTopic.builder() + .name(serviceTopic + PUBLISH_SUFFIX) + .build(); + + List<Subscription> cachedSubscriptionInfos = getCachedSubscriptionInfosByTopic(dataPartitionId, serviceTopic); + for (Subscription info : cachedSubscriptionInfos) { + Map<String, String> attributes = new HashMap<>(refineAttributes(message)); + attributes.put("subscription-id", info.getNotificationId()); + attributes.put("topic", serviceTopic); + OqmMessage enrichedMessage = OqmMessage.builder() + .id(message.getId()) + .data(message.getData()) + .attributes(attributes) + .build(); + driver.publish(enrichedMessage, publishTopic, OqmDestination.builder() + .partitionId(dataPartitionId) + .build()); + } + log.debug("OQM | `{}` topic | Published notifications for `{}` third-party subscribers for tenant: `{}`", + serviceTopic, cachedSubscriptionInfos.size(), dataPartitionId); + } + + private static Map<String, String> refineAttributes(OqmMessage message) { + Map<String, String> refinedAttributes = message.getAttributes().entrySet().stream() + .filter(entry -> !entry.getKey().startsWith("goog")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + log.debug("Message attributes for message id: `{}` were refined: `{}`", message.getId(), refinedAttributes); + return refinedAttributes; + } + + private List<Subscription> getCachedSubscriptionInfosByTopic(String dataPartitionId, String serviceTopic) { + return Optional.ofNullable(subscriptionInfoCache.get(dataPartitionId)) + .orElseThrow(() -> new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Server error", "OQM | Subscription info cache was not initialized")) + .getSubscriptions().stream() + .filter(info -> serviceTopic.equals(info.getTopic())) + .collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorGenerator.java similarity index 96% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorGenerator.java index 119dbc79b17b395ca7dbf6b4f20b224be0ce2686..2192620af432a28f0e2cbfb99fc2ed19a192c883 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorGenerator.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorGenerator.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.util; +package org.opengroup.osdu.notification.provider.gcp.security; import com.google.api.client.googleapis.auth.oauth2.GoogleIdTokenVerifier; import com.google.api.client.http.javanet.NetHttpTransport; diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorImpl.java similarity index 97% rename from provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java rename to provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorImpl.java index 05a7475a1c14b140e404cb5028df58b78f2df356..367e18ed2919cc6c9a912cc02fcf40b498eea086 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountValidatorImpl.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/security/GoogleServiceAccountValidatorImpl.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.provider.gcp.util; +package org.opengroup.osdu.notification.provider.gcp.security; import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken; import com.google.api.client.googleapis.auth.oauth2.GoogleIdTokenVerifier; diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/DpsHeadersProvider.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/DpsHeadersProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..39d230527e49fe9b53b00da0f6a46477c368a7db --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/DpsHeadersProvider.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.service; + +import lombok.RequiredArgsConstructor; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * DpsHeadersProvider extracts headers from pubsub message + * to support DpsHeaders usage without RequestScope. + */ +@Component +@RequiredArgsConstructor +public class DpsHeadersProvider { + + private final IServiceAccountJwtClient serviceAccountJwtClient; + + public DpsHeaders getDpsHeaders(String dataPartitionId) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + return getDpsHeaders(attributes); + } + + public DpsHeaders getDpsHeaders(Map<String, String> headerAttributes) { + Map<String, String> attributes = new HashMap<>(headerAttributes); + + String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); + String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId); + attributes.put(DpsHeaders.AUTHORIZATION, authToken); + return DpsHeaders.createFromMap(attributes); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java new file mode 100644 index 0000000000000000000000000000000000000000..ae1914887178455e7aa0bf7f078bba1b006fb8d9 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/service/SubscriptionServiceGc.java @@ -0,0 +1,56 @@ +package org.opengroup.osdu.notification.provider.gcp.service; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.opengroup.osdu.core.common.http.HttpRequest; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.http.IHttpClient; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.io.IOException; +import java.util.List; + +import org.opengroup.osdu.core.common.notification.SubscriptionAPIConfig; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.common.util.UrlNormalizationUtil; +import org.springframework.stereotype.Component; + +/** + * GC extension of {@link org.opengroup.osdu.core.common.notification.SubscriptionService}. + * Remove the current class after moving code to the original service. + */ +@Component +@RequiredArgsConstructor +public class SubscriptionServiceGc { + + private final SubscriptionAPIConfig config; + private final IHttpClient httpClient; + private final DpsHeadersProvider dpsHeadersProvider; + + public List<Subscription> getAll(String dataPartitionId) throws SubscriptionException { + String url = UrlNormalizationUtil.normalizeStringUrl(config.getRootUrl(), "/subscriptions"); + DpsHeaders dpsHeaders = dpsHeadersProvider.getDpsHeaders(dataPartitionId); + dpsHeaders.put("AppKey", config.getApiKey()); + HttpRequest request = HttpRequest.get() + .url(url) + .headers(dpsHeaders.getHeaders()) + .build(); + HttpResponse response = httpClient.send(request); + return getListOfSubscriptions(response); + } + + private List<Subscription> getListOfSubscriptions(HttpResponse response) throws SubscriptionException { + if (response.isSuccessCode()) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(response.getBody(), new TypeReference<List<Subscription>>(){}); + } catch (IOException ex) { + throw new SubscriptionException("Exception in deserializing response", response, ex); + } + } else { + throw new SubscriptionException("Error making request to Register service. Check the HttpResponse: " + response, response); + } + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScope.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScope.java new file mode 100644 index 0000000000000000000000000000000000000000..1cf383e679cdc2105b7be1b927bf0a34a41d88eb --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScope.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.thread; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectFactory; +import org.springframework.beans.factory.config.Scope; + +@Slf4j +public class ThreadScope implements Scope { + + public Object get(String name, ObjectFactory<?> factory) { + log.trace("Get bean:{} with factory: {} current Thread: {}", name, factory, + Thread.currentThread().getName()); + Object result = null; + Map<String, Object> hBeans = ThreadScopeContextHolder.currentThreadScopeAttributes() + .getBeanMap(); + if (!hBeans.containsKey(name)) { + result = factory.getObject(); + log.trace( + "No bean in context with name: {} factory provisioning result is: {} current Thread: {}", + name, result, Thread.currentThread().getName()); + hBeans.put(name, result); + } else { + result = hBeans.get(name); + } + + return result; + } + + public Object remove(String name) { + log.trace("Removing bean : {} current Thread: {}", name, Thread.currentThread().getName()); + Object result = null; + Map<String, Object> hBeans = ThreadScopeContextHolder.currentThreadScopeAttributes() + .getBeanMap(); + if (hBeans.containsKey(name)) { + result = hBeans.get(name); + hBeans.remove(name); + } + + return result; + } + + public void registerDestructionCallback(String name, Runnable callback) { + ThreadScopeContextHolder.currentThreadScopeAttributes().registerRequestDestructionCallback(name, callback); + } + + public Object resolveContextualObject(String key) { + return null; + } + + public String getConversationId() { + return Thread.currentThread().getName(); + } +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeAttributes.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeAttributes.java new file mode 100644 index 0000000000000000000000000000000000000000..aa163236f5ad77004e0f0c7122179031ec9f84a9 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeAttributes.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.thread; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.lang.NonNull; + +@Slf4j +@RequiredArgsConstructor +public class ThreadScopeAttributes { + + protected final Map<String, Object> hBeans = new HashMap<>(); + protected final Map<String, Runnable> hRequestDestructionCallbacks = new LinkedHashMap<>(); + + protected final Map<String, Object> getBeanMap() { + return this.hBeans; + } + + protected final void registerRequestDestructionCallback(@NonNull String name, @NonNull Runnable callback) { + log.trace("Registering callback for: {} on runnable: {}", name, callback); + this.hRequestDestructionCallbacks.put(name, callback); + } + + public final void clear() { + this.processDestructionCallbacks(); + this.hBeans.clear(); + } + + private void processDestructionCallbacks() { + for (Map.Entry<String, Runnable> mapEntry : this.hRequestDestructionCallbacks.entrySet()) { + Runnable callback = mapEntry.getValue(); + log.trace( + "Performing destruction callback for: {} on thread: {}", + mapEntry.getKey(), + Thread.currentThread().getName()); + callback.run(); + } + this.hRequestDestructionCallbacks.clear(); + } +} diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeContextHolder.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeContextHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..038a0b8e4423544ea637f0cecb1eac8073fbb895 --- /dev/null +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/thread/ThreadScopeContextHolder.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.thread; + +public class ThreadScopeContextHolder { + + private static final ThreadLocal<ThreadScopeAttributes> threadScopeAttributesHolder = + new InheritableThreadLocal<ThreadScopeAttributes>() { + @Override + protected ThreadScopeAttributes initialValue() { + return new ThreadScopeAttributes(); + } + }; + + private ThreadScopeContextHolder() { + } + + public static ThreadScopeAttributes getThreadScopeAttributes() { + return threadScopeAttributesHolder.get(); + } + + public static void setThreadScopeAttributes(ThreadScopeAttributes accessor) { + threadScopeAttributesHolder.set(accessor); + } + + public static ThreadScopeAttributes currentThreadScopeAttributes() throws IllegalStateException { + ThreadScopeAttributes accessor = threadScopeAttributesHolder.get(); + if (accessor == null) { + throw new IllegalStateException("No thread scoped attributes."); + } else { + return accessor; + } + } +} + diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java index ae30169a57df28d6b5c11b5ce1df668b237ee6f1..ae953d10bc5f93371916b5b269b32b01a4386b7a 100644 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java +++ b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java @@ -16,20 +16,24 @@ package org.opengroup.osdu.notification.provider.gcp.util; +import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.http.impl.client.CloseableHttpClient; import org.opengroup.osdu.core.gcp.googleidtoken.IGoogleIdTokenFactory; import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +/** + * GSA token provider. Provides ID token for 'authorization' request header. + * Used in HTTP notification request for GSA secret type. + * Wraps core-lib-gcp GC ID token provider. + */ @Component +@RequiredArgsConstructor public class GoogleServiceAccountImpl implements IGoogleServiceAccount { - @Autowired - private IGoogleIdTokenFactory googleIdTokenFactory; - @Autowired - private CloseableHttpClient closeableHttpClient; + private final IGoogleIdTokenFactory googleIdTokenFactory; + private final CloseableHttpClient closeableHttpClient; @SneakyThrows @Override diff --git a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/JwtValidity.java b/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/JwtValidity.java deleted file mode 100644 index 3890e3ef735076b280c76f93547d1a920f663363..0000000000000000000000000000000000000000 --- a/provider/notification-gc/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/JwtValidity.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - Copyright 2020-2023 Google LLC - Copyright 2020-2023 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package org.opengroup.osdu.notification.provider.gcp.util; - -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -public class JwtValidity { - - String token; - long expiryTime; - - JwtValidity(String jwt, long expiryTime) { - this.token = jwt; - this.expiryTime = expiryTime; - } -} diff --git a/provider/notification-gc/src/main/resources/application-anthos.properties b/provider/notification-gc/src/main/resources/application-anthos.properties index 4f5988ee5a35a8064ad5aa59f505ac888980c607..b2d59b6e4e08eb53357a7ad4dcba859455274631 100644 --- a/provider/notification-gc/src/main/resources/application-anthos.properties +++ b/provider/notification-gc/src/main/resources/application-anthos.properties @@ -20,4 +20,8 @@ partition-auth-enabled=false oqmDriver=rabbitmq openid.provider-url=${OPENID_PROVIDER_URL} openid.provider-client-id=${OPENID_PROVIDER_CLIENT_ID} -openid.provider-client-secret=${OPENID_PROVIDER_CLIENT_SECRET} \ No newline at end of file +openid.provider-client-secret=${OPENID_PROVIDER_CLIENT_SECRET} + +# Rabbit retry policy config +rabbitmq-retry-limit=3 +rabbitmq-retry-delay=1 \ No newline at end of file diff --git a/provider/notification-gc/src/main/resources/application-local.properties b/provider/notification-gc/src/main/resources/application-local.properties index 0aba93ff487d48d0c057d464b39a3c4437accc60..b560463b0c38293c8b6803d3d50c46e18a70c3b2 100644 --- a/provider/notification-gc/src/main/resources/application-local.properties +++ b/provider/notification-gc/src/main/resources/application-local.properties @@ -1,26 +1,34 @@ # -# Copyright 2017-2020, Schlumberger +# Copyright 2020-2023 Google LLC +# Copyright 2020-2023 EPAM Systems, Inc # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # server.port=8080 logging.level.org.springframework.web=${LOG_LEVEL:DEBUG} -app.entitlements=https://community.gcp.gnrg-osdu.projects.epam.com/entitlements/v2 +app.entitlements=https://community.gcp.gnrg-osdu.projects.epam.com/api/entitlements/v2 app.register=https://community.gcp.gnrg-osdu.projects.epam.com/api/register/v1 +PARTITION_API=https://community.gcp.gnrg-osdu.projects.epam.com/api/partition/v1 app.project=nice-etching-277309 service.token.provider=GCP partition-auth-enabled=true oqmDriver=pubsub + +# Redis config +redis-host=${REDIS_USER_INFO_HOST:127.0.0.1} +redis-port=${REDIS_USER_INFO_PORT:6379} +redis-password=${REDIS_USER_INFO_PASSWORD:} +redis-with-ssl=${REDIS_USER_INFO_WITH_SSL:false} diff --git a/provider/notification-gc/src/main/resources/application.properties b/provider/notification-gc/src/main/resources/application.properties index 44979e699f41320200cf5db516d5fe5e38a5cf45..0b2d7993f8df49644a0d0d8641f837dea729c739 100644 --- a/provider/notification-gc/src/main/resources/application.properties +++ b/provider/notification-gc/src/main/resources/application.properties @@ -41,3 +41,10 @@ partition.api=${PARTITION_API} service.token.provider=GCP partition-auth-enabled=true oqmDriver=pubsub + +# Redis config +redis-host=${REDIS_USER_INFO_HOST:127.0.0.1} +redis-port=${REDIS_USER_INFO_PORT:6379} +redis-password=${REDIS_USER_INFO_PASSWORD:} +redis-with-ssl=${REDIS_USER_INFO_WITH_SSL:false} +cache.codec=jackson diff --git a/provider/notification-gc/src/main/resources/logback.xml b/provider/notification-gc/src/main/resources/logback.xml index 50c1b2b9477e7c44a1d2b4736e76e354651a315f..ea18f9dbe3c5a632869659c9571efba0a013fa5e 100644 --- a/provider/notification-gc/src/main/resources/logback.xml +++ b/provider/notification-gc/src/main/resources/logback.xml @@ -6,7 +6,7 @@ <springProfile name="local"> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>%yellow([%thread]) %highlight(| %-5level |) %green(%d) %cyan(| %logger{15} |) %highlight(%msg) %n</pattern> + <pattern>%yellow([%thread]) %highlight(| %-5level |) %green(%date{HH:mm:ss.SSS}) %cyan(| %logger{0} |) %highlight(%msg) %n</pattern> <charset>utf8</charset> </encoder> </appender> diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6399f67175b8efeba721ee2fd2c74e21a9cc53fa --- /dev/null +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmControlTopicReceiverTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmControlTopicReceiver; + +import java.util.HashMap; +import java.util.List; +import java.util.stream.Stream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class OqmControlTopicReceiverTest { + + @InjectMocks + private OqmControlTopicReceiver sut; + @Mock + private OqmAckReplier replier; + @Mock + private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + @Mock + private OqmSubscriptionHandler subscriptionHandler; + @Mock + private Subscription subscription; + @Mock + private ExternalSubscriptions externalSubscriptions; + @Mock + private List<Subscription> subscriptionInfos; + + @Test + public void testReceiveMessageSubscriptionCreated() { + when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); + when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); + when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); + sut.receiveMessage(createMessage, replier); + verify(subscriptionInfos, times(1)).add(subscription); + verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(replier, times(1)).ack(); + } + + @Test + public void testReceiveMessageSubscriptionUpdated() { + when(subscriptionHandler.getSubscription(any(), any(), any())).thenReturn(subscription); + when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); + when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + OqmMessage updatedMessage = OqmMessage.builder().data("Subscription Updated").attributes(new HashMap<>()).build(); + sut.receiveMessage(updatedMessage, replier); + verify(subscriptionInfos, times(1)).add(subscription); + verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(replier, times(1)).ack(); + } + + @Test + public void testReceiveMessageSubscriptionDeleted() { + when(subscriptionInfoCache.get(null)).thenReturn(externalSubscriptions); + when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + when(subscriptionInfos.stream()).thenReturn(Stream.of(subscription)); + when(subscription.getNotificationId()).thenReturn("4"); + when(subscription.getTopic()).thenReturn("topic1"); + OqmMessage deletedMessage = OqmMessage.builder() + .data("Subscription Deleted") + .attributes(new HashMap<String, String>() {{ + put("subscription-id", "4"); + put("topic", "topic1"); + }}) + .build(); + sut.receiveMessage(deletedMessage, replier); + verify(subscriptionInfos, times(1)).remove(subscription); + verify(subscriptionInfoCache, times(1)).put(any(), any()); + verify(replier, times(1)).ack(); + } + + @Test + public void testReceiveMessageUnsupported() { + OqmMessage unsupportedMessage = OqmMessage.builder().data("Unsupported").attributes(new HashMap<>()).build(); + sut.receiveMessage(unsupportedMessage, replier); + verify(replier, times(1)).nack(); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dc1a08318c45c9ad366073fa9f1b9088dd6f0dcf --- /dev/null +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmPublishTopicReceiverTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmPublishTopicReceiver; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class OqmPublishTopicReceiverTest { + + @InjectMocks + private OqmPublishTopicReceiver sut; + @Mock + private OqmAckReplier replier; + @Mock + private OqmNotificationHandler notificationHandler; + @Mock + private HttpResponse httpResponse; + @Mock + private OqmSubscription subscription; + private final OqmTopic topic = OqmTopic.builder().name("topic1").build(); + private final List<OqmTopic> topics = Collections.singletonList(topic); + + + @Test + public void testReceiveMessageNotificationSuccess() throws Exception { + when(notificationHandler.notifySubscriber(any(), any(), anyMap())).thenReturn(httpResponse); + when(httpResponse.isSuccessCode()).thenReturn(true); + when(subscription.getTopics()).thenReturn(topics); + OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); + sut.receiveMessage(createMessage, replier); + verify(replier, times(1)).ack(); + } + + @Test + public void testReceiveMessageNotificationFailed() throws Exception { + when(notificationHandler.notifySubscriber(any(), any(), anyMap())).thenReturn(httpResponse); + when(httpResponse.isSuccessCode()).thenReturn(false); + when(subscription.getTopics()).thenReturn(topics); + OqmMessage createMessage = OqmMessage.builder().data("Subscription Created").attributes(new HashMap<>()).build(); + sut.receiveMessage(createMessage, replier); + verify(replier, times(1)).nack(); + } + + @Test + public void testReceiveMessageException() throws Exception { + when(notificationHandler.notifySubscriber(any(), any(), anyMap())).thenThrow(new Exception()); + when(subscription.getTopics()).thenReturn(topics); + OqmMessage unsupportedMessage = OqmMessage.builder().data("Unsupported").attributes(new HashMap<>()).build(); + sut.receiveMessage(unsupportedMessage, replier); + verify(replier, times(1)).nack(); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..eb0d6868119e22f6a50440456997e37c1b8aaadf --- /dev/null +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmServiceTopicReceiverTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; +import org.opengroup.osdu.notification.provider.gcp.pubsub.receiver.OqmServiceTopicReceiver; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class OqmServiceTopicReceiverTest { + + @InjectMocks + private OqmServiceTopicReceiver sut; + @Mock + private OqmAckReplier replier; + @Mock + private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; + @Mock + private OqmDriver driver; + @Mock + private OqmSubscription oqmSubscription; + @Mock + private ExternalSubscriptions externalSubscriptions; + private List<Subscription> subscriptionInfos; + private OqmTopic topic; + + @Before + public void setUp() { + topic = OqmTopic.builder().name("topic1").build(); + Subscription subscription1 = new Subscription(); + subscription1.setTopic(topic.getName()); + Subscription subscription2 = new Subscription(); + subscription2.setTopic(topic.getName()); + Subscription subscription3 = new Subscription(); + subscription3.setTopic(topic.getName()); + subscriptionInfos = Arrays.asList(subscription1, subscription2, subscription3); + } + + @Test + public void testReceiveMessageSubscriptionCreated() { + when(oqmSubscription.getTopics()).thenReturn(Collections.singletonList(topic)); + when(subscriptionInfoCache.get("tenant1")).thenReturn(externalSubscriptions); + when(externalSubscriptions.getSubscriptions()).thenReturn(subscriptionInfos); + OqmMessage message = OqmMessage.builder().data("Message").attributes(new HashMap<String, String>() {{ + put("data-partition-id", "tenant1"); + }}).build(); + sut.receiveMessage(message, replier); + verify(driver, times(3)).publish(any(OqmMessage.class), any(), any()); + verify(replier, times(1)).ack(); + } +} \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java index 220c7d40b16f700bf5b9744442b424cd76e9ea1a..d5b386fd6fb602fe82930f27028821c857e4b6fd 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManagerTest.java @@ -1,207 +1,121 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.notification.provider.gcp.pubsub; -import org.jetbrains.annotations.NotNull; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; +import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.cache.IRedisCache; +import org.opengroup.osdu.core.common.model.notification.Subscription; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; -import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; -import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.notification.provider.gcp.config.OqmConfigurationProperties; -import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; +import org.opengroup.osdu.notification.provider.gcp.model.ExternalSubscriptions; -import java.util.*; +import java.util.Arrays; +import java.util.List; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.*; +import static org.mockito.Mockito.*; -@RunWith(PowerMockRunner.class) -@PrepareForTest(OqmSubscriberManager.class) +@RunWith(MockitoJUnitRunner.class) public class OqmSubscriberManagerTest { - public static final String TENANT1 = "tenant1"; - public static final String TENANT2 = "tenant2"; - public static final String TENANT3 = "tenant3"; - + @InjectMocks + private OqmSubscriberManager sut; + @Spy + private OqmConfigurationProperties properties; @Mock - ITenantFactory tenantInfoFactory; + private ITenantFactory tenantFactory; @Mock - OqmDriver driver; - + private OqmDriver driver; @Mock - OqmNotificationHandler notificationHandler; - - OqmConfigurationProperties properties = new OqmConfigurationProperties(); - - OqmSubscriberManager manager; + private OqmSubscriptionProvider subscriptionProvider; + @Mock + private OqmSubscriptionHandler subscriptionHandler; + @Mock + private IRedisCache<String, ExternalSubscriptions> subscriptionInfoCache; - private final Long constructDate = System.currentTimeMillis(); - private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + private List<TenantInfo> tenantInfos; + private List<OqmTopic> topics; + private List<Subscription> subscriptions; @Before - public void before() throws Exception { - Collection<TenantInfo> tenants = getMockedTenantInfos(); - when(tenantInfoFactory, "listTenantInfo").thenReturn(tenants); - + public void setUp() { + tenantInfos = createTenants(); + topics = createTopics(); + subscriptions = createSubscriptions(); } - @Test - public void whenProvisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { - manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); - - //Mock 4 subscriberable subscriptions somehow distributed between mock tenants - doAnswer(question -> { - TenantInfo tenant = question.getArgument(0); - String tenantId = tenant.getDataPartitionId(); - switch (tenantId) { - case TENANT1: - return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); - case TENANT2: - return Arrays.asList( - OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build(), - OqmSubscription.builder().name("de-" + tenantId + "-subscription-B").build() - ); - case TENANT3: - return Collections.singletonList(OqmSubscription.builder().name("de-" + tenantId + "-subscription-A").build()); - default: - return Collections.emptyList(); - } - }).when(manager).getSubscriberableSubscriptions(any(TenantInfo.class)); - - PowerMockito.doNothing().when(manager, "registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); - - //Run being tested method. - manager.provisionSubscribersOnAllInterestedTopicsSubscriptionsForAllTenantsBrokers(); - - //verify 3 times getSubscriberableSubscriptions method was called - verifyPrivate(manager, times(3)).invoke("getSubscriberableSubscriptions", any(TenantInfo.class)); + private List<TenantInfo> createTenants() { + TenantInfo tenant1 = new TenantInfo(); + tenant1.setId(1L); + tenant1.setDataPartitionId("tenant1"); + TenantInfo tenant2 = new TenantInfo(); + tenant2.setId(2L); + tenant2.setDataPartitionId("tenant2"); + TenantInfo tenant3 = new TenantInfo(); + tenant3.setId(3L); + tenant3.setDataPartitionId("tenant3"); + return Arrays.asList(tenant1, tenant2, tenant3); + } - //verify only 4 subscribers creation were requested and completed - verifyPrivate(manager, times(4)).invoke("registerSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + private List<OqmTopic> createTopics() { + OqmTopic topic1 = OqmTopic.builder().name("topic1").build(); + OqmTopic topic2 = OqmTopic.builder().name("topic2").build(); + OqmTopic topic3 = OqmTopic.builder().name("topic3").build(); + OqmTopic topic4 = OqmTopic.builder().name("topic4").build(); + return Arrays.asList(topic1, topic2,topic3, topic4); + } + private List<Subscription> createSubscriptions() { + Subscription subscription1 = new Subscription(); + Subscription subscription2 = new Subscription(); + return Arrays.asList(subscription1, subscription2); } @Test - public void whenProvisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers_thenProperOperationsInvoked() throws Exception { - - //Mock only tenant 2 broker has control topic created so far. Others are not yet. - when(driver.getTopic(ArgumentMatchers.eq(properties.getRegisterSubscriberControlTopicName()), any(OqmDestination.class))) - .thenAnswer((question) -> { - String topicName = question.getArgument(0); - OqmDestination destination = question.getArgument(1); - if (destination.getPartitionId().equals(TENANT2)) { - return Optional.of(OqmTopic.builder().name(topicName).build()); - } - - return Optional.empty(); - }); - - //Mock only tenant 2 broker has control topic subscription created so far. Others are not yet. - when(driver.getSubscription(any(String.class), any(OqmDestination.class))) - .thenAnswer((question) -> { - String subscriptionName = question.getArgument(0); - OqmDestination destination = question.getArgument(1); - if (destination.getPartitionId().equals(TENANT2)) { - return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); - } - - return Optional.empty(); - }); - - //Mock driver smoothly creates a subscription - when(driver.createAndGetSubscription(any(OqmSubscription.class), any(OqmDestination.class))) - .thenAnswer(question -> question.<OqmSubscription>getArgument(0)); - - manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); - //Mock subscriberControlTopicSubscriptionName - Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); - //Mock subscriber silent and successful creation - PowerMockito.doNothing().when(manager, "registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); - - //Run being tested method. - manager.provisionControlTopicsWithSubscriptionsAndSubscribersForAllTenantsBrokers(); - - //verify 3 times the driver was asked if the topic exists (for all tenants) - verify(driver, times(3)).getTopic(any(String.class), any(OqmDestination.class)); - - //verify 2 times the driver was asked to create and get topic (for tenants 1 and 3 - verify(driver, times(2)).createAndGetTopic(any(String.class), any(OqmDestination.class)); - - //verify only 1 time the driver was asked if the subscription exists (for TENANT2) - verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); - - //verify 2 times the driver was asked to create the subscription (for TENANT1, TENANT2) - verify(driver, times(1)).getSubscription(any(String.class), any(OqmDestination.class)); - - //verify only 2 subscribers creation were requested and completed - verifyPrivate(manager, times(2)).invoke("registerControlTopicSubscriber", any(TenantInfo.class), any(OqmSubscription.class)); + public void testProvisioningControlTopicSubscribers() { + when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); + sut.provisionControlTopicSubscribers(); + verify(driver, times(3)).subscribe(any(), any()); } @Test - public void whenUnprovisionControlTopicsSubscriptionsFromAllTenantsBrokers_thenProperOperationsInvoked() { - //Mock all 3 tenants' brokers have control topic subscriptions created so far. - when(driver.getSubscription(any(String.class), any(OqmDestination.class))) - .thenAnswer((question) -> { - String subscriptionName = question.getArgument(0); - OqmDestination destination = question.getArgument(1); - return Optional.of(OqmSubscription.builder().name(subscriptionName).build()); - }); - - //Mock driver smoothly deletes a subscription - PowerMockito.doNothing().when(driver).deleteSubscription(any(String.class), any(OqmDestination.class)); - - manager = spy(new OqmSubscriberManager(tenantInfoFactory, driver, notificationHandler, properties)); - //Mock subscriberControlTopicSubscriptionName - Whitebox.setInternalState(manager, "subscriberControlTopicSubscriptionName", subscriberControlTopicSubscriptionName); - - //Run being tested method. - manager.unprovisionControlTopicsSubscriptionsFromAllTenantsBrokers(); - - //verify 3 times the driver was asked about a subscription existence (for all tenants) - verify(driver, times(3)) - .getSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); - - - //verify 3 times the driver was asked to delete the subscription (for all tenants) - verify(driver, times(3)) - .deleteSubscription(ArgumentMatchers.eq(subscriberControlTopicSubscriptionName), any(OqmDestination.class)); - + public void testProvisioningServiceTopicSubscribers() { + when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); + when(subscriptionHandler.getTopics(any())).thenReturn(topics); + sut.provisionServiceSubscribers(); + verify(driver, times(24)).subscribe(any(), any()); } - @NotNull - private Collection<TenantInfo> getMockedTenantInfos() { - Collection<TenantInfo> tenants = new ArrayList<>(); - - //Mock we have 3 tenants - tenants.add(new TenantInfo() { - { - setId(1L); - setDataPartitionId(TENANT1); - } - }); - tenants.add(new TenantInfo() { - { - setId(2L); - setDataPartitionId(TENANT2); - } - }); - tenants.add(new TenantInfo() { - { - setId(3L); - setDataPartitionId(TENANT3); - } - }); - return tenants; + @Test + public void testProvisioningSubscriptionInfoCache() { + when(tenantFactory.listTenantInfo()).thenReturn(tenantInfos); + when(subscriptionHandler.getAllSubscriptionInfos(anyString())).thenReturn(subscriptions); + sut.provisionSubscriptionInfoCache(); + verify(subscriptionHandler, times(6)).getSubscription(anyString(), anyString(), anyString()); + verify(subscriptionInfoCache, times(6)).put(any(), any()); } } \ No newline at end of file diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorGeneratorTest.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorGeneratorTest.java similarity index 93% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorGeneratorTest.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorGeneratorTest.java index 109ce00b73685a8ead25dd4c9108f8f6f1755846..de09494a9e6e7a7c0af2f45cf34fc8d8ae15bc80 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorGeneratorTest.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorGeneratorTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.util; +package org.opengroup.osdu.notification.secutity; import com.google.api.client.googleapis.auth.oauth2.GoogleIdTokenVerifier; import com.google.api.client.http.javanet.NetHttpTransport; @@ -23,7 +23,7 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; -import org.opengroup.osdu.notification.provider.gcp.util.GoogleServiceAccountValidatorGenerator; +import org.opengroup.osdu.notification.provider.gcp.security.GoogleServiceAccountValidatorGenerator; import org.powermock.modules.junit4.PowerMockRunner; import java.util.Collection; diff --git a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorImplTests.java b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorImplTests.java similarity index 94% rename from provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorImplTests.java rename to provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorImplTests.java index 8b09d49af5500fcce1415a7d7a0cbf3e99f4fab1..5ca164bab5b967fe68ebdfde07476a795dfda4dd 100644 --- a/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/util/GoogleServiceAccountValidatorImplTests.java +++ b/provider/notification-gc/src/test/java/org/opengroup/osdu/notification/secutity/GoogleServiceAccountValidatorImplTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.opengroup.osdu.notification.util; +package org.opengroup.osdu.notification.secutity; import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken; import com.google.api.client.googleapis.auth.oauth2.GoogleIdTokenVerifier; @@ -24,8 +24,8 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.notification.provider.gcp.util.GoogleServiceAccountValidatorGenerator; -import org.opengroup.osdu.notification.provider.gcp.util.GoogleServiceAccountValidatorImpl; +import org.opengroup.osdu.notification.provider.gcp.security.GoogleServiceAccountValidatorGenerator; +import org.opengroup.osdu.notification.provider.gcp.security.GoogleServiceAccountValidatorImpl; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; diff --git a/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointGSA.java b/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointGSA.java index da4c56e88bdc7fd1e1c68f38fbe0a21cc3f48202..a8b9c556f302ca1876ea40e33b183b02eead5404 100644 --- a/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointGSA.java +++ b/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointGSA.java @@ -1,89 +1,79 @@ /* - Copyright 2002-2022 Google LLC - Copyright 2002-2022 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.opengroup.osdu.notification.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +/** + * PubsubEndpoint tests are not used due to GC switches to OQM approach. + */ +public class TestPubSubEndpointGSA extends PubsubEndpointGSATests { -import com.sun.jersey.api.client.ClientResponse; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.opengroup.osdu.notification.util.AnthosTestUtils; + @Override + public void setup() { + // not used in GC + } -public class TestPubSubEndpointGSA extends PubsubEndpointGSATests { + @Override + public void tearDown() { + // not used in GC + } - private static final AnthosTestUtils gcpTestUtils = new AnthosTestUtils(); + @Override + public void should_return401_when_noAccessOnCustomerTenant() { + // not used in GC + } + @Override + public void should_return401_when_accessingWithAdminCredentials() { + // not used in GC + } - @BeforeClass - public static void classSetup() throws Exception { - PubsubEndpointGSATests.classSetup(gcpTestUtils.getOpsToken()); + @Override + public void should_return401_when_accessingWithEditorCredentials() { + // not used in GC } - @AfterClass - public static void classTearDown() throws Exception { + @Override + public void should_return401_when_accessingWithNoAccessCredentials() { + // not used in GC } - @Before @Override - public void setup() throws Exception { - this.testUtils = new AnthosTestUtils(); + public void should_returnOk_when_makingHttpOptionsRequest() { + // not used in GC } - @After @Override - public void tearDown() throws Exception { - this.testUtils = null; + public void should_return307_when_makingHttpRequest() { + // not used in GC } @Override - @Test - public void should_return20X_when_usingCredentialsWithOpsPermission() throws Exception { - createResource(); + public void should_return20XResponseCode_when_makingValidHttpsRequest() { + // not used in GC + } - try { - ClientResponse response = descriptor.run(getArg(), testUtils.getOpsToken()); + @Override + public void should_return400_when_makingHttpRequestWithoutToken() { + // not used in GC + } - assertEquals(error(response.getStatus() == 204 ? "" : response.getEntity(String.class)), - expectedOkResponseCode(), response.getStatus()); - assertEquals("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH", - response.getHeaders().getFirst("Access-Control-Allow-Methods")); - assertEquals( - "access-control-allow-origin, origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey", - response.getHeaders().getFirst("Access-Control-Allow-Headers")); - assertEquals("*", response.getHeaders().getFirst("Access-Control-Allow-Origin")); - assertEquals("true", response.getHeaders().getFirst("Access-Control-Allow-Credentials")); - assertEquals("DENY", response.getHeaders().getFirst("X-Frame-Options")); - assertEquals("1; mode=block", response.getHeaders().getFirst("X-XSS-Protection")); - assertEquals("nosniff", response.getHeaders().getFirst("X-Content-Type-Options")); - assertEquals("no-cache, no-store, must-revalidate", - response.getHeaders().getFirst("Cache-Control")); - assertEquals("default-src 'self'", response.getHeaders().getFirst("Content-Security-Policy")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("max-age=31536000")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("includeSubDomains")); - assertEquals("0", response.getHeaders().getFirst("Expires")); - } finally { - deleteResource(); - } + @Override + public void should_return20X_when_usingCredentialsWithOpsPermission() { + // not used in GC } } \ No newline at end of file diff --git a/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointHMAC.java b/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointHMAC.java index 1f5e091d9acd36c89369b9f8fc9b69d7d1031857..ba6278efb0e882dfaf2882c961b03366f75aa9ca 100644 --- a/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointHMAC.java +++ b/testing/notification-test-anthos/src/test/java/org/opengroup/osdu/notification/api/TestPubSubEndpointHMAC.java @@ -1,86 +1,79 @@ /* - Copyright 2002-2022 Google LLC - Copyright 2002-2022 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ - http://www.apache.org/licenses/LICENSE-2.0 +package org.opengroup.osdu.notification.api; - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +/** + * PubsubEndpoint tests are not used due to GC switches to OQM approach. */ +public class TestPubSubEndpointHMAC extends PubsubEndpointHMACTests { -package org.opengroup.osdu.notification.api; + @Override + public void setup() { + // not used in GC + } -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; + @Override + public void tearDown() { + // not used in GC + } -import com.sun.jersey.api.client.ClientResponse; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.opengroup.osdu.notification.util.AnthosTestUtils; + @Override + public void should_return401_when_noAccessOnCustomerTenant() { + // not used in GC + } -public class TestPubSubEndpointHMAC extends PubsubEndpointHMACTests { + @Override + public void should_return401_when_accessingWithAdminCredentials() { + // not used in GC + } - @BeforeClass - public static void classSetup() throws Exception { - PubsubEndpointHMACTests.classSetup(); + @Override + public void should_return401_when_accessingWithEditorCredentials() { + // not used in GC } - @AfterClass - public static void classTearDown() throws Exception { + @Override + public void should_return401_when_accessingWithNoAccessCredentials() { + // not used in GC } - @Before @Override - public void setup() throws Exception { - this.testUtils = new AnthosTestUtils(); + public void should_returnOk_when_makingHttpOptionsRequest() { + // not used in GC } - @After @Override - public void tearDown() throws Exception { - this.testUtils = null; + public void should_return307_when_makingHttpRequest() { + // not used in GC } @Override - @Test - public void should_return20X_when_usingCredentialsWithOpsPermission() throws Exception { - createResource(); + public void should_return20XResponseCode_when_makingValidHttpsRequest() { + // not used in GC + } - try { - ClientResponse response = descriptor.run(getArg(), testUtils.getOpsToken()); + @Override + public void should_return400_when_makingHttpRequestWithoutToken() { + // not used in GC + } - assertEquals(error(response.getStatus() == 204 ? "" : response.getEntity(String.class)), - expectedOkResponseCode(), response.getStatus()); - assertEquals("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH", - response.getHeaders().getFirst("Access-Control-Allow-Methods")); - assertEquals( - "access-control-allow-origin, origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey", - response.getHeaders().getFirst("Access-Control-Allow-Headers")); - assertEquals("*", response.getHeaders().getFirst("Access-Control-Allow-Origin")); - assertEquals("true", response.getHeaders().getFirst("Access-Control-Allow-Credentials")); - assertEquals("DENY", response.getHeaders().getFirst("X-Frame-Options")); - assertEquals("1; mode=block", response.getHeaders().getFirst("X-XSS-Protection")); - assertEquals("nosniff", response.getHeaders().getFirst("X-Content-Type-Options")); - assertEquals("no-cache, no-store, must-revalidate", - response.getHeaders().getFirst("Cache-Control")); - assertEquals("default-src 'self'", response.getHeaders().getFirst("Content-Security-Policy")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("max-age=31536000")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("includeSubDomains")); - assertEquals("0", response.getHeaders().getFirst("Expires")); - } finally { - deleteResource(); - } + @Override + public void should_return20X_when_usingCredentialsWithOpsPermission() { + // not used in GC } } \ No newline at end of file diff --git a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointGSA.java b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointGSA.java index a743915e56c34ee0919d4213dbf74727a9b5b0b1..1da673ed83d47c5cc164728264641d29b63baf49 100644 --- a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointGSA.java +++ b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointGSA.java @@ -1,72 +1,79 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.notification.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +/** + * PubsubEndpoint tests are not used due to GC switches to OQM approach. + */ +public class TestPubsubEndpointGSA extends PubsubEndpointGSATests { -import com.sun.jersey.api.client.ClientResponse; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.opengroup.osdu.notification.util.GCPTestUtils; + @Override + public void setup() { + // not used in GC + } -public class TestPubsubEndpointGSA extends PubsubEndpointGSATests { + @Override + public void tearDown() { + // not used in GC + } - private static final GCPTestUtils gcpTestUtils = new GCPTestUtils(); + @Override + public void should_return401_when_noAccessOnCustomerTenant() { + // not used in GC + } + @Override + public void should_return401_when_accessingWithAdminCredentials() { + // not used in GC + } - @BeforeClass - public static void classSetup() throws Exception { - PubsubEndpointGSATests.classSetup(gcpTestUtils.getOpsToken()); - } + @Override + public void should_return401_when_accessingWithEditorCredentials() { + // not used in GC + } - @AfterClass - public static void classTearDown() throws Exception { - } + @Override + public void should_return401_when_accessingWithNoAccessCredentials() { + // not used in GC + } - @Before - @Override - public void setup() throws Exception { - this.testUtils = new GCPTestUtils(); - } + @Override + public void should_returnOk_when_makingHttpOptionsRequest() { + // not used in GC + } - @After - @Override - public void tearDown() throws Exception { - this.testUtils = null; - } + @Override + public void should_return307_when_makingHttpRequest() { + // not used in GC + } - @Override - @Test - public void should_return20X_when_usingCredentialsWithOpsPermission() throws Exception { - createResource(); + @Override + public void should_return20XResponseCode_when_makingValidHttpsRequest() { + // not used in GC + } - try { - ClientResponse response = descriptor.run(getArg(), testUtils.getOpsToken()); + @Override + public void should_return400_when_makingHttpRequestWithoutToken() { + // not used in GC + } - assertEquals(error(response.getStatus() == 204 ? "" : response.getEntity(String.class)), - expectedOkResponseCode(), response.getStatus()); - assertEquals("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH", - response.getHeaders().getFirst("Access-Control-Allow-Methods")); - assertEquals( - "access-control-allow-origin, origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey", - response.getHeaders().getFirst("Access-Control-Allow-Headers")); - assertEquals("*", response.getHeaders().getFirst("Access-Control-Allow-Origin")); - assertEquals("true", response.getHeaders().getFirst("Access-Control-Allow-Credentials")); - assertEquals("DENY", response.getHeaders().getFirst("X-Frame-Options")); - assertEquals("1; mode=block", response.getHeaders().getFirst("X-XSS-Protection")); - assertEquals("nosniff", response.getHeaders().getFirst("X-Content-Type-Options")); - assertEquals("no-cache, no-store, must-revalidate", - response.getHeaders().getFirst("Cache-Control")); - assertEquals("default-src 'self'", response.getHeaders().getFirst("Content-Security-Policy")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("max-age=31536000")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("includeSubDomains")); - assertEquals("0", response.getHeaders().getFirst("Expires")); - } finally { - deleteResource(); + @Override + public void should_return20X_when_usingCredentialsWithOpsPermission() { + // not used in GC } - } } \ No newline at end of file diff --git a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointHMAC.java b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointHMAC.java index 5ba744d1eb3c15eebab7d0c71bdcb0a1a02d0b00..71e3d7845ae974b74ba73fa6807aa55a600b02c3 100644 --- a/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointHMAC.java +++ b/testing/notification-test-gc/src/test/java/org/opengroup/osdu/notification/api/TestPubsubEndpointHMAC.java @@ -1,69 +1,79 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.notification.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +/** + * PubsubEndpoint tests are not used due to GC switches to OQM approach. + */ +public class TestPubsubEndpointHMAC extends PubsubEndpointHMACTests { + + @Override + public void setup() { + // not used in GC + } -import com.sun.jersey.api.client.ClientResponse; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.opengroup.osdu.notification.util.GCPTestUtils; + @Override + public void tearDown() { + // not used in GC + } -public class TestPubsubEndpointHMAC extends PubsubEndpointHMACTests { + @Override + public void should_return401_when_noAccessOnCustomerTenant() { + // not used in GC + } - @BeforeClass - public static void classSetup() throws Exception { - PubsubEndpointHMACTests.classSetup(); + @Override + public void should_return401_when_accessingWithAdminCredentials() { + // not used in GC } - @AfterClass - public static void classTearDown() throws Exception { + @Override + public void should_return401_when_accessingWithEditorCredentials() { + // not used in GC } - @Before @Override - public void setup() throws Exception { - this.testUtils = new GCPTestUtils(); + public void should_return401_when_accessingWithNoAccessCredentials() { + // not used in GC } - @After @Override - public void tearDown() throws Exception { - this.testUtils = null; + public void should_return20X_when_usingCredentialsWithOpsPermission() { + // not used in GC } @Override - @Test - public void should_return20X_when_usingCredentialsWithOpsPermission() throws Exception { - createResource(); + public void should_returnOk_when_makingHttpOptionsRequest() { + // not used in GC + } - try { - ClientResponse response = descriptor.run(getArg(), testUtils.getOpsToken()); + @Override + public void should_return307_when_makingHttpRequest() { + // not used in GC + } - assertEquals(error(response.getStatus() == 204 ? "" : response.getEntity(String.class)), - expectedOkResponseCode(), response.getStatus()); - assertEquals("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH", - response.getHeaders().getFirst("Access-Control-Allow-Methods")); - assertEquals( - "access-control-allow-origin, origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey", - response.getHeaders().getFirst("Access-Control-Allow-Headers")); - assertEquals("*", response.getHeaders().getFirst("Access-Control-Allow-Origin")); - assertEquals("true", response.getHeaders().getFirst("Access-Control-Allow-Credentials")); - assertEquals("DENY", response.getHeaders().getFirst("X-Frame-Options")); - assertEquals("1; mode=block", response.getHeaders().getFirst("X-XSS-Protection")); - assertEquals("nosniff", response.getHeaders().getFirst("X-Content-Type-Options")); - assertEquals("no-cache, no-store, must-revalidate", - response.getHeaders().getFirst("Cache-Control")); - assertEquals("default-src 'self'", response.getHeaders().getFirst("Content-Security-Policy")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("max-age=31536000")); - assertTrue(response.getHeaders().get("Strict-Transport-Security").get(0) - .contains("includeSubDomains")); - assertEquals("0", response.getHeaders().getFirst("Expires")); - } finally { - deleteResource(); - } + @Override + public void should_return20XResponseCode_when_makingValidHttpsRequest() { + // not used in GC + } + + @Override + public void should_return400_when_makingHttpRequestWithoutToken() { + // not used in GC } } \ No newline at end of file