Skip to content
Snippets Groups Projects
Commit dd345639 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Indexer queue in progress

parent ad23e937
No related branches found
No related tags found
1 merge request!6Trusted ibm
......@@ -66,5 +66,10 @@
<version>1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
</dependencies>
</project>
package org.opengroup.osdu.indexerqueue.aws.api;
public class Arguments {
public String region;
public String queueName;
public String targetURL;
public int maxIndexThreads;
public String deadLetterQueueName;
public long keepAliveTimeInMin;
}
......@@ -2,6 +2,8 @@ package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.xray.model.Http;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -20,13 +22,15 @@ import com.amazonaws.services.sqs.model.Message;
public class IndexProcessor implements Callable<IndexProcessor> {
public CallableResult result;
public Exception exception;
private Message message;
public Message message;
public String messageId;
public String receiptHandle;
public StringBuilder response;
public String targetURL;
public IndexProcessor(Message message){
public IndexProcessor(Message message, String targetUrl){
this.message = message;
this.targetURL = targetUrl;
this.receiptHandle = message.getReceiptHandle();
result = CallableResult.Pass;
}
......@@ -34,69 +38,18 @@ public class IndexProcessor implements Callable<IndexProcessor> {
@Override
public IndexProcessor call() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
HttpURLConnection connection = null;
try {
String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker";
URL url = new URL(targetURL);
SQSMessageBody messageBody = mapper.readValue(message.getBody(), SQSMessageBody.class);
RecordChangedMessages convertedMessage = new RecordChangedMessages();
convertedMessage.data = messageBody.Message;
convertedMessage.messageId = messageBody.MessageId;
this.messageId = messageBody.MessageId;
convertedMessage.publishTime = messageBody.Timestamp;
// "MessageAttributes" : {
// "account-id" : {"Type":"String","Value":"common"},
// "correlation-id" : {"Type":"String","Value":"5c43af36-d826-40cd-8e7f-604a5e7b2bc0"},
// "data-partition-id" : {"Type":"String","Value":"common"}
// }
convertedMessage.attributes = new HashMap<>();
convertedMessage.attributes.put("data-partition-id", "common");
convertedMessage.attributes.put("account-id", "common");
// Map<String, Object> messageAttributes = messageBody.MessageAttributes;
// for (Map.Entry<String, Object> entry : messageAttributes.entrySet()) {
// if(entry.getKey() == "data-partition-id"){
// String enStr = entry.getValue().toString();
// Map<String, String> dataPartitionValues = mapper.readValue(entry.getValue().toString(), Map.class);
// convertedMessage.attributes.put(entry.getKey(), dataPartitionValues.get("Value"));
// }
// }
this.messageId = message.getMessageId();
RecordChangedMessages convertedMessage = getConvertedMessage(message);
String body = mapper.writeValueAsString(convertedMessage);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Length",
Integer.toString(body.getBytes().length));;
connection.setRequestProperty("Content-Type",
"application/json");
connection.setRequestProperty("data-partition-id", "common");
connection.setRequestProperty("user", "opendes@byoc.local");
String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8
connection.setRequestProperty("Authorization", "Basic "+encoded);
connection.setUseCaches(false);
connection.setDoOutput(true);
//Send request
DataOutputStream wr = new DataOutputStream (
connection.getOutputStream());
wr.writeBytes(body);
wr.close();
//Get Response
// TODO: check for 200 to set to pass
InputStream is = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = rd.readLine()) != null) {
this.response.append(line);
this.response.append('\r');
}
rd.close();
HttpURLConnection connection = getConnection(body, convertedMessage.attributes.get("data-partition-id"));
sendRequest(connection, body);
getResponse(connection);
} catch (MalformedURLException e) {
result = CallableResult.Fail;
exception = e;
......@@ -112,4 +65,59 @@ public class IndexProcessor implements Callable<IndexProcessor> {
}
return this;
}
private RecordChangedMessages getConvertedMessage(Message message){
RecordChangedMessages convertedMessage = new RecordChangedMessages();
convertedMessage.data = message.getBody();
convertedMessage.messageId = message.getMessageId();
Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes();
MessageAttributeValue dataPartitionIdValue = messageAttributes.get("data-partition-id");
MessageAttributeValue accountIdValue = messageAttributes.get("account-id");
Map<String, String> attributes = new HashMap<>();
attributes.put("data-partition-id", dataPartitionIdValue.getStringValue());
attributes.put("account-id", accountIdValue.getStringValue());
convertedMessage.attributes = attributes;
return convertedMessage;
}
private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException {
// TODO: integrate with entitlements
URL url = new URL(this.targetURL);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Length",
Integer.toString(body.getBytes().length));;
connection.setRequestProperty("Content-Type",
"application/json");
connection.setRequestProperty("data-partition-id", dataPartitionId);
connection.setRequestProperty("user", "opendes@byoc.local");
String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8
connection.setRequestProperty("Authorization", "Basic "+encoded);
connection.setUseCaches(false);
connection.setDoOutput(true);
return connection;
}
private void sendRequest(HttpURLConnection connection, String body) throws IOException {
DataOutputStream wr = new DataOutputStream (
connection.getOutputStream());
wr.writeBytes(body);
wr.close();
}
private void getResponse(HttpURLConnection connection) throws IOException {
InputStream is = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = rd.readLine()) != null) {
this.response.append(line);
this.response.append('\r');
}
rd.close();
}
}
......@@ -2,74 +2,197 @@ package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.*;
import com.sun.org.apache.xpath.internal.Arg;
import org.apache.commons.cli.*;
import org.opengroup.osdu.core.aws.sqs.SQSBuilder;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class IndexerQueue {
public static void main(String[] args){
int maxIndexThreads = 50;
String region = "us-east-1";
String queueName = "dev-osdu-storage-queue";
AmazonSQS sqsClient = SQSBuilder.generateSqsClient(region);
/**
* // TODO: make number of messages 50
* int maxIndexThreads = 10;
* String region = "us-east-1";
* String queueName = "dev-osdu-storage-queue";
* String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker";
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
// TODO: what happens when the exception happens here?
public static void main(String[] args) throws ExecutionException, InterruptedException, ParseException {
Arguments arguments = getArguments(args);
// TODO: how long do in flight messages last?
AmazonSQS sqsClient = SQSBuilder.generateSqsClient(arguments.region);
ThreadPoolExecutor executorPool = getExecutor(arguments.maxIndexThreads, arguments.keepAliveTimeInMin);
//TODO: make this into a while loop
recursiveProcessQueue(sqsClient, arguments, executorPool);
}
// TODO: break down into smaller methods for cleaner code
private static void recursiveProcessQueue(AmazonSQS sqsClient, Arguments arguments, ThreadPoolExecutor executorPool)
throws ExecutionException, InterruptedException {
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
List<Message> messages = getMessages(sqsClient, queueName);
for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call);
futures.add(future);
}
try {
// TODO: change this to have each thread reach out to SQS
List<Message> messages = getMessages(sqsClient, arguments.queueName, arguments.maxIndexThreads);
if (!messages.isEmpty()) {
// TODO: better logging to cloudwatch
System.out.println(String.format("Processing %s messages from storage queue", messages.size()));
for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message, arguments.targetURL);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool);
futures.add(future);
}
List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
try {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture
.allOf(cfs)
.thenApply(ignored -> futures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<IndexProcessor> indexProcessors = results.get();
for(IndexProcessor indexProcessor : indexProcessors) {
if(indexProcessor.result == CallableResult.Pass && indexProcessor.exception == null) {
DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle);
deleteEntries.add(deleteEntry);
}
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture
.allOf(cfs)
.thenApply(ignored -> futures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<IndexProcessor> indexProcessors = results.get();
final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl();
for (IndexProcessor indexProcessor : indexProcessors) {
DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle);
deleteEntries.add(deleteEntry);
if (indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null) {
sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient);
}
}catch (ExecutionException | InterruptedException e){
// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records",
// e.getMessage(), e);
}
DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(queueName, deleteEntries);
sqsClient.deleteMessageBatch(deleteBatchRequest);
} catch (Exception e) {
// if (e.getCause() instanceof AppException) {
// throw (AppException) e.getCause();
// } else {
// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records",
// e.getMessage(), e);
// }
if(deleteEntries.size() > 0) {
DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(arguments.queueName, deleteEntries);
sqsClient.deleteMessageBatch(deleteBatchRequest);
}
recursiveProcessQueue(sqsClient, arguments, executorPool);
}
return;
}
private static ThreadPoolExecutor getExecutor(int maxIndexThreads, long keepAliveTimeInMin){
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
Executors.newFixedThreadPool(maxIndexThreads); // what is keep alive?
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, maxIndexThreads, keepAliveTimeInMin,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxIndexThreads), threadFactory, rejectionHandler);
return executorPool;
}
private static List<Message> getMessages(AmazonSQS sqsClient, String queueName){
final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQeueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
// TODO: make number of messages 50
private static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int maxThreads){
final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(maxThreads);
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest
.withMessageAttributeNames("data-partition-id", "account-id")).getMessages();
return messages;
}
private static Arguments getArguments(String[] args) throws ParseException {
CommandLine commandLine;
Option regionOption = Option.builder("r")
.required(true)
.desc("AWS Region")
.hasArg()
.build();
Option queue_name_option = Option.builder("q")
.required(true)
.desc("Storage Queue Name")
.hasArg()
.build();
Option url_option = Option.builder("u")
.required(true)
.desc("Url for indexer service")
.hasArg()
.build();
Option threads_option = Option.builder("t")
.required(true)
.desc("Max amount of threads hitting indexer service")
.hasArg()
.build();
Option dead_letter_queue_name_option = Option.builder("d")
.required(true)
.desc("Dead Letter Queue Name")
.hasArg()
.build();
Option keep_alive_time_in_min = Option.builder("k")
.required(true)
.desc("Keep Alive Time in Minutes")
.hasArg()
.build();
Options options = new Options();
options.addOption(regionOption);
options.addOption(queue_name_option);
options.addOption(url_option);
options.addOption(threads_option);
options.addOption(dead_letter_queue_name_option);
options.addOption(keep_alive_time_in_min);
CommandLineParser parser = new DefaultParser();
commandLine = parser.parse(options, args);
Arguments arguments = new Arguments();
if (commandLine.hasOption("r")){
arguments.region = commandLine.getOptionValue("r");
}
if (commandLine.hasOption("q")){
arguments.queueName = commandLine.getOptionValue("q");
}
if (commandLine.hasOption("u")){
arguments.targetURL = commandLine.getOptionValue("u");
}
if (commandLine.hasOption("t")){
arguments.maxIndexThreads = Integer.parseInt(commandLine.getOptionValue("t"));
}
if (commandLine.hasOption("d")){
arguments.deadLetterQueueName = commandLine.getOptionValue("d");
}
if (commandLine.hasOption("k")){
arguments.keepAliveTimeInMin = Long.parseLong(commandLine.getOptionValue("k"));
}
return arguments;
}
private static void sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){
Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes();
MessageAttributeValue exceptionAttribute = new MessageAttributeValue()
.withDataType("String");
String exceptionMessage = indexProcessor.exception.getMessage();
if(exceptionMessage == null){
exceptionMessage = "Empty";
}
StringWriter sw = new StringWriter();
indexProcessor.exception.printStackTrace(new PrintWriter(sw));
String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString());
exceptionAttribute.setStringValue(exceptionAsString);
messageAttributes.put("Exception", exceptionAttribute);
SendMessageRequest send_msg_request = new SendMessageRequest()
.withQueueUrl(deadLetterQueueUrl)
.withMessageBody(indexProcessor.message.getBody())
.withMessageAttributes(messageAttributes);
sqsClient.sendMessage(send_msg_request);
}
}
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}
}
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.Map;
public class SQSMessageBody {
public String Message;
public String Type;
public String MessageId;
public String Timestamp;
public Map<String, Object> MessageAttributes;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment