diff --git a/provider/indexer-aws/indexer-aws-queue-service/pom.xml b/provider/indexer-aws/indexer-aws-queue-service/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..42549d4fccefec2402660c0eaeb68c97b887bf8b --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.opengroup.osdu.indexerqueue.aws</groupId> + <artifactId>indexer-queue-aws</artifactId> + <version>1.0-SNAPSHOT</version> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + </plugins> + </build> + <name>indexer-queue-aws</name> + <packaging>jar</packaging> + + + <dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-core</artifactId> + <version>1.11.651</version> + </dependency> + <dependency> + <groupId>org.opengroup.osdu.core.aws</groupId> + <artifactId>aws-osdu-util</artifactId> + <version>0.0.1</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + <version>1.11.327</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>2.0.2</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>7.6.0</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + <version>1</version> + <scope>compile</scope> + </dependency> + </dependencies> +</project> diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java new file mode 100644 index 0000000000000000000000000000000000000000..44c5a4a14368a87c77ec8107956f3ffb39d7e401 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java @@ -0,0 +1,6 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +public enum CallableResult { + Pass, + Fail +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..9e63b70e592a87edca4867f6c319adadbfe33fa3 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java @@ -0,0 +1,115 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + + + +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.Callable; +import com.amazonaws.services.sqs.model.Message; + + +public class IndexProcessor implements Callable<IndexProcessor> { + public CallableResult result; + public Exception exception; + private Message message; + public String messageId; + public String receiptHandle; + public StringBuilder response; + + public IndexProcessor(Message message){ + this.message = message; + this.receiptHandle = message.getReceiptHandle(); + result = CallableResult.Pass; + } + + @Override + public IndexProcessor call() { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + HttpURLConnection connection = null; + try { + String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker"; + URL url = new URL(targetURL); + + SQSMessageBody messageBody = mapper.readValue(message.getBody(), SQSMessageBody.class); + RecordChangedMessages convertedMessage = new RecordChangedMessages(); + convertedMessage.data = messageBody.Message; + convertedMessage.messageId = messageBody.MessageId; + this.messageId = messageBody.MessageId; + convertedMessage.publishTime = messageBody.Timestamp; +// "MessageAttributes" : { +// "account-id" : {"Type":"String","Value":"common"}, +// "correlation-id" : {"Type":"String","Value":"5c43af36-d826-40cd-8e7f-604a5e7b2bc0"}, +// "data-partition-id" : {"Type":"String","Value":"common"} +// } + convertedMessage.attributes = new HashMap<>(); + convertedMessage.attributes.put("data-partition-id", "common"); + convertedMessage.attributes.put("account-id", "common"); +// Map<String, Object> messageAttributes = messageBody.MessageAttributes; +// for (Map.Entry<String, Object> entry : messageAttributes.entrySet()) { +// if(entry.getKey() == "data-partition-id"){ +// String enStr = entry.getValue().toString(); +// Map<String, String> dataPartitionValues = mapper.readValue(entry.getValue().toString(), Map.class); +// convertedMessage.attributes.put(entry.getKey(), dataPartitionValues.get("Value")); +// } +// } + + String body = mapper.writeValueAsString(convertedMessage); + + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Length", + Integer.toString(body.getBytes().length));; + connection.setRequestProperty("Content-Type", + "application/json"); + connection.setRequestProperty("data-partition-id", "common"); + connection.setRequestProperty("user", "opendes@byoc.local"); + + String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8 + connection.setRequestProperty("Authorization", "Basic "+encoded); + + connection.setUseCaches(false); + connection.setDoOutput(true); + + //Send request + DataOutputStream wr = new DataOutputStream ( + connection.getOutputStream()); + wr.writeBytes(body); + wr.close(); + + //Get Response + // TODO: check for 200 to set to pass + InputStream is = connection.getInputStream(); + BufferedReader rd = new BufferedReader(new InputStreamReader(is)); + String line; + while ((line = rd.readLine()) != null) { + this.response.append(line); + this.response.append('\r'); + } + rd.close(); + } catch (MalformedURLException e) { + result = CallableResult.Fail; + exception = e; + } catch (ProtocolException e) { + result = CallableResult.Fail; + exception = e; + } catch (IOException e) { + result = CallableResult.Fail; + exception = e; + } catch (Exception e){ + result = CallableResult.Fail; + exception = e; + } + return this; + } +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..568def5821720a2816907cf89d858e9c9b00c6dd --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java @@ -0,0 +1,75 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import org.opengroup.osdu.core.aws.sqs.SQSBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class IndexerQueue { + public static void main(String[] args){ + int maxIndexThreads = 50; + String region = "us-east-1"; + String queueName = "dev-osdu-storage-queue"; + AmazonSQS sqsClient = SQSBuilder.generateSqsClient(region); + + List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); + List<Message> messages = getMessages(sqsClient, queueName); + + for (final Message message : messages) { + IndexProcessor processor = new IndexProcessor(message); + CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call); + futures.add(future); + } + try { + List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(); + try { + CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); + CompletableFuture<List<IndexProcessor>> results = CompletableFuture + .allOf(cfs) + .thenApply(ignored -> futures + .stream() + .map(CompletableFuture::join) + .collect(Collectors.toList())); + + List<IndexProcessor> indexProcessors = results.get(); + + for(IndexProcessor indexProcessor : indexProcessors) { + if(indexProcessor.result == CallableResult.Pass && indexProcessor.exception == null) { + DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle); + deleteEntries.add(deleteEntry); + } + } + }catch (ExecutionException | InterruptedException e){ +// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records", +// e.getMessage(), e); + } + DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(queueName, deleteEntries); + sqsClient.deleteMessageBatch(deleteBatchRequest); + } catch (Exception e) { +// if (e.getCause() instanceof AppException) { +// throw (AppException) e.getCause(); +// } else { +// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records", +// e.getMessage(), e); +// } + } + } + + private static List<Message> getMessages(AmazonSQS sqsClient, String queueName){ + final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQeueueUrl); + receiveMessageRequest.setMaxNumberOfMessages(10); + List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + // TODO: make number of messages 50 + return messages; + } +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java new file mode 100644 index 0000000000000000000000000000000000000000..5d90f423419abbb1ad7cd111c569c25b318868d3 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java @@ -0,0 +1,15 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +import java.util.Map; + +// TODO: consolidate this model with core refactor +public class RecordChangedMessages { + public String messageId; + public String publishTime; + public String data; + public Map<String, String> attributes; + + public String getMessageId(){ + return this.messageId; + } +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java new file mode 100644 index 0000000000000000000000000000000000000000..2740c8845a7992bca27e022fbf63247d0ab81dc0 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/SQSMessageBody.java @@ -0,0 +1,11 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +import java.util.Map; + +public class SQSMessageBody { + public String Message; + public String Type; + public String MessageId; + public String Timestamp; + public Map<String, Object> MessageAttributes; +} diff --git a/provider/indexer-aws/pom.xml b/provider/indexer-aws/pom.xml index b645226fd74811bfee5a9faf5a529b5e44f0b764..42ee5831a8a4a4d8cf05769a0120cf708a2d80b5 100644 --- a/provider/indexer-aws/pom.xml +++ b/provider/indexer-aws/pom.xml @@ -12,7 +12,10 @@ <artifactId>indexer-aws</artifactId> <version>1.0-SNAPSHOT</version> <name>indexer-aws</name> - <dependencies> + <packaging>jar</packaging> + + + <dependencies> <dependency> <groupId>org.opengroup.osdu.indexer</groupId> <artifactId>indexer-core</artifactId> diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/api/RecordIndexerAwsApi.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/api/RecordIndexerAwsApi.java index 3e3f95e6e838081096713af79628a6528a21f198..4d2b3ba62220b304916a00d8705ac43418f4843d 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/api/RecordIndexerAwsApi.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/api/RecordIndexerAwsApi.java @@ -18,12 +18,16 @@ import javax.inject.Inject; @Log @RestController @RequestMapping("/") +@RequestScope public class RecordIndexerAwsApi { @Inject ElasticThrottlePoolImpl elasticThrottlePool; + @Inject + private IndexerService indexerService; + @Value("${aws.region}") private String region; @@ -36,7 +40,7 @@ public class RecordIndexerAwsApi { String responseMessage = "Currently Running"; ResponseEntity<String> httpResponse = new ResponseEntity(responseMessage, HttpStatus.ACCEPTED); if(!elasticThrottlePool.isRunning) { - elasticThrottlePool.write(); + elasticThrottlePool.write(indexerService); responseMessage = "Started Indexing"; httpResponse = new ResponseEntity<>(responseMessage, HttpStatus.CREATED); } diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/ElasticThrottlePoolImpl.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/ElasticThrottlePoolImpl.java index 9ab5739e07151254d64e77b491431f6a45934324..3188b4d5a818f66228d4e73d7abdbc8b138aa46a 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/ElasticThrottlePoolImpl.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/ElasticThrottlePoolImpl.java @@ -12,7 +12,10 @@ import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.is.core.model.RecordChangedMessages; import org.opengroup.osdu.is.core.util.AppException; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import org.springframework.web.context.annotation.RequestScope; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -36,8 +39,8 @@ public class ElasticThrottlePoolImpl { private AmazonSQS sqsClient; - @Resource - private IndexerService indexerService; +// @Resource +// private IndexerService indexerService; public Boolean isRunning = false; @@ -46,7 +49,7 @@ public class ElasticThrottlePoolImpl { this.sqsClient = SQSBuilder.generateSqsClient(region); } - public void write() { + public void write(IndexerService indexerService) { isRunning = true; final Gson gson = new Gson(); final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/sqs/IndexProcessor.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/sqs/IndexProcessor.java index 9b1d985e25d8ff4775924cf44a04915bb693cd95..ec97d610f2bb86f5cb9baf2a601badc68b6c79e7 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/sqs/IndexProcessor.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/sqs/IndexProcessor.java @@ -4,6 +4,7 @@ package org.opengroup.osdu.indexer.aws.util.sqs; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonParseException; +import jdk.nashorn.internal.objects.annotations.Constructor; import org.opengroup.osdu.indexer.aws.model.CallableResult; import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.indexer.util.RecordInfo; @@ -11,6 +12,7 @@ import org.opengroup.osdu.is.core.model.RecordChangedMessages; import org.opengroup.osdu.is.core.util.AppException; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.web.context.annotation.RequestScope; import javax.annotation.PostConstruct; import javax.inject.Inject;