diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index edba954cd94ca0d5666c151e50bee5abe19e7c08..342908b2ac3338cddbf74f805983c9dbeb7deb6a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -47,4 +47,4 @@ include: - project: "osdu/platform/ci-cd-pipelines" file: "cloud-providers/ibm.yml" - - local: "devops/gc/pipeline/override-stages.yml" + - local: "devops/gc/pipeline/override-stages.yml" \ No newline at end of file diff --git a/provider/notification-aws/pom.xml b/provider/notification-aws/pom.xml index 28625cf91f331ea0fba385024cd383d21b536a6c..4fcbf612a9c7e377dac4e80c962a86024337586b 100644 --- a/provider/notification-aws/pom.xml +++ b/provider/notification-aws/pom.xml @@ -92,7 +92,7 @@ <version>${aws.version}</version> </dependency> - <dependency> + <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>notification-core</artifactId> <version>0.22.0-SNAPSHOT</version> @@ -119,6 +119,17 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito2</artifactId> + <version>2.0.9</version> + <scope>test</scope> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> @@ -185,4 +196,4 @@ </plugin> </plugins> </build> -</project> +</project> \ No newline at end of file diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java new file mode 100644 index 0000000000000000000000000000000000000000..59cb608b17d5b6e5d03c9fe3ce1a2aac9615c1a1 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/FailedNotificationDoc.java @@ -0,0 +1,54 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexRangeKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTypeConvertedEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@DynamoDBTable(tableName = "Failed-Notification") +public class FailedNotificationDoc { + + @DynamoDBHashKey(attributeName = "id") + private String id; + + @DynamoDBAttribute(attributeName = "dataPartitionId") + private String dataPartitionId; + + @DynamoDBTypeConvertedEnum + @DynamoDBAttribute(attributeName = "status") + private NotificationResult status; + + @DynamoDBIndexHashKey(attributeName = "partitionIdSubscriptionId", globalSecondaryIndexName = "subscription-index") + private String partitionIdSubscriptionId; + + @DynamoDBIndexRangeKey(attributeName = "createdOnEpoch", globalSecondaryIndexName = "subscription-index") + private String createdOnEpoch; + + @DynamoDBAttribute(attributeName = "ttl") + private Long ttl; + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java new file mode 100644 index 0000000000000000000000000000000000000000..e3ebcef118f39d009827d0c22c316b613cb62a93 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/NotificationResult.java @@ -0,0 +1,19 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +public enum NotificationResult { + ACK, + NACK +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java new file mode 100644 index 0000000000000000000000000000000000000000..a268162a184e46c896c5d6d5e9ca6ae299b0db19 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/RetryProcessResult.java @@ -0,0 +1,28 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.sqs.model.Message; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class RetryProcessResult { + Message message; + + NotificationResult result; +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java new file mode 100644 index 0000000000000000000000000000000000000000..93284ae75ab426b46cff7c4fedf0503b1f541c11 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/model/SubscriptionDoc.java @@ -0,0 +1,84 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.model; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexHashKey; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.nio.ByteBuffer; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@DynamoDBTable(tableName = "Register.Subscription") +public class SubscriptionDoc { + + @DynamoDBHashKey(attributeName = "id") + private String id; + + @DynamoDBAttribute(attributeName = "name") + private String name; + + @DynamoDBAttribute(attributeName = "description") + private String description; + + @DynamoDBAttribute(attributeName = "topic") + private String topic; + + @DynamoDBAttribute(attributeName = "pushEndpoint") + private String pushEndpoint; + + @DynamoDBAttribute(attributeName = "createdBy") + private String createdBy; + + @DynamoDBAttribute(attributeName = "createdOnEpoch") + private String createdOnEpoch; + + @DynamoDBAttribute(attributeName = "notificationId") + private String notificationId; + + @DynamoDBAttribute(attributeName = "secretType") + private String secretType; + + @DynamoDBAttribute(attributeName = "secretValue") + private ByteBuffer secretValue; + + @DynamoDBIndexHashKey(attributeName = "dataPartitionId", globalSecondaryIndexName = "dataPartitionId-index") + private String dataPartitionId; + + public static Subscription mapTo(SubscriptionDoc subDoc, Secret secret) { + Subscription sub = new Subscription(); + sub.setId(subDoc.getId()); + sub.setName(subDoc.getName()); + sub.setDescription(subDoc.getDescription()); + sub.setTopic(subDoc.getTopic()); + sub.setPushEndpoint(subDoc.getPushEndpoint()); + sub.setCreatedBy(subDoc.getCreatedBy()); + sub.setNotificationId(subDoc.getNotificationId()); + sub.setSecret(secret); + + return sub; + } + + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java new file mode 100644 index 0000000000000000000000000000000000000000..40f21f3eaee9707649605828ba405e78e28e6799 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationQueueService.java @@ -0,0 +1,27 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.common.model.notification.Subscription; + +import java.util.List; + +public interface NotificationQueueService { + + List<Message> processNotificationMessages(List<Message> messages); + + void processMessagesBySubscription(Subscription subscription, List<Message> messages, String dataPartitionId); +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java new file mode 100644 index 0000000000000000000000000000000000000000..bc7d33c14f9379406615a57f33c5836cbc0193f2 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetryQueueService.java @@ -0,0 +1,27 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; + +import java.util.List; + +public interface NotificationRetryQueueService { + + List<RetryProcessResult> processNotificationMessages(List<Message> messages); + + List<RetryProcessResult> processMessagesBySubscription(Subscription subscription, List<Message> messages); +} \ No newline at end of file diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..00f68fcf110d56813cacfd48207638d655d80c68 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandler.java @@ -0,0 +1,101 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationRetryQueueServiceImpl; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +@Service +public class NotificationRetrySQSHandler { + + private static final Logger logger = LoggerFactory.getLogger(NotificationRetrySQSHandler.class); + private static final int MAX_MESSAGE_ALLOWED = 100; + private static final int MAX_BATCH_REQUEST_COUNT = 10; + + @Value("${aws.region}") + private String region; + + @Value("${aws.sqs.poll.sleepTime.retryQueue}") + private Integer sleepTime; + + private String notificationRetryQueueUrl; + + private SQSUtils sqsUtil; + + private NotificationRetryQueueServiceImpl notificationRetryQueueService; + + public NotificationRetrySQSHandler(NotificationRetryQueueServiceImpl notificationRetryQueueService, SQSUtils sqsUtil) { + this.notificationRetryQueueService = notificationRetryQueueService; + this.sqsUtil = sqsUtil; + } + + @Async + @EventListener(ApplicationStartedEvent.class) + public void init() throws K8sParameterNotFoundException, InterruptedException { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationRetryQueueUrl = provider.getParameterAsString("notification-retry-sqs-url"); + processNotifications(); + } + + private void processNotifications() throws InterruptedException { + List<Message> messages; + logger.info("Running notification Retry SQS processor"); + while (true) { + try { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + AmazonSQS sqsClient = sqsConfig.AmazonSQS(); + messages = sqsUtil.getMessages(sqsClient, notificationRetryQueueUrl, MAX_BATCH_REQUEST_COUNT, MAX_MESSAGE_ALLOWED); + if (messages.isEmpty()) { + logger.info("No messages in the retry queue, sleeping for {} seconds", sleepTime); + Thread.sleep(sleepTime * 1000L); + continue; + } + logger.info("Processing {} messages from notification retry queue", messages.size()); + List<RetryProcessResult> notificationResults = notificationRetryQueueService.processNotificationMessages(messages); + List<Message> failedMessages = notificationResults.stream().filter(msg -> NotificationResult.NACK.equals(msg.getResult())).map(RetryProcessResult::getMessage).collect( + Collectors.toList()); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, notificationRetryQueueUrl, failedMessages); + + List<Message> deleteMessageList = notificationResults.stream().filter(result -> NotificationResult.ACK.equals(result.getResult())) + .map(RetryProcessResult::getMessage).collect(Collectors.toList()); + if (!deleteMessageList.isEmpty()) { + int deletedCount = sqsUtil.deleteMessages(deleteMessageList, notificationRetryQueueUrl, sqsClient); + logger.info("Deleted Messages count :: {}", deletedCount); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + logger.error("General Exception", e); + } + } + } + +} \ No newline at end of file diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..7a1078e858fc4e96f4fb4136991b625f857a39b6 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandler.java @@ -0,0 +1,107 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Service +public class NotificationSQSHandler { + + private static final Logger logger = LoggerFactory.getLogger(NotificationSQSHandler.class); + private static final int MAX_MESSAGE_ALLOWED = 100; + private static final int MAX_BATCH_REQUEST_COUNT = 10; + + @Value("${aws.region}") + private String region; + + @Value("${aws.sqs.poll.sleepTime.mainQueue}") + private Integer sleepTime; + private String notificationQueueUrl; + + private String notificationDeadLetterQueueUrl; + + private NotificationQueueService notificationQueueService; + + private SQSUtils sqsUtil; + + public NotificationSQSHandler(NotificationQueueService notificationQueueService, SQSUtils sqsUtil) { + this.notificationQueueService = notificationQueueService; + this.sqsUtil = sqsUtil; + } + + @Async + @EventListener(ApplicationStartedEvent.class) + public void init() throws K8sParameterNotFoundException, InterruptedException { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationQueueUrl = provider.getParameterAsString("notification-sqs-url"); + notificationDeadLetterQueueUrl = provider.getParameterAsString("notification-deadletter-queue-url"); + processNotifications(); + } + + private void processNotifications() throws InterruptedException { + List<Message> messages; + logger.info("Running notification SQS processor"); + while (true) { + try { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + AmazonSQS sqsClient = sqsConfig.AmazonSQS(); + messages = sqsUtil.getMessages(sqsClient, notificationQueueUrl, MAX_BATCH_REQUEST_COUNT, MAX_MESSAGE_ALLOWED); + if (messages.isEmpty()) { + logger.info("No messages in the main queue, sleeping for {} seconds", sleepTime); + Thread.sleep(sleepTime * 1000L); + continue; + } + logger.info("Processing {} messages from notification queue", messages.size()); + + List<Message> invalidMessages = messages.stream().filter(msg -> !msg.getMessageAttributes().containsKey("data-partition-id")).collect(Collectors.toList()); + List<Message> validMessages = messages.stream().filter(msg -> msg.getMessageAttributes().containsKey("data-partition-id")).collect(Collectors.toList()); + + ArrayList<Message> deleteMessageList = new ArrayList<>(invalidMessages); + ArrayList<Message> deadLetterMessages = new ArrayList<>(invalidMessages); + + if (!deadLetterMessages.isEmpty()) { + logger.info("Sending {} invalid messages to dead letter queue", deadLetterMessages.size()); + sqsUtil.sendMessagesToDeadLetterQueue(notificationDeadLetterQueueUrl, deadLetterMessages, sqsClient); + } + List<Message> processedMessages = notificationQueueService.processNotificationMessages(validMessages); + deleteMessageList.addAll(processedMessages); + if (!deleteMessageList.isEmpty()) { + int deletedCount = sqsUtil.deleteMessages(deleteMessageList, notificationQueueUrl, sqsClient); + logger.info("Deleted Messages count :: {}", deletedCount); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + logger.error("General Exception: ", e); + } + } + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..387c40eeb946bd486ab0394d1aed7ecbde0a3179 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImpl.java @@ -0,0 +1,202 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDeleteExpression; +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.queue.NotificationQueueService; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.stream.Collectors; + +@Service +public class NotificationQueueServiceImpl implements NotificationQueueService { + + private static final Logger logger = LoggerFactory.getLogger(NotificationQueueServiceImpl.class); + + + @Value("${aws.region}") + private String region; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + private SubscriptionRepository subscriptionRepository; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private AwsNotificationHandler notificationHandler; + + private AmazonSQS sqsClient; + + private String notificationRetryQueueUrl; + + + public NotificationQueueServiceImpl(SubscriptionRepository subscriptionRepository, DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, + AwsNotificationHandler awsNotificationHandler) { + this.subscriptionRepository = subscriptionRepository; + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.notificationHandler = awsNotificationHandler; + } + + @PostConstruct + public void init() throws K8sParameterNotFoundException { + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(region); + sqsClient = sqsConfig.AmazonSQS(); + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + notificationRetryQueueUrl = provider.getParameterAsString("notification-retry-sqs-url"); + } + + + @Override + public List<Message> processNotificationMessages(List<Message> messages) { + try { + Map<String, List<Message>> messagesByDataPartition = messages.stream().collect( + Collectors.groupingBy(msg -> msg.getMessageAttributes().get("data-partition-id").getStringValue(), HashMap::new, Collectors.toCollection(ArrayList::new))); + + return messagesByDataPartition.entrySet().parallelStream() + .map(entry -> processMessagesByDataPartition(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull).flatMap(list -> list.stream()).collect(Collectors.toList()); + } catch (Exception e) { + logger.error("Exception processing messages :", e); + return new ArrayList<>(); + } + } + + public List<Message> processMessagesByDataPartition(String dataPartitionId, List<Message> messages) { + try { + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition(dataPartitionId); + if (!subscriptionList.isEmpty()) { + subscriptionList.parallelStream().forEach(subscription -> processMessagesBySubscription(subscription, messages, dataPartitionId)); + } + return messages; + } catch (Exception e) { + logger.error("Exception processing messages for dataPartitionId : {}", dataPartitionId, e); + return new ArrayList<>(); + } + } + + public void processMessagesBySubscription(Subscription subscription, List<Message> messages, String dataPartitionId) { + List<Message> messagesToRetry = new ArrayList<>(); + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + try { + FailedNotificationDoc gsiQuery = new FailedNotificationDoc(); + gsiQuery.setPartitionIdSubscriptionId(String.join("-", dataPartitionId, subscription.getId())); + PaginatedQueryList<FailedNotificationDoc> results = dynamoDBQueryHelper.queryByGSI(FailedNotificationDoc.class, gsiQuery); + + if (results != null && !results.isEmpty()) { + logger.debug("Subscription {} has previous failed messages", subscription.getId()); + messagesToRetry.addAll(messages); + } else { + boolean hasPreviousFailed = false; + for (Message message : messages) { + logger.debug("Processing Notification for messageId: {}, Subscription Endpoint: {}", message.getMessageId(), subscription.getPushEndpoint()); + if (hasPreviousFailed) { + messagesToRetry.add(message); + continue; + } + Map<String, String> messageAttributes = message.getMessageAttributes().entrySet().stream().collect( + Collectors.toMap(Entry::getKey, attribute -> attribute.getValue().getStringValue())); + if (!notifySubscriber(subscription, message.getBody(), messageAttributes)) { + messagesToRetry.add(message); + hasPreviousFailed = true; + } + } + } + } catch (Exception e) { + logger.error("Exception in processMessagesBySubscription for {} :", subscription.getPushEndpoint(), e); + messagesToRetry.addAll(messages); + } + logger.info("Messages retry size :: {}", messagesToRetry.size()); + messagesToRetry.forEach(message -> insertFailedNotification(message, subscription, dataPartitionId, dynamoDBQueryHelper)); + } + + private boolean notifySubscriber(Subscription subscription, String messageBody, Map<String, String> headerAttributes) { + HttpResponse response; + try { + response = notificationHandler.notifySubscriber(subscription, messageBody, headerAttributes); + } catch (Exception e) { + logger.error("Failed to notify subscriber {} : ", subscription.getPushEndpoint(), e); + return false; + } + return response.isSuccessCode(); + } + + + private void insertFailedNotification(Message message, Subscription subscription, String dataPartitionId, DynamoDBQueryHelperV2 dynamoDBQueryHelper) { + logger.debug("Inserting failing notification for :: {}, {}", subscription.getPushEndpoint(), message.getMessageId()); + Long currentTimeMillis = System.currentTimeMillis(); + Long ttl = currentTimeMillis / 1000L + 432000; + + String id = Base64.getEncoder().encodeToString((subscription.getPushEndpoint() + message.getMessageId()).getBytes()); + try { + Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); + + MessageAttributeValue failedNotificationRecordId = new MessageAttributeValue() + .withDataType("String"); + failedNotificationRecordId.setStringValue(id); + + MessageAttributeValue subscriptionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + subscriptionIdAttribute.setStringValue(subscription.getId()); + + messageAttributes.put("subscriptionId", subscriptionIdAttribute); + messageAttributes.put("failedNotificationRecordId", failedNotificationRecordId); + + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(notificationRetryQueueUrl) + .withMessageBody(message.getBody()) + .withMessageAttributes(messageAttributes); + sqsClient.sendMessage(sendMessageRequest); + + FailedNotificationDoc notificationDoc = FailedNotificationDoc.builder() + .id(id) + .partitionIdSubscriptionId(String.join("-", dataPartitionId, subscription.getId())) + .status(NotificationResult.NACK) + .createdOnEpoch(new Timestamp(currentTimeMillis).toString()) + .dataPartitionId(dataPartitionId) + .ttl(ttl).build(); + dynamoDBQueryHelper.saveWithHashCondition(notificationDoc, "id"); + } catch (Exception e) { + logger.error("Exception while inserting failed record, messageId: {}, subscriptionPushUrl: {} ", message.getMessageId(), subscription.getPushEndpoint(), e); + } + } +} \ No newline at end of file diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..0652b440531b1f5c8be0b28c8fffd1608244d63e --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceImpl.java @@ -0,0 +1,212 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDeleteExpression; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.queue.NotificationRetryQueueService; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; + +@Service +public class NotificationRetryQueueServiceImpl implements NotificationRetryQueueService { + + private static final Logger logger = LoggerFactory.getLogger(NotificationRetryQueueServiceImpl.class); + + private static final int SQS_MIN_VISIBILITY_TIMEOUT = 30; + + private static final int SQS_MAX_VISIBILITY_TIMEOUT = 43200; + + private static final String FAILED_NOTIFICATION_RECORD_ID = "failedNotificationRecordId"; + + private AwsNotificationHandler notificationHandler; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private SubscriptionRepository subscriptionRepository; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + @Value("${aws.sqs.visibilityTimeout:300}") + private Integer visibilityTimeout; + + public NotificationRetryQueueServiceImpl(AwsNotificationHandler notificationHandler, DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, + SubscriptionRepository subscriptionRepository) { + this.notificationHandler = notificationHandler; + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.subscriptionRepository = subscriptionRepository; + } + + @Override + public List<RetryProcessResult> processNotificationMessages(List<Message> messages) { + Map<String, List<Message>> messagesBySubscription = messages.stream().collect(Collectors.groupingBy( + msg -> String.join(":", msg.getMessageAttributes().get("data-partition-id").getStringValue(), msg.getMessageAttributes().get("subscriptionId").getStringValue()), + HashMap::new, Collectors.toCollection(ArrayList::new))); + + return messagesBySubscription.entrySet().parallelStream().map(entry -> { + try { + Optional<Subscription> subscription = subscriptionRepository.getSubscriptionById(entry.getValue().get(0).getMessageAttributes().get("subscriptionId").getStringValue(), + entry.getValue().get(0).getMessageAttributes().get("data-partition-id").getStringValue()); + if (!subscription.isPresent()) { + logger.info("Subscription with id {} not found for dataPartitionId {}", + entry.getValue().get(0).getMessageAttributes().get("subscriptionId").getStringValue(), + entry.getValue().get(0).getMessageAttributes().get("data-partition-id").getStringValue()); + deleteDbRecords(entry.getValue()); + return entry.getValue().stream().map(msg -> new RetryProcessResult(msg, NotificationResult.ACK)).collect(Collectors.toList()); + } + return processMessagesBySubscription(subscription.get(), entry.getValue()); + } catch (Exception e) { + logger.error("Exception while trying to send retry notifications:", e); + return entry.getValue().stream().map(msg -> new RetryProcessResult(msg, NotificationResult.NACK)).collect(Collectors.toList()); + } + }).flatMap(Collection::stream).collect(Collectors.toList()); + } + + @Override + public List<RetryProcessResult> processMessagesBySubscription(Subscription subscription, List<Message> messages) { + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + boolean hasPreviousFailed = false; + List<RetryProcessResult> notificationResults = new ArrayList<>(); + + for (Message message : messages) { + RetryProcessResult processResult = new RetryProcessResult(); + processResult.setMessage(message); + + logger.debug("Processing Notification for messageId: {}, Subscription Endpoint: {}", message.getMessageId(), subscription.getPushEndpoint()); + if (hasPreviousFailed) { + processResult.setResult(NotificationResult.NACK); + notificationResults.add(processResult); + continue; + } + Map<String, String> messageAttributes = message.getMessageAttributes().entrySet().stream().collect( + Collectors.toMap(Entry::getKey, attribute -> attribute.getValue().getStringValue())); + try { + HttpResponse response = notificationHandler.notifySubscriber(subscription, message.getBody(), messageAttributes); + if (!response.isSuccessCode()) { + processResult.setResult(NotificationResult.NACK); + hasPreviousFailed = true; + } else { + processResult.setResult(NotificationResult.ACK); + FailedNotificationDoc doc = dynamoDBQueryHelper.loadByPrimaryKey(FailedNotificationDoc.class, + message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + if (doc != null) { + FailedNotificationDoc objectToDelete = new FailedNotificationDoc(); + objectToDelete.setId(message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + DynamoDBDeleteExpression deleteExpression = new DynamoDBDeleteExpression() + .withConditionExpression("attribute_exists(id)"); + dynamoDBQueryHelper.deleteByObjectWithCondition(objectToDelete, deleteExpression); + } + } + } catch (Exception e) { + logger.error("Failed to notify subscriber : ", e); + processResult.setResult(NotificationResult.NACK); + hasPreviousFailed = true; + } + notificationResults.add(processResult); + } + return notificationResults; + } + + public void changeMessageVisibilityTimeout(AmazonSQS sqsClient, String sqsQueueUrl, List<Message> messages) { + List<ChangeMessageVisibilityBatchRequestEntry> entries = new ArrayList<>(); + for (Message message : messages) { + int timeout = exponentialTimeOutWindow(parseIntOrDefault(message.getAttributes().get("ApproximateReceiveCount"), 0)); + if (timeout < SQS_MIN_VISIBILITY_TIMEOUT || timeout > SQS_MAX_VISIBILITY_TIMEOUT) { + timeout = timeout < SQS_MIN_VISIBILITY_TIMEOUT ? SQS_MIN_VISIBILITY_TIMEOUT : SQS_MAX_VISIBILITY_TIMEOUT; + } + entries.add( + new ChangeMessageVisibilityBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()) + .withVisibilityTimeout(timeout)); + if (entries.size() == 10) { + sqsClient.changeMessageVisibilityBatch(sqsQueueUrl, entries); + entries.clear(); + } + } + + if (!entries.isEmpty()) { + sqsClient.changeMessageVisibilityBatch(sqsQueueUrl, entries); + } + } + + private void deleteDbRecords(List<Message> messages) { + DynamoDBQueryHelperV2 dynamoDBQueryHelper = dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(failedNotificationTablePath); + messages.parallelStream().forEach(message -> { + FailedNotificationDoc doc = dynamoDBQueryHelper.loadByPrimaryKey(FailedNotificationDoc.class, + message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + try { + FailedNotificationDoc objectToDelete = new FailedNotificationDoc(); + objectToDelete.setId(message.getMessageAttributes().get(FAILED_NOTIFICATION_RECORD_ID).getStringValue()); + DynamoDBDeleteExpression deleteExpression = new DynamoDBDeleteExpression() + .withConditionExpression("attribute_exists(id)"); + dynamoDBQueryHelper.deleteByObjectWithCondition(objectToDelete, deleteExpression); + } catch (Exception e) { + logger.info("Failed to delete: {}", e.getMessage()); + } + }); + } + + private Integer exponentialTimeOutWindow(int receiveCount) { + if (receiveCount <= 10) { + return visibilityTimeout; + } + if (receiveCount <= 20) { + return visibilityTimeout * 2; + } + if (receiveCount <= 30) { + return visibilityTimeout * 4; + } + if (receiveCount <= 40) { + return visibilityTimeout * 8; + } + if (receiveCount <= 50) { + return visibilityTimeout * 16; + } + if (receiveCount <= 60) { + return visibilityTimeout * 32; + } + return 43200; + } + + private Integer parseIntOrDefault(String toParse, int defaultValue) { + try { + return Integer.parseInt(toParse); + } catch (NumberFormatException e) { + return defaultValue; + } + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java new file mode 100644 index 0000000000000000000000000000000000000000..4cecb6736dcc124aa93b31527689c055465699b3 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepository.java @@ -0,0 +1,94 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.repository; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.model.notification.GsaSecret; +import org.opengroup.osdu.core.common.model.notification.GsaSecretValue; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.SubscriptionDoc; +import org.opengroup.osdu.notification.provider.aws.security.KmsHelper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Repository; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Repository +public class SubscriptionRepository { + + private static final String GSA_SECRET = "GSA"; + + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + private KmsHelper kmsHelper; + + @Value("${aws.dynamodb.subscriptionTable.ssm.relativePath}") + String subscriptionTableRelativePath; + + public SubscriptionRepository(DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory, KmsHelper kmsHelper) { + this.dynamoDBQueryHelperFactory = dynamoDBQueryHelperFactory; + this.kmsHelper = kmsHelper; + } + + public List<Subscription> getAllSubscriptionsByDataPartition(String dataPartitionId) { + DynamoDBQueryHelperV2 queryHelper = dynamoDBQueryHelperFactory.getQueryHelperForPartition(dataPartitionId, subscriptionTableRelativePath); + SubscriptionDoc gsiQuery = new SubscriptionDoc(); + gsiQuery.setDataPartitionId(dataPartitionId); + PaginatedQueryList<SubscriptionDoc> results = queryHelper.queryByGSI(SubscriptionDoc.class, gsiQuery); + + if(results == null){ + return Collections.emptyList(); + } + return results.stream().map(subsDoc -> { + String secretValue = kmsHelper.decrypt(subsDoc.getSecretValue(), dataPartitionId); + Secret secret = createSecret(subsDoc.getSecretType(), secretValue); + return SubscriptionDoc.mapTo(subsDoc, secret); + }).collect(Collectors.toList()); + } + + public Optional<Subscription> getSubscriptionById(String subscriptionId, String dataPartitionId) { + DynamoDBQueryHelperV2 queryHelper = dynamoDBQueryHelperFactory.getQueryHelperForPartition(dataPartitionId, subscriptionTableRelativePath); + SubscriptionDoc subsDoc = queryHelper.loadByPrimaryKey(SubscriptionDoc.class, subscriptionId); + if (subsDoc == null) { + return Optional.empty(); + } + String secretValue = kmsHelper.decrypt(subsDoc.getSecretValue(), dataPartitionId); + Secret secret = createSecret(subsDoc.getSecretType(), secretValue); + return Optional.of(SubscriptionDoc.mapTo(subsDoc, secret)); + } + + private Secret createSecret(String secretType, String secretValue) { + Secret secret; + if (secretType.equals(GSA_SECRET)) { + GsaSecret gsaSecret = new GsaSecret(); + String[] splitSecret = secretValue.split("`"); + gsaSecret.setValue(new GsaSecretValue(splitSecret[0], splitSecret[1])); + secret = gsaSecret; + } else { + HmacSecret hmacSecret = new HmacSecret(); + hmacSecret.setValue(secretValue); + secret = hmacSecret; + } + return secret; + } + +} \ No newline at end of file diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..37e3473fbe8b49030bbab6aa53fcf4a4bdda7e30 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsConfig.java @@ -0,0 +1,45 @@ +// Copyright © 2020 Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kms.AWSKMS; +import com.amazonaws.services.kms.AWSKMSClientBuilder; +import org.opengroup.osdu.core.aws.configurationsetup.ConfigSetup; +import org.opengroup.osdu.core.aws.iam.IAMConfig; + +//This class should be moved to os-core-lib-aws. Keeping it here temporarily till testing is complete +public class KmsConfig { + + private String amazonKmsEndpoint; + + private String amazonKmsRegion; + + private AWSCredentialsProvider amazonAWSCredentials; + + public KmsConfig(String amazonKmsEndpoint, String amazonKmsRegion) { + amazonAWSCredentials = IAMConfig.amazonAWSCredentials(); + this.amazonKmsEndpoint = amazonKmsEndpoint; + this.amazonKmsRegion = amazonKmsRegion; + } + + public AWSKMS awsKMS() { + // Generate the KMS client + return AWSKMSClientBuilder.standard() + .withCredentials(amazonAWSCredentials) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonKmsEndpoint, amazonKmsRegion)) + .withClientConfiguration(ConfigSetup.setUpConfig()).build(); + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..73ad6a41679a9b39103b5e21cd6a83398a97f2d1 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/KmsHelper.java @@ -0,0 +1,100 @@ +// Copyright © 2020 Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + + +import com.amazonaws.services.kms.AWSKMS; +import com.amazonaws.services.kms.model.DecryptRequest; +import com.amazonaws.services.kms.model.EncryptRequest; +import com.amazonaws.services.simplesystemsmanagement.model.InternalServerErrorException; +import lombok.AccessLevel; +import lombok.Setter; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.util.Collections; + +@Component +public class KmsHelper { + + @Value("${aws.region}") + @Setter(AccessLevel.PROTECTED) + public String amazonRegion; + + @Autowired + private DpsHeaders dpsHeaders; + + @Autowired + private JaxRsDpsLog logger; + + @Value("${aws.kms.endpoint}") + public String kmsEndpoint; + + private AWSKMS kmsClient; + + private String kmsKeyId; + + @PostConstruct + public void init() { + KmsConfig kmsConfig = new KmsConfig(kmsEndpoint, amazonRegion); + kmsClient = kmsConfig.awsKMS(); + try { + K8sLocalParameterProvider provider = new K8sLocalParameterProvider(); + kmsKeyId = provider.getParameterAsString("KMS_KEY_ID"); + } catch (K8sParameterNotFoundException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "SSM ParameterNotFoundException", e.getMessage()); + } catch (InternalServerErrorException e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "SSM InternalServerErrorException", e.getErrorMessage()); + + } + + } + + + + public ByteBuffer encrypt(String plainTextString) { + + + EncryptRequest encReq = new EncryptRequest(); + encReq.setKeyId(kmsKeyId); + encReq.setPlaintext(ByteBuffer.wrap(plainTextString.getBytes())); + encReq.setEncryptionContext(Collections.singletonMap("dataPartitionId", dpsHeaders.getPartitionId())); + ByteBuffer ciphertext = kmsClient.encrypt(encReq).getCiphertextBlob(); + return ciphertext; + + + + } + public String decrypt(ByteBuffer ciphertext, String dataPartitionId) { + + DecryptRequest decReq = new DecryptRequest(); + + decReq.setCiphertextBlob(ciphertext); + decReq.setEncryptionContext(Collections.singletonMap("dataPartitionId", dataPartitionId)); + ByteBuffer decrypted = kmsClient.decrypt(decReq).getPlaintext(); + + String decryptedStr = new String(decrypted.array()); + return decryptedStr; + } + +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java new file mode 100644 index 0000000000000000000000000000000000000000..bd54591c5550a593a1c8877a3c4420621055f63c --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureService.java @@ -0,0 +1,151 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.aws.security; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.opengroup.osdu.core.common.cryptographic.HmacData; +import org.opengroup.osdu.core.common.cryptographic.ISignatureService; +import org.opengroup.osdu.core.common.cryptographic.SignatureServiceException; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Base64; + +@Component +@Primary +public class ThreadSignatureService implements ISignatureService { + + private static final String HMAC_SHA_256 = "HmacSHA256"; + private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}"; + private static final String NOTIFICATION_SERVICE = "de-notification-service"; + private static final long EXPIRE_DURATION = 30000L; + + private static final String INVALID_SIGNATURE = "Invalid signature"; + private static final String ERROR_GENERATING_SIGNATURE = "Error generating the signature"; + private static final String SIGNATURE_EXPIRED = "Signature is expired"; + private static final String MISSING_HMAC_SIGNATURE = "HMAC signature should not be null or empty"; + private static final String MISSING_SECRET_VALUE = "Secret should not be null or empty"; + private static final String MISSING_ATTRIBUTES_IN_SIGNATURE = "Missing url or nonce or expire time in the signature"; + + + @Override + public String getSignedSignature(String url, String secret) throws SignatureServiceException { + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret)) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE); + } + final long currentTime = System.currentTimeMillis(); + final String expireTime = String.valueOf(currentTime + EXPIRE_DURATION); + final String timeStamp = String.valueOf(currentTime); + try { + String nonce = DatatypeConverter.printHexBinary(generateRandomBytes(16)).toLowerCase(); + String data = String.format(DATA_FORMAT, expireTime, url, nonce); + final byte[] signature = getSignature(secret, nonce, timeStamp, data); + byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); + String dataBytesEncoded = Base64.getEncoder().encodeToString(dataBytes); + StringBuilder output = new StringBuilder(); + output.append(dataBytesEncoded) + .append(".") + .append(DatatypeConverter.printHexBinary(signature).toLowerCase()); + + return output.toString(); + } catch (Exception ex) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE, ex); + } + } + + @Override + public String getSignedSignature(String url, String secret, String expireTime, String nonce) throws SignatureServiceException { + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret) || !StringUtils.isNumeric(expireTime)) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE); + } + final long expiry = Long.parseLong(expireTime); + if (System.currentTimeMillis() > expiry) { + throw new SignatureServiceException(SIGNATURE_EXPIRED); + } + String timeStamp = String.valueOf(expiry - EXPIRE_DURATION); + String data = String.format(DATA_FORMAT, expireTime, url, nonce); + try { + final byte[] signature = getSignature(secret, nonce, timeStamp, data); + return DatatypeConverter.printHexBinary(signature).toLowerCase(); + } catch (Exception ex) { + throw new SignatureServiceException(ERROR_GENERATING_SIGNATURE, ex); + } + } + + + @Override + public void verifyHmacSignature(String hmac, String secret) throws SignatureServiceException { + if (Strings.isNullOrEmpty(hmac)) { + throw new SignatureServiceException(MISSING_HMAC_SIGNATURE); + } + if (Strings.isNullOrEmpty(secret)) { + throw new SignatureServiceException(MISSING_SECRET_VALUE); + } + String[] tokens = hmac.split("\\."); + if (tokens.length != 2) { + throw new SignatureServiceException(INVALID_SIGNATURE); + } + byte[] dataBytes = Base64.getDecoder().decode(tokens[0]); + String requestSignature = tokens[1]; + + String data = new String(dataBytes, StandardCharsets.UTF_8); + HmacData hmacData = new Gson().fromJson(data, HmacData.class); + String url = hmacData.getEndpointUrl(); + String nonce = hmacData.getNonce(); + String expireTime = hmacData.getExpireMillisecond(); + if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(nonce) || Strings.isNullOrEmpty(expireTime)) { + throw new SignatureServiceException(MISSING_ATTRIBUTES_IN_SIGNATURE); + } + String newSignature = getSignedSignature(url, secret, expireTime, nonce); + if (!requestSignature.equalsIgnoreCase(newSignature)) { + throw new SignatureServiceException(INVALID_SIGNATURE); + } + } + + private byte[] getSignature(String secret, String nonce, String timeStamp, String data) throws Exception { + final byte[] secretBytes = DatatypeConverter.parseHexBinary(secret); + final byte[] nonceBytes = DatatypeConverter.parseHexBinary(nonce); + final byte[] encryptedNonce = computeHmacSha256(nonceBytes, secretBytes); + final byte[] encryptedTimestamp = computeHmacSha256(timeStamp, encryptedNonce); + final byte[] signedKey = computeHmacSha256(NOTIFICATION_SERVICE, encryptedTimestamp); + return computeHmacSha256(data, signedKey); + } + + private byte[] computeHmacSha256(final String data, final byte[] key) throws Exception { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + return mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); + } + + private byte[] computeHmacSha256(final byte[] data, final byte[] key) throws Exception { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + return mac.doFinal(data); + } + + private byte[] generateRandomBytes(final int size) { + final byte[] key = new byte[size]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(key); + return key; + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..96bba5679e8cb6b172f7a13a216a657466a5714d --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandler.java @@ -0,0 +1,85 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.service; + +import com.google.common.base.Strings; +import org.opengroup.osdu.core.common.http.HttpClient; +import org.opengroup.osdu.core.common.http.HttpRequest; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.Secret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.auth.factory.AuthFactory; +import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Map; + +@Service +public class AwsNotificationHandler { + + private static final Logger logger = LoggerFactory.getLogger(AwsNotificationHandler.class); + private static final String X_COLLABORATION_HEADER = "x-collaboration"; + + private HttpClient httpClient; + private AuthFactory authFactory; + @Value("${app.waitingTime:30000}") + private int waitingTime; + + public AwsNotificationHandler(HttpClient httpClient, AuthFactory authFactory) { + this.httpClient = httpClient; + this.authFactory = authFactory; + } + + public HttpResponse notifySubscriber(Subscription subscription, String pubsubMessage, Map<String, String> headerAttributes) throws Exception { + Secret secret = subscription.getSecret(); + String endpoint = subscription.getPushEndpoint(); + String secretType = secret.getSecretType(); + String collaborationContext = ""; + + // Authentication Secret + SecretAuth secretAuth = authFactory.getSecretAuth(secretType); + secretAuth.setSecret(secret); + String pushUrl = secretAuth.getPushUrl(endpoint); + Map<String, String> requestHeader = secretAuth.getRequestHeaders(); + + if (headerAttributes.containsKey(X_COLLABORATION_HEADER)) { + collaborationContext = headerAttributes.get(X_COLLABORATION_HEADER); + } + + requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json"); + requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID)); + requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID)); + + if (!Strings.isNullOrEmpty(collaborationContext)) { + requestHeader.put(X_COLLABORATION_HEADER, headerAttributes.get(X_COLLABORATION_HEADER)); + } + + HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(waitingTime).build(); + logger.debug("Sending out notification to endpoint: {}", endpoint); + HttpResponse response = httpClient.send(request); + + logger.debug("Request URL : {}, Response Code: {} ,Response Time: {}", subscription.getPushEndpoint(), response.getResponseCode(), Duration.ofMillis(response.getLatency())); + + if (!response.isSuccessCode()) { + logger.error("Subscriber notification request failed. notificationId={} pushEndpoint={} createdBy={} response={}", + subscription.getNotificationId(), subscription.getPushEndpoint(), subscription.getCreatedBy(), response.getBody()); + } + return response; + } +} diff --git a/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..c55b2400e4cd3fde9c3823ad44a578b53743f4b7 --- /dev/null +++ b/provider/notification-aws/src/main/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtils.java @@ -0,0 +1,92 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.utils; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +public class SQSUtils { + + public List<Message> getMessages(AmazonSQS sqsClient, String sqsQueueUrl, int maxBatchRequestCount, int maxMessageCount) { + int numOfMessages = maxBatchRequestCount; + List<Message> messages = new ArrayList<>(); + do { + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfMessages); + receiveMessageRequest.withMessageAttributeNames("All"); + receiveMessageRequest.withAttributeNames("All"); + List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + messages.addAll(retrievedMessages); + numOfMessages = retrievedMessages.size(); + + } while (messages.size() < maxMessageCount && numOfMessages > 0); + + return messages; + } + + public int deleteMessages(List<Message> messages, String sqsQueueUrl, AmazonSQS sqsClient) { + int deletedCount = 0; + List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(); + for (Message message : messages) { + deleteEntries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())); + if (deleteEntries.size() == 10) { + DeleteMessageBatchResult deleteResult = sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest(sqsQueueUrl, deleteEntries)); + deletedCount += deleteResult.getSuccessful().size(); + deleteEntries.clear(); + } + } + if (!deleteEntries.isEmpty()) { + DeleteMessageBatchResult deleteResult = sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest(sqsQueueUrl, deleteEntries)); + deletedCount += deleteResult.getSuccessful().size(); + deleteEntries.clear(); + } + return deletedCount; + } + + public List<SendMessageResult> sendMessagesToDeadLetterQueue(String deadLetterQueueUrl, List<Message> messages, AmazonSQS sqsClient) { + return messages.stream().map(message -> sendMessageToDeadLetterQueue(deadLetterQueueUrl, message, sqsClient)) + .collect(Collectors.toList()); + + } + + private SendMessageResult sendMessageToDeadLetterQueue(String deadLetterQueueUrl, Message message, AmazonSQS sqsClient) { + + Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); + MessageAttributeValue exceptionAttribute = new MessageAttributeValue() + .withDataType("String"); + + exceptionAttribute.setStringValue("Exception message: missing dataPartitionId"); + messageAttributes.put("Exception", exceptionAttribute); + SendMessageRequest send_msg_request = new SendMessageRequest() + .withQueueUrl(deadLetterQueueUrl) + .withMessageBody(message.getBody()) + .withMessageAttributes(messageAttributes); + return sqsClient.sendMessage(send_msg_request); + } + +} \ No newline at end of file diff --git a/provider/notification-aws/src/main/resources/application.properties b/provider/notification-aws/src/main/resources/application.properties index 8dd0c1aed7dac367891c6d576e743f3148f8ef11..4bfd1059ff713f5dc7fbc68ae175a913cc440abe 100644 --- a/provider/notification-aws/src/main/resources/application.properties +++ b/provider/notification-aws/src/main/resources/application.properties @@ -1,4 +1,4 @@ -# Copyright � 2020 Amazon Web Services +# Copyright ? 2020 Amazon Web Services # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -49,4 +49,12 @@ server.ssl.key-alias=${SSL_KEY_ALIAS:osduonaws} server.ssl.key-password=${SSL_KEY_PASSWORD:} server.ssl.key-store-password=${SSL_KEY_STORE_PASSWORD:} +aws.kms.endpoint=kms.${AWS_REGION}.amazonaws.com +aws.dynamodb.failedNotificationTable.ssmPath=${FAILED_NOTIFICATION_SSM_PATH:/osdu/instances/${OSDU_INSTANCE_NAME}/core/notification/failed-notification-table} +aws.dynamodb.subscriptionTable.ssm.relativePath=${SUBSCRIPTION_SSM_RELATIVE_PATH:services/core/register/SubscriptionTable} + +aws.sqs.visibilityTimeout=${SQS_MIN_VISIBILITY_TIMEOUT:300} +aws.sqs.poll.sleepTime.mainQueue=${SQS_POLL_SLEEP_TIME_MAIN_QUEUE:30} +aws.sqs.poll.sleepTime.retryQueue=${SQS_POLL_SLEEP_TIME_RETRY_QUEUE:300} + spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.SecurityAutoConfiguration \ No newline at end of file diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2f345062980f767307231785a369a236d557836a --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationRetrySQSHandlerTest.java @@ -0,0 +1,76 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.powermock.reflect.Whitebox; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationRetrySQSHandlerTest { + + @Mock + private NotificationQueueService notificationRetryQueueService; + + @Mock + private SQSUtils sqsUtil; + + @InjectMocks + NotificationRetrySQSHandler notificationRetrySQSHandler; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + Mockito.when(mock.getParameterAsString("notification-retry-sqs-url")).thenReturn( + "test-sqs-url"); + }); + } + + @AfterClass + public static void close() { + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationRetrySQSHandler, "region", "us-east-1"); + } + + @Test(expected = InterruptedException.class) + public void processNotifications_interruptedException() throws Exception { + when(sqsUtil.getMessages(any(), anyString(), anyInt(), anyInt())).thenAnswer((t) -> { + throw new InterruptedException("Test Interrupted"); + }); + notificationRetrySQSHandler.init(); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..392ce5a1d6aa0422eabc4ea125e6a608cb545944 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/NotificationSQSHandlerTest.java @@ -0,0 +1,75 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.notification.provider.aws.utils.SQSUtils; +import org.powermock.reflect.Whitebox; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationSQSHandlerTest { + + @Mock + private NotificationQueueService notificationQueueService; + + @Mock + private SQSUtils sqsUtil; + + @InjectMocks + NotificationSQSHandler notificationSQSHandler; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + Mockito.when(mock.getParameterAsString("notification-sqs-url")).thenReturn( + "test-sqs-retry-url"); + }); + } + + @AfterClass + public static void close() { + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationSQSHandler, "region", "us-east-1"); + } + + @Test(expected = InterruptedException.class) + public void processNotifications_interruptedException() throws Exception { + when(sqsUtil.getMessages(any(), anyString(), anyInt(), anyInt())).thenAnswer((t) -> { + throw new InterruptedException("Test Interrupted"); + }); + notificationSQSHandler.init(); + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0f9bbedf0a31ef19e6383da7e96ab800712576b7 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationQueueServiceImplTest.java @@ -0,0 +1,245 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.defaultanswers.ForwardsInvocations; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.queue.impl.NotificationQueueServiceImpl; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationQueueServiceImplTest { + + @Mock + private SubscriptionRepository subscriptionRepository; + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private AwsNotificationHandler notificationHandler; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Mock + private AmazonSQS sqsClient; + + @InjectMocks + NotificationQueueServiceImpl notificationQueueService; + + private static MockedConstruction<K8sLocalParameterProvider> mockedConstruction; + + @BeforeClass + public static void setup() { + mockedConstruction = Mockito.mockConstruction(K8sLocalParameterProvider.class, + (mock, context) -> { + //implement initializer for mock. Set return value for object A mock methods + when(mock.getParameterAsString("notification-sqs-url")).thenReturn( + "test-sqs-url"); + }); + } + + @AfterClass + public static void close(){ + mockedConstruction.close(); + } + + @Before + public void initTest() { + Whitebox.setInternalState(notificationQueueService, "region", "us-east-1"); + Whitebox.setInternalState(notificationQueueService, "failedNotificationTablePath", "failed-notificationTable-path"); + Whitebox.setInternalState(notificationQueueService, "sqsClient", sqsClient); + + Subscription subscription1 = new Subscription(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscription1Id"); + + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Arrays.asList(subscription1)); + when(dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(anyString())).thenReturn(dynamoDBQueryHelper); + } + + @Test + public void processNotificationMessage_success() throws Exception { + List<FailedNotificationDoc> list = new ArrayList<>(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(200); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_noSubscribers_success() { + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenReturn(Collections.emptyList()); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionGettingSubscription_NoMessagesProcessed() { + when(subscriptionRepository.getAllSubscriptionsByDataPartition(anyString())).thenAnswer(t -> { + throw new Exception("test Exception"); + }); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(0, responseMessageList.size()); + } + + @Test + public void processNotificationMessage_existingFailedMessages_sentToRetry() { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Arrays.asList(failedDoc); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionInFailedMessagesDBQuery_messagesSentToRetry() { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenAnswer(t -> { + throw new Exception(); + }); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_non200ResponseFromNotification_sentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_exceptionFromNotification_messagesSentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + when(notificationHandler.notifySubscriber(any(), any(), any())).thenThrow(new Exception()); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals("testMessage1", responseMessageList.get(0).getMessageId()); + } + + @Test + public void processNotificationMessage_non200ResponseFromNotificationMultipleMessages_sentToRetry() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testId"); + List<FailedNotificationDoc> list = Collections.emptyList(); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(list)))); + when(sqsClient.sendMessage(any())).thenReturn(new SendMessageResult()); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testDataPartition"), + createMessage("testMessage2", "testDataPartition")); + + List<Message> responseMessageList = notificationQueueService.processNotificationMessages(messageList); + assertEquals(2, responseMessageList.size()); + assertEquals("testMessage2", responseMessageList.get(1).getMessageId()); + } + + private Message createMessage(String messageId, String dataPartitionId) { + Message message = new Message(); + message.setMessageId(messageId); + MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + dataPartitionIdAttribute.setStringValue(dataPartitionId); + message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute); + return message; + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7f976384fddc2e6c69e580c6271d4b4e20e36241 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/queue/impl/NotificationRetryQueueServiceTest.java @@ -0,0 +1,258 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.queue.impl; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.FailedNotificationDoc; +import org.opengroup.osdu.notification.provider.aws.model.NotificationResult; +import org.opengroup.osdu.notification.provider.aws.model.RetryProcessResult; +import org.opengroup.osdu.notification.provider.aws.repository.SubscriptionRepository; +import org.opengroup.osdu.notification.provider.aws.service.AwsNotificationHandler; +import org.powermock.reflect.Whitebox; +import org.springframework.beans.factory.annotation.Value; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +@RunWith(MockitoJUnitRunner.class) +public class NotificationRetryQueueServiceTest { + + @Mock + private AwsNotificationHandler notificationHandler; + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private SubscriptionRepository subscriptionRepository; + + @Value("${aws.dynamodb.failedNotificationTable.ssmPath}") + private String failedNotificationTablePath; + + @InjectMocks + private NotificationRetryQueueServiceImpl notificationRetryQueueService; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Mock + private AmazonSQS sqsClient; + + @Captor + ArgumentCaptor<List<ChangeMessageVisibilityBatchRequestEntry>> changeMessageVisibilityRequestCaptor; + + @Before + public void beforeTest() { + Whitebox.setInternalState(notificationRetryQueueService, "failedNotificationTablePath", "failed-notificationTable-path"); + Whitebox.setInternalState(notificationRetryQueueService, "visibilityTimeout", 300); + + Subscription subscription1 = new Subscription(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscription1Id"); + + when(subscriptionRepository.getSubscriptionById(any(), any())).thenReturn(Optional.of(subscription1)); + when(dynamoDBQueryHelperFactory.getQueryHelperUsingSSM(anyString())).thenReturn(dynamoDBQueryHelper); + } + + @Test + public void processNotificationMessage_success() throws Exception { + FailedNotificationDoc failedDoc = new FailedNotificationDoc(); + failedDoc.setId("testFailedRecordId"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(failedDoc); + + HttpResponse response = new HttpResponse(); + response.setResponseCode(200); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.ACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_subscriptionNotFound_messagesTobeMarkedAcked() { + when(subscriptionRepository.getSubscriptionById(any(), any())).thenReturn(Optional.empty()); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.ACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_generalException_messageNotProcessed() { + when(subscriptionRepository.getSubscriptionById(any(), any())).thenAnswer(t -> { + throw new Exception(); + }); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_non200ResponseFromSubscriber_messageNotAcked() throws Exception { + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId","testDataPartition", 1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void processNotificationMessage_non200ResponseFromSubscriberMultipleMessages_messageNotAcked() throws Exception { + HttpResponse response = new HttpResponse(); + response.setResponseCode(500); + when(notificationHandler.notifySubscriber(any(), any(), any())).thenReturn(response); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId1","testDataPartition", 1), + createMessage("testMessage2", "testSubscription1Id", "testFailedRecordId2", "testDataPartition",1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(2, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(1).getResult()); + } + + @Test + public void processNotificationMessage_exceptionSendingNotification_messageNotAcked() throws Exception { + when(notificationHandler.notifySubscriber(any(), any(), any())).thenAnswer(t -> { + throw new Exception("test Exception"); + }); + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + List<RetryProcessResult> responseMessageList = notificationRetryQueueService.processNotificationMessages(messageList); + assertEquals(1, responseMessageList.size()); + assertEquals(NotificationResult.NACK, responseMessageList.get(0).getResult()); + } + + @Test + public void changeMessageVisibilityTimeout_success() { + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",1)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + List<ChangeMessageVisibilityBatchRequestEntry> visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(300), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",15)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(600), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",25)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(1200), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",35)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(2400), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",45)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(4800), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId", "testDataPartition",55)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(9600), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + + Mockito.reset(sqsClient); + messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId","testDataPartition", 61)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(1)).changeMessageVisibilityBatch(any(), changeMessageVisibilityRequestCaptor.capture()); + visibilityBatchRequestEntries = changeMessageVisibilityRequestCaptor.getValue(); + assertEquals(Integer.valueOf(43200), visibilityBatchRequestEntries.get(0).getVisibilityTimeout()); + } + + @Test + public void changeMessageVisibilityTimeout_moreThan10messages_success() { + List<Message> messageList = Arrays.asList(createMessage("testMessage1", "testSubscription1Id", "testFailedRecordId1", "testDataPartition", 1), + createMessage("testMessage2", "testSubscription1Id", "testFailedRecordId2", "testDataPartition", 1), + createMessage("testMessage3", "testSubscription1Id", "testFailedRecordId3", "testDataPartition", 1), + createMessage("testMessage4", "testSubscription1Id", "testFailedRecordId4", "testDataPartition", 1), + createMessage("testMessage5", "testSubscription1Id", "testFailedRecordId5", "testDataPartition", 1), + createMessage("testMessage6", "testSubscription1Id", "testFailedRecordId6", "testDataPartition", 1), + createMessage("testMessage7", "testSubscription1Id", "testFailedRecordId7", "testDataPartition", 1), + createMessage("testMessage8", "testSubscription1Id", "testFailedRecordId8", "testDataPartition", 1), + createMessage("testMessage9", "testSubscription1Id", "testFailedRecordId9", "testDataPartition", 1), + createMessage("testMessage10", "testSubscription1Id", "testFailedRecordId10", "testDataPartition", 1), + createMessage("testMessage11", "testSubscription1Id", "testFailedRecordId11", "testDataPartition", 1)); + notificationRetryQueueService.changeMessageVisibilityTimeout(sqsClient, "test-queue-url", messageList); + Mockito.verify(sqsClient, times(2)).changeMessageVisibilityBatch(any(), any()); + + } + + private Message createMessage(String messageId, String subscriptionId, String failedRecordId, String dataPartitionId, int receiveCount) { + Message message = new Message(); + message.setMessageId(messageId); + MessageAttributeValue subscriptionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + subscriptionIdAttribute.setStringValue(subscriptionId); + message.getMessageAttributes().put("subscriptionId", subscriptionIdAttribute); + + MessageAttributeValue dataPartitionIdAttribute = new MessageAttributeValue() + .withDataType("String"); + dataPartitionIdAttribute.setStringValue(dataPartitionId); + + MessageAttributeValue failedNotificationRecordId = new MessageAttributeValue() + .withDataType("String"); + failedNotificationRecordId.setStringValue(failedRecordId); + message.getMessageAttributes().put("failedNotificationRecordId", subscriptionIdAttribute); + message.getMessageAttributes().put("data-partition-id", dataPartitionIdAttribute); + + message.getAttributes().put("ApproximateReceiveCount", String.valueOf(receiveCount)); + return message; + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8d77de500ccc1338dada3fc21f4ddcf32cd3ba4a --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/repository/SubscriptionRepositoryTest.java @@ -0,0 +1,144 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.repository; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.internal.stubbing.defaultanswers.ForwardsInvocations; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperFactory; +import org.opengroup.osdu.core.aws.dynamodb.DynamoDBQueryHelperV2; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.provider.aws.model.SubscriptionDoc; +import org.opengroup.osdu.notification.provider.aws.security.KmsHelper; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@RunWith(MockitoJUnitRunner.class) +public class SubscriptionRepositoryTest { + + @Mock + private DynamoDBQueryHelperFactory dynamoDBQueryHelperFactory; + + @Mock + private KmsHelper kmsHelper; + + @InjectMocks + private SubscriptionRepository subscriptionRepository; + + @Mock + private DynamoDBQueryHelperV2 dynamoDBQueryHelper; + + @Before + public void initTest() { + Whitebox.setInternalState(subscriptionRepository, "subscriptionTableRelativePath", "services/core/register/SubscriptionTable"); + + when(dynamoDBQueryHelperFactory.getQueryHelperForPartition(anyString(), anyString())).thenReturn(dynamoDBQueryHelper); + } + + + @Test + public void getAllSubscriptions_hmacSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("HMAC"); + ArrayList<SubscriptionDoc> results = new ArrayList<>(); + results.add(subscription1); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(results)))); + + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition("testDataPartitionId"); + assertEquals(1, subscriptionList.size()); + assertEquals("testSubscriptionId1", subscriptionList.get(0).getId()); + } + + @Test + public void getAllSubscriptions_gsaSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("GSA"); + ArrayList<SubscriptionDoc> results = new ArrayList<>(); + results.add(subscription1); + when(dynamoDBQueryHelper.queryByGSI(any(), any())).thenReturn(mock(PaginatedQueryList.class, withSettings().defaultAnswer(new ForwardsInvocations(results)))); + + when(kmsHelper.decrypt(any(), any())).thenReturn("testSecretValue`test"); + + List<Subscription> subscriptionList = subscriptionRepository.getAllSubscriptionsByDataPartition("testDataPartitionId"); + assertEquals(1, subscriptionList.size()); + assertEquals("testSubscriptionId1", subscriptionList.get(0).getId()); + } + + @Test + public void getSubscriptionById_hmacSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("GSA"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(subscription1); + when(kmsHelper.decrypt(any(), any())).thenReturn("testSecretValue`test"); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(true, subscriptionResponse.isPresent()); + assertEquals("testSubscriptionId1", subscriptionResponse.get().getId()); + } + + @Test + public void getSubscriptionById_gsaSecretType_success(){ + SubscriptionDoc subscription1 = new SubscriptionDoc(); + subscription1.setName("My listener"); + subscription1.setTopic("records-changed"); + subscription1.setPushEndpoint("/api/test/subscriber"); + subscription1.setNotificationId("de-859ea6a6-eefa-4e30-ba38-13f8cd71360a"); + subscription1.setId("testSubscriptionId1"); + subscription1.setSecretType("HMAC"); + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(subscription1); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(true, subscriptionResponse.isPresent()); + assertEquals("testSubscriptionId1", subscriptionResponse.get().getId()); + } + + + @Test + public void getSubscriptionById_subscriptionNotFound_emptyResponse(){ + when(dynamoDBQueryHelper.loadByPrimaryKey(any(), any())).thenReturn(null); + + Optional<Subscription> subscriptionResponse = subscriptionRepository.getSubscriptionById("testDataPartitionId", "testSubscriptionId1"); + assertEquals(false, subscriptionResponse.isPresent()); + } +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..fe7d7e5c15c0851183643bfd6ce35b7cca029f11 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/security/ThreadSignatureServiceTest.java @@ -0,0 +1,109 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.security; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.cryptographic.SignatureServiceException; + +import javax.crypto.Mac; +import java.security.NoSuchAlgorithmException; + +@RunWith(MockitoJUnitRunner.class) +public class ThreadSignatureServiceTest { + + private static final String URL = "http://www.osdu.example.com/signature"; + private static final String SECRET = "93491ab4453ef0306bf08f3703460950"; + + private static final String HMAC = "02030405060708.090A123B0C0D0E0F"; + + @InjectMocks + private ThreadSignatureService threadSignatureService; + + @Test + public void getSignedSignature_success() throws SignatureServiceException { + String response = threadSignatureService.getSignedSignature(URL, SECRET); + assertNotNull(response); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_nullUrl_signatureServiceExceptio() throws SignatureServiceException { + threadSignatureService.getSignedSignature(null, SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, null); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignature_invalidAlgorithm_signatureServiceException() throws SignatureServiceException { + try (MockedStatic<Mac> mockedMac = Mockito.mockStatic(Mac.class)) { + mockedMac.when(() -> Mac.getInstance(any())).thenThrow(new NoSuchAlgorithmException()); + threadSignatureService.getSignedSignature(URL, SECRET); + } + } + + @Test + public void getSignedSignatureWithExpiryTime_success() throws SignatureServiceException { + String response = threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + assertNotNull(response); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpiryTime_nullUrl_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(null, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpriyTime_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, null, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpiryTime_expiredSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() - 100), SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void getSignedSignatureWithExpriyTime_invalidAlgorithm_signatureServiceException() throws SignatureServiceException { + try (MockedStatic<Mac> mockedMac = Mockito.mockStatic(Mac.class)) { + mockedMac.when(() -> Mac.getInstance(any())).thenThrow(new NoSuchAlgorithmException()); + threadSignatureService.getSignedSignature(URL, SECRET, String.valueOf(System.currentTimeMillis() + 2000), SECRET); + } + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_nullHmac_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature(null, SECRET); + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_nullSecret_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature(HMAC, null); + } + + @Test(expected = SignatureServiceException.class) + public void verifyHmacSignature_invalidHmac_signatureServiceException() throws SignatureServiceException { + threadSignatureService.verifyHmacSignature("invalidHmac", SECRET); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d7509e889c198a5c75062f9e3a2771bb8157d52a --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/service/AwsNotificationHandlerTest.java @@ -0,0 +1,113 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.service; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.http.HttpClient; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.notification.auth.factory.AuthFactory; +import org.opengroup.osdu.notification.auth.interfaces.SecretAuth; +import org.powermock.reflect.Whitebox; + +import java.util.HashMap; +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class AwsNotificationHandlerTest { + + @Mock + private HttpClient httpClient; + + @Mock + private AuthFactory authFactory; + + @InjectMocks + private AwsNotificationHandler notificationHandler; + + @Mock + private SecretAuth secretAuth; + + @Before + public void beforeTest() { + Whitebox.setInternalState(notificationHandler, "waitingTime", 100); + } + + @Test + public void notifySubscriber_success() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(200); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(200, response.getResponseCode()); + } + + @Test + public void notifySubscriber_withCollborationHeader_success() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + headerAttributes.put("x-collaboration", "a0ad53aa-430d-42b1-b676-2dbac583634c"); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(200); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(200, response.getResponseCode()); + } + + @Test + public void notifySubscriber_failed() throws Exception { + Subscription subscription = new Subscription(); + subscription.setSecret(new HmacSecret()); + subscription.setPushEndpoint("/api/test"); + Map<String, String> headerAttributes = new HashMap<>(); + + when(authFactory.getSecretAuth(any())).thenReturn(secretAuth); + when(secretAuth.getPushUrl(anyString())).thenReturn("/api/test"); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(500); + when(httpClient.send(any())).thenReturn(httpResponse); + + HttpResponse response = notificationHandler.notifySubscriber(subscription, "testMessage", headerAttributes); + assertEquals(500, response.getResponseCode()); + } + +} diff --git a/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..70e6c5f0bbaf22660a6943f995e00f641a5e4e03 --- /dev/null +++ b/provider/notification-aws/src/test/java/org/opengroup/osdu/notification/provider/aws/utils/SQSUtilsTest.java @@ -0,0 +1,84 @@ +// Copyright © Amazon Web Services +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.opengroup.osdu.notification.provider.aws.utils; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResultEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class SQSUtilsTest { + + @Mock + private AmazonSQS sqsClient; + + @InjectMocks + SQSUtils sqsUtils; + + @Test + public void getMessages_success() { + ReceiveMessageResult sqsResult = new ReceiveMessageResult(); + Message message1 = new Message(); + sqsResult.setMessages(Arrays.asList(message1)); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(sqsResult); + List<Message> response = sqsUtils.getMessages(sqsClient, "test-sqs-url", 10, 10); + assertEquals(10, response.size()); + } + + + @Test + public void deleteMessages_success() { + DeleteMessageBatchResult deleteResult = new DeleteMessageBatchResult(); + List<DeleteMessageBatchResultEntry> deleteEntries = new ArrayList<>(); + deleteEntries.add(new DeleteMessageBatchResultEntry()); + deleteResult.setSuccessful(deleteEntries); + + Message message1 = new Message(); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))).thenReturn(deleteResult); + int deletedCount = sqsUtils.deleteMessages(Arrays.asList(message1), "test-sqs-url", sqsClient); + assertEquals(1, deletedCount); + } + + @Test + public void sendMessagesToDeadLetterQueue_success() { + SendMessageResult sendResult = new SendMessageResult(); + + Message message1 = new Message(); + + when(sqsClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendResult); + List<SendMessageResult> results = sqsUtils.sendMessagesToDeadLetterQueue("test-deadletter-sqs-url", Arrays.asList(message1), sqsClient); + assertEquals(1, results.size()); + } +} diff --git a/testing/notification-test-aws/pom.xml b/testing/notification-test-aws/pom.xml index 2263ef4a77bd6cc23c6925fe6284f081689a13db..62dee1666a82d46de06e6f656ba27142c2b88750 100644 --- a/testing/notification-test-aws/pom.xml +++ b/testing/notification-test-aws/pom.xml @@ -59,7 +59,7 @@ <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>os-core-common</artifactId> - <version>0.3.6</version> + <version>0.21.0</version> </dependency> <dependency> diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/LegalTagUtils.java b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/LegalTagUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..0860938624cb9568baf43ab8e6b3f010502d6c5a --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/LegalTagUtils.java @@ -0,0 +1,65 @@ +package org.opengroup.osdu.notification.subscriptions; + +import static org.junit.Assert.assertEquals; + +import org.apache.http.HttpStatus; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.sun.jersey.api.client.ClientResponse; +import org.opengroup.osdu.notification.util.TestUtils; + +import java.util.UUID; + +public class LegalTagUtils { + + public static ClientResponse create(String legalTagName, String token, boolean isTestPartition) throws Exception { + return create("US", legalTagName, "2099-01-25", "Public Domain Data", token, isTestPartition); + } + + protected static ClientResponse create(String countryOfOrigin, String name, String expDate, String dataType, String token, boolean isTestPartition) + throws Exception { + String body = getBody(countryOfOrigin, name, expDate, dataType); + ClientResponse response = StorageTestUtils.send(getLegalUrl(), "legaltags", "POST", StorageTestUtils.getHeaders(TenantUtils.getTenantName(isTestPartition), token, UUID.randomUUID().toString(), isTestPartition), body, + ""); + + assertEquals(HttpStatus.SC_CREATED, response.getStatus()); + Thread.sleep(100); + return response; + } + + public static ClientResponse delete(String legalTagName, String token, boolean isTestPartition) throws Exception { + return StorageTestUtils.send(getLegalUrl(), "legaltags/" + legalTagName, "DELETE", StorageTestUtils.getHeaders(TenantUtils.getTenantName(isTestPartition), token, UUID.randomUUID().toString(), isTestPartition), "", ""); + } + + protected static String getLegalUrl() { + String legalUrl = System.getProperty("LEGAL_URL", System.getenv("LEGAL_URL")); + if (legalUrl == null || legalUrl.contains("-null")) { + legalUrl = "https://os-legal-dot-opendes.appspot.com/api/legal/v1/"; + } + return legalUrl; + } + + protected static String getBody(String countryOfOrigin, String name, String expDate, String dataType) { + + JsonArray coo = new JsonArray(); + coo.add(countryOfOrigin); + + JsonObject properties = new JsonObject(); + properties.add("countryOfOrigin", coo); + properties.addProperty("contractId", "A1234"); + properties.addProperty("expirationDate", expDate); + properties.addProperty("dataType", dataType); + properties.addProperty("originator", "MyCompany"); + properties.addProperty("securityClassification", "Public"); + properties.addProperty("exportClassification", "EAR99"); + properties.addProperty("personalData", "No Personal Data"); + + JsonObject tag = new JsonObject(); + tag.addProperty("name", name); + tag.addProperty("description", "test for " + name); + tag.add("properties", properties); + + return tag.toString(); + } +} diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/RecordUtils.java b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/RecordUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..3c14f0b4dc5c28d8a5d569be55aa8e2942c0e20a --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/RecordUtils.java @@ -0,0 +1,93 @@ +package org.opengroup.osdu.notification.subscriptions; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.opengroup.osdu.core.common.Constants; + +public class RecordUtils { + + private static final String domain = System.getProperty("DOMAIN", System.getenv("DOMAIN")); + + public static String createJsonRecordWithReference(int recordsCount, String id, String kind, String legalTag, String fromCrs, String conversionType, boolean isTestPartition) { + + JsonArray records = new JsonArray(); + + for (int i = 0; i < recordsCount; i++) { + + JsonObject data = new JsonObject(); + data.addProperty("X", 16.00); + data.addProperty("Y", 10.00); + data.addProperty("Z", 0.0); + + JsonArray propertyNames = new JsonArray(); + propertyNames.add("X"); + propertyNames.add("Y"); + propertyNames.add("Z"); + + JsonObject meta = new JsonObject(); + meta.addProperty(Constants.KIND, conversionType); + meta.addProperty(Constants.PERSISTABLE_REFERENCE, fromCrs); + meta.add(Constants.PROPERTY_NAMES, propertyNames); + + JsonArray metaBlocks = new JsonArray(); + metaBlocks.add(meta); + + JsonObject record = getRecordWithInputData(id + i, kind, legalTag, data, isTestPartition); + record.add(Constants.META, metaBlocks); + + records.add(record); + } + + return records.toString(); + } + + private static JsonObject getRecordWithInputData(String id, String kind, String legalTag, JsonObject data, boolean isTestPartition) { + JsonObject record = getDefaultRecord(id, kind, legalTag, isTestPartition); + record.add("data", data); + return record; + } + + private static JsonObject getDefaultRecord(String id, String kind, String legalTag, boolean isTestPartition) { + JsonArray acls = new JsonArray(); + acls.add(String.format("data.test1@%s", getAclSuffix(isTestPartition))); + return getDefaultRecordFromAcl(id, kind, legalTag, acls); + } + + private static JsonObject getDefaultRecordFromAcl(String id, String kind, String legalTag, JsonArray acls) { + JsonObject acl = new JsonObject(); + acl.add("viewers", acls); + acl.add("owners", acls); + + JsonArray tags = new JsonArray(); + tags.add(legalTag); + + JsonArray ordcJson = new JsonArray(); + ordcJson.add("BR"); + + JsonObject legal = new JsonObject(); + legal.add("legaltags", tags); + legal.add("otherRelevantDataCountries", ordcJson); + + JsonObject record = new JsonObject(); + record.addProperty("id", id); + record.addProperty("kind", kind); + record.add("acl", acl); + record.add("legal", legal); + return record; + } + + public static final String getAclSuffix(boolean isTestPartition) { + String environment = getEnvironment(); + //build.gradle currently throws exception if a variable is set to empty or not set at all + //workaround by setting it to an "empty" string to construct the url + if (environment.equalsIgnoreCase("empty")) environment = ""; + if (!environment.isEmpty()) + environment = "." + environment; + + return String.format("%s%s.%s", TenantUtils.getTenantName(isTestPartition), environment, domain); + } + + public static String getEnvironment() { + return System.getProperty("DEPLOY_ENV", System.getenv("DEPLOY_ENV")); + } +} diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/StorageTestUtils.java b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/StorageTestUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..befe028631c6ef0054f7308ea5773902e668b1b6 --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/StorageTestUtils.java @@ -0,0 +1,124 @@ +package org.opengroup.osdu.notification.subscriptions; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.core.MediaType; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.*; + +public class StorageTestUtils { + + public static ClientResponse send(String path, String httpMethod, Map<String, String> headers, String requestBody, + String query) throws Exception { + + log(httpMethod, StorageTestUtils.getApiPath(path + query), headers, requestBody); + Client client = StorageTestUtils.getClient(); + + WebResource webResource = client.resource(StorageTestUtils.getApiPath(path + query)); + + WebResource.Builder builder = webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON); + headers.forEach(builder::header); + + return builder.method(httpMethod, ClientResponse.class, requestBody); + } + + public static ClientResponse send(String url, String path, String httpMethod, Map<String, String> headers, + String requestBody, String query) throws Exception { + + log(httpMethod, url + path, headers, requestBody); + Client client = StorageTestUtils.getClient(); + + WebResource webResource = client.resource(url + path); + WebResource.Builder builder = webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON); + headers.forEach(builder::header); + + return builder.method(httpMethod, ClientResponse.class, requestBody); + } + + private static void log(String method, String url, Map<String, String> headers, String body) { + System.out.println(String.format("%s: %s", method, url)); + System.out.println(body); + } + + public static String getApiPath(String api) throws Exception { + String baseUrl = System.getProperty("STORAGE_URL", System.getenv("STORAGE_URL")); + if (baseUrl == null || baseUrl.contains("-null")) { + baseUrl = "https://localhost:8443/api/storage/v2/"; + } + URL mergedURL = new URL(baseUrl + api); + System.out.println(mergedURL.toString()); + return mergedURL.toString(); + } + + protected static Client getClient() { + TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] certs, String authType) { + } + + @Override + public void checkServerTrusted(X509Certificate[] certs, String authType) { + } + }}; + + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, trustAllCerts, new SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + } catch (Exception e) { + } + allowMethods("PATCH"); + return Client.create(); + } + + private static void allowMethods(String... methods) { + try { + Field methodsField = HttpURLConnection.class.getDeclaredField("methods"); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(methodsField, methodsField.getModifiers() & ~Modifier.FINAL); + + methodsField.setAccessible(true); + + String[] oldMethods = (String[]) methodsField.get(null); + Set<String> methodsSet = new LinkedHashSet<>(Arrays.asList(oldMethods)); + methodsSet.addAll(Arrays.asList(methods)); + String[] newMethods = methodsSet.toArray(new String[0]); + + methodsField.set(null/*static field*/, newMethods); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + + public static Map<String, String> getHeaders(String tenantName, String token, String correlationId, boolean isTestPartition) { + Map<String, String> headers = new HashMap<>(); + if(tenantName == null || tenantName.isEmpty()) { + tenantName = TenantUtils.getTenantName(false); + } + headers.put("data-partition-id", TenantUtils.getTenantName(isTestPartition)); + headers.put("Authorization", token); + + System.out.printf("Using correlation-id for the request: %s \n", correlationId); + headers.put("correlation-id", correlationId); + return headers; + } +} diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TenantUtils.java b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TenantUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..182461b8691c0f47a215b2ac9f38376e3bafaeb9 --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TenantUtils.java @@ -0,0 +1,10 @@ +package org.opengroup.osdu.notification.subscriptions; + +public class TenantUtils { + + public static String getTenantName(boolean isTestPartition) { + return isTestPartition ? + TestNotificationsEndpoint.PARTITION_TEST + : System.getProperty("OSDU_TENANT", System.getenv("OSDU_TENANT")); + } +} diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TestNotificationsEndpoint.java b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TestNotificationsEndpoint.java new file mode 100644 index 0000000000000000000000000000000000000000..de53e60c3fa09c562e09d0f00c917a38b12ba7c7 --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/TestNotificationsEndpoint.java @@ -0,0 +1,157 @@ +package org.opengroup.osdu.notification.subscriptions; + +import com.sun.jersey.api.client.ClientResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.notification.ISubscriptionService; +import org.opengroup.osdu.core.common.notification.SubscriptionAPIConfig; +import org.opengroup.osdu.core.common.notification.SubscriptionException; +import org.opengroup.osdu.core.common.notification.SubscriptionFactory; +import org.opengroup.osdu.notification.util.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +public class TestNotificationsEndpoint extends TestBase { + + private String subscriptionId = null; + private ISubscriptionService awssubscriptionService; + private TestUtils testUtils; + private static SubscriptionFactory awsfactory; + + private static final long NOW = System.currentTimeMillis(); + private static final String RECORD_ID_PREFIX = TenantUtils.getTenantName(false) + ":query:"; + private static final String RECORD_ID_PREFIX_TEST = TenantUtils.getTenantName(true) + ":query:"; + private static final String KIND = TenantUtils.getTenantName(false) + ":ds:query:1.0." + NOW; + private static final String KIND_TEST = TenantUtils.getTenantName(true) + ":ds:query:1.0." + NOW; + private static final String LEGAL_TAG = TenantUtils.getTenantName(false) + "-storage-" + System.currentTimeMillis(); + private static final String LEGAL_TAG_TEST = TenantUtils.getTenantName(true) + "-storage-" + System.currentTimeMillis(); + + protected static final String PARTITION_TEST = "performance-test"; + + protected static final String PERSISTABLE_REFERENCE = "%7B%22LB_CRS%22%3A%22%257B%2522WKT%2522%253A%2522PROJCS%255B%255C%2522British_National_Grid%255C%2522%252CGEOGCS%255B%255C%2522GCS_OSGB_1936%255C%2522%252CDATUM%255B%255C%2522D_OSGB_1936%255C%2522%252CSPHEROID%255B%255C%2522Airy_1830%255C%2522%252C6377563.396%252C299.3249646%255D%255D%252CPRIMEM%255B%255C%2522Greenwich%255C%2522%252C0.0%255D%252CUNIT%255B%255C%2522Degree%255C%2522%252C0.0174532925199433%255D%255D%252CPROJECTION%255B%255C%2522Transverse_Mercator%255C%2522%255D%252CPARAMETER%255B%255C%2522False_Easting%255C%2522%252C400000.0%255D%252CPARAMETER%255B%255C%2522False_Northing%255C%2522%252C-100000.0%255D%252CPARAMETER%255B%255C%2522Central_Meridian%255C%2522%252C-2.0%255D%252CPARAMETER%255B%255C%2522Scale_Factor%255C%2522%252C0.9996012717%255D%252CPARAMETER%255B%255C%2522Latitude_Of_Origin%255C%2522%252C49.0%255D%252CUNIT%255B%255C%2522Meter%255C%2522%252C1.0%255D%252CAUTHORITY%255B%255C%2522EPSG%255C%2522%252C27700%255D%255D%2522%252C%2522Type%2522%253A%2522LBCRS%2522%252C%2522EngineVersion%2522%253A%2522PE_10_3_1%2522%252C%2522AuthorityCode%2522%253A%257B%2522Authority%2522%253A%2522EPSG%2522%252C%2522Code%2522%253A%252227700%2522%257D%252C%2522Name%2522%253A%2522British_National_Grid%2522%257D%22%2C%22TRF%22%3A%22%257B%2522WKT%2522%253A%2522GEOGTRAN%255B%255C%2522OSGB_1936_To_WGS_1984_Petroleum%255C%2522%252CGEOGCS%255B%255C%2522GCS_OSGB_1936%255C%2522%252CDATUM%255B%255C%2522D_OSGB_1936%255C%2522%252CSPHEROID%255B%255C%2522Airy_1830%255C%2522%252C6377563.396%252C299.3249646%255D%255D%252CPRIMEM%255B%255C%2522Greenwich%255C%2522%252C0.0%255D%252CUNIT%255B%255C%2522Degree%255C%2522%252C0.0174532925199433%255D%255D%252CGEOGCS%255B%255C%2522GCS_WGS_1984%255C%2522%252CDATUM%255B%255C%2522D_WGS_1984%255C%2522%252CSPHEROID%255B%255C%2522WGS_1984%255C%2522%252C6378137.0%252C298.257223563%255D%255D%252CPRIMEM%255B%255C%2522Greenwich%255C%2522%252C0.0%255D%252CUNIT%255B%255C%2522Degree%255C%2522%252C0.0174532925199433%255D%255D%252CMETHOD%255B%255C%2522Position_Vector%255C%2522%255D%252CPARAMETER%255B%255C%2522X_Axis_Translation%255C%2522%252C446.448%255D%252CPARAMETER%255B%255C%2522Y_Axis_Translation%255C%2522%252C-125.157%255D%252CPARAMETER%255B%255C%2522Z_Axis_Translation%255C%2522%252C542.06%255D%252CPARAMETER%255B%255C%2522X_Axis_Rotation%255C%2522%252C0.15%255D%252CPARAMETER%255B%255C%2522Y_Axis_Rotation%255C%2522%252C0.247%255D%252CPARAMETER%255B%255C%2522Z_Axis_Rotation%255C%2522%252C0.842%255D%252CPARAMETER%255B%255C%2522Scale_Difference%255C%2522%252C-20.489%255D%252CAUTHORITY%255B%255C%2522EPSG%255C%2522%252C1314%255D%255D%2522%252C%2522Type%2522%253A%2522STRF%2522%252C%2522EngineVersion%2522%253A%2522PE_10_3_1%2522%252C%2522AuthorityCode%2522%253A%257B%2522Authority%2522%253A%2522EPSG%2522%252C%2522Code%2522%253A%25221314%2522%257D%252C%2522Name%2522%253A%2522OSGB_1936_To_WGS_1984_Petroleum%2522%257D%22%2C%22Type%22%3A%22EBCRS%22%2C%22EngineVersion%22%3A%22PE_10_3_1%22%2C%22Name%22%3A%22OSGB+1936+*+UKOOA-Pet+%2F+British+National+Grid+%5B27700%2C1314%5D%22%2C%22AuthorityCode%22%3A%7B%22Authority%22%3A%22MyCompany%22%2C%22Code%22%3A%2227700006%22%7D%7D"; + @BeforeClass + public static void classSetup() { + SubscriptionAPIConfig config = SubscriptionAPIConfig.builder().rootUrl(Config.Instance().RegisterServicePath).build(); + awsfactory = new SubscriptionFactory(config); + } + + @Before + public void setup() throws Exception { + this.testUtils = new AwsTestUtils(); + } + + @After + @Override + public void tearDown() throws Exception { + //LegalTagUtils.delete(LEGAL_TAG, testUtils.getOpsToken()); + this.testUtils = null; + } + + private void createResource() throws Exception { + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, TestUtils.getOsduTenant()); + headers.put(DpsHeaders.AUTHORIZATION, testUtils.getOpsToken()); + //hardcoding user here for 200 response tests. This is just initializing the subscription creation + headers.put("x-user-id", AwsConfig.getAWSCognitoUser()); + DpsHeaders dpsHeaders = DpsHeaders.createFromMap(headers); + awssubscriptionService = awsfactory.create(dpsHeaders); + + Map<String,String> h = dpsHeaders.getHeaders(); + System.out.println(h); + + + //Create a new subscription to pub/sub + Subscription subscription = new Subscription(); + subscription.setName("Subscription-test-for-notification"); + subscription.setDescription("Subscription test for fetching notifications"); + subscription.setTopic(Config.Instance().Topic); + //This seems to be a bug. Don't need to add the string + //subscription.setPushEndpoint(Config.Instance().HMACPushUrl + "hmac-integration-test"); + subscription.setPushEndpoint(Config.Instance().HMACPushUrl); + HmacSecret secret = new HmacSecret(); + secret.setValue(Config.Instance().hmacSecretValue); + + subscription.setSecret(secret); + try { + Subscription subscriptionCreated = awssubscriptionService.create(subscription); + + String notificationId = subscriptionCreated.getNotificationId(); + subscriptionId = subscriptionCreated.getId(); + Config.Instance().NotificationId = notificationId; + }catch (SubscriptionException e){ + System.out.println("Subscription exception inner response : " + e.getHttpResponse()); + throw e; + } + } + + private ClientResponse createStorageRecord(final String correlationId) throws Exception { + String recordId = RECORD_ID_PREFIX + UUID.randomUUID().toString(); + String jsonInput = RecordUtils.createJsonRecordWithReference(1, recordId, KIND, LEGAL_TAG, PERSISTABLE_REFERENCE, "CRS", false); + return StorageTestUtils.send("records", "PUT", StorageTestUtils.getHeaders(TenantUtils.getTenantName(false), testUtils.getAdminToken(), correlationId, false), jsonInput, ""); + } + + private ClientResponse createStorageRecordForTestPartition(final String correlationId) throws Exception { + String recordId = RECORD_ID_PREFIX_TEST + UUID.randomUUID().toString(); + String jsonInput = RecordUtils.createJsonRecordWithReference(1, recordId, KIND_TEST, LEGAL_TAG_TEST, PERSISTABLE_REFERENCE, "CRS", true); + return StorageTestUtils.send("records", "PUT", StorageTestUtils.getHeaders(TenantUtils.getTenantName(true), testUtils.getAdminToken(), correlationId, true), jsonInput, ""); + } + + @Test + public void testVerifyNotificationReceivedWhenDataPartitionIdIsDifferent() throws Exception { + try { + LegalTagUtils.create(LEGAL_TAG_TEST, testUtils.getOpsToken(), true); + createResource(); + final String correlationId = UUID.randomUUID().toString(); + ClientResponse response = createStorageRecordForTestPartition(correlationId); + assertEquals(201, response.getStatus()); + //Executing notifications response to endpoints takes an upper bound of 120s. + Thread.sleep(120000); + + //Run Bash File to fetch logs from register endpoint and verify that notification was received + String bashFileToExecute = "src/test/java/org/opengroup/osdu/notification/subscriptions/verify_register-logs.sh " + correlationId; + Process process = Runtime.getRuntime().exec(bashFileToExecute); + process.waitFor(); + int exitValue = process.exitValue(); + assertEquals(exitValue, 1); + } catch (Exception e) { + System.out.println("Error while verifying notification service" + e); + } finally { + awssubscriptionService.delete(subscriptionId); + LegalTagUtils.delete(LEGAL_TAG_TEST, testUtils.getOpsToken(), true); + } + } + + @Test + public void testVerifyNotificationReceivedWhenDataPartitionIdIsSame() throws Exception { + try { + LegalTagUtils.create(LEGAL_TAG, testUtils.getOpsToken(), false); + createResource(); + final String correlationId = UUID.randomUUID().toString(); + ClientResponse response = createStorageRecord(correlationId); + assertEquals(201, response.getStatus()); + //Executing notifications response to endpoints takes an upper bound of 120s. + Thread.sleep(120000); + + //Run Bash File to fetch logs from register endpoint and verify that notification was received + String bashFileToExecute = "src/test/java/org/opengroup/osdu/notification/subscriptions/verify_register-logs.sh " + correlationId; + Process process = Runtime.getRuntime().exec(bashFileToExecute); + process.waitFor(); + int exitValue = process.exitValue(); + assertEquals(exitValue, 0); + } catch (Exception e) { + System.out.println("Error while verifying notification service" + e); + } finally { + awssubscriptionService.delete(subscriptionId); + LegalTagUtils.delete(LEGAL_TAG_TEST, testUtils.getOpsToken(), false); + } + } +} diff --git a/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/verify_register-logs.sh b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/verify_register-logs.sh new file mode 100644 index 0000000000000000000000000000000000000000..6832babe4761f27a9da3d986bc1194f32af61979 --- /dev/null +++ b/testing/notification-test-aws/src/test/java/org/opengroup/osdu/notification/subscriptions/verify_register-logs.sh @@ -0,0 +1,8 @@ +register_pod=$(kubectl get pods --all-namespaces -o=jsonpath='{range .items..metadata}{.name}{"\n"}{end}' | fgrep os-register-) +register_logs=$(kubectl logs $register_pod -n osdu-instance-$OSDU_INSTANCE_NAME-core --since=2m) + +if echo $register_logs | grep $1; then + return 0 +else + return 1 +fi \ No newline at end of file