Skip to content
Snippets Groups Projects
Commit 189c786c authored by MIchael Nguyen's avatar MIchael Nguyen
Browse files

updating indexer queue.

parent 6b9e2fa2
No related branches found
No related tags found
1 merge request!6Trusted ibm
FROM amazoncorretto:8
RUN mkdir /docker
WORKDIR /docker
COPY pom.xml .
RUN mvn dependency:resolve
COPY src .
RUN mvn clean verify
\ No newline at end of file
......@@ -71,5 +71,10 @@
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
</dependencies>
</project>
......@@ -5,6 +5,7 @@ import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.*;
import com.sun.org.apache.xpath.internal.Arg;
import org.apache.commons.cli.*;
import org.apache.commons.collections4.ListUtils;
import org.opengroup.osdu.core.aws.sqs.SQSBuilder;
import java.io.PrintWriter;
......@@ -28,33 +29,29 @@ public class IndexerQueue {
*/
// TODO: what happens when the exception happens here?
public static void main(String[] args) throws ExecutionException, InterruptedException, ParseException {
int maxThreadCount = 50;
Arguments arguments = getArguments(args);
// TODO: how long do in flight messages last?
// TODO: how long do in flight messages last? we can make them in flight for 12 hours max
AmazonSQS sqsClient = SQSBuilder.generateSqsClient(arguments.region);
ThreadPoolExecutor executorPool = getExecutor(arguments.maxIndexThreads, arguments.keepAliveTimeInMin);
ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreadCount);
//TODO: make this into a while loop
recursiveProcessQueue(sqsClient, arguments, executorPool);
processQueue(sqsClient, arguments, executorPool);
}
// TODO: break down into smaller methods for cleaner code
private static void recursiveProcessQueue(AmazonSQS sqsClient, Arguments arguments, ThreadPoolExecutor executorPool)
private static void processQueue(AmazonSQS sqsClient, Arguments arguments, ThreadPoolExecutor executorPool)
throws ExecutionException, InterruptedException {
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
// TODO: change this to have each thread reach out to SQS
List<Message> messages = getMessages(sqsClient, arguments.queueName, arguments.maxIndexThreads);
List<Message> messages = getMessages(sqsClient, arguments.queueName);
if (!messages.isEmpty()) {
// TODO: better logging to cloudwatch
System.out.println(String.format("Processing %s messages from storage queue", messages.size()));
for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message, arguments.targetURL);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool);
futures.add(future);
}
List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, arguments.targetURL);
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture
.allOf(cfs)
......@@ -65,6 +62,8 @@ public class IndexerQueue {
List<IndexProcessor> indexProcessors = results.get();
executorPool.shutdown();
List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl();
for (IndexProcessor indexProcessor : indexProcessors) {
DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle);
......@@ -75,31 +74,47 @@ public class IndexerQueue {
}
if(deleteEntries.size() > 0) {
DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(arguments.queueName, deleteEntries);
sqsClient.deleteMessageBatch(deleteBatchRequest);
List<DeleteMessageBatchRequest> deleteBatchRequests = createMultipleBatchDeleteRequest(arguments.queueName, deleteEntries);
for (DeleteMessageBatchRequest deleteRequest : deleteBatchRequests) {
sqsClient.deleteMessageBatch(deleteRequest);
}
}
recursiveProcessQueue(sqsClient, arguments, executorPool);
}
return;
}
private static ThreadPoolExecutor getExecutor(int maxIndexThreads, long keepAliveTimeInMin){
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
Executors.newFixedThreadPool(maxIndexThreads); // what is keep alive?
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, maxIndexThreads, keepAliveTimeInMin,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxIndexThreads), threadFactory, rejectionHandler);
return executorPool;
private static List<DeleteMessageBatchRequest> createMultipleBatchDeleteRequest(String queueName, List<DeleteMessageBatchRequestEntry> deleteEntries) {
List<List<DeleteMessageBatchRequestEntry>> batchedEntries = ListUtils.partition(deleteEntries, 10);
return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueName, entries)).collect(Collectors.toList());
}
private static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url){
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message, url);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool);
futures.add(future);
}
return futures;
}
private static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int maxThreads){
// TODO make the upper bound configurable as a program argument
private static List<Message> getMessages(AmazonSQS sqsClient, String queueName){
final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(maxThreads);
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest
.withMessageAttributeNames("data-partition-id", "account-id")).getMessages();
int numOfMessages = 10;
List<Message> messages = new ArrayList<>();
do {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(numOfMessages);
List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest
.withMessageAttributeNames("data-partition-id", "account-id")).getMessages();
messages.addAll(retrievedMessages);
numOfMessages = retrievedMessages.size();
}while (messages.size() < 1000 && numOfMessages > 0);
return messages;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment