From eea7b96a55c8c71bd60edb716d20de9bd8f7caba Mon Sep 17 00:00:00 2001 From: Manish Jangid <msjangid@amazon.com> Date: Wed, 14 Jun 2023 18:40:16 +0000 Subject: [PATCH] changes to enable OSDU notification feature on AWS --- provider/notification-aws/pom.xml | 105 +++---- .../aws/model/FailedNotificationDoc.java | 54 ++++ .../aws/model/NotificationResult.java | 19 ++ .../aws/model/RetryProcessResult.java | 28 ++ .../provider/aws/model/SubscriptionDoc.java | 84 ++++++ .../aws/queue/NotificationQueueService.java | 27 ++ .../queue/NotificationRetryQueueService.java | 27 ++ .../queue/NotificationRetrySQSHandler.java | 101 +++++++ .../aws/queue/NotificationSQSHandler.java | 107 ++++++++ .../impl/NotificationQueueServiceImpl.java | 202 ++++++++++++++ .../NotificationRetryQueueServiceImpl.java | 212 ++++++++++++++ .../repository/SubscriptionRepository.java | 94 +++++++ .../provider/aws/security/KmsConfig.java | 45 +++ .../provider/aws/security/KmsHelper.java | 100 +++++++ .../aws/security/ThreadSignatureService.java | 151 ++++++++++ .../aws/service/AwsNotificationHandler.java | 85 ++++++ .../provider/aws/utils/SQSUtils.java | 92 +++++++ .../src/main/resources/application.properties | 8 + .../NotificationRetrySQSHandlerTest.java | 76 ++++++ .../aws/queue/NotificationSQSHandlerTest.java | 75 +++++ .../NotificationQueueServiceImplTest.java | 245 +++++++++++++++++ .../NotificationRetryQueueServiceTest.java | 258 ++++++++++++++++++ .../SubscriptionRepositoryTest.java | 144 ++++++++++ .../security/ThreadSignatureServiceTest.java | 109 ++++++++ .../service/AwsNotificationHandlerTest.java | 113 ++++++++ .../provider/aws/utils/SQSUtilsTest.java | 84 ++++++ 26 files changed, 2598 insertions(+), 47 deletions(-) create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java create mode 100644 provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java create mode 100644 provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java diff --git a/provider/notification-aws/pom.xml b/provider/notification-aws/pom.xml index 5f370e9b0..ca8967177 100644 --- a/provider/notification-aws/pom.xml +++ b/provider/notification-aws/pom.xml @@ -76,7 +76,7 @@ <dependency> <groupId>org.opengroup.osdu.core.aws</groupId> <artifactId>os-core-lib-aws</artifactId> - <version>0.21.0-rc5</version> + <version>0.21.0</version> </dependency> <dependency> <groupId>commons-io</groupId> @@ -119,57 +119,68 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>${jackson.version}</version> - </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson-databind.version}</version> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <scope>test</scope> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - <version>${jackson.version}</version> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito2</artifactId> + <version>2.0.9</version> + <scope>test</scope> </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-maven-plugin</artifactId> - <version>${spring-boot-maven-plugin.version}</version> - <executions> - <execution> - <goals> - <goal>repackage</goal> - </goals> - <configuration> - <classifier>spring-boot</classifier> - <mainClass> - org.opengroup.osdu.notification.provider.aws.Application - </mainClass> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-war-plugin</artifactId> - <configuration> - <failOnMissingWebXml>false</failOnMissingWebXml> - </configuration> - </plugin> - <plugin> - <groupId>org.owasp</groupId> - <artifactId>dependency-check-maven</artifactId> - <version>7.4.4</version> - <configuration> - <!-- .Disable Net content--> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson-databind.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>${spring-boot-maven-plugin.version}</version> + <executions> + <execution> + <goals> + <goal>repackage</goal> + </goals> + <configuration> + <classifier>spring-boot</classifier> + <mainClass> + org.opengroup.osdu.notification.provider.aws.Application + </mainClass> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-war-plugin</artifactId> + <configuration> + <failOnMissingWebXml>false</failOnMissingWebXml> + </configuration> + </plugin> + <plugin> + <groupId>org.owasp</groupId> + <artifactId>dependency-check-maven</artifactId> + <version>7.4.4</version> + <configuration> + <!-- .Disable Net content--> <assemblyAnalyzerEnabled>false</assemblyAnalyzerEnabled> <nugetconfAnalyzerEnabled>false</nugetconfAnalyzerEnabled> <nuspecAnalyzerEnabled>false</nuspecAnalyzerEnabled> diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java new file mode 100644 index 000000000..59cb608b1 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java @@ -0,0 +1,54 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexRangeKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTypeConvertedEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@DynamoDBTable(tableName = "Failed-Notification") +public class FailedNotificationDoc { + + @DynamoDBHashKey(attributeName = "id") + private String id; + + @DynamoDBAttribute(attributeName = "dataPartitionId") + private String dataPartitionId; + + @DynamoDBTypeConvertedEnum + @DynamoDBAttribute(attributeName = "status") + private NotificationResult status; + + @DynamoDBIndexHashKey(attributeName = "partitionIdSubscriptionId", globalSecondaryIndexName = "subscription-index") + private String partitionIdSubscriptionId; + + @DynamoDBIndexRangeKey(attributeName = "createdOnEpoch", globalSecondaryIndexName = "subscription-index") + private String createdOnEpoch; + + @DynamoDBAttribute(attributeName = "ttl") + private Long ttl; + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java new file mode 100644 index 000000000..e3ebcef11 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java @@ -0,0 +1,19 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +public enum NotificationResult { + ACK, + NACK +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java new file mode 100644 index 000000000..a268162a1 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java @@ -0,0 +1,28 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.sqs.model.Message; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class RetryProcessResult { + Message message; + + NotificationResult result; +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java new file mode 100644 index 000000000..93284ae75 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java @@ -0,0 +1,84 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.nio.ByteBuffer; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@DynamoDBTable(tableName = "Register.Subscription") +public class SubscriptionDoc { + + @DynamoDBHashKey(attributeName = "id") + private String id; + + @DynamoDBAttribute(attributeName = "name") + private String name; + + @DynamoDBAttribute(attributeName = "description") + private String description; + + @DynamoDBAttribute(attributeName = "topic") + private String topic; + + @DynamoDBAttribute(attributeName = "pushEndpoint") + private String pushEndpoint; + + @DynamoDBAttribute(attributeName = "createdBy") + private String createdBy; + + @DynamoDBAttribute(attributeName = "createdOnEpoch") + private String createdOnEpoch; + + @DynamoDBAttribute(attributeName = "notificationId") + private String notificationId; + + @DynamoDBAttribute(attributeName = "secretType") + private String secretType; + + @DynamoDBAttribute(attributeName = "secretValue") + private ByteBuffer secretValue; + + @DynamoDBIndexHashKey(attributeName = "dataPartitionId", globalSecondaryIndexName = "dataPartitionId-index") + private String dataPartitionId; + + public static Subscription mapTo(SubscriptionDoc subDoc, Secret secret) { + Subscription sub = new Subscription(); + sub.setId(subDoc.getId()); + sub.setName(subDoc.getName()); + sub.setDescription(subDoc.getDescription()); + sub.setTopic(subDoc.getTopic()); + sub.setPushEndpoint(subDoc.getPushEndpoint()); + sub.setCreatedBy(subDoc.getCreatedBy()); + sub.setNotificationId(subDoc.getNotificationId()); + sub.setSecret(secret); + + return sub; + } + + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java new file mode 100644 index 000000000..40f21f3ea --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java @@ -0,0 +1,27 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.util.List; + +public interface NotificationQueueService { + + List<Message> processNotificationMessages(List<Message> messages); + + void processMessagesBySubscription(Subscription subscription, List<Message> messages, String dataPartitionId); +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java new file mode 100644 index 000000000..aa7471569 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java @@ -0,0 +1,27 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; + +import java.util.List; + +public interface NotificationRetryQueueService { + + List<RetryProcessResult> processNotificationMessages(List<Message> messages); + + List<RetryProcessResult> processMessagesBySubscription(Subscription subscription, List<Message> messages); +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java new file mode 100644 index 000000000..f04a75397 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java @@ -0,0 +1,101 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationRetryQueueServiceImpl; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +@Service +public class NotificationRetrySQSHandler { + + private static final Logger logger = LoggerFactory.getLogger(NotificationRetrySQSHandler.class); + private static final int MAX_MESSAGE_ALLOWED = 100; + private static final int MAX_BATCH_REQUEST_COUNT = 10; + + @Value("${aws.region}") + private String region; + + @Value("${aws.sqs.poll.sleepTime.retryQueue}") + private Integer sleepTime; + + private String notificationRetryQueueUrl; + + private SQSUtils sqsUtil; + + private NotificationRetryQueueServiceImpl notificationRetryQueueService; + + public NotificationRetrySQSHandler(NotificationRetryQueueServiceImpl notificationRetryQueueService, SQSUtils sqsUtil) { + this.notificationRetryQueueService = notificationRetryQueueService; + this.sqsUtil = sqsUtil; + } + + @Async + @EventListener(ApplicationStartedEvent.class) + public void init() throws K8sParameterNotFoundException, InterruptedException { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationRetryQueueUrl = provider.getParameterAsString("notification-retry-sqs-url"); + processNotifications(); + } + + private void processNotifications() throws InterruptedException { + List<Message> messages; + logger.info("Running notification Retry SQS processor"); + while (true) { + try { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + AmazonSQS sqsClient = sqsConfig.AmazonSQS(); + messages = sqsUtil.getMessages(sqsClient, notificationRetryQueueUrl, MAX_BATCH_REQUEST_COUNT, MAX_MESSAGE_ALLOWED); + if (messages.isEmpty()) { + logger.info("No messages in the retry queue, sleeping for {} seconds", sleepTime); + Thread.sleep(sleepTime * 1000L); + continue; + } + logger.info("Processing {} messages from notification retry queue", messages.size()); + List<RetryProcessResult> notificationResults = notificationRetryQueueService.processNotificationMessages(messages); + List<Message> failedMessages = notificationResults.stream().filter(msg -> NotificationResult.NACK.equals(msg.getResult())).map(RetryProcessResult::getMessage).collect( + Collectors.toList()); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, notificationRetryQueueUrl, failedMessages); + + List<Message> deleteMessageList = notificationResults.stream().filter(result -> NotificationResult.ACK.equals(result.getResult())) + .map(RetryProcessResult::getMessage).collect(Collectors.toList()); + if (!deleteMessageList.isEmpty()) { + int deletedCount = sqsUtil.deleteMessages(deleteMessageList, notificationRetryQueueUrl, sqsClient); + logger.info("Deleted Messages count :: {}", deletedCount); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + logger.error("General Exception", e); + } + } + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java new file mode 100644 index 000000000..7a1078e85 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java @@ -0,0 +1,107 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Service +public class NotificationSQSHandler { + + private static final Logger logger = LoggerFactory.getLogger(NotificationSQSHandler.class); + private static final int MAX_MESSAGE_ALLOWED = 100; + private static final int MAX_BATCH_REQUEST_COUNT = 10; + + @Value("${aws.region}") + private String region; + + @Value("${aws.sqs.poll.sleepTime.mainQueue}") + private Integer sleepTime; + private String notificationQueueUrl; + + private String notificationDeadLetterQueueUrl; + + private NotificationQueueService notificationQueueService; + + private SQSUtils sqsUtil; + + public NotificationSQSHandler(NotificationQueueService notificationQueueService, SQSUtils sqsUtil) { + this.notificationQueueService = notificationQueueService; + this.sqsUtil = sqsUtil; + } + + @Async + @EventListener(ApplicationStartedEvent.class) + public void init() throws K8sParameterNotFoundException, InterruptedException { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationQueueUrl = provider.getParameterAsString("notification-sqs-url"); + notificationDeadLetterQueueUrl = provider.getParameterAsString("notification-deadletter-queue-url"); + processNotifications(); + } + + private void processNotifications() throws InterruptedException { + List<Message> messages; + logger.info("Running notification SQS processor"); + while (true) { + try { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + AmazonSQS sqsClient = sqsConfig.AmazonSQS(); + messages = sqsUtil.getMessages(sqsClient, notificationQueueUrl, MAX_BATCH_REQUEST_COUNT, MAX_MESSAGE_ALLOWED); + if (messages.isEmpty()) { + logger.info("No messages in the main queue, sleeping for {} seconds", sleepTime); + Thread.sleep(sleepTime * 1000L); + continue; + } + logger.info("Processing {} messages from notification queue", messages.size()); + + List<Message> invalidMessages = messages.stream().filter(msg -> !msg.getMessageAttributes().containsKey("data-partition-id")).collect(Collectors.toList()); + List<Message> validMessages = messages.stream().filter(msg -> msg.getMessageAttributes().containsKey("data-partition-id")).collect(Collectors.toList()); + + ArrayList<Message> deleteMessageList = new ArrayList<>(invalidMessages); + ArrayList<Message> deadLetterMessages = new ArrayList<>(invalidMessages); + + if (!deadLetterMessages.isEmpty()) { + logger.info("Sending {} invalid messages to dead letter queue", deadLetterMessages.size()); + sqsUtil.sendMessagesToDeadLetterQueue(notificationDeadLetterQueueUrl, deadLetterMessages, sqsClient); + } + List<Message> processedMessages = notificationQueueService.processNotificationMessages(validMessages); + deleteMessageList.addAll(processedMessages); + if (!deleteMessageList.isEmpty()) { + int deletedCount = sqsUtil.deleteMessages(deleteMessageList, notificationQueueUrl, sqsClient); + logger.info("Deleted Messages count :: {}", deletedCount); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + logger.error("General Exception: ", e); + } + } + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java new file mode 100644 index 000000000..b79914ce6 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java @@ -0,0 +1,202 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDeleteExpression; +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.queue.NotificationQueueService; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.stream.Collectors; + +@Service +public class NotificationQueueServiceImpl implements NotificationQueueService { + + private static final Logger logger = LoggerFactory.getLogger(NotificationQueueServiceImpl.class); + + + @Value("${aws.region}") + private String region; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + private SubscriptionRepository subscriptionRepository; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private AwsNotificationHandler notificationHandler; + + private AmazonSQS sqsClient; + + private String notificationRetryQueueUrl; + + + public NotificationQueueServiceImpl(SubscriptionRepository subscriptionRepository, DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, + AwsNotificationHandler awsNotificationHandler) { + this.subscriptionRepository = subscriptionRepository; + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.notificationHandler = awsNotificationHandler; + } + + @PostConstruct + public void init() throws K8sParameterNotFoundException { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + sqsClient = sqsConfig.AmazonSQS(); + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationRetryQueueUrl = provider.getParameterAsString("notification-retry-sqs-url"); + } + + + @Override + public List<Message> processNotificationMessages(List<Message> messages) { + try { + Map<String, List<Message>> messagesByDataPartition = messages.stream().collect( + Collectors.groupingBy(msg -> msg.getMessageAttributes().get("data-partition-id").getStringValue(), HashMap::new, Collectors.toCollection(ArrayList::new))); + + return messagesByDataPartition.entrySet().parallelStream() + .map(entry -> processMessagesByDataPartition(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull).flatMap(list -> list.stream()).collect(Collectors.toList()); + } catch (Exception e) { + logger.error("Exception processing messages :", e); + return new ArrayList<>(); + } + } + + public List<Message> processMessagesByDataPartition(String dataPartitionId, List<Message> messages) { + try { + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition(dataPartitionId); + if (!subscriptionList.isEmpty()) { + subscriptionList.parallelStream().forEach(subscription -> processMessagesBySubscription(subscription, messages, dataPartitionId)); + } + return messages; + } catch (Exception e) { + logger.error("Exception processing messages for dataPartitionId : {}", dataPartitionId, e); + return new ArrayList<>(); + } + } + + public void processMessagesBySubscription(Subscription subscription, List<Message> messages, String dataPartitionId) { + List<Message> messagesToRetry = new ArrayList<>(); + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + try { + FailedNotificationDoc gsiQuery = new FailedNotificationDoc(); + gsiQuery.setPartitionIdSubscriptionId(String.join("-", dataPartitionId, subscription.getId())); + PaginatedQueryList<FailedNotificationDoc> results = dynamoDBQueryHelper.queryByGSI(FailedNotificationDoc.class, gsiQuery); + + if (results != null && !results.isEmpty()) { + logger.debug("Subscription {} has previous failed messages", subscription.getId()); + messagesToRetry.addAll(messages); + } else { + boolean hasPreviousFailed = false; + for (Message message : messages) { + logger.debug("Processing Notification for messageId: {}, Subscription Endpoint: {}", message.getMessageId(), subscription.getPushEndpoint()); + if (hasPreviousFailed) { + messagesToRetry.add(message); + continue; + } + Map<String, String> messageAttributes = message.getMessageAttributes().entrySet().stream().collect( + Collectors.toMap(Entry::getKey, attribute -> attribute.getValue().getStringValue())); + if (!notifySubscriber(subscription, message.getBody(), messageAttributes)) { + messagesToRetry.add(message); + hasPreviousFailed = true; + } + } + } + } catch (Exception e) { + logger.error("Exception in processMessagesBySubscription for {} :", subscription.getPushEndpoint(), e); + messagesToRetry.addAll(messages); + } + logger.info("Messages retry size :: {}", messagesToRetry.size()); + messagesToRetry.forEach(message -> insertFailedNotification(message, subscription, dataPartitionId, dynamoDBQueryHelper)); + } + + private boolean notifySubscriber(Subscription subscription, String messageBody, Map<String, String> headerAttributes) { + HttpResponse response; + try { + response = notificationHandler.notifySubscriber(subscription, messageBody, headerAttributes); + } catch (Exception e) { + logger.error("Failed to notify subscriber {} : ", subscription.getPushEndpoint(), e); + return false; + } + return response.isSuccessCode(); + } + + + private void insertFailedNotification(Message message, Subscription subscription, String dataPartitionId, DynamoDBQueryHelperV2 dynamoDBQueryHelper) { + logger.debug("Inserting failing notification for :: {}, {}", subscription.getPushEndpoint(), message.getMessageId()); + Long currentTimeMillis = System.currentTimeMillis(); + Long ttl = currentTimeMillis / 1000L + 432000; + + String id = Base64.getEncoder().encodeToString((subscription.getPushEndpoint() + message.getMessageId()).getBytes()); + try { + Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); + + MessageAttributeValue failedNotificationRecordId = new MessageAttributeValue() + .withDataType("String"); + failedNotificationRecordId.setStringValue(id); + + MessageAttributeValue subscriptionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + subscriptionIdAttribute.setStringValue(subscription.getId()); + + messageAttributes.put("subscriptionId", subscriptionIdAttribute); + messageAttributes.put("failedNotificationRecordId", failedNotificationRecordId); + + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(notificationRetryQueueUrl) + .withMessageBody(message.getBody()) + .withMessageAttributes(messageAttributes); + sqsClient.sendMessage(sendMessageRequest); + + FailedNotificationDoc notificationDoc = FailedNotificationDoc.builder() + .id(id) + .partitionIdSubscriptionId(String.join("-", dataPartitionId, subscription.getId())) + .status(NotificationResult.NACK) + .createdOnEpoch(new Timestamp(currentTimeMillis).toString()) + .dataPartitionId(dataPartitionId) + .ttl(ttl).build(); + dynamoDBQueryHelper.saveWithHashCondition(notificationDoc, "id"); + } catch (Exception e) { + logger.error("Exception while inserting failed record, messageId: {}, subscriptionPushUrl: {} ", message.getMessageId(), subscription.getPushEndpoint(), e); + } + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java new file mode 100644 index 000000000..56b7ea88a --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java @@ -0,0 +1,212 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDeleteExpression; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.queue.NotificationRetryQueueService; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; + +@Service +public class NotificationRetryQueueServiceImpl implements NotificationRetryQueueService { + + private static final Logger logger = LoggerFactory.getLogger(NotificationRetryQueueServiceImpl.class); + + private static final int SQS_MIN_VISIBILITY_TIMEOUT = 30; + + private static final int SQS_MAX_VISIBILITY_TIMEOUT = 43200; + + private static final String FAILED_NOTIFICATION_RECORD_ID = "failedNotificationRecordId"; + + private AwsNotificationHandler notificationHandler; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private SubscriptionRepository subscriptionRepository; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + @Value("${aws.sqs.visibilityTimeout:300}") + private Integer visibilityTimeout; + + public NotificationRetryQueueServiceImpl(AwsNotificationHandler notificationHandler, DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, + SubscriptionRepository subscriptionRepository) { + this.notificationHandler = notificationHandler; + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.subscriptionRepository = subscriptionRepository; + } + + @Override + public List<RetryProcessResult> processNotificationMessages(List<Message> messages) { + Map<String, List<Message>> messagesBySubscription = messages.stream().collect(Collectors.groupingBy( + msg -> String.join(":", msg.getMessageAttributes().get("data-partition-id").getStringValue(), msg.getMessageAttributes().get("subscriptionId").getStringValue()), + HashMap::new, Collectors.toCollection(ArrayList::new))); + + return messagesBySubscription.entrySet().parallelStream().map(entry -> { + try { + Optional<Subscription> subscription = subscriptionRepository.getSubscriptionById(entry.getValue().get(0).getMessageAttributes().get("subscriptionId").getStringValue(), + entry.getValue().get(0).getMessageAttributes().get("data-partition-id").getStringValue()); + if (!subscription.isPresent()) { + logger.info("Subscription with id {} not found for dataPartitionId {}", + entry.getValue().get(0).getMessageAttributes().get("subscriptionId").getStringValue(), + entry.getValue().get(0).getMessageAttributes().get("data-partition-id").getStringValue()); + deleteDbRecords(entry.getValue()); + return entry.getValue().stream().map(msg -> new RetryProcessResult(msg, NotificationResult.ACK)).collect(Collectors.toList()); + } + return processMessagesBySubscription(subscription.get(), entry.getValue()); + } catch (Exception e) { + logger.error("Exception while trying to send retry notifications:", e); + return entry.getValue().stream().map(msg -> new RetryProcessResult(msg, NotificationResult.NACK)).collect(Collectors.toList()); + } + }).flatMap(Collection::stream).collect(Collectors.toList()); + } + + @Override + public List<RetryProcessResult> processMessagesBySubscription(Subscription subscription, List<Message> messages) { + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + boolean hasPreviousFailed = false; + List<RetryProcessResult> notificationResults = new ArrayList<>(); + + for (Message message : messages) { + RetryProcessResult processResult = new RetryProcessResult(); + processResult.setMessage(message); + + logger.debug("Processing Notification for messageId: {}, Subscription Endpoint: {}", message.getMessageId(), subscription.getPushEndpoint()); + if (hasPreviousFailed) { + processResult.setResult(NotificationResult.NACK); + notificationResults.add(processResult); + continue; + } + Map<String, String> messageAttributes = message.getMessageAttributes().entrySet().stream().collect( + Collectors.toMap(Entry::getKey, attribute -> attribute.getValue().getStringValue())); + try { + HttpResponse response = notificationHandler.notifySubscriber(subscription, message.getBody(), messageAttributes); + if (!response.isSuccessCode()) { + processResult.setResult(NotificationResult.NACK); + hasPreviousFailed = true; + } else { + processResult.setResult(NotificationResult.ACK); + FailedNotificationDoc doc = dynamoDBQueryHelper.loadByPrimaryKey(FailedNotificationDoc.class, + message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + if (doc != null) { + FailedNotificationDoc objectToDelete = new FailedNotificationDoc(); + objectToDelete.setId(message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + DynamoDBDeleteExpression deleteExpression = new DynamoDBDeleteExpression() + .withConditionExpression("attribute_exists(id)"); + dynamoDBQueryHelper.deleteByObjectWithCondition(objectToDelete, deleteExpression); + } + } + } catch (Exception e) { + logger.error("Failed to notify subscriber : ", e); + processResult.setResult(NotificationResult.NACK); + hasPreviousFailed = true; + } + notificationResults.add(processResult); + } + return notificationResults; + } + + public void changeMessageVisibilityTimeout(AmazonSQS sqsClient, String sqsQueueUrl, List<Message> messages) { + List<ChangeMessageVisibilityBatchRequestEntry> entries = new ArrayList<>(); + for (Message message : messages) { + int timeout = exponentialTimeOutWindow(parseIntOrDefault(message.getAttributes().get("ApproximateReceiveCount"), 0)); + if (timeout < SQS_MIN_VISIBILITY_TIMEOUT || timeout > SQS_MAX_VISIBILITY_TIMEOUT) { + timeout = timeout < SQS_MIN_VISIBILITY_TIMEOUT ? SQS_MIN_VISIBILITY_TIMEOUT : SQS_MAX_VISIBILITY_TIMEOUT; + } + entries.add( + new ChangeMessageVisibilityBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()) + .withVisibilityTimeout(timeout)); + if (entries.size() == 10) { + sqsClient.changeMessageVisibilityBatch(sqsQueueUrl, entries); + entries.clear(); + } + } + + if (!entries.isEmpty()) { + sqsClient.changeMessageVisibilityBatch(sqsQueueUrl, entries); + } + } + + private void deleteDbRecords(List<Message> messages) { + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + messages.parallelStream().forEach(message -> { + FailedNotificationDoc doc = dynamoDBQueryHelper.loadByPrimaryKey(FailedNotificationDoc.class, + message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + try { + FailedNotificationDoc objectToDelete = new FailedNotificationDoc(); + objectToDelete.setId(message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + DynamoDBDeleteExpression deleteExpression = new DynamoDBDeleteExpression() + .withConditionExpression("attribute_exists(id)"); + dynamoDBQueryHelper.deleteByObjectWithCondition(objectToDelete, deleteExpression); + } catch (Exception e) { + logger.info("Failed to delete: {}", e.getMessage()); + } + }); + } + + private Integer exponentialTimeOutWindow(int receiveCount) { + if (receiveCount <= 10) { + return visibilityTimeout; + } + if (receiveCount <= 20) { + return visibilityTimeout * 2; + } + if (receiveCount <= 30) { + return visibilityTimeout * 4; + } + if (receiveCount <= 40) { + return visibilityTimeout * 8; + } + if (receiveCount <= 50) { + return visibilityTimeout * 16; + } + if (receiveCount <= 60) { + return visibilityTimeout * 32; + } + return 43200; + } + + private Integer parseIntOrDefault(String toParse, int defaultValue) { + try { + return Integer.parseInt(toParse); + } catch (NumberFormatException e) { + return defaultValue; + } + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java new file mode 100644 index 000000000..51f4897ba --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java @@ -0,0 +1,94 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.repository; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.model.notification.GsaSecret; +import org.opengroup.osdu.core.common.model.notification.GsaSecretValue; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.SubscriptionDoc; +import org.opengroup.osdu.notification.provider.aws.security.KmsHelper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Repository; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Repository +public class SubscriptionRepository { + + private static final String GSA_SECRET = "GSA"; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private KmsHelper kmsHelper; + + @Value("${aws.dynamodb.subscriptionTable.ssm.relativePath}") + String subscriptionTableRelativePath; + + public SubscriptionRepository(DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, KmsHelper kmsHelper) { + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.kmsHelper = kmsHelper; + } + + public List<Subscription> getAllSubscriptionsByDataPartition(String dataPartitionId) { + DynamoDBQueryHelperV2 queryHelper = dynamoDBQueryHelperFactory.getQueryHelperForPartition(dataPartitionId, subscriptionTableRelativePath); + SubscriptionDoc gsiQuery = new SubscriptionDoc(); + gsiQuery.setDataPartitionId(dataPartitionId); + PaginatedQueryList<SubscriptionDoc> results = queryHelper.queryByGSI(SubscriptionDoc.class, gsiQuery); + + if(results == null){ + return Collections.emptyList(); + } + return results.stream().map(subsDoc -> { + String secretValue = kmsHelper.decrypt(subsDoc.getSecretValue(), dataPartitionId); + Secret secret = createSecret(subsDoc.getSecretType(), secretValue); + return SubscriptionDoc.mapTo(subsDoc, secret); + }).collect(Collectors.toList()); + } + + public Optional<Subscription> getSubscriptionById(String subscriptionId, String dataPartitionId) { + DynamoDBQueryHelperV2 queryHelper = dynamoDBQueryHelperFactory.getQueryHelperForPartition(dataPartitionId, subscriptionTableRelativePath); + SubscriptionDoc subsDoc = queryHelper.loadByPrimaryKey(SubscriptionDoc.class, subscriptionId); + if (subsDoc == null) { + return Optional.empty(); + } + String secretValue = kmsHelper.decrypt(subsDoc.getSecretValue(), dataPartitionId); + Secret secret = createSecret(subsDoc.getSecretType(), secretValue); + return Optional.of(SubscriptionDoc.mapTo(subsDoc, secret)); + } + + private Secret createSecret(String secretType, String secretValue) { + Secret secret; + if (secretType.equals(GSA_SECRET)) { + GsaSecret gsaSecret = new GsaSecret(); + String[] splitSecret = secretValue.split("`"); + gsaSecret.setValue(new GsaSecretValue(splitSecret[0], splitSecret[1])); + secret = gsaSecret; + } else { + HmacSecret hmacSecret = new HmacSecret(); + hmacSecret.setValue(secretValue); + secret = hmacSecret; + } + return secret; + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java new file mode 100644 index 000000000..37e3473fb --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java @@ -0,0 +1,45 @@ +// Copyright © 2020 Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kms.AWSKMS; +import com.amazonaws.services.kms.AWSKMSClientBuilder; +import org.opengroup.osdu.core.aws.configurationsetup.ConfigSetup; +import org.opengroup.osdu.core.aws.iam.IAMConfig; + +//This class should be moved to os-core-lib-aws. Keeping it here temporarily till testing is complete +public class KmsConfig { + + private String amazonKmsEndpoint; + + private String amazonKmsRegion; + + private AWSCredentialsProvider amazonAWSCredentials; + + public KmsConfig(String amazonKmsEndpoint, String amazonKmsRegion) { + amazonAWSCredentials = IAMConfig.amazonAWSCredentials(); + this.amazonKmsEndpoint = amazonKmsEndpoint; + this.amazonKmsRegion = amazonKmsRegion; + } + + public AWSKMS awsKMS() { + // Generate the KMS client + return AWSKMSClientBuilder.standard() + .withCredentials(amazonAWSCredentials) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonKmsEndpoint, amazonKmsRegion)) + .withClientConfiguration(ConfigSetup.setUpConfig()).build(); + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java new file mode 100644 index 000000000..73ad6a416 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java @@ -0,0 +1,100 @@ +// Copyright © 2020 Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + + +import com.amazonaws.services.kms.AWSKMS; +import com.amazonaws.services.kms.model.DecryptRequest; +import com.amazonaws.services.kms.model.EncryptRequest; +import com.amazonaws.services.simplesystemsmanagement.model.InternalServerErrorException; +import lombok.AccessLevel; +import lombok.Setter; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.util.Collections; + +@Component +public class KmsHelper { + + @Value("${aws.region}") + @Setter(AccessLevel.PROTECTED) + public String amazonRegion; + + @Autowired + private DpsHeaders dpsHeaders; + + @Autowired + private JaxRsDpsLog logger; + + @Value("${aws.kms.endpoint}") + public String kmsEndpoint; + + private AWSKMS kmsClient; + + private String kmsKeyId; + + @PostConstruct + public void init() { + KmsConfig kmsConfig = new KmsConfig(kmsEndpoint, amazonRegion); + kmsClient = kmsConfig.awsKMS(); + try { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + kmsKeyId = provider.getParameterAsString("KMS_KEY_ID"); + } catch (K8sParameterNotFoundException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "SSM ParameterNotFoundException", e.getMessage()); + } catch (InternalServerErrorException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "SSM InternalServerErrorException", e.getErrorMessage()); + + } + + } + + + + public ByteBuffer encrypt(String plainTextString) { + + + EncryptRequest encReq = new EncryptRequest(); + encReq.setKeyId(kmsKeyId); + encReq.setPlaintext(ByteBuffer.wrap(plainTextString.getBytes())); + encReq.setEncryptionContext(Collections.singletonMap("dataPartitionId", dpsHeaders.getPartitionId())); + ByteBuffer ciphertext = kmsClient.encrypt(encReq).getCiphertextBlob(); + return ciphertext; + + + + } + public String decrypt(ByteBuffer ciphertext, String dataPartitionId) { + + DecryptRequest decReq = new DecryptRequest(); + + decReq.setCiphertextBlob(ciphertext); + decReq.setEncryptionContext(Collections.singletonMap("dataPartitionId", dataPartitionId)); + ByteBuffer decrypted = kmsClient.decrypt(decReq).getPlaintext(); + + String decryptedStr = new String(decrypted.array()); + return decryptedStr; + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java new file mode 100644 index 000000000..ca530568c --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java @@ -0,0 +1,151 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.aws.security; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.opengroup.osdu.core.common.cryptographic.HmacData; +import org.opengroup.osdu.core.common.cryptographic.ISignatureService; +import org.opengroup.osdu.core.common.cryptographic.SignatureServiceException; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Base64; + +@Component +@Primary +public class ThreadSignatureService implements ISignatureService { + + private static final String HMAC_SHA_256 = "HmacSHA256"; + private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}"; + private static final String NOTIFICATION_SERVICE = "de-notification-service"; + private static final long EXPIRE_DURATION = 30000L; + + private static final String INVALID_SIGNATURE = "Invalid signature"; + private static final String ERROR_GENERATING_SIGNATURE = "Error generating the signature"; + private static final String SIGNATURE_EXPIRED = "Signature is expired"; + private static final String MISSING_HMAC_SIGNATURE = "HMAC signature should not be null or empty"; + private static final String MISSING_SECRET_VALUE = "Secret should not be null or empty"; + private static final String MISSING_ATTRIBUTES_IN_SIGNATURE = "Missing url or nonce or expire time in the signature"; + + + @Override + public String getSignedSignature(String url, String secret) throws SignatureServiceException { + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret)) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE); + } + final long currentTime = System.currentTimeMillis(); + final String expireTime = String.valueOf(currentTime + EXPIRE_DURATION); + final String timeStamp = String.valueOf(currentTime); + try { + String nonce = DatatypeConverter.printHexBinary(generateRandomBytes(16)).toLowerCase(); + String data = String.format(DATA_FORMAT, expireTime, url, nonce); + final byte[] signature = getSignature(secret, nonce, timeStamp, data); + byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); + String dataBytesEncoded = Base64.getEncoder().encodeToString(dataBytes); + StringBuilder output = new StringBuilder(); + output.append(dataBytesEncoded) + .append(".") + .append(DatatypeConverter.printHexBinary(signature).toLowerCase()); + + return output.toString(); + } catch (Exception ex) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE, ex); + } + } + + @Override + public String getSignedSignature(String url, String secret, String expireTime, String nonce) throws SignatureServiceException { + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret) || !StringUtils.isNumeric(expireTime)) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE); + } + final long expiry = Long.parseLong(expireTime); + if (System.currentTimeMillis() > expiry) { + throw new SignatureServiceException(SIGNATURE_EXPIRED); + } + String timeStamp = String.valueOf(expiry - EXPIRE_DURATION); + String data = String.format(DATA_FORMAT, expireTime, url, nonce); + try { + final byte[] signature = getSignature(secret, nonce, timeStamp, data); + return DatatypeConverter.printHexBinary(signature).toLowerCase(); + } catch (Exception ex) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE, ex); + } + } + + + @Override + public void verifyHmacSignature(String hmac, String secret) throws SignatureServiceException { + if (Strings.isNullOrEmpty(hmac)) { + throw new SignatureServiceException(MISSING_HMAC_SIGNATURE); + } + if (Strings.isNullOrEmpty(secret)) { + throw new SignatureServiceException(MISSING_SECRET_VALUE); + } + String[] tokens = hmac.split("\\."); + if (tokens.length != 2) { + throw new SignatureServiceException(INVALID_SIGNATURE); + } + byte[] dataBytes = Base64.getDecoder().decode(tokens[0]); + String requestSignature = tokens[1]; + + String data = new String(dataBytes, StandardCharsets.UTF_8); + HmacData hmacData = new Gson().fromJson(data, HmacData.class); + String url = hmacData.getEndpointUrl(); + String nonce = hmacData.getNonce(); + String expireTime = hmacData.getExpireMillisecond(); + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(nonce) || Strings.isNullOrEmpty(expireTime)) { + throw new SignatureServiceException(MISSING_ATTRIBUTES_IN_SIGNATURE); + } + String newSignature = getSignedSignature(url, secret, expireTime, nonce); + if (!requestSignature.equalsIgnoreCase(newSignature)) { + throw new SignatureServiceException(INVALID_SIGNATURE); + } + } + + private byte[] getSignature(String secret, String nonce, String timeStamp, String data) throws Exception { + final byte[] secretBytes = DatatypeConverter.parseHexBinary(secret); + final byte[] nonceBytes = DatatypeConverter.parseHexBinary(nonce); + final byte[] encryptedNonce = computeHmacSha256(nonceBytes, secretBytes); + final byte[] encryptedTimestamp = computeHmacSha256(timeStamp, encryptedNonce); + final byte[] signedKey = computeHmacSha256(NOTIFICATION_SERVICE, encryptedTimestamp); + return computeHmacSha256(data, signedKey); + } + + private byte[] computeHmacSha256(final String data, final byte[] key) throws Exception { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + return mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); + } + + private byte[] computeHmacSha256(final byte[] data, final byte[] key) throws Exception { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + return mac.doFinal(data); + } + + private byte[] generateRandomBytes(final int size) { + final byte[] key = new byte[size]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(key); + return key; + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java new file mode 100644 index 000000000..517398f0b --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java @@ -0,0 +1,85 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.service; + +import com.google.common.base.Strings; +import org.opengroup.osdu.core.common.http.HttpClient; +import org.opengroup.osdu.core.common.http.HttpRequest; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.auth.factory.AuthFactory; +import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Map; + +@Service +public class AwsNotificationHandler { + + private static final Logger logger = LoggerFactory.getLogger(AwsNotificationHandler.class); + private static final String X_COLLABORATION_HEADER = "x-collaboration"; + + private HttpClient httpClient; + private AuthFactory authFactory; + @Value("${app.waitingTime:30000}") + private int waitingTime; + + public AwsNotificationHandler(HttpClient httpClient, AuthFactory authFactory) { + this.httpClient = httpClient; + this.authFactory = authFactory; + } + + public HttpResponse notifySubscriber(Subscription subscription, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { + Secret secret = subscription.getSecret(); + String endpoint = subscription.getPushEndpoint(); + String secretType = secret.getSecretType(); + String collaborationContext = ""; + + // Authentication Secret + SecretAuth secretAuth = authFactory.getSecretAuth(secretType); + secretAuth.setSecret(secret); + String pushUrl = secretAuth.getPushUrl(endpoint); + Map<String, String> requestHeader = secretAuth.getRequestHeaders(); + + if (headerAttributes.containsKey(X_COLLABORATION_HEADER)) { + collaborationContext = headerAttributes.get(X_COLLABORATION_HEADER); + } + + requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json"); + requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID)); + requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID)); + + if (!Strings.isNullOrEmpty(collaborationContext)) { + requestHeader.put(X_COLLABORATION_HEADER, headerAttributes.get(X_COLLABORATION_HEADER)); + } + + HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(waitingTime).build(); + logger.debug("Sending out notification to endpoint: {}", endpoint); + HttpResponse response = httpClient.send(request); + + logger.info("Request URL : {}, Response Code: {} ,Response Time: {}", subscription.getPushEndpoint(), response.getResponseCode(), Duration.ofMillis(response.getLatency())); + + if (!response.isSuccessCode()) { + logger.error("Subscriber notification request failed. notificationId={} pushEndpoint={} createdBy={} response={}", + subscription.getNotificationId(), subscription.getPushEndpoint(), subscription.getCreatedBy(), response.getBody()); + } + return response; + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java new file mode 100644 index 000000000..6d86f8e97 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java @@ -0,0 +1,92 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.utils; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +public class SQSUtils { + + public List<Message> getMessages(AmazonSQS sqsClient, String sqsQueueUrl, int maxBatchRequestCount, int maxMessageCount) { + int numOfMessages = maxBatchRequestCount; + List<Message> messages = new ArrayList<>(); + do { + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfMessages); + receiveMessageRequest.withMessageAttributeNames("All"); + receiveMessageRequest.withAttributeNames("All"); + List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + messages.addAll(retrievedMessages); + numOfMessages = retrievedMessages.size(); + + } while (messages.size() < maxMessageCount && numOfMessages > 0); + + return messages; + } + + public int deleteMessages(List<Message> messages, String sqsQueueUrl, AmazonSQS sqsClient) { + int deletedCount = 0; + List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(); + for (Message message : messages) { + deleteEntries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())); + if (deleteEntries.size() == 10) { + DeleteMessageBatchResult deleteResult = sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest(sqsQueueUrl, deleteEntries)); + deletedCount += deleteResult.getSuccessful().size(); + deleteEntries.clear(); + } + } + if (!deleteEntries.isEmpty()) { + DeleteMessageBatchResult deleteResult = sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest(sqsQueueUrl, deleteEntries)); + deletedCount += deleteResult.getSuccessful().size(); + deleteEntries.clear(); + } + return deletedCount; + } + + public List<SendMessageResult> sendMessagesToDeadLetterQueue(String deadLetterQueueUrl, List<Message> messages, AmazonSQS sqsClient) { + return messages.stream().map(message -> sendMessageToDeadLetterQueue(deadLetterQueueUrl, message, sqsClient)) + .collect(Collectors.toList()); + + } + + private SendMessageResult sendMessageToDeadLetterQueue(String deadLetterQueueUrl, Message message, AmazonSQS sqsClient) { + + Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); + MessageAttributeValue exceptionAttribute = new MessageAttributeValue() + .withDataType("String"); + + exceptionAttribute.setStringValue("Exception message: missing dataPartitionId"); + messageAttributes.put("Exception", exceptionAttribute); + SendMessageRequest send_msg_request = new SendMessageRequest() + .withQueueUrl(deadLetterQueueUrl) + .withMessageBody(message.getBody()) + .withMessageAttributes(messageAttributes); + return sqsClient.sendMessage(send_msg_request); + } + +} diff --git a/provider/notification-aws/src/main/resources/application.properties b/provider/notification-aws/src/main/resources/application.properties index 8dd0c1aed..33bc46a2a 100644 --- a/provider/notification-aws/src/main/resources/application.properties +++ b/provider/notification-aws/src/main/resources/application.properties @@ -49,4 +49,12 @@ server.ssl.key-alias=${SSL_KEY_ALIAS:osduonaws} server.ssl.key-password=${SSL_KEY_PASSWORD:} server.ssl.key-store-password=${SSL_KEY_STORE_PASSWORD:} +aws.kms.endpoint=kms.${AWS_REGION}.amazonaws.com +aws.dynamodb.failedNotificationTable.ssmPath=${FAILED_NOTIFICATION_SSM_PATH:/osdu/instances/${OSDU_INSTANCE_NAME}/core/notification/failed-notification-table} +aws.dynamodb.subscriptionTable.ssm.relativePath=${SUBSCRIPTION_SSM_RELATIVE_PATH:services/core/register/SubscriptionTable} + +aws.sqs.visibilityTimeout=${SQS_MIN_VISIBILITY_TIMEOUT:300} +aws.sqs.poll.sleepTime.mainQueue=${SQS_POLL_SLEEP_TIME_MAIN_QUEUE:30} +aws.sqs.poll.sleepTime.retryQueue=${SQS_POLL_SLEEP_TIME_RETRY_QUEUE:300} + spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.SecurityAutoConfiguration \ No newline at end of file diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java new file mode 100644 index 000000000..2f3450629 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java @@ -0,0 +1,76 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.powermock.reflect.Whitebox; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationRetrySQSHandlerTest { + + @Mock + private NotificationQueueService notificationRetryQueueService; + + @Mock + private SQSUtils sqsUtil; + + @InjectMocks + NotificationRetrySQSHandler notificationRetrySQSHandler; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + Mockito.when(mock.getParameterAsString("notification-retry-sqs-url")).thenReturn( + "test-sqs-url"); + }); + } + + @AfterClass + public static void close() { + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationRetrySQSHandler, "region", "us-east-1"); + } + + @Test(expected = InterruptedException.class) + public void processNotifications_interruptedException() throws Exception { + when(sqsUtil.getMessages(any(), anyString(), anyInt(), anyInt())).thenAnswer((t) -> { + throw new InterruptedException("Test Interrupted"); + }); + notificationRetrySQSHandler.init(); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java new file mode 100644 index 000000000..392ce5a1d --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java @@ -0,0 +1,75 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.powermock.reflect.Whitebox; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationSQSHandlerTest { + + @Mock + private NotificationQueueService notificationQueueService; + + @Mock + private SQSUtils sqsUtil; + + @InjectMocks + NotificationSQSHandler notificationSQSHandler; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + Mockito.when(mock.getParameterAsString("notification-sqs-url")).thenReturn( + "test-sqs-retry-url"); + }); + } + + @AfterClass + public static void close() { + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationSQSHandler, "region", "us-east-1"); + } + + @Test(expected = InterruptedException.class) + public void processNotifications_interruptedException() throws Exception { + when(sqsUtil.getMessages(any(), anyString(), anyInt(), anyInt())).thenAnswer((t) -> { + throw new InterruptedException("Test Interrupted"); + }); + notificationSQSHandler.init(); + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java new file mode 100644 index 000000000..0f9bbedf0 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java @@ -0,0 +1,245 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.defaultanswers.ForwardsInvocations; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationQueueServiceImpl; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationQueueServiceImplTest { + + @Mock + private SubscriptionRepository subscriptionRepository; + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private AwsNotificationHandler notificationHandler; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Mock + private AmazonSQS sqsClient; + + @InjectMocks + NotificationQueueServiceImpl notificationQueueService; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + //implement initializer for mock. Set return value for object A mock methods + when(mock.getParameterAsString("notification-sqs-url")).thenReturn( + "test-sqs-url"); + }); + } + + @AfterClass + public static void close(){ + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationQueueService, "region", "us-east-1"); + Whitebox.setInternalState(notificationQueueService, "failedNotificationTablePath", "failed-notificationTable-path"); + Whitebox.setInternalState(notificationQueueService, "sqsClient", sqsClient); + + Subscription subscription1 = new Subscription(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscription1Id"); + + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Arrays.asList(subscription1)); + when(dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(anyString())).thenReturn(dynamoDBQueryHelper); + } + + @Test + public void processNotificationMessage_success() throws Exception { + List<FailedNotificationDoc> list = new ArrayList<>(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(200); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_noSubscribers_success() { + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Collections.emptyList()); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionGettingSubscription_NoMessagesProcessed() { + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenAnswer(t -> { + throw new Exception("test Exception"); + }); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(0, responseMessageList.size()); + } + + @Test + public void processNotificationMessage_existingFailedMessages_sentToRetry() { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Arrays.asList(failedDoc); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionInFailedMessagesDBQuery_messagesSentToRetry() { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenAnswer(t -> { + throw new Exception(); + }); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_non200ResponseFromNotification_sentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionFromNotification_messagesSentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + when(notificationHandler.notifySubscriber(any(), any(), any())).thenThrow(new Exception()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_non200ResponseFromNotificationMultipleMessages_sentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition"), + createMessage("testMessage2", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(2, responseMessageList.size()); + assertEquals("testMessage2", responseMessageList.get(1).getMessageId()); + } + + private Message createMessage(String messageId, String dataPartitionId) { + Message message = new Message(); + message.setMessageId(messageId); + MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + dataPartitionIdAttribute.setStringValue(dataPartitionId); + message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute); + return message; + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java new file mode 100644 index 000000000..7f976384f --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java @@ -0,0 +1,258 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.powermock.reflect.Whitebox; +import org.springframework.beans.factory.annotation.Value; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationRetryQueueServiceTest { + + @Mock + private AwsNotificationHandler notificationHandler; + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private SubscriptionRepository subscriptionRepository; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + @InjectMocks + private NotificationRetryQueueServiceImpl notificationRetryQueueService; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Mock + private AmazonSQS sqsClient; + + @Captor + ArgumentCaptor<List<ChangeMessageVisibilityBatchRequestEntry>> changeMessageVisibilityRequestCaptor; + + @Before + public void beforeTest() { + Whitebox.setInternalState(notificationRetryQueueService, "failedNotificationTablePath", "failed-notificationTable-path"); + Whitebox.setInternalState(notificationRetryQueueService, "visibilityTimeout", 300); + + Subscription subscription1 = new Subscription(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscription1Id"); + + when(subscriptionRepository.getSubscriptionById(any(), any())).thenReturn(Optional.of(subscription1)); + when(dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(anyString())).thenReturn(dynamoDBQueryHelper); + } + + @Test + public void processNotificationMessage_success() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testFailedRecordId"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(failedDoc); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(200); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.ACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_subscriptionNotFound_messagesTobeMarkedAcked() { + when(subscriptionRepository.getSubscriptionById(any(), any())).thenReturn(Optional.empty()); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.ACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_generalException_messageNotProcessed() { + when(subscriptionRepository.getSubscriptionById(any(), any())).thenAnswer(t -> { + throw new Exception(); + }); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_non200ResponseFromSubscriber_messageNotAcked() throws Exception { + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId","testDataPartition", 1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_non200ResponseFromSubscriberMultipleMessages_messageNotAcked() throws Exception { + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId1","testDataPartition", 1), + createMessage("testMessage2", "testSubscription1Id", "testFailedRecordId2", "testDataPartition",1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(2, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(1).getResult()); + } + + @Test + public void processNotificationMessage_exceptionSendingNotification_messageNotAcked() throws Exception { + when(notificationHandler.notifySubscriber(any(), any(), any())).thenAnswer(t -> { + throw new Exception("test Exception"); + }); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void changeMessageVisibilityTimeout_success() { + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + List<ChangeMessageVisibilityBatchRequestEntry> visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(300), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",15)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(600), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",25)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(1200), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",35)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(2400), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",45)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(4800), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",55)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(9600), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId","testDataPartition", 61)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(43200), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + } + + @Test + public void changeMessageVisibilityTimeout_moreThan10messages_success() { + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId1", "testDataPartition", 1), + createMessage("testMessage2", "testSubscription1Id", "testFailedRecordId2", "testDataPartition", 1), + createMessage("testMessage3", "testSubscription1Id", "testFailedRecordId3", "testDataPartition", 1), + createMessage("testMessage4", "testSubscription1Id", "testFailedRecordId4", "testDataPartition", 1), + createMessage("testMessage5", "testSubscription1Id", "testFailedRecordId5", "testDataPartition", 1), + createMessage("testMessage6", "testSubscription1Id", "testFailedRecordId6", "testDataPartition", 1), + createMessage("testMessage7", "testSubscription1Id", "testFailedRecordId7", "testDataPartition", 1), + createMessage("testMessage8", "testSubscription1Id", "testFailedRecordId8", "testDataPartition", 1), + createMessage("testMessage9", "testSubscription1Id", "testFailedRecordId9", "testDataPartition", 1), + createMessage("testMessage10", "testSubscription1Id", "testFailedRecordId10", "testDataPartition", 1), + createMessage("testMessage11", "testSubscription1Id", "testFailedRecordId11", "testDataPartition", 1)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(2)).changeMessageVisibilityBatch(any(), any()); + + } + + private Message createMessage(String messageId, String subscriptionId, String failedRecordId, String dataPartitionId, int receiveCount) { + Message message = new Message(); + message.setMessageId(messageId); + MessageAttributeValue subscriptionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + subscriptionIdAttribute.setStringValue(subscriptionId); + message.getMessageAttributes().put("subscriptionId", subscriptionIdAttribute); + + MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + dataPartitionIdAttribute.setStringValue(dataPartitionId); + + MessageAttributeValue failedNotificationRecordId = new MessageAttributeValue() + .withDataType("String"); + failedNotificationRecordId.setStringValue(failedRecordId); + message.getMessageAttributes().put("failedNotificationRecordId", subscriptionIdAttribute); + message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute); + + message.getAttributes().put("ApproximateReceiveCount", String.valueOf(receiveCount)); + return message; + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java new file mode 100644 index 000000000..8d77de500 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java @@ -0,0 +1,144 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.repository; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.internal.stubbing.defaultanswers.ForwardsInvocations; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.SubscriptionDoc; +import org.opengroup.osdu.notification.provider.aws.security.KmsHelper; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@RunWith(MockitoJUnitRunner.class) +public class SubscriptionRepositoryTest { + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private KmsHelper kmsHelper; + + @InjectMocks + private SubscriptionRepository subscriptionRepository; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Before + public void initTest() { + Whitebox.setInternalState(subscriptionRepository, "subscriptionTableRelativePath", "services/core/register/SubscriptionTable"); + + when(dynamoDBQueryHelperFactory.getQueryHelperForPartition(anyString(), anyString())).thenReturn(dynamoDBQueryHelper); + } + + + @Test + public void getAllSubscriptions_hmacSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("HMAC"); + ArrayList<SubscriptionDoc> results = new ArrayList<>(); + results.add(subscription1); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(results)))); + + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition("testDataPartitionId"); + assertEquals(1, subscriptionList.size()); + assertEquals("testSubscriptionId1", subscriptionList.get(0).getId()); + } + + @Test + public void getAllSubscriptions_gsaSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("GSA"); + ArrayList<SubscriptionDoc> results = new ArrayList<>(); + results.add(subscription1); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(results)))); + + when(kmsHelper.decrypt(any(), any())).thenReturn("testSecretValue`test"); + + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition("testDataPartitionId"); + assertEquals(1, subscriptionList.size()); + assertEquals("testSubscriptionId1", subscriptionList.get(0).getId()); + } + + @Test + public void getSubscriptionById_hmacSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("GSA"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(subscription1); + when(kmsHelper.decrypt(any(), any())).thenReturn("testSecretValue`test"); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(true, subscriptionResponse.isPresent()); + assertEquals("testSubscriptionId1", subscriptionResponse.get().getId()); + } + + @Test + public void getSubscriptionById_gsaSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("HMAC"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(subscription1); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(true, subscriptionResponse.isPresent()); + assertEquals("testSubscriptionId1", subscriptionResponse.get().getId()); + } + + + @Test + public void getSubscriptionById_subscriptionNotFound_emptyResponse(){ + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(null); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(false, subscriptionResponse.isPresent()); + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java new file mode 100644 index 000000000..fe7d7e5c1 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java @@ -0,0 +1,109 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.cryptographic.SignatureServiceException; + +import javax.crypto.Mac; +import java.security.NoSuchAlgorithmException; + +@RunWith(MockitoJUnitRunner.class) +public class ThreadSignatureServiceTest { + + private static final String URL = "http://www.osdu.example.com/signature"; + private static final String SECRET = "93491ab4453ef0306bf08f3703460950"; + + private static final String HMAC = "02030405060708.090A123B0C0D0E0F"; + + @InjectMocks + private ThreadSignatureService threadSignatureService; + + @Test + public void getSignedSignature_success() throws SignatureServiceException { + String response = threadSignatureService.getSignedSignature(URL, SECRET); + assertNotNull(response); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_nullUrl_signatureServiceExceptio() throws SignatureServiceException { + threadSignatureService.getSignedSignature(null, SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, null); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_invalidAlgorithm_signatureServiceException() throws SignatureServiceException { + try (MockedStatic<Mac> mockedMac = Mockito.mockStatic(Mac.class)) { + mockedMac.when(() -> Mac.getInstance(any())).thenThrow(new NoSuchAlgorithmException()); + threadSignatureService.getSignedSignature(URL, SECRET); + } + } + + @Test + public void getSignedSignatureWithExpiryTime_success() throws SignatureServiceException { + String response = threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + assertNotNull(response); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpiryTime_nullUrl_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(null, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpriyTime_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, null, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpiryTime_expiredSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() - 100), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpriyTime_invalidAlgorithm_signatureServiceException() throws SignatureServiceException { + try (MockedStatic<Mac> mockedMac = Mockito.mockStatic(Mac.class)) { + mockedMac.when(() -> Mac.getInstance(any())).thenThrow(new NoSuchAlgorithmException()); + threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_nullHmac_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature(null, SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature(HMAC, null); + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_invalidHmac_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature("invalidHmac", SECRET); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java new file mode 100644 index 000000000..d7509e889 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java @@ -0,0 +1,113 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.service; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.http.HttpClient; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.auth.factory.AuthFactory; +import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.powermock.reflect.Whitebox; + +import java.util.HashMap; +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class AwsNotificationHandlerTest { + + @Mock + private HttpClient httpClient; + + @Mock + private AuthFactory authFactory; + + @InjectMocks + private AwsNotificationHandler notificationHandler; + + @Mock + private SecretAuth secretAuth; + + @Before + public void beforeTest() { + Whitebox.setInternalState(notificationHandler, "waitingTime", 100); + } + + @Test + public void notifySubscriber_success() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(200); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(200, response.getResponseCode()); + } + + @Test + public void notifySubscriber_withCollborationHeader_success() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + headerAttributes.put("x-collaboration", "a0ad53aa-430d-42b1-b676-2dbac583634c"); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(200); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(200, response.getResponseCode()); + } + + @Test + public void notifySubscriber_failed() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(500); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(500, response.getResponseCode()); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java new file mode 100644 index 000000000..70e6c5f0b --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java @@ -0,0 +1,84 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.utils; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResultEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class SQSUtilsTest { + + @Mock + private AmazonSQS sqsClient; + + @InjectMocks + SQSUtils sqsUtils; + + @Test + public void getMessages_success() { + ReceiveMessageResult sqsResult = new ReceiveMessageResult(); + Message message1 = new Message(); + sqsResult.setMessages(Arrays.asList(message1)); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(sqsResult); + List<Message> response = sqsUtils.getMessages(sqsClient, "test-sqs-url", 10, 10); + assertEquals(10, response.size()); + } + + + @Test + public void deleteMessages_success() { + DeleteMessageBatchResult deleteResult = new DeleteMessageBatchResult(); + List<DeleteMessageBatchResultEntry> deleteEntries = new ArrayList<>(); + deleteEntries.add(new DeleteMessageBatchResultEntry()); + deleteResult.setSuccessful(deleteEntries); + + Message message1 = new Message(); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))).thenReturn(deleteResult); + int deletedCount = sqsUtils.deleteMessages(Arrays.asList(message1), "test-sqs-url", sqsClient); + assertEquals(1, deletedCount); + } + + @Test + public void sendMessagesToDeadLetterQueue_success() { + SendMessageResult sendResult = new SendMessageResult(); + + Message message1 = new Message(); + + when(sqsClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendResult); + List<SendMessageResult> results = sqsUtils.sendMessagesToDeadLetterQueue("test-deadletter-sqs-url", Arrays.asList(message1), sqsClient); + assertEquals(1, results.size()); + } +} -- GitLab