Skip to content
Snippets Groups Projects
Commit ad23e937 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Indexer queue rough start

parent 05a5ca35
No related branches found
No related tags found
1 merge request!6Trusted ibm
Showing
with 309 additions and 5 deletions
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.opengroup.osdu.indexerqueue.aws</groupId>
<artifactId>indexer-queue-aws</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>indexer-queue-aws</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.651</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu.core.aws</groupId>
<artifactId>aws-osdu-util</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.327</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>7.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
package org.opengroup.osdu.indexerqueue.aws.api;
public enum CallableResult {
Pass,
Fail
}
package org.opengroup.osdu.indexerqueue.aws.api;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.Callable;
import com.amazonaws.services.sqs.model.Message;
public class IndexProcessor implements Callable<IndexProcessor> {
public CallableResult result;
public Exception exception;
private Message message;
public String messageId;
public String receiptHandle;
public StringBuilder response;
public IndexProcessor(Message message){
this.message = message;
this.receiptHandle = message.getReceiptHandle();
result = CallableResult.Pass;
}
@Override
public IndexProcessor call() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
HttpURLConnection connection = null;
try {
String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker";
URL url = new URL(targetURL);
SQSMessageBody messageBody = mapper.readValue(message.getBody(), SQSMessageBody.class);
RecordChangedMessages convertedMessage = new RecordChangedMessages();
convertedMessage.data = messageBody.Message;
convertedMessage.messageId = messageBody.MessageId;
this.messageId = messageBody.MessageId;
convertedMessage.publishTime = messageBody.Timestamp;
// "MessageAttributes" : {
// "account-id" : {"Type":"String","Value":"common"},
// "correlation-id" : {"Type":"String","Value":"5c43af36-d826-40cd-8e7f-604a5e7b2bc0"},
// "data-partition-id" : {"Type":"String","Value":"common"}
// }
convertedMessage.attributes = new HashMap<>();
convertedMessage.attributes.put("data-partition-id", "common");
convertedMessage.attributes.put("account-id", "common");
// Map<String, Object> messageAttributes = messageBody.MessageAttributes;
// for (Map.Entry<String, Object> entry : messageAttributes.entrySet()) {
// if(entry.getKey() == "data-partition-id"){
// String enStr = entry.getValue().toString();
// Map<String, String> dataPartitionValues = mapper.readValue(entry.getValue().toString(), Map.class);
// convertedMessage.attributes.put(entry.getKey(), dataPartitionValues.get("Value"));
// }
// }
String body = mapper.writeValueAsString(convertedMessage);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Length",
Integer.toString(body.getBytes().length));;
connection.setRequestProperty("Content-Type",
"application/json");
connection.setRequestProperty("data-partition-id", "common");
connection.setRequestProperty("user", "opendes@byoc.local");
String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8
connection.setRequestProperty("Authorization", "Basic "+encoded);
connection.setUseCaches(false);
connection.setDoOutput(true);
//Send request
DataOutputStream wr = new DataOutputStream (
connection.getOutputStream());
wr.writeBytes(body);
wr.close();
//Get Response
// TODO: check for 200 to set to pass
InputStream is = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = rd.readLine()) != null) {
this.response.append(line);
this.response.append('\r');
}
rd.close();
} catch (MalformedURLException e) {
result = CallableResult.Fail;
exception = e;
} catch (ProtocolException e) {
result = CallableResult.Fail;
exception = e;
} catch (IOException e) {
result = CallableResult.Fail;
exception = e;
} catch (Exception e){
result = CallableResult.Fail;
exception = e;
}
return this;
}
}
package org.opengroup.osdu.indexerqueue.aws.api;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import org.opengroup.osdu.core.aws.sqs.SQSBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class IndexerQueue {
public static void main(String[] args){
int maxIndexThreads = 50;
String region = "us-east-1";
String queueName = "dev-osdu-storage-queue";
AmazonSQS sqsClient = SQSBuilder.generateSqsClient(region);
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
List<Message> messages = getMessages(sqsClient, queueName);
for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call);
futures.add(future);
}
try {
List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
try {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture
.allOf(cfs)
.thenApply(ignored -> futures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<IndexProcessor> indexProcessors = results.get();
for(IndexProcessor indexProcessor : indexProcessors) {
if(indexProcessor.result == CallableResult.Pass && indexProcessor.exception == null) {
DeleteMessageBatchRequestEntry deleteEntry = new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle);
deleteEntries.add(deleteEntry);
}
}
}catch (ExecutionException | InterruptedException e){
// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records",
// e.getMessage(), e);
}
DeleteMessageBatchRequest deleteBatchRequest = new DeleteMessageBatchRequest(queueName, deleteEntries);
sqsClient.deleteMessageBatch(deleteBatchRequest);
} catch (Exception e) {
// if (e.getCause() instanceof AppException) {
// throw (AppException) e.getCause();
// } else {
// throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error during indexing records",
// e.getMessage(), e);
// }
}
}
private static List<Message> getMessages(AmazonSQS sqsClient, String queueName){
final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQeueueUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
// TODO: make number of messages 50
return messages;
}
}
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.Map;
// TODO: consolidate this model with core refactor
public class RecordChangedMessages {
public String messageId;
public String publishTime;
public String data;
public Map<String, String> attributes;
public String getMessageId(){
return this.messageId;
}
}
package org.opengroup.osdu.indexerqueue.aws.api;
import java.util.Map;
public class SQSMessageBody {
public String Message;
public String Type;
public String MessageId;
public String Timestamp;
public Map<String, Object> MessageAttributes;
}
......@@ -12,7 +12,10 @@
<artifactId>indexer-aws</artifactId>
<version>1.0-SNAPSHOT</version>
<name>indexer-aws</name>
<dependencies>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.opengroup.osdu.indexer</groupId>
<artifactId>indexer-core</artifactId>
......
......@@ -18,12 +18,16 @@ import javax.inject.Inject;
@Log
@RestController
@RequestMapping("/")
@RequestScope
public class RecordIndexerAwsApi {
@Inject
ElasticThrottlePoolImpl elasticThrottlePool;
@Inject
private IndexerService indexerService;
@Value("${aws.region}")
private String region;
......@@ -36,7 +40,7 @@ public class RecordIndexerAwsApi {
String responseMessage = "Currently Running";
ResponseEntity<String> httpResponse = new ResponseEntity(responseMessage, HttpStatus.ACCEPTED);
if(!elasticThrottlePool.isRunning) {
elasticThrottlePool.write();
elasticThrottlePool.write(indexerService);
responseMessage = "Started Indexing";
httpResponse = new ResponseEntity<>(responseMessage, HttpStatus.CREATED);
}
......
......@@ -12,7 +12,10 @@ import org.opengroup.osdu.indexer.service.IndexerService;
import org.opengroup.osdu.is.core.model.RecordChangedMessages;
import org.opengroup.osdu.is.core.util.AppException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
......@@ -36,8 +39,8 @@ public class ElasticThrottlePoolImpl {
private AmazonSQS sqsClient;
@Resource
private IndexerService indexerService;
// @Resource
// private IndexerService indexerService;
public Boolean isRunning = false;
......@@ -46,7 +49,7 @@ public class ElasticThrottlePoolImpl {
this.sqsClient = SQSBuilder.generateSqsClient(region);
}
public void write() {
public void write(IndexerService indexerService) {
isRunning = true;
final Gson gson = new Gson();
final String sqsQeueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
......
......@@ -4,6 +4,7 @@ package org.opengroup.osdu.indexer.aws.util.sqs;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import jdk.nashorn.internal.objects.annotations.Constructor;
import org.opengroup.osdu.indexer.aws.model.CallableResult;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.opengroup.osdu.indexer.util.RecordInfo;
......@@ -11,6 +12,7 @@ import org.opengroup.osdu.is.core.model.RecordChangedMessages;
import org.opengroup.osdu.is.core.util.AppException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment