Skip to content
Snippets Groups Projects
Commit a24dcda0 authored by Spencer Sutton's avatar Spencer Sutton
Browse files

Entitlements Additions to integration tests and queue processor

parent 0750b896
No related branches found
No related tags found
1 merge request!6Trusted ibm
package org.opengroup.osdu.indexerqueue.aws.api; package org.opengroup.osdu.indexerqueue.aws.api;
public class Arguments { public class EnvironmentVariables {
public String region; public String region;
public String queueName; public String queueName;
public String targetURL; public String targetURL;
...@@ -9,8 +9,14 @@ public class Arguments { ...@@ -9,8 +9,14 @@ public class Arguments {
public int maxMessagesAllowed; public int maxMessagesAllowed;
public int maxIndexThreads; public int maxIndexThreads;
public long keepAliveTimeInMin; public long keepAliveTimeInMin;
public String indexerServiceAccountName;
public String indexerServiceAccountPassword;
public String cognitoClientId;
public String cognitoAuthFlow;
public String cognitoUser;
public String cognitoPassword;
public Arguments() { public EnvironmentVariables() {
this.region = System.getenv("AWS_REGION"); this.region = System.getenv("AWS_REGION");
this.queueName = System.getenv("AWS_QUEUE_INDEXER_NAME"); this.queueName = System.getenv("AWS_QUEUE_INDEXER_NAME");
this.targetURL = System.getenv("AWS_INDEXER_INDEX_API"); this.targetURL = System.getenv("AWS_INDEXER_INDEX_API");
...@@ -19,5 +25,11 @@ public class Arguments { ...@@ -19,5 +25,11 @@ public class Arguments {
this.maxBatchRequestCount = Integer.parseInt(System.getenv("MAX_REQUEST_COUNT")); this.maxBatchRequestCount = Integer.parseInt(System.getenv("MAX_REQUEST_COUNT"));
this.maxMessagesAllowed = Integer.parseInt(System.getenv("MAX_MESSAGE_COUNT")); this.maxMessagesAllowed = Integer.parseInt(System.getenv("MAX_MESSAGE_COUNT"));
this.keepAliveTimeInMin = Long.parseLong(System.getenv("KEEP_ALIVE_IN_MINUTES")); this.keepAliveTimeInMin = Long.parseLong(System.getenv("KEEP_ALIVE_IN_MINUTES"));
this.indexerServiceAccountName = System.getenv("INDEXER_SERVICE_ACCOUNT_NAME");
this.indexerServiceAccountPassword = System.getenv("INDEXER_SERVICE_ACCOUNT_NAME");
this.cognitoClientId = System.getenv("AWS_COGNITO_CLIENT_ID");
this.cognitoAuthFlow = System.getenv("AWS_COGNITO_AUTH_FLOW");
this.cognitoUser = System.getenv("AWS_COGNITO_AUTH_PARAMS_USER");
this.cognitoPassword = System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD");
}; };
} }
...@@ -27,12 +27,14 @@ public class IndexProcessor implements Callable<IndexProcessor> { ...@@ -27,12 +27,14 @@ public class IndexProcessor implements Callable<IndexProcessor> {
public String receiptHandle; public String receiptHandle;
public StringBuilder response; public StringBuilder response;
public String targetURL; public String targetURL;
public String indexerServiceAccountJWT;
public IndexProcessor(Message message, String targetUrl){ public IndexProcessor(Message message, String targetUrl, String indexServiceAccountJWT){
this.message = message; this.message = message;
this.targetURL = targetUrl; this.targetURL = targetUrl;
this.receiptHandle = message.getReceiptHandle(); this.receiptHandle = message.getReceiptHandle();
result = CallableResult.Pass; result = CallableResult.Pass;
this.indexerServiceAccountJWT = indexServiceAccountJWT;
} }
@Override @Override
...@@ -84,7 +86,6 @@ public class IndexProcessor implements Callable<IndexProcessor> { ...@@ -84,7 +86,6 @@ public class IndexProcessor implements Callable<IndexProcessor> {
} }
private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException { private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException {
// TODO: integrate with entitlements
URL url = new URL(this.targetURL); URL url = new URL(this.targetURL);
HttpURLConnection connection = (HttpURLConnection) url.openConnection(); HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST"); connection.setRequestMethod("POST");
...@@ -93,10 +94,7 @@ public class IndexProcessor implements Callable<IndexProcessor> { ...@@ -93,10 +94,7 @@ public class IndexProcessor implements Callable<IndexProcessor> {
connection.setRequestProperty("Content-Type", connection.setRequestProperty("Content-Type",
"application/json"); "application/json");
connection.setRequestProperty("data-partition-id", dataPartitionId); connection.setRequestProperty("data-partition-id", dataPartitionId);
connection.setRequestProperty("user", "opendes@byoc.local"); connection.setRequestProperty("Authorization", "Bearer " + this.indexerServiceAccountJWT);
String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8
connection.setRequestProperty("Authorization", "Basic "+encoded);
connection.setUseCaches(false); connection.setUseCaches(false);
connection.setDoOutput(true); connection.setDoOutput(true);
......
package org.opengroup.osdu.indexerqueue.aws.api; package org.opengroup.osdu.indexerqueue.aws.api;
// TODO: rename package // TODO: rename package
import com.amazonaws.services.sqs.AmazonSQSConfig;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*; import com.amazonaws.services.sqs.model.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.opengroup.osdu.core.aws.sqs.SQSBuilder; import org.opengroup.osdu.core.aws.cognito.AWSCognitoClient;
import org.opengroup.osdu.core.aws.cognito.CognitoBuilder;
import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -29,22 +32,30 @@ public class IndexerQueue { ...@@ -29,22 +32,30 @@ public class IndexerQueue {
try { try {
log.debug("Starting Indexer Queue and obtaining Arguments"); log.debug("Starting Indexer Queue and obtaining Arguments");
Arguments arguments = new Arguments(); EnvironmentVariables environmentVariables = new EnvironmentVariables();
log.debug("Retrieving indexer service account JWT");
AWSCognitoClient cognitoClient = new AWSCognitoClient(environmentVariables.cognitoClientId, environmentVariables.cognitoAuthFlow,
environmentVariables.cognitoUser, environmentVariables.cognitoPassword);
String indexerServiceAccountJWT = cognitoClient.getToken();
if(indexerServiceAccountJWT == null){
log.error("Indexer service account not set up correctly");
}
log.debug(String.format("Connecting to the SQS Queue: %s", arguments.queueName)); log.debug(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName));
AmazonSQSConfig sqsConfig = new AmazonSQSConfig(arguments.region); AmazonSQSConfig sqsConfig = new AmazonSQSConfig(environmentVariables.region);
sqsClient = sqsConfig.AmazonSQS(); AmazonSQS sqsClient = sqsConfig.AmazonSQS();
log.debug(String.format("Creating a thread pool with %s threads", arguments.maxIndexThreads)); log.debug(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads));
ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(arguments.maxIndexThreads); ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(environmentVariables.maxIndexThreads);
final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl(); final String deadLetterQueueUrl = sqsClient.getQueueUrl(environmentVariables.deadLetterQueueName).getQueueUrl();
List<Message> messages = IndexerQueueService.getMessages(sqsClient, arguments.queueName, arguments.maxBatchRequestCount, arguments.maxMessagesAllowed); List<Message> messages = IndexerQueueService.getMessages(sqsClient, environmentVariables.queueName, environmentVariables.maxBatchRequestCount, environmentVariables.maxMessagesAllowed);
log.debug(String.format("Processing %s messages from storage queue", messages.size())); log.debug(String.format("Processing %s messages from storage queue", messages.size()));
if (!messages.isEmpty()) { if (!messages.isEmpty()) {
List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, arguments.targetURL, executorPool); List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, environmentVariables.targetURL, executorPool, indexerServiceAccountJWT);
log.debug(String.format("%s Messages Processed", indexProcessors.size())); log.debug(String.format("%s Messages Processed", indexProcessors.size()));
List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList()); List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList());
...@@ -56,8 +67,8 @@ public class IndexerQueue { ...@@ -56,8 +67,8 @@ public class IndexerQueue {
List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList()); List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList());
log.debug(String.format("%s Messages Deleting", deleteEntries.size())); log.debug(String.format("%s Messages Deleting", deleteEntries.size()));
final String sqsQueueUrl = sqsClient.getQueueUrl(arguments.queueName).getQueueUrl(); final String sqsQueueUrl = sqsClient.getQueueUrl(environmentVariables.queueName).getQueueUrl();
List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, arguments.maxBatchRequestCount); List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, environmentVariables.maxBatchRequestCount);
log.debug(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())); log.debug(String.format("%s Delete Batch Request Created", deleteBatchRequests.size()));
List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient); List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient);
......
...@@ -17,10 +17,10 @@ import java.util.stream.Collectors; ...@@ -17,10 +17,10 @@ import java.util.stream.Collectors;
public class IndexerQueueService { public class IndexerQueueService {
public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool) public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool, String indexerServiceAccountJWT)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url); List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url, indexerServiceAccountJWT);
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]);
CompletableFuture<List<IndexProcessor>> results = CompletableFuture CompletableFuture<List<IndexProcessor>> results = CompletableFuture
...@@ -46,10 +46,10 @@ public class IndexerQueueService { ...@@ -46,10 +46,10 @@ public class IndexerQueueService {
return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList()); return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList());
} }
public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url){ public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url, String indexerServiceAccountJWT){
List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>();
for (final Message message : messages) { for (final Message message : messages) {
IndexProcessor processor = new IndexProcessor(message, url); IndexProcessor processor = new IndexProcessor(message, url, indexerServiceAccountJWT);
CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool);
futures.add(future); futures.add(future);
} }
......
...@@ -19,6 +19,5 @@ public class ElasticUtilsAws extends ElasticUtils { ...@@ -19,6 +19,5 @@ public class ElasticUtilsAws extends ElasticUtils {
signer.setRegionName(password); signer.setRegionName(password);
HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(username, signer, credentials); HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(username, signer, credentials);
return new RestHighLevelClient(RestClient.builder(HttpHost.create(host)).setHttpClientConfigCallback(configCallBack -> configCallBack.addInterceptorLast(interceptor))); return new RestHighLevelClient(RestClient.builder(HttpHost.create(host)).setHttpClientConfigCallback(configCallBack -> configCallBack.addInterceptorLast(interceptor)));
} }
} }
...@@ -2,24 +2,24 @@ package org.opengroup.osdu.util; ...@@ -2,24 +2,24 @@ package org.opengroup.osdu.util;
public class Config { public class Config {
private static final String DEFAULT_ELASTIC_HOST = ""; private static final String DEFAULT_ELASTIC_HOST = "https://search-dev-osdu-indexer-i5bpf2gv4iv6ha2xi7rook2rga.us-east-1.es.amazonaws.com";
private static final String DEFAULT_ELASTIC_USER_NAME = ""; private static final String DEFAULT_ELASTIC_USER_NAME = "es";
private static final String DEFAULT_ELASTIC_PASSWORD = ""; private static final String DEFAULT_ELASTIC_PASSWORD = "us-east-1";
static final int PORT = 9243; static final int PORT = 9243;
private static final String DEFAULT_INDEXER_HOST = ""; private static final String DEFAULT_INDEXER_HOST = "http://127.0.0.1:8080/api/indexer/v2/";
private static final String DEFAULT_SEARCH_HOST = ""; private static final String DEFAULT_SEARCH_HOST = "http://127.0.0.1:8082/api/search/v2/";
private static final String DEFAULT_STORAGE_HOST = ""; private static final String DEFAULT_STORAGE_HOST = "http://127.0.0.1:8081/api/storage/v2/";
private static final String DEFAULT_DATA_PARTITION_ID_TENANT1 = ""; private static final String DEFAULT_DATA_PARTITION_ID_TENANT1 = "opendes";
private static final String DEFAULT_DATA_PARTITION_ID_TENANT2 = ""; private static final String DEFAULT_DATA_PARTITION_ID_TENANT2 = "common";
private static final String DEFAULT_SEARCH_INTEGRATION_TESTER = ""; private static final String DEFAULT_SEARCH_INTEGRATION_TESTER = "common";
private static final String DEFAULT_TARGET_AUDIENCE = ""; private static final String DEFAULT_TARGET_AUDIENCE = "";
private static final String DEFAULT_LEGAL_TAG = ""; private static final String DEFAULT_LEGAL_TAG = "";
private static final String DEFAULT_OTHER_RELEVANT_DATA_COUNTRIES = ""; private static final String DEFAULT_OTHER_RELEVANT_DATA_COUNTRIES = "";
private static final String DEFAULT_ENTITLEMENTS_DOMAIN = ""; private static final String DEFAULT_ENTITLEMENTS_DOMAIN = "testing.com";
public static String getOtherRelevantDataCountries() { public static String getOtherRelevantDataCountries() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment