diff --git a/pom.xml b/pom.xml index b8ad8e89bc77cd008c0d3f41d25b0c58507036d5..4daf714293877b87b922429bd969b101695b4153 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <java.version>8</java.version> <maven.compiler.target>${java.version}</maven.compiler.target> <maven.compiler.source>${java.version}</maven.compiler.source> - <os-core-common.version>0.11.0</os-core-common.version> + <os-core-common.version>0.12.0</os-core-common.version> </properties> <licenses> diff --git a/provider/notification-gcp/README.md b/provider/notification-gcp/README.md index 80a59b3287f04ee392f40d1e1d83fd8b0dc23384..3f73c6807b45196421132241c087fd9690ae5d82 100644 --- a/provider/notification-gcp/README.md +++ b/provider/notification-gcp/README.md @@ -4,13 +4,39 @@ notification-gcp is a [Spring Boot](https://spring.io/projects/spring-boot) serv ## Getting Started These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system. -### Requirements +# Features of implementation +This is a universal solution created using EPAM OQM mapper technology. +It allows you to work with various implementations of message brokers. + +## Limitations of the current version + +In the current version, the OQM mapper is equipped with 2 drivers to the message brokers: + +- Google PubSub; +- RabbitMQ + +## Extensibility + +To use any other message broker, implement a driver for it. With an extensible set of drivers, the solution is unrestrictedly universal and portable without modification to the main code. + +Mapper support "multitenancy" with flexibility in how it is implemented. +It switches between datasources of different tenants due to the work of a bunch of classes that implement the following interfaces: + +- Destination - takes a description of the current context, e.g., "data-partition-id = opendes" +- DestinationResolver – accepts Destination, finds the resource, connects, and returns Resolution +- DestinationResolution – contains a ready-made connection, the mapper uses it to get to data + +# Settings and Configuration +## Requirements +### Mandatory * Java 8 * [Maven 3.6.0+](https://maven.apache.org/download.cgi) + +### for Google Cloud only * GCloud command line tool * GCloud access to opendes project -### General Tips +## General Tips **Environment Variable Management** The following tools make environment variable configuration simpler @@ -22,16 +48,62 @@ This project uses [Lombok](https://projectlombok.org/) for code generation. You - [Intellij configuration](https://projectlombok.org/setup/intellij) - [VSCode configuration](https://projectlombok.org/setup/vscode) -### Installation -In order to run the service locally or remotely, you will need to have the following environment variables defined. +## Mapper tuning mechanisms + +This service uses specific implementation of DestinationResolver based on the tenant information provided by the OSDU Partition service. +A total of 2 resolvers are implemented, which are divided into two groups: +### for universal technologies: +- for RabbitMQ: mappers/oqm/MqTenantOqmDestinationResolver.java + +#### Their algorithms are as follows: +- incoming Destination carries data-partition-id +- resolver accesses the Partition service and gets PartitionInfo +- from PartitionInfo resolver retrieves properties for the connection: URL, username, password etc. +- resolver creates a data source, connects to the resource, remembers the datasource +- resolver gives the datasource to the mapper in the Resolution object + +### for native Google Cloud technologies: +- for PubSub: mappers/oqm/PsTenantOqmDestinationResolver.java + +#### Their algorithms are similar, +Except that they do not receive special properties from the Partition service for connection, because the location of the resources is unambiguously known - they are in the GCP project. And credentials are also not needed - access to data is made on behalf of the Google Identity SA under which the service itself is launched. Therefore, resolver takes only the value of the **projectId** property from PartitionInfo and uses it to connect to a resource in the corresponding GCP project. + +# Configuration + +## Service Configuration +In order to run the service locally or remotely, define the following environment variables. +Most of them are common to all hosting environments, but there are properties that are only necessary when running in Google Cloud. + +#### Common properties for all environments | name | value | description | sensitive? | source | | --- | --- | --- | --- | --- | | `APP_ENTITLEMENTS` | ex `https://entitlements.com/entitlements/v1` | Entitlements API endpoint | no | output of infrastructure deployment | | `APP_REGISTER` | ex `https://register.com/api/register/v1` | Storage API endpoint | no | output of infrastructure deployment | +| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - | + +#### For Mappers, to activate drivers + +| name | value | description | +|-----------|-----------|-----------------------------------------------------| +| OQMDRIVER | pubsub | to activate **OQM** driver for **Google PubSub** | +| OQMDRIVER | rabbitmq | to activate **OQM** driver for **Rabbit MQ** | + +#### For Google Cloud only +| name | value | description | sensitive? | source | +|------------------------------|---------------------------------------|--------------------------------------------------------------------|------------|---------------------------------------------------| | `APP_PROJECT` | ex `opendes` | Google Cloud Project Id | no | output of infrastructure deployment | | `APP_AUDIENCES` | ex `*****.apps.googleusercontent.com` | Client ID for getting access to cloud resources | yes | https://console.cloud.google.com/apis/credentials | -| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - | + +##### service account IAM roles +Also, the following IAM roles should be assigned to the service's Google service account (SA) + +| IAM role | The purpose | +|----------|-------------------------------------------------------------------------------| +| Service Account Token Creator | To write yourself JWT for requesting neighbor microservices | +| Pub/Sub Editor | To fetch available PubSub topics and subscriptions and be able to create them | + + **System Environment required to run service** @@ -39,7 +111,115 @@ In order to run the service locally or remotely, you will need to have the follo | --- | --- | --- | --- | --- | | `SPRING_PROFILES_ACTIVE` | `local` | spring active profile | no | -### Run Locally +## Configuring mappers Datasources +When using non-Google-Cloud-native technologies, property sets must be defined on the Partition service as part of PartitionInfo for each Tenant. + +#### for OQM - RabbitMQ: +**prefix:** `oqm.rabbitmq` +It can be overridden by: + +- through the Spring Boot property `oqm.rabbitmq.partitionPropertiesPrefix` +- environment variable `OQM_RABBITMQ_PARTITIONPROPERTIESPREFIX` + +**Propertyset** (for two types of connection: messaging and admin operations): + +| Property | Description | +| --- | --- | +| oqm.rabbitmq.amqp.host | messaging hostnameorIP | +| oqm.rabbitmq.amqp.port | - port | +| oqm.rabbitmq.amqp.path | - path | +| oqm.rabbitmq.amqp.username | - username | +| oqm.rabbitmq.amqp.password | - password | +| oqm.rabbitmq.admin.schema | admin host schema | +| oqm.rabbitmq.admin.host | - host name | +| oqm.rabbitmq.admin.port | - port | +| oqm.rabbitmq.admin.path | - path | +| oqm.rabbitmq.admin.username | - username | +| oqm.rabbitmq.admin.password | - password | + +<details><summary>Example of a single tenant definition</summary> + +``` + +curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H 'data-partition-id: opendes' -H 'Authorization: Bearer ...' -H 'Content-Type: application/json' --data-raw '{ + "properties": { + "oqm.rabbitmq.amqp.host": { + "sensitive": false, + "value": "localhost" + }, + "oqm.rabbitmq.amqp.port": { + "sensitive": false, + "value": "5672" + }, + "oqm.rabbitmq.amqp.path": { + "sensitive": false, + "value": "" + }, + "oqm.rabbitmq.amqp.username": { + "sensitive": false, + "value": "guest" + }, + "oqm.rabbitmq.amqp.password": { + "sensitive": true, + "value": "guest" + }, + + "oqm.rabbitmq.admin.schema": { + "sensitive": false, + "value": "http" + }, + "oqm.rabbitmq.admin.host": { + "sensitive": false, + "value": "localhost" + }, + "oqm.rabbitmq.admin.port": { + "sensitive": false, + "value": "9002" + }, + "oqm.rabbitmq.admin.path": { + "sensitive": false, + "value": "/api" + }, + "oqm.rabbitmq.admin.username": { + "sensitive": false, + "value": "guest" + }, + "oqm.rabbitmq.admin.password": { + "sensitive": true, + "value": "guest" + } + } +}' + +``` + +</details> + +## Interaction with message brokers + +### Specifics of work through PULL subscription + +To receive messages from brokers, this solution uses the PULL-subscriber mechanism in the Notification service. +This is its cardinal difference from other implementations that use PUSH-subscribers (webhooks). +This opens a wide choice when choosing brokers. + +When using PULL-subscribers, there is a need to restore Notification service subscribers for each Subscription +at the start of Notification service, as well as in the runtime, +upon registration of a new Subscription by the Register service. + +To do this, a special "command" topic is involved: + +- the default topic name is `register-subscriber-control`. + +If necessary, the name of the topic can be overridden through: + +- Spring Boot property `oqm.registerSubscriberControlTopicName` +- environment variable `OQM_REGISTERSUBSCRIBERCONTROLTOPICNAME` + +A topic is created, in its absence, when any of Register or Notification services starts. + +# Run and test the service +## Running Locally Check that maven is installed: ```bash $ mvn --version @@ -145,9 +325,9 @@ After the service has started it should be accessible via a web browser by visit **Entitlements configuration for integration accounts** - | DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | - | --- | --- | --- | --- | - |notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br>| service.entitlements.user<br/>users<br/>users.datalake.admins</br> | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/>| +| DE_OPS_TESTER | DE_ADMIN_TESTER | DE_EDITOR_TESTER | DE_NO_ACCESS_TESTER | +| --- | --- | --- | --- | +|notification.pubsub<br/>service.entitlements.user<br/>users<br/>users.datalake.ops</br>| service.entitlements.user<br/>users<br/>users.datalake.admins</br> | service.entitlements.user<br/>users<br/>users.datalake.editors</br> | service.entitlements.user<br/>users<br/>| Above variables should be configured in the release pipeline to run integration tests. You should also replace them with proper values if you wish to run tests locally. diff --git a/provider/notification-gcp/pom.xml b/provider/notification-gcp/pom.xml index 19bcd78c204fc790591fd9ecc6ba96fca575724c..9079b7f37000e1afa7d0a711de39c0c3ca01d00e 100644 --- a/provider/notification-gcp/pom.xml +++ b/provider/notification-gcp/pom.xml @@ -37,6 +37,12 @@ </properties> <dependencies> + <dependency> + <groupId>org.opengroup.osdu</groupId> + <artifactId>oqm</artifactId> + <version>0.13.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>os-core-common</artifactId> @@ -44,7 +50,7 @@ <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>core-lib-gcp</artifactId> - <version>0.11.0</version> + <version>0.12.0</version> </dependency> <dependency> diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/di/PartitionProviderConfig.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/di/PartitionProviderConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..d051f6672b352a47e67f92bcad757fd7076799e1 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/di/PartitionProviderConfig.java @@ -0,0 +1,33 @@ +package org.opengroup.osdu.notification.provider.gcp.di; + +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.partition.IPartitionFactory; +import org.opengroup.osdu.core.common.partition.IPartitionProvider; +import org.opengroup.osdu.core.gcp.googleidtoken.GcpServiceAccountJwtClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE; +import static org.springframework.context.annotation.ScopedProxyMode.TARGET_CLASS; + +/** + * Enables partition info resolution outside of request scope + */ +@Configuration +public class PartitionProviderConfig { + + @Bean + @Primary + @Scope(value = SCOPE_PROTOTYPE, proxyMode = TARGET_CLASS) + public IPartitionProvider partitionProvider( + IPartitionFactory partitionFactory, + GcpServiceAccountJwtClient jwtClient + ) { + DpsHeaders partitionHeaders = new DpsHeaders(); + String idToken = jwtClient.getDefaultOrInjectedServiceAccountIdToken(); + partitionHeaders.put("authorization", idToken); + return partitionFactory.create(partitionHeaders); + } +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqOqmConfigurationProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqOqmConfigurationProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..f87188a274c46790734eb1da4375d1c2d6c5d2a5 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqOqmConfigurationProperties.java @@ -0,0 +1,34 @@ +/* + Copyright 2020 Google LLC + Copyright 2020 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.opengroup.osdu.notification.provider.gcp.mappers.oqm; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnProperty(name = "oqmDriver", havingValue = "rabbitmq") +@ConfigurationProperties(prefix = "oqm.rabbitmq") +@Getter +@Setter +public class MqOqmConfigurationProperties { + + private String partitionPropertiesPrefix = "oqm.rabbitmq"; + +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java new file mode 100644 index 0000000000000000000000000000000000000000..c8da3f492875c162f68c1884e01724dcd0ba6c57 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/MqTenantOqmDestinationResolver.java @@ -0,0 +1,165 @@ +/* + Copyright 2020 Google LLC + Copyright 2020 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */package org.opengroup.osdu.notification.provider.gcp.mappers.oqm; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.http.client.Client; +import com.rabbitmq.http.client.ClientParameters; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.partition.IPartitionProvider; +import org.opengroup.osdu.core.common.partition.PartitionException; +import org.opengroup.osdu.core.common.partition.PartitionInfo; +import org.opengroup.osdu.core.common.partition.Property; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; +import org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolution; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; + +/** + * For RabbitMQ. Tenant Based OQM destination resolver + */ +@Component +@Scope(SCOPE_SINGLETON) +@ConditionalOnProperty(name = "oqmDriver", havingValue = "rabbitmq") +@RequiredArgsConstructor +@Slf4j +public class MqTenantOqmDestinationResolver implements org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolver { + + private final MqOqmConfigurationProperties properties; + + //Compose names to get configuration properties from Partition + private static final String AMQP = ".amqp."; + private static final String AMQP_HOST = AMQP.concat("host"); + private static final String AMQP_PORT = AMQP.concat("port"); + private static final String AMQP_PATH = AMQP.concat("path"); + private static final String AMQP_USERNAME = AMQP.concat("username"); + private static final String AMQP_PASSWORD = AMQP.concat("password"); + + private static final String ADMIN = ".admin."; + private static final String ADMIN_SCHEMA = ADMIN.concat("schema"); + private static final String ADMIN_HOST = ADMIN.concat("host"); + private static final String ADMIN_PORT = ADMIN.concat("port"); + private static final String ADMIN_PATH = ADMIN.concat("path"); + private static final String ADMIN_USERNAME = ADMIN.concat("username"); + private static final String ADMIN_PASSWORD = ADMIN.concat("password"); + + private final IPartitionProvider partitionProvider; + + private final Map<String, ConnectionFactory> amqpConnectionFactoryCache = new HashMap<>(); + private final Map<String, Client> httpClientCache = new HashMap<>(); + + @Override + public MqOqmDestinationResolution resolve(OqmDestination destination) { + + String partitionId = destination.getPartitionId(); + + //noinspection SwitchStatementWithTooFewBranches + switch (partitionId) { + default: + + String virtualHost = "/"; + + ConnectionFactory amqpFactory = amqpConnectionFactoryCache.get(partitionId); + Client httpClient = httpClientCache.get(partitionId); + + if (amqpFactory == null || httpClient == null) { + + PartitionInfo partitionInfo; + try { + partitionInfo = partitionProvider.get(partitionId); + } catch (PartitionException e) { + throw new OqmDriverRuntimeException(e, "Partition '%s' destination resolution issue", destination.getPartitionId()); + } + Map<String, Property> partitionProperties = partitionInfo.getProperties(); + + if (amqpFactory == null) { + + String amqpHost = getPartitionProperty(partitionId, partitionProperties, AMQP_HOST); + String amqpPort = getPartitionProperty(partitionId, partitionProperties, AMQP_PORT); + String amqpPath = getPartitionProperty(partitionId, partitionProperties, AMQP_PATH); + String amqpUser = getPartitionProperty(partitionId, partitionProperties, AMQP_USERNAME); + String amqpPass = getPartitionProperty(partitionId, partitionProperties, AMQP_PASSWORD); + + URI amqpUri; + try { + amqpUri = new URI("amqp", amqpUser + ":" + amqpPass, amqpHost, Integer.parseInt(amqpPort), amqpPath, null, null); + amqpFactory = new ConnectionFactory(); + amqpFactory.setUri(amqpUri); + amqpConnectionFactoryCache.put(partitionId, amqpFactory); + + } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) { + throw new OqmDriverRuntimeException("RabbitMQ amqp URI and ConnectionFactory", e); + } + } + + if (httpClient == null) { + + String adminSchm = getPartitionProperty(partitionId, partitionProperties, ADMIN_SCHEMA); + String adminHost = getPartitionProperty(partitionId, partitionProperties, ADMIN_HOST); + String adminPort = getPartitionProperty(partitionId, partitionProperties, ADMIN_PORT); + String adminPath = getPartitionProperty(partitionId, partitionProperties, ADMIN_PATH); + String adminUser = getPartitionProperty(partitionId, partitionProperties, ADMIN_USERNAME); + String adminPass = getPartitionProperty(partitionId, partitionProperties, ADMIN_PASSWORD); + + try { + URI httpUrl = new URI(adminSchm, null, adminHost, Integer.parseInt(adminPort), adminPath, null, null); + ClientParameters clientParameters = new ClientParameters().url(httpUrl.toURL()) + .username(adminUser).password(adminPass); + + httpClient = new Client(clientParameters); + httpClientCache.put(partitionId, httpClient); + + } catch (URISyntaxException | MalformedURLException e) { + throw new OqmDriverRuntimeException("RabbitMQ http(api) URI and Client", e); + } + } + } + return MqOqmDestinationResolution.builder() + .amqpFactory(amqpFactory) + .adminClient(httpClient) + .virtualHost(virtualHost) + .build(); + } + } + + private String getPartitionProperty(String partitionId, Map<String, Property> partitionProperties, String propertyName) { + String fullName = properties.getPartitionPropertiesPrefix().concat(propertyName); + return Optional.ofNullable(partitionProperties.get(fullName)).map(Property::getValue).map(Object::toString) + .orElseThrow(() -> new OqmDriverRuntimeException(null, + "Partition '%s' RabbitMQ OQM destination resolution configuration issue. Property '%s' is not provided in PartitionInfo.", + partitionId, fullName)); + } + + @PreDestroy + public void shutdown() { + log.info("On pre-destroy."); + } +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/OqmConfigurationProperties.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/OqmConfigurationProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..f062ae3ee15b4352a904085b832a29e8418c452e --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/OqmConfigurationProperties.java @@ -0,0 +1,34 @@ +/* + Copyright 2020 Google LLC + Copyright 2020 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.opengroup.osdu.notification.provider.gcp.mappers.oqm; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnProperty(name = "oqmDriver") +@ConfigurationProperties(prefix = "oqm") +@Getter +@Setter +public class OqmConfigurationProperties { + + private String registerSubscriberControlTopicName = "register-subscriber-control"; + +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java new file mode 100644 index 0000000000000000000000000000000000000000..2ddca9bebde20fa4d434a966e85d69fd7c53db90 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/mappers/oqm/PsTenantOqmDestinationResolver.java @@ -0,0 +1,110 @@ +/* + Copyright 2020 Google LLC + Copyright 2020 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.opengroup.osdu.notification.provider.gcp.mappers.oqm; + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; +import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolution; +import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.notification.provider.gcp.util.GcpAppServiceConfig; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; + +/** + * For GCP PubSub. Tenant Based OQM destination resolver + */ +@Component +@Scope(SCOPE_SINGLETON) +@ConditionalOnProperty(name = "oqmDriver", havingValue = "pubsub") +@Slf4j +@RequiredArgsConstructor +public class PsTenantOqmDestinationResolver implements PsOqmDestinationResolver { + + private Map<OqmDestination, TopicAdminClient> topicClientCache = new HashMap<>(); + private Map<OqmDestination, SubscriptionAdminClient> subscriptionClientCache = new HashMap<>(); + + private final ITenantFactory tenantInfoFactory; + private final GcpAppServiceConfig config; + + @Override + public PsOqmDestinationResolution resolve(OqmDestination destination) { + TenantInfo ti = tenantInfoFactory.getTenantInfo(destination.getPartitionId()); + String partitionId = destination.getPartitionId(); + + //noinspection SwitchStatementWithTooFewBranches + switch (partitionId) { + default: + String servicesProjectId = config.getGoogleCloudProject(); + String dataProjectId = ti.getProjectId(); + + TopicAdminClient tac = topicClientCache.get(destination); + if (tac == null) { + try { + TopicAdminSettings tas = TopicAdminSettings.newBuilder().build(); + tac = TopicAdminClient.create(tas); + topicClientCache.put(destination, tac); + } catch (IOException e) { + throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve TopicAdminClient", e); + } + } + + SubscriptionAdminClient sac = subscriptionClientCache.get(destination); + if (sac == null) { + try { + sac = SubscriptionAdminClient.create(); + subscriptionClientCache.put(destination, sac); + } catch (IOException e) { + throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve SubscriptionAdminClient", e); + } + } + + return PsOqmDestinationResolution.builder() + .servicesProjectId(servicesProjectId) + .dataProjectId(dataProjectId) + .topicAdminClient(tac) + .subscriptionAdminClient(sac) + .build(); + } + } + + @PreDestroy + public void shutdown() { + log.info("On pre-destroy. {} topic client(s) & {} subscription clients to shutdown", + topicClientCache.size(), subscriptionClientCache.size()); + for (TopicAdminClient tac : topicClientCache.values()) { + tac.shutdown(); + } + for (SubscriptionAdminClient sac : subscriptionClientCache.values()) { + sac.shutdown(); + } + } +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java new file mode 100644 index 0000000000000000000000000000000000000000..36f57d916a8973e061d133ccf4b1e6c804853482 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/OqmSubscriberManager.java @@ -0,0 +1,215 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException; +import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.notification.provider.gcp.mappers.oqm.OqmConfigurationProperties; +import org.opengroup.osdu.notification.provider.gcp.pubsub.di.OqmNotificationHandler; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; + +/** + * Runs once on the service start. + * 1. Fetches oqm for message broker pull subscriptions in interested topics. Creates the service's subscribers in every found subscription. + * 2. Checks for the "subscriber control topic" and creates if it is absent. + * - This topic is a "control channel" between Register and Notification services. + * - The former sends events on new pull Subscriptions being created, the latter listens for events and creates corresponding Subscribers. + */ +@Slf4j +@Component +@Scope(SCOPE_SINGLETON) +@ConditionalOnProperty(name = "oqmDriver") +@RequiredArgsConstructor +public class OqmSubscriberManager { + + private final String ACKNOWLEDGE = "message acknowledged by client"; + private final String NOT_ACKNOWLEDGE = "message not acknowledged by client"; + + //TODO should be externalized to application.properties + private static final List<OqmTopic> INTERESTED_TOPICS = + Stream.of("records-changed", "schema-changed", "status-changed", "legaltags_changed") + .map(topicName -> OqmTopic.builder().name(topicName).build()).collect(Collectors.toList()); + + private static final String INTERESTED_SUBSCRIPTIONS_PREFIX = "de-"; + + private static final OqmSubscriptionQuery INTERESTED_SUBSCRIPTIONS_QUERY = OqmSubscriptionQuery.builder() + .forAnyOfTopics(INTERESTED_TOPICS).namePrefix(INTERESTED_SUBSCRIPTIONS_PREFIX) + .subscriberable(true).build(); + + private final ITenantFactory tenantInfoFactory; + private final OqmDriver driver; + private final OqmNotificationHandler notificationHandler; + private final OqmConfigurationProperties properties; + + private final Long constructDate = System.currentTimeMillis(); + private final String subscriberControlTopicSubscriptionName = "notification-service-" + constructDate; + private OqmSubscription subscriberControlTopicSubscription = null; + + + @PostConstruct + void postConstruct() { + log.info("OqmSubscriberManager bean constructed. Provisioning STARTED"); + + //Get all Tenant infos + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId()); + //For every Tenant Destination get "subscriberable" Subscriptions + for (OqmSubscription subscription : getSubscriberableSubscriptions(tenantInfo)) { + log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName()); + + //Register a Subscriber on every subscription + registerSubscriber(tenantInfo, subscription); + + log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName()); + } + log.info("* OqmSubscriberManager on provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); + } + + TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() + .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + + log.info("* OqmSubscriberManager on check '{}' subscriber control topic existence:", properties.getRegisterSubscriberControlTopicName()); + OqmTopic controlTopic = driver.getTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)).orElse(null); + if (controlTopic != null) { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic exists.", properties.getRegisterSubscriberControlTopicName()); + } else { + log.info("* * OqmSubscriberManager: '{}' subscriber control topic doesn't exist. Trying to create it:", properties.getRegisterSubscriberControlTopicName()); + driver.createAndGetTopic(properties.getRegisterSubscriberControlTopicName(), getDestination(tenant)); + } + + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}'", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + + OqmSubscription subscriptionRequest = OqmSubscription.builder().name(subscriberControlTopicSubscriptionName) + .topics(Collections.singletonList(controlTopic)).build(); + + subscriberControlTopicSubscription = driver.createAndGetSubscription(subscriptionRequest, getDestination(tenant)); + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscription CREATED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + + registerControlTopicSubscriber(tenant, subscriberControlTopicSubscription); + log.info("* OqmSubscriberManager on registering '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + + log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED"); + } + + @PreDestroy + void onPreDestroy() { + log.info("OqmSubscriberManager bean on pre-destroy: STARTED"); + if (subscriberControlTopicSubscription != null) { + TenantInfo tenant = tenantInfoFactory.listTenantInfo().stream().findAny() + .orElseThrow(() -> new OqmDriverRuntimeException(null, "Unable to get any TenantInfo")); + log.info("* OqmSubscriberManager on delete '{}' subscriber control topic subscription with name '{}': Subscriber REGISTERED.", + properties.getRegisterSubscriberControlTopicName(), subscriberControlTopicSubscriptionName); + driver.deleteSubscription(subscriberControlTopicSubscriptionName, getDestination(tenant)); + } + log.info("OqmSubscriberManager bean on pre-destroy: COMPLETED"); + } + + private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) { + OqmDestination destination = getDestination(tenantInfo); + + OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { + + String pubsubMessage = oqmMessage.getData(); + String notificationId = subscription.getName(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + + + HttpResponse response; + boolean ackedNacked = false; + try { + response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes); + + if (!response.isSuccessCode()) { + log.error(NOT_ACKNOWLEDGE + response.getBody()); + } else { + log.debug(ACKNOWLEDGE); + oqmAckReplier.ack(); + } + ackedNacked = true; + + } catch (Exception e) { + log.debug(NOT_ACKNOWLEDGE, e); + } + + if (!ackedNacked) oqmAckReplier.nack(); + }; + + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build(); + driver.subscribe(subscriber, destination); + log.info("Just subscribed at topic {} subscription {} for tenant {}", + subscription.getTopics().get(0), subscription.getName(), tenantInfo.getDataPartitionId()); + } + + private void registerControlTopicSubscriber(TenantInfo tenantInfo, OqmSubscription controlTopicSubscriber) { + + OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> { + + String pubsubMessage = oqmMessage.getData(); + Map<String, String> headerAttributes = oqmMessage.getAttributes(); + String subscriptionId = headerAttributes.get("subscription-id"); + String dataPartitionId = headerAttributes.get("data-partition-id"); + String topic = headerAttributes.get("topic"); + + OqmSubscription subscription = OqmSubscription.builder() + .topic(OqmTopic.builder().name(topic).build()) + .name(subscriptionId) + .build(); + + registerSubscriber(tenantInfoFactory.getTenantInfo(dataPartitionId), subscription); + + log.debug(ACKNOWLEDGE); + oqmAckReplier.ack(); + }; + + OqmSubscriber subscriber = OqmSubscriber.builder().subscription(controlTopicSubscriber).messageReceiver(receiver).build(); + OqmDestination destination = getDestination(tenantInfo); + driver.subscribe(subscriber, destination); + + log.info("Just subscribed at topic {} subscription {} for tenant {}", + controlTopicSubscriber.getTopics().get(0), controlTopicSubscriber.getName(), tenantInfo.getDataPartitionId()); + } + + public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) { + return driver.listSubscriptions(null, INTERESTED_SUBSCRIPTIONS_QUERY, getDestination(tenantInfo)); + } + + private OqmDestination getDestination(TenantInfo tenantInfo) { + return OqmDestination.builder().partitionId(tenantInfo.getDataPartitionId()).build(); + } + +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..2235850d9ce603adc494b4b73af65c46b2165741 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmNotificationHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; + +import org.opengroup.osdu.core.common.http.HttpClient; +import org.opengroup.osdu.core.common.http.HttpRequest; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.auth.factory.AuthFactory; +import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@ConditionalOnProperty(name = "oqmDriver") +public class OqmNotificationHandler { + private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class); + @Autowired + private HttpClient httpClient; + @Autowired + private OqmSubscriptionHandler subscriptionHandler; + @Autowired + private AuthFactory authFactory; + @Value("${app.waitingTime:30000}") + private int WAITING_TIME; + + public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { + Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId, headerAttributes); + Secret secret = subscription.getSecret(); + String endpoint = subscription.getPushEndpoint(); + String secretType = secret.getSecretType(); + String pushUrl; + + // Authentication Secret + SecretAuth secretAuth = authFactory.getSecretAuth(secretType); + secretAuth.setSecret(secret); + pushUrl = secretAuth.getPushUrl(endpoint); + + Map<String, String> requestHeader = secretAuth.getRequestHeaders(); + requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json"); + requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID)); + requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID)); + + HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build(); + HttpResponse response = httpClient.send(request); + LOGGER.debug("Sending out notification to endpoint: " + endpoint); + return response; + } +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java new file mode 100644 index 0000000000000000000000000000000000000000..28e65f7beb280db55180d449e7685ba7bd7cd314 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSignatureService.java @@ -0,0 +1,40 @@ +/* + Copyright 2020 Google LLC + Copyright 2020 EPAM Systems, Inc + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; + +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.cryptographic.SignatureService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; + +@Component +@Scope(SCOPE_SINGLETON) +@Primary +@Slf4j +@ConditionalOnProperty(name = "oqmDriver") +public class OqmSignatureService extends SignatureService { + @PostConstruct + void postConstruct() { + log.info("OqmSignatureService bean constructed."); + } +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..31d8353a241e2c0b1186edf75a698bd89b56d1ae --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionCacheFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; + +import org.opengroup.osdu.core.common.cache.ICache; +import org.opengroup.osdu.core.common.cache.MultiTenantCache; +import org.opengroup.osdu.core.common.cache.VmCache; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@ConditionalOnProperty(name = "oqmDriver") +public class OqmSubscriptionCacheFactory { + @Autowired + private ITenantFactory tenantFactory; + + private MultiTenantCache<String> caches; + + public OqmSubscriptionCacheFactory(@Value("${app.expireTime}") int expireTime, @Value("${app.maxCacheSize}") int maxCacheSize) { + this.caches = new MultiTenantCache<>(new VmCache<>(expireTime, maxCacheSize)); + } + + public void put(String key, String val, Map<String, String> headerAttributes) { + this.partitionCache(headerAttributes).put(key, val); + } + + public String get(String key, Map<String, String> headerAttributes) { + return this.partitionCache(headerAttributes).get(key); + } + + public void delete(String key, Map<String, String> headerAttributes) { + this.partitionCache(headerAttributes).delete(key); + } + + public void clearAll(Map<String, String> headerAttributes) { + this.partitionCache(headerAttributes).clearAll(); + } + + private ICache<String, String> partitionCache(Map<String, String> headerAttributes) { + String tenantId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); + TenantInfo tenantInfo = this.tenantFactory.getTenantInfo(tenantId); + if (tenantInfo == null) { + throw AppException.createUnauthorized(String.format("could not retrieve tenant info for data partition id: %s", tenantId)); + } + return this.caches.get(String.format("%s:subscription", tenantInfo.getDataPartitionId())); + } + +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..d326ced415cf96015a42a14bb05f63353e7d9a6d --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/OqmSubscriptionHandler.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.pubsub.di; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.gson.Gson; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; +import org.opengroup.osdu.core.common.notification.ISubscriptionService; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@ConditionalOnProperty(name = "oqmDriver") +public class OqmSubscriptionHandler { + @Autowired + private ISubscriptionFactory subscriptionFactory; + @Autowired + private OqmSubscriptionCacheFactory subscriptionCacheFactory; + @Autowired + private JaxRsDpsLog log; + + @Autowired + private IServiceAccountJwtClient serviceAccountJwtClient; + + private static final Gson gson = new Gson(); + private ObjectMapper objectMapper; + + public Subscription getSubscriptionFromCache(String notificationId, Map<String, String> headerAttributes) { + String subscriptionString = subscriptionCacheFactory.get(notificationId, headerAttributes); + try { + if (Strings.isNullOrEmpty(subscriptionString)) + subscriptionString = querySubscriptionAndUpdateCache(notificationId, headerAttributes); + ObjectMapper objectMapper = this.getObjectMapper(); + return objectMapper.readValue(subscriptionString, Subscription.class); + } catch (IOException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error Parsing subscription String to object", "Unexpected error in pushing message", e); + } catch (SubscriptionException se) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error query subscription from registration", "Unexpected error in pushing message", se); + } + } + + private String querySubscriptionAndUpdateCache(String notificationId, Map<String, String> headerAttributes) throws AppException, SubscriptionException { + DpsHeaders headers = getDpsHeaders(headerAttributes); + ISubscriptionService service = subscriptionFactory.create(headers); + + List<Subscription> subscriptionList = service.query(notificationId); + if (subscriptionList == null || subscriptionList.size() == 0) { + throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found"); + } + + Subscription subscription = subscriptionList.get(0); + String jsonSubscription = gson.toJson(subscription); + this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription, headerAttributes); + + return jsonSubscription; + } + + private DpsHeaders getDpsHeaders(Map<String, String> headerAttributes) { + Map<String, String> attributes = new HashMap<>(headerAttributes); + + //extract headers from pubsub message + String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID); + String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId); + attributes.put(DpsHeaders.AUTHORIZATION, authToken); + return DpsHeaders.createFromMap(attributes); + + } + + //unit test purpose + protected ObjectMapper getObjectMapper() { + if (this.objectMapper == null) { + this.objectMapper = new ObjectMapper(); + } + return this.objectMapper; + } + + //unit test purpose + void setObjectMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } +} diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md new file mode 100644 index 0000000000000000000000000000000000000000..609d9001fd89dd9649ea3d94084ab80428a3bda9 --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/pubsub/di/README.md @@ -0,0 +1,6 @@ +The content of the package is 4 classes derived from the eponymous classes from the core part +(find them by names with no "Oqm"prefix). They were derived for one only reason - to free OqmSubscriberManager +from addiction to "request scope" which is not valid for the OQM work context. + +In the future, one may probably find a better way to achieve the same (and reuse original classes) + diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..f4d35e2a55f884c7490dc0dec68de39c7e89d25c --- /dev/null +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GcpAppServiceConfig.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.gcp.util; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +@Component +@Primary +public class GcpAppServiceConfig { + + @Value("${APP_PROJECT}") + private String googleCloudProject; + + public String getGoogleCloudProject() { + return googleCloudProject; + } +} \ No newline at end of file diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java index 1555ab181b10abe173a290752a13ed0628f7409d..ae30169a57df28d6b5c11b5ce1df668b237ee6f1 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/GoogleServiceAccountImpl.java @@ -18,7 +18,7 @@ package org.opengroup.osdu.notification.provider.gcp.util; import lombok.SneakyThrows; import org.apache.http.impl.client.CloseableHttpClient; -import org.opengroup.osdu.core.gcp.GoogleIdToken.IGoogleIdTokenFactory; +import org.opengroup.osdu.core.gcp.googleidtoken.IGoogleIdTokenFactory; import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; diff --git a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java index 1484bad38cdf6ff9f51070154f39b776f3398086..b5b7d51ff2b241f0b6def88f1f99d0d05b2fbfb5 100644 --- a/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java +++ b/provider/notification-gcp/src/main/java/org/opengroup/osdu/notification/provider/gcp/util/ServiceAccountJwtGcpClientImpl.java @@ -19,8 +19,7 @@ package org.opengroup.osdu.notification.provider.gcp.util; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson.JacksonFactory; +import com.google.api.client.json.gson.GsonFactory; import com.google.api.services.iam.v1.Iam; import com.google.api.services.iam.v1.IamScopes; import com.google.api.services.iam.v1.model.SignJwtRequest; @@ -59,7 +58,7 @@ public class ServiceAccountJwtGcpClientImpl implements IServiceAccountJwtClient private AppProperties config; private static final String JWT_AUDIENCE = "https://www.googleapis.com/oauth2/v4/token"; private static final String SERVICE_ACCOUNT_NAME_FORMAT = "projects/%s/serviceAccounts/%s"; - private static final JsonFactory JSON_FACTORY = new JacksonFactory(); + private static final GsonFactory JSON_FACTORY = new GsonFactory(); static final String INVALID_INPUT = "Invalid inputs provided to getIdToken function"; static final String INVALID_DATA_PARTITION = "Invalid data partition id"; diff --git a/provider/notification-gcp/src/main/resources/application.properties b/provider/notification-gcp/src/main/resources/application.properties index c2f3bf7bea18040d814d894a14b5b50ec671e6ad..45946ed858f7666a2e981b026a911b0f66854b93 100644 --- a/provider/notification-gcp/src/main/resources/application.properties +++ b/provider/notification-gcp/src/main/resources/application.properties @@ -22,4 +22,6 @@ app.maxCacheSize=10 server.error.whitelabel.enabled=false google.audiences=${APP_AUDIENCES} -partition.api=http://localhost:8081/api/partition/v1 \ No newline at end of file +partition.api=http://localhost:8081/api/partition/v1 + +oqmDriver=pubsub \ No newline at end of file