Skip to content
Snippets Groups Projects
Commit 80290cac authored by MIchael Nguyen's avatar MIchael Nguyen
Browse files

Merge branch 'dockerfile' of...

Merge branch 'dockerfile' of https://git-codecommit.us-east-1.amazonaws.com/v1/repos/os-indexer into dockerfile
parents 20df2b45 9a558a92
No related branches found
No related tags found
1 merge request!6Trusted ibm
Showing
with 0 additions and 874 deletions
FROM amazoncorretto:8
WORKDIR /
COPY provider/indexer-aws/indexer-queue-aws/target/indexer-queue-aws-1.0-SNAPSHOT.jar indexer-queue-aws-1.0-SNAPSHOT.jar
EXPOSE 8080
CMD ["java","-jar", "indexer-queue-aws-1.0-SNAPSHOT.jar"]
\ No newline at end of file
<?xml version="1.0"?>
<!--
Copyright © Amazon Web Services
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.opengroup.osdu.indexerqueue.aws</groupId>
<artifactId>indexer-queue-aws</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>
org.opengroup.osdu.indexerqueue.aws.api.IndexerQueue
</Main-Class>
<X-Compile-Source-JDK>8</X-Compile-Source-JDK>
<X-Compile-Target-JDK>8</X-Compile-Target-JDK>
</manifestEntries>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>com.github.seahen</groupId>
<artifactId>maven-s3-wagon</artifactId>
<version>1.3.0</version>
</extension>
</extensions>
</build>
<name>indexer-queue-aws</name>
<packaging>jar</packaging>
<properties>
<aws.version>1.11.637</aws.version>
<deployment.environment>dev</deployment.environment>
<deployment.repositorybucket>osdu-local-maven-repository</deployment.repositorybucket>
</properties>
<!-- Internal packages -->
<dependencies>
<dependency>
<groupId>org.opengroup.osdu.core.aws</groupId>
<artifactId>aws-osdu-util</artifactId>
<version>0.0.1</version>
</dependency>
<!-- AWS managed packages -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.651</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.327</version>
</dependency>
<!-- Third party Apache 2.0 license packages -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>7.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
<!-- Testing packages -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven-s3-release-repo</id>
<name>S3 Maven Release Repository</name>
<url>s3://${deployment.repositorybucket}/release</url>
</repository>
<repository>
<id>maven-s3-snapshot-repo</id>
<name>S3 Maven Snapshot Repository</name>
<url>s3://${deployment.repositorybucket}/snapshot</url>
</repository>
<repository>
<id>local.release</id>
<url>file:../../local-release-dir</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<distributionManagement>
<site>
<id>s3.site</id>
<url>s3://${deployment.repositorybucket}/site</url>
</site>
<repository>
<id>s3.release</id>
<url>s3://${deployment.repositorybucket}/release</url>
</repository>
<snapshotRepository>
<id>s3.snapshot</id>
<url>s3://${deployment.repositorybucket}/snapshot</url>
</snapshotRepository>
</distributionManagement>
</project>
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
public enum CallableResult {
Pass,
Fail
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
public class EnvironmentVariables {
public String region;
public String queueName;
public String targetURL;
public String deadLetterQueueName;
public int maxBatchRequestCount;
public int maxMessagesAllowed;
public int maxIndexThreads;
public long keepAliveTimeInMin;
public String cognitoClientId;
public String cognitoAuthFlow;
public String cognitoUser;
public String cognitoPassword;
public EnvironmentVariables() {
this.region = System.getenv("AWS_REGION") != null ? System.getenv("AWS_REGION") : "us-east-1";
this.queueName = System.getenv("AWS_QUEUE_INDEXER_NAME") != null ? System.getenv("AWS_QUEUE_INDEXER_NAME") : "dev-osdu-storage-queue";
this.targetURL = System.getenv("AWS_INDEXER_INDEX_API") != null ? System.getenv("AWS_INDEXER_INDEX_API"): "ECSALB-os-indexer-1927005132.us-east-1.elb.amazonaws.com/api/indexer/v2/_dps/task-handlers/index-worker";
this.deadLetterQueueName = System.getenv("AWS_DEADLETTER_QUEUE_NAME") != null ? System.getenv("AWS_DEADLETTER_QUEUE_NAME") : "dev-osdu-storage-dead-letter-queue";
this.maxIndexThreads = System.getenv("MAX_INDEX_THREADS") != null ? Integer.parseInt(System.getenv("MAX_INDEX_THREADS")) : 50;
this.maxBatchRequestCount = System.getenv("MAX_REQUEST_COUNT") != null ? Integer.parseInt(System.getenv("MAX_REQUEST_COUNT")) : 10;
this.maxMessagesAllowed = System.getenv("MAX_MESSAGE_COUNT") != null ? Integer.parseInt(System.getenv("MAX_MESSAGE_COUNT")) : 100000;
this.keepAliveTimeInMin = System.getenv("KEEP_ALIVE_IN_MINUTES") != null ? Long.parseLong(System.getenv("KEEP_ALIVE_IN_MINUTES")) : 9999;
this.cognitoClientId = System.getenv("AWS_COGNITO_CLIENT_ID") != null ? System.getenv("AWS_COGNITO_CLIENT_ID") : "3rmgmg8mup281ttc1mbut1pimc";
this.cognitoAuthFlow = System.getenv("AWS_COGNITO_AUTH_FLOW") != null ? System.getenv("AWS_COGNITO_AUTH_FLOW") : "USER_PASSWORD_AUTH";
this.cognitoUser = System.getenv("AWS_COGNITO_AUTH_PARAMS_USER") != null ? System.getenv("AWS_COGNITO_AUTH_PARAMS_USER") : "test-user-with-access@testing.com";
this.cognitoPassword = System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD") != null ? System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD"): "Password123*";
};
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.xray.model.Http;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.Callable;
import com.amazonaws.services.sqs.model.Message;
public class IndexProcessor implements Callable<IndexProcessor> {
public CallableResult result;
public Exception exception;
public Message message;
public String messageId;
public String receiptHandle;
public StringBuilder response;
public String targetURL;
public String indexerServiceAccountJWT;
public IndexProcessor(Message message, String targetUrl, String indexServiceAccountJWT){
this.message = message;
this.targetURL = targetUrl;
this.receiptHandle = message.getReceiptHandle();
result = CallableResult.Pass;
this.indexerServiceAccountJWT = indexServiceAccountJWT;
}
@Override
public IndexProcessor call() {
ObjectMapper mapper = new ObjectMapper();
try {
this.messageId = message.getMessageId();
RecordChangedMessages convertedMessage = getConvertedMessage(message);
String body = mapper.writeValueAsString(convertedMessage);
HttpURLConnection connection = getConnection(body, convertedMessage.attributes.get("data-partition-id"));
sendRequest(connection, body);
getResponse(connection);
} catch (MalformedURLException e) {
System.out.println(e.getMessage());
result = CallableResult.Fail;
exception = e;
} catch (ProtocolException e) {
System.out.println(e.getMessage());
result = CallableResult.Fail;
exception = e;
} catch (IOException e) {
System.out.println(e.getMessage());
result = CallableResult.Fail;
exception = e;
} catch (Exception e){
System.out.println(e.getMessage());
result = CallableResult.Fail;
exception = e;
}
return this;
}
private RecordChangedMessages getConvertedMessage(Message message){
RecordChangedMessages convertedMessage = new RecordChangedMessages();
convertedMessage.data = message.getBody();
convertedMessage.messageId = message.getMessageId();
Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes();
MessageAttributeValue dataPartitionIdValue = messageAttributes.get("data-partition-id");
MessageAttributeValue accountIdValue = messageAttributes.get("account-id");
Map<String, String> attributes = new HashMap<>();
attributes.put("data-partition-id", dataPartitionIdValue.getStringValue());
attributes.put("account-id", accountIdValue.getStringValue());
convertedMessage.attributes = attributes;
return convertedMessage;
}
private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException {
URL url = new URL(this.targetURL);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Length",
Integer.toString(body.getBytes().length));;
connection.setRequestProperty("Content-Type",
"application/json");
connection.setRequestProperty("data-partition-id", dataPartitionId);
connection.setRequestProperty("Authorization", "Bearer " + this.indexerServiceAccountJWT);
connection.setUseCaches(false);
connection.setDoOutput(true);
return connection;
}
private void sendRequest(HttpURLConnection connection, String body) throws IOException {
DataOutputStream wr = new DataOutputStream (
connection.getOutputStream());
wr.writeBytes(body);
wr.close();
}
private void getResponse(HttpURLConnection connection) throws IOException {
InputStream is = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = rd.readLine()) != null) {
this.response.append(line);
this.response.append('\r');
}
rd.close();
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.logs.AWSLogs;
import com.amazonaws.services.logs.model.InputLogEvent;
import com.amazonaws.services.logs.model.PutLogEventsRequest;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
import org.opengroup.osdu.core.aws.cognito.AWSCognitoClient;
import org.opengroup.osdu.core.aws.logging.AmazonLogConfig;
import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
public class IndexerQueue {
/**
* int maxIndexThreads = 10;
* String region = "us-east-1";
* String queueName = "dev-osdu-storage-queue";
* String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker";
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) {
EnvironmentVariables environmentVariables = new EnvironmentVariables();
// AmazonLogConfig logConfig = new AmazonLogConfig(environmentVariables.region);
// AWSLogs log = logConfig.AmazonLog();
// PutLogEventsRequest logRequest = new PutLogEventsRequest("ECSLogGroup-indexer-queue", "indexer-queue", new ArrayList<>());
// InputLogEvent logEvent = new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments");
// logRequest.getLogEvents().add(logEvent);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
System.out.println(environmentVariables.region);
System.out.println(environmentVariables.maxBatchRequestCount);
System.out.println(environmentVariables.queueName);
System.out.println(environmentVariables.cognitoAuthFlow);
System.out.println(environmentVariables.cognitoClientId);
System.out.println(environmentVariables.cognitoPassword);
System.out.println(environmentVariables.cognitoUser);
System.out.println(environmentVariables.deadLetterQueueName);
System.out.println(environmentVariables.maxIndexThreads);
System.out.println(environmentVariables.maxMessagesAllowed);
System.out.println(environmentVariables.targetURL);
System.out.println(environmentVariables.keepAliveTimeInMin);
try {
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments"));
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
AWSCognitoClient cognitoClient = new AWSCognitoClient(environmentVariables.cognitoClientId, environmentVariables.cognitoAuthFlow,
environmentVariables.cognitoUser, environmentVariables.cognitoPassword);
String indexerServiceAccountJWT = cognitoClient.getToken();
if(indexerServiceAccountJWT == null){
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Indexer service account not set up correctly"));
}
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName)));
AmazonSQSConfig sqsConfig = new AmazonSQSConfig(environmentVariables.region);
AmazonSQS sqsClient = sqsConfig.AmazonSQS();
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads)));
ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(environmentVariables.maxIndexThreads);
final String deadLetterQueueUrl = sqsClient.getQueueUrl(environmentVariables.deadLetterQueueName).getQueueUrl();
System.out.println(deadLetterQueueUrl);
List<Message> messages = IndexerQueueService.getMessages(sqsClient, environmentVariables.queueName, environmentVariables.maxBatchRequestCount, environmentVariables.maxMessagesAllowed);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Processing %s messages from storage queue", messages.size())));
if (!messages.isEmpty()) {
List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, environmentVariables.targetURL, executorPool, indexerServiceAccountJWT);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Processed", indexProcessors.size())));
List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList());
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Failed", failedProcessors.size())));
List<SendMessageResult> deadLetterResults = IndexerQueueService.sendMsgsToDeadLetterQueue(deadLetterQueueUrl, failedProcessors, sqsClient);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Dead Lettered", deadLetterResults.size())));
List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList());
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Deleting", deleteEntries.size())));
final String sqsQueueUrl = sqsClient.getQueueUrl(environmentVariables.queueName).getQueueUrl();
List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, environmentVariables.maxBatchRequestCount);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())));
List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Requests Deleted", deleteMessageBatchResults.size())));
}
} catch (ExecutionException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} catch (InterruptedException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} catch (NullPointerException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
}catch (Exception e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} finally {
System.out.println("failing finally");
// log.putLogEvents(logRequest);
}
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.Logger;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
public class IndexerQueueService {
public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool, String indexerServiceAccountJWT)
throws ExecutionException, InterruptedException {
List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url, indexerServiceAccountJWT);
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture
.allOf(cfs)
.thenApply(ignored -> futures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<IndexProcessor> processed = results.get();
executorPool.shutdown();
return processed;
}
public static List<DeleteMessageBatchResult> deleteMessages(List<DeleteMessageBatchRequest> deleteBatchRequests, AmazonSQS sqsClient) {
return deleteBatchRequests.stream().map(deleteRequest -> sqsClient.deleteMessageBatch(deleteRequest)).collect(Collectors.toList());
}
public static List<DeleteMessageBatchRequest> createMultipleBatchDeleteRequest(String queueUrl, List<DeleteMessageBatchRequestEntry> deleteEntries, int maxBatchRequest) {
List<List<DeleteMessageBatchRequestEntry>> batchedEntries = ListUtils.partition(deleteEntries, maxBatchRequest);
return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList());
}
public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url, String indexerServiceAccountJWT){
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
for (final Message message : messages) {
System.out.println(message);
System.out.println(url);
System.out.println(indexerServiceAccountJWT);
IndexProcessor processor = new IndexProcessor(message, url, indexerServiceAccountJWT);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool);
futures.add(future);
}
return futures;
}
public static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int numOfmessages, int maxMessageCount){
final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
System.out.println("inside get messages");
System.out.println(sqsQueueUrl);
int numOfMessages = numOfmessages;
List<Message> messages = new ArrayList<>();
do {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(numOfMessages);
receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id");
List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
messages.addAll(retrievedMessages);
numOfMessages = retrievedMessages.size();
}while (messages.size() < maxMessageCount && numOfMessages > 0);
return messages;
}
public static List<SendMessageResult> sendMsgsToDeadLetterQueue(String deadLetterQueueUrl, List<IndexProcessor> indexProcessors, AmazonSQS sqsClient) {
return indexProcessors.stream().map(indexProcessor -> sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient)).collect(Collectors.toList());
}
private static SendMessageResult sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){
Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes();
MessageAttributeValue exceptionAttribute = new MessageAttributeValue()
.withDataType("String");
String exceptionMessage = indexProcessor.exception.getMessage();
if(exceptionMessage == null){
exceptionMessage = "Empty";
}
StringWriter sw = new StringWriter();
indexProcessor.exception.printStackTrace(new PrintWriter(sw));
String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString());
exceptionAttribute.setStringValue(exceptionAsString);
messageAttributes.put("Exception", exceptionAttribute);
SendMessageRequest send_msg_request = new SendMessageRequest()
.withQueueUrl(deadLetterQueueUrl)
.withMessageBody(indexProcessor.message.getBody())
.withMessageAttributes(messageAttributes);
return sqsClient.sendMessage(send_msg_request);
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.Map;
// TODO: consolidate this model with core refactor
public class RecordChangedMessages {
public String messageId;
public String publishTime;
public String data;
public Map<String, String> attributes;
public String getMessageId(){
return this.messageId;
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
public class IndexProcessorTest {
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class IndexerQueueServiceTest {
@Rule
public ExpectedException exception = ExpectedException.none();
@Mock
private AmazonSQS sqsClient = mock(AmazonSQS.class);
private GetQueueUrlResult queueUrlResult;
private ReceiveMessageResult receiveResult;
private String queueName = "testQueue";
private String queueUrl ="localhost";
@Before
public void setUp() {
queueUrlResult = new GetQueueUrlResult();
queueUrlResult.setQueueUrl(queueUrl);
Mockito.when(sqsClient.getQueueUrl(queueName)).thenReturn(queueUrlResult);
receiveResult = new ReceiveMessageResult();
}
@Test
public void test_getMessages_10Messages_pass() {
// Arrange
int numOfmessages = 10;
int maxMessageCount = 10;
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(numOfmessages);
receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id");
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg = new Message();
messages.add(msg);
}
receiveResult.setMessages(messages);
Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult);
// Act
List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount);
// Assert
assertEquals(messages.get(1), actualMessages.get(1));
assertEquals(messages.size(), actualMessages.size());
}
@Test
public void test_getMessages_0Messages_pass() {
// Arrange
int numOfmessages = 10;
int maxMessageCount = 10;
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(numOfmessages);
receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id");
List<Message> messages = new ArrayList<>();
receiveResult.setMessages(messages);
Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult);
// Act
List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount);
// Assert
assertEquals(messages.size(), actualMessages.size());
}
@Test(expected = NullPointerException.class)
public void test_getMessages_invalidqueuename_fail() {
// Arrange
int numOfmessages = 10;
int maxMessageCount = 10;
String invalidQueueName = "invalid";
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(numOfmessages);
receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id");
List<Message> messages = new ArrayList<>();
receiveResult.setMessages(messages);
Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult);
// Act
List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, invalidQueueName, numOfmessages, maxMessageCount);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment