diff --git a/provider/indexer-aws/indexer-aws-queue-service/pom.xml b/provider/indexer-aws/indexer-aws-queue-service/pom.xml index 42549d4fccefec2402660c0eaeb68c97b887bf8b..484e18c988b6273926426f9f1705e46c9390689e 100644 --- a/provider/indexer-aws/indexer-aws-queue-service/pom.xml +++ b/provider/indexer-aws/indexer-aws-queue-service/pom.xml @@ -66,5 +66,10 @@ <version>1</version> <scope>compile</scope> </dependency> - </dependencies> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.4</version> + </dependency> + </dependencies> </project> diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java new file mode 100644 index 0000000000000000000000000000000000000000..b495b0f1a177088ab55e5e1b85f6bcd0d16ce229 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java @@ -0,0 +1,10 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +public class Arguments { + public String region; + public String queueName; + public String targetURL; + public int maxIndexThreads; + public String deadLetterQueueName; + public long keepAliveTimeInMin; +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java index 9e63b70e592a87edca4867f6c319adadbfe33fa3..7f492a2678842735f24a5f389cc8a75f659c5e31 100644 --- a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java @@ -2,6 +2,8 @@ package org.opengroup.osdu.indexerqueue.aws.api; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.xray.model.Http; import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -20,13 +22,15 @@ import com.amazonaws.services.sqs.model.Message; public class IndexProcessor implements Callable<IndexProcessor> { public CallableResult result; public Exception exception; - private Message message; + public Message message; public String messageId; public String receiptHandle; public StringBuilder response; + public String targetURL; - public IndexProcessor(Message message){ + public IndexProcessor(Message message, String targetUrl){ this.message = message; + this.targetURL = targetUrl; this.receiptHandle = message.getReceiptHandle(); result = CallableResult.Pass; } @@ -34,69 +38,18 @@ public class IndexProcessor implements Callable<IndexProcessor> { @Override public IndexProcessor call() { ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, - false); - HttpURLConnection connection = null; + try { - String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker"; - URL url = new URL(targetURL); - - SQSMessageBody messageBody = mapper.readValue(message.getBody(), SQSMessageBody.class); - RecordChangedMessages convertedMessage = new RecordChangedMessages(); - convertedMessage.data = messageBody.Message; - convertedMessage.messageId = messageBody.MessageId; - this.messageId = messageBody.MessageId; - convertedMessage.publishTime = messageBody.Timestamp; -// "MessageAttributes" : { -// "account-id" : {"Type":"String","Value":"common"}, -// "correlation-id" : {"Type":"String","Value":"5c43af36-d826-40cd-8e7f-604a5e7b2bc0"}, -// "data-partition-id" : {"Type":"String","Value":"common"} -// } - convertedMessage.attributes = new HashMap<>(); - convertedMessage.attributes.put("data-partition-id", "common"); - convertedMessage.attributes.put("account-id", "common"); -// Map<String, Object> messageAttributes = messageBody.MessageAttributes; -// for (Map.Entry<String, Object> entry : messageAttributes.entrySet()) { -// if(entry.getKey() == "data-partition-id"){ -// String enStr = entry.getValue().toString(); -// Map<String, String> dataPartitionValues = mapper.readValue(entry.getValue().toString(), Map.class); -// convertedMessage.attributes.put(entry.getKey(), dataPartitionValues.get("Value")); -// } -// } + this.messageId = message.getMessageId(); + RecordChangedMessages convertedMessage = getConvertedMessage(message); String body = mapper.writeValueAsString(convertedMessage); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Length", - Integer.toString(body.getBytes().length));; - connection.setRequestProperty("Content-Type", - "application/json"); - connection.setRequestProperty("data-partition-id", "common"); - connection.setRequestProperty("user", "opendes@byoc.local"); - - String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8 - connection.setRequestProperty("Authorization", "Basic "+encoded); - - connection.setUseCaches(false); - connection.setDoOutput(true); - - //Send request - DataOutputStream wr = new DataOutputStream ( - connection.getOutputStream()); - wr.writeBytes(body); - wr.close(); - - //Get Response - // TODO: check for 200 to set to pass - InputStream is = connection.getInputStream(); - BufferedReader rd = new BufferedReader(new InputStreamReader(is)); - String line; - while ((line = rd.readLine()) != null) { - this.response.append(line); - this.response.append('\r'); - } - rd.close(); + HttpURLConnection connection = getConnection(body, convertedMessage.attributes.get("data-partition-id")); + + sendRequest(connection, body); + + getResponse(connection); } catch (MalformedURLException e) { result = CallableResult.Fail; exception = e; @@ -112,4 +65,59 @@ public class IndexProcessor implements Callable<IndexProcessor> { } return this; } + + private RecordChangedMessages getConvertedMessage(Message message){ + RecordChangedMessages convertedMessage = new RecordChangedMessages(); + convertedMessage.data = message.getBody(); + convertedMessage.messageId = message.getMessageId(); + + Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); + MessageAttributeValue dataPartitionIdValue = messageAttributes.get("data-partition-id"); + MessageAttributeValue accountIdValue = messageAttributes.get("account-id"); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("data-partition-id", dataPartitionIdValue.getStringValue()); + attributes.put("account-id", accountIdValue.getStringValue()); + + convertedMessage.attributes = attributes; + return convertedMessage; + } + + private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException { + // TODO: integrate with entitlements + URL url = new URL(this.targetURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Length", + Integer.toString(body.getBytes().length));; + connection.setRequestProperty("Content-Type", + "application/json"); + connection.setRequestProperty("data-partition-id", dataPartitionId); + connection.setRequestProperty("user", "opendes@byoc.local"); + + String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8 + connection.setRequestProperty("Authorization", "Basic "+encoded); + + connection.setUseCaches(false); + connection.setDoOutput(true); + return connection; + } + + private void sendRequest(HttpURLConnection connection, String body) throws IOException { + DataOutputStream wr = new DataOutputStream ( + connection.getOutputStream()); + wr.writeBytes(body); + wr.close(); + } + + private void getResponse(HttpURLConnection connection) throws IOException { + InputStream is = connection.getInputStream(); + BufferedReader rd = new BufferedReader(new InputStreamReader(is)); + String line; + while ((line = rd.readLine()) != null) { + this.response.append(line); + this.response.append('\r'); + } + rd.close(); + } } diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java index 568def5821720a2816907cf89d858e9c9b00c6dd..a1038324908c48fa8acf45e5404df62021070a75 100644 --- a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java @@ -2,74 +2,197 @@ package org.opengroup.osdu.indexerqueue.aws.api; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.*; +import com.sun.org.apache.xpath.internal.Arg; +import org.apache.commons.cli.*; import org.opengroup.osdu.core.aws.sqs.SQSBuilder; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.concurrent.*; import java.util.stream.Collectors; public class IndexerQueue { - public static void main(String[] args){ - int maxIndexThreads = 50; - String region = "us-east-1"; - String queueName = "dev-osdu-storage-queue"; - AmazonSQS sqsClient = SQSBuilder.generateSqsClient(region); + /** + * // TODO: make number of messages 50 + * int maxIndexThreads = 10; + * String region = "us-east-1"; + * String queueName = "dev-osdu-storage-queue"; + * String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker"; + * @param args + * @throws ExecutionException + * @throws InterruptedException + */ + // TODO: what happens when the exception happens here? + public static void main(String[] args) throws ExecutionException, InterruptedException, ParseException { + Arguments arguments = getArguments(args); + // TODO: how long do in flight messages last? + AmazonSQS sqsClient = SQSBuilder.generateSqsClient(arguments.region); + ThreadPoolExecutor executorPool = getExecutor(arguments.maxIndexThreads, arguments.keepAliveTimeInMin); + //TODO: make this into a while loop + recursiveProcessQueue(sqsClient, arguments, executorPool); + } + + // TODO: break down into smaller methods for cleaner code + private static void recursiveProcessQueue(AmazonSQS sqsClient, Arguments arguments, ThreadPoolExecutor executorPool) + throws ExecutionException, InterruptedException { List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); - List<Message> messages = getMessages(sqsClient, queueName); - for (final Message message : messages) { - IndexProcessor processor = new IndexProcessor(message); - CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call); - futures.add(future); - } - try { + // TODO: change this to have each thread reach out to SQS + List<Message> messages = getMessages(sqsClient, arguments.queueName, arguments.maxIndexThreads); + + if (!messages.isEmpty()) { + // TODO: better logging to cloudwatch + System.out.println(String.format("Processing %s messages from storage queue", messages.size())); + for (final Message message : messages) { + IndexProcessor processor = new IndexProcessor(message, arguments.targetURL); + CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); + futures.add(future); + } + List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(); - try { - CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); - CompletableFuture<List<IndexProcessor>> results = CompletableFuture - .allOf(cfs) - .thenApply(ignored -> futures - .stream() - .map(CompletableFuture::join) - .collect(Collectors.toList())); - - List<IndexProcessor> indexProcessors = results.get(); - - for(IndexProcessor indexProcessor : indexProcessors) { - if(indexProcessor.result == CallableResult.Pass && indexProcessor.exception == null) { - DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle); - deleteEntries.add(deleteEntry); - } + CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); + CompletableFuture<List<IndexProcessor>> results = CompletableFuture + .allOf(cfs) + .thenApply(ignored -> futures + .stream() + .map(CompletableFuture::join) + .collect(Collectors.toList())); + + List<IndexProcessor> indexProcessors = results.get(); + + final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl(); + for (IndexProcessor indexProcessor : indexProcessors) { + DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle); + deleteEntries.add(deleteEntry); + if (indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null) { + sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient); } - }catch (ExecutionException | InterruptedException e){ -// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records", -// e.getMessage(), e); } - DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(queueName, deleteEntries); - sqsClient.deleteMessageBatch(deleteBatchRequest); - } catch (Exception e) { -// if (e.getCause() instanceof AppException) { -// throw (AppException) e.getCause(); -// } else { -// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records", -// e.getMessage(), e); -// } + + if(deleteEntries.size() > 0) { + DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(arguments.queueName, deleteEntries); + sqsClient.deleteMessageBatch(deleteBatchRequest); + } + recursiveProcessQueue(sqsClient, arguments, executorPool); } + return; + } + + private static ThreadPoolExecutor getExecutor(int maxIndexThreads, long keepAliveTimeInMin){ + //RejectedExecutionHandler implementation + RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); + //Get the ThreadFactory implementation to use + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + Executors.newFixedThreadPool(maxIndexThreads); // what is keep alive? + ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, maxIndexThreads, keepAliveTimeInMin, + TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxIndexThreads), threadFactory, rejectionHandler); + return executorPool; } - private static List<Message> getMessages(AmazonSQS sqsClient, String queueName){ - final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQeueueUrl); - receiveMessageRequest.setMaxNumberOfMessages(10); - List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); - // TODO: make number of messages 50 + private static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int maxThreads){ + final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); + receiveMessageRequest.setMaxNumberOfMessages(maxThreads); + List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest + .withMessageAttributeNames("data-partition-id", "account-id")).getMessages(); return messages; } + + private static Arguments getArguments(String[] args) throws ParseException { + CommandLine commandLine; + Option regionOption = Option.builder("r") + .required(true) + .desc("AWS Region") + .hasArg() + .build(); + Option queue_name_option = Option.builder("q") + .required(true) + .desc("Storage Queue Name") + .hasArg() + .build(); + Option url_option = Option.builder("u") + .required(true) + .desc("Url for indexer service") + .hasArg() + .build(); + Option threads_option = Option.builder("t") + .required(true) + .desc("Max amount of threads hitting indexer service") + .hasArg() + .build(); + Option dead_letter_queue_name_option = Option.builder("d") + .required(true) + .desc("Dead Letter Queue Name") + .hasArg() + .build(); + Option keep_alive_time_in_min = Option.builder("k") + .required(true) + .desc("Keep Alive Time in Minutes") + .hasArg() + .build(); + + Options options = new Options(); + options.addOption(regionOption); + options.addOption(queue_name_option); + options.addOption(url_option); + options.addOption(threads_option); + options.addOption(dead_letter_queue_name_option); + options.addOption(keep_alive_time_in_min); + + CommandLineParser parser = new DefaultParser(); + + commandLine = parser.parse(options, args); + + Arguments arguments = new Arguments(); + if (commandLine.hasOption("r")){ + arguments.region = commandLine.getOptionValue("r"); + } + + if (commandLine.hasOption("q")){ + arguments.queueName = commandLine.getOptionValue("q"); + } + + if (commandLine.hasOption("u")){ + arguments.targetURL = commandLine.getOptionValue("u"); + } + + if (commandLine.hasOption("t")){ + arguments.maxIndexThreads = Integer.parseInt(commandLine.getOptionValue("t")); + } + + if (commandLine.hasOption("d")){ + arguments.deadLetterQueueName = commandLine.getOptionValue("d"); + } + + if (commandLine.hasOption("k")){ + arguments.keepAliveTimeInMin = Long.parseLong(commandLine.getOptionValue("k")); + } + return arguments; + } + + private static void sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){ + Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes(); + MessageAttributeValue exceptionAttribute = new MessageAttributeValue() + .withDataType("String"); + String exceptionMessage = indexProcessor.exception.getMessage(); + if(exceptionMessage == null){ + exceptionMessage = "Empty"; + } + + StringWriter sw = new StringWriter(); + indexProcessor.exception.printStackTrace(new PrintWriter(sw)); + String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString()); + exceptionAttribute.setStringValue(exceptionAsString); + messageAttributes.put("Exception", exceptionAttribute); + SendMessageRequest send_msg_request = new SendMessageRequest() + .withQueueUrl(deadLetterQueueUrl) + .withMessageBody(indexProcessor.message.getBody()) + .withMessageAttributes(messageAttributes); + sqsClient.sendMessage(send_msg_request); + } } diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..184e1eb87deb31a9b03b88d2185f977bec0ae23f --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java @@ -0,0 +1,11 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + System.out.println(r.toString() + " is rejected"); + } +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java deleted file mode 100644 index 2740c8845a7992bca27e022fbf63247d0ab81dc0..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.opengroup.osdu.indexerqueue.aws.api; - -import java.util.Map; - -public class SQSMessageBody { - public String Message; - public String Type; - public String MessageId; - public String Timestamp; - public Map<String, Object> MessageAttributes; -}