diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java similarity index 54% rename from provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java rename to provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java index c58f5ca28bb505e7bd68c3f8ead01000700f32ef..06455f24d0d8e523129e18e621800988bd9888eb 100644 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/Arguments.java +++ b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java @@ -1,6 +1,6 @@ package org.opengroup.osdu.indexerqueue.aws.api; -public class Arguments { +public class EnvironmentVariables { public String region; public String queueName; public String targetURL; @@ -9,8 +9,14 @@ public class Arguments { public int maxMessagesAllowed; public int maxIndexThreads; public long keepAliveTimeInMin; + public String indexerServiceAccountName; + public String indexerServiceAccountPassword; + public String cognitoClientId; + public String cognitoAuthFlow; + public String cognitoUser; + public String cognitoPassword; - public Arguments() { + public EnvironmentVariables() { this.region = System.getenv("AWS_REGION"); this.queueName = System.getenv("AWS_QUEUE_INDEXER_NAME"); this.targetURL = System.getenv("AWS_INDEXER_INDEX_API"); @@ -19,5 +25,11 @@ public class Arguments { this.maxBatchRequestCount = Integer.parseInt(System.getenv("MAX_REQUEST_COUNT")); this.maxMessagesAllowed = Integer.parseInt(System.getenv("MAX_MESSAGE_COUNT")); this.keepAliveTimeInMin = Long.parseLong(System.getenv("KEEP_ALIVE_IN_MINUTES")); + this.indexerServiceAccountName = System.getenv("INDEXER_SERVICE_ACCOUNT_NAME"); + this.indexerServiceAccountPassword = System.getenv("INDEXER_SERVICE_ACCOUNT_NAME"); + this.cognitoClientId = System.getenv("AWS_COGNITO_CLIENT_ID"); + this.cognitoAuthFlow = System.getenv("AWS_COGNITO_AUTH_FLOW"); + this.cognitoUser = System.getenv("AWS_COGNITO_AUTH_PARAMS_USER"); + this.cognitoPassword = System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD"); }; } diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java index 7f492a2678842735f24a5f389cc8a75f659c5e31..47ae685e44e23deb71e5b47f9b3f88ea967405d0 100644 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java +++ b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java @@ -27,12 +27,14 @@ public class IndexProcessor implements Callable<IndexProcessor> { public String receiptHandle; public StringBuilder response; public String targetURL; + public String indexerServiceAccountJWT; - public IndexProcessor(Message message, String targetUrl){ + public IndexProcessor(Message message, String targetUrl, String indexServiceAccountJWT){ this.message = message; this.targetURL = targetUrl; this.receiptHandle = message.getReceiptHandle(); result = CallableResult.Pass; + this.indexerServiceAccountJWT = indexServiceAccountJWT; } @Override @@ -84,7 +86,6 @@ public class IndexProcessor implements Callable<IndexProcessor> { } private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException { - // TODO: integrate with entitlements URL url = new URL(this.targetURL); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); @@ -93,10 +94,7 @@ public class IndexProcessor implements Callable<IndexProcessor> { connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("data-partition-id", dataPartitionId); - connection.setRequestProperty("user", "opendes@byoc.local"); - - String encoded = Base64.getEncoder().encodeToString(("opendes@byoc.local"+":"+"123").getBytes(StandardCharsets.UTF_8)); //Java 8 - connection.setRequestProperty("Authorization", "Basic "+encoded); + connection.setRequestProperty("Authorization", "Bearer " + this.indexerServiceAccountJWT); connection.setUseCaches(false); connection.setDoOutput(true); diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java index 071e44569b6cc8297a0e45c1bc073d753c7ac8e8..a86eb47bccce7484c8f9ccf8205ecd807ecbeb3c 100644 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java +++ b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java @@ -1,10 +1,13 @@ package org.opengroup.osdu.indexerqueue.aws.api; // TODO: rename package -import com.amazonaws.services.sqs.AmazonSQSConfig; + +import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opengroup.osdu.core.aws.sqs.SQSBuilder; +import org.opengroup.osdu.core.aws.cognito.AWSCognitoClient; +import org.opengroup.osdu.core.aws.cognito.CognitoBuilder; +import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; import java.util.List; import java.util.concurrent.ExecutionException; @@ -29,22 +32,30 @@ public class IndexerQueue { try { log.debug("Starting Indexer Queue and obtaining Arguments"); - Arguments arguments = new Arguments(); + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + log.debug("Retrieving indexer service account JWT"); + AWSCognitoClient cognitoClient = new AWSCognitoClient(environmentVariables.cognitoClientId, environmentVariables.cognitoAuthFlow, + environmentVariables.cognitoUser, environmentVariables.cognitoPassword); + String indexerServiceAccountJWT = cognitoClient.getToken(); + if(indexerServiceAccountJWT == null){ + log.error("Indexer service account not set up correctly"); + } - log.debug(String.format("Connecting to the SQS Queue: %s", arguments.queueName)); - AmazonSQSConfig sqsConfig = new AmazonSQSConfig(arguments.region); - sqsClient = sqsConfig.AmazonSQS(); + log.debug(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName)); + AmazonSQSConfig sqsConfig = new AmazonSQSConfig(environmentVariables.region); + AmazonSQS sqsClient = sqsConfig.AmazonSQS(); - log.debug(String.format("Creating a thread pool with %s threads", arguments.maxIndexThreads)); - ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(arguments.maxIndexThreads); + log.debug(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads)); + ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(environmentVariables.maxIndexThreads); - final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl(); - List<Message> messages = IndexerQueueService.getMessages(sqsClient, arguments.queueName, arguments.maxBatchRequestCount, arguments.maxMessagesAllowed); + final String deadLetterQueueUrl = sqsClient.getQueueUrl(environmentVariables.deadLetterQueueName).getQueueUrl(); + List<Message> messages = IndexerQueueService.getMessages(sqsClient, environmentVariables.queueName, environmentVariables.maxBatchRequestCount, environmentVariables.maxMessagesAllowed); log.debug(String.format("Processing %s messages from storage queue", messages.size())); if (!messages.isEmpty()) { - List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, arguments.targetURL, executorPool); + List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, environmentVariables.targetURL, executorPool, indexerServiceAccountJWT); log.debug(String.format("%s Messages Processed", indexProcessors.size())); List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList()); @@ -56,8 +67,8 @@ public class IndexerQueue { List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList()); log.debug(String.format("%s Messages Deleting", deleteEntries.size())); - final String sqsQueueUrl = sqsClient.getQueueUrl(arguments.queueName).getQueueUrl(); - List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, arguments.maxBatchRequestCount); + final String sqsQueueUrl = sqsClient.getQueueUrl(environmentVariables.queueName).getQueueUrl(); + List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, environmentVariables.maxBatchRequestCount); log.debug(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())); List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient); diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java index 5461b1ab04d1eae01ffe55ca6336ed9423077313..2f3a73819c5cf4f0577b6f3c33b93707a3477722 100644 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java +++ b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java @@ -17,10 +17,10 @@ import java.util.stream.Collectors; public class IndexerQueueService { - public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool) + public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool, String indexerServiceAccountJWT) throws ExecutionException, InterruptedException { - List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url); + List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url, indexerServiceAccountJWT); CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); CompletableFuture<List<IndexProcessor>> results = CompletableFuture @@ -46,10 +46,10 @@ public class IndexerQueueService { return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList()); } - public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url){ + public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url, String indexerServiceAccountJWT){ List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); for (final Message message : messages) { - IndexProcessor processor = new IndexProcessor(message, url); + IndexProcessor processor = new IndexProcessor(message, url, indexerServiceAccountJWT); CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); futures.add(future); } diff --git a/testing/indexer-test-aws/src/test/java/org/opengroup/osdu/util/ElasticUtilsAws.java b/testing/indexer-test-aws/src/test/java/org/opengroup/osdu/util/ElasticUtilsAws.java index 61ce6133a657337e7b405a4df1a012beb9a02866..d66598a99f188fc9d3eae88967a8d8d7d1e0eb7a 100644 --- a/testing/indexer-test-aws/src/test/java/org/opengroup/osdu/util/ElasticUtilsAws.java +++ b/testing/indexer-test-aws/src/test/java/org/opengroup/osdu/util/ElasticUtilsAws.java @@ -19,6 +19,5 @@ public class ElasticUtilsAws extends ElasticUtils { signer.setRegionName(password); HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(username, signer, credentials); return new RestHighLevelClient(RestClient.builder(HttpHost.create(host)).setHttpClientConfigCallback(configCallBack -> configCallBack.addInterceptorLast(interceptor))); - } } diff --git a/testing/indexer-test-core/src/main/java/org/opengroup/osdu/util/Config.java b/testing/indexer-test-core/src/main/java/org/opengroup/osdu/util/Config.java index c76cb4307f74d17257d0c540c8dd5f16be784421..8faa2dc401523d0aa456d9939c27e7f1e78afe24 100644 --- a/testing/indexer-test-core/src/main/java/org/opengroup/osdu/util/Config.java +++ b/testing/indexer-test-core/src/main/java/org/opengroup/osdu/util/Config.java @@ -2,24 +2,24 @@ package org.opengroup.osdu.util; public class Config { - private static final String DEFAULT_ELASTIC_HOST = ""; - private static final String DEFAULT_ELASTIC_USER_NAME = ""; - private static final String DEFAULT_ELASTIC_PASSWORD = ""; + private static final String DEFAULT_ELASTIC_HOST = "https://search-dev-osdu-indexer-i5bpf2gv4iv6ha2xi7rook2rga.us-east-1.es.amazonaws.com"; + private static final String DEFAULT_ELASTIC_USER_NAME = "es"; + private static final String DEFAULT_ELASTIC_PASSWORD = "us-east-1"; static final int PORT = 9243; - private static final String DEFAULT_INDEXER_HOST = ""; - private static final String DEFAULT_SEARCH_HOST = ""; - private static final String DEFAULT_STORAGE_HOST = ""; - private static final String DEFAULT_DATA_PARTITION_ID_TENANT1 = ""; - private static final String DEFAULT_DATA_PARTITION_ID_TENANT2 = ""; - private static final String DEFAULT_SEARCH_INTEGRATION_TESTER = ""; + private static final String DEFAULT_INDEXER_HOST = "http://127.0.0.1:8080/api/indexer/v2/"; + private static final String DEFAULT_SEARCH_HOST = "http://127.0.0.1:8082/api/search/v2/"; + private static final String DEFAULT_STORAGE_HOST = "http://127.0.0.1:8081/api/storage/v2/"; + private static final String DEFAULT_DATA_PARTITION_ID_TENANT1 = "opendes"; + private static final String DEFAULT_DATA_PARTITION_ID_TENANT2 = "common"; + private static final String DEFAULT_SEARCH_INTEGRATION_TESTER = "common"; private static final String DEFAULT_TARGET_AUDIENCE = ""; private static final String DEFAULT_LEGAL_TAG = ""; private static final String DEFAULT_OTHER_RELEVANT_DATA_COUNTRIES = ""; - private static final String DEFAULT_ENTITLEMENTS_DOMAIN = ""; + private static final String DEFAULT_ENTITLEMENTS_DOMAIN = "testing.com"; public static String getOtherRelevantDataCountries() {