diff --git a/provider/indexer-aws/indexer-queue-aws/Dockerfile b/provider/indexer-aws/indexer-queue-aws/Dockerfile deleted file mode 100644 index f0427338f114c87c49298b04f1e9c512b254966f..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/Dockerfile +++ /dev/null @@ -1,7 +0,0 @@ -FROM amazoncorretto:8 - -WORKDIR / -COPY provider/indexer-aws/indexer-queue-aws/target/indexer-queue-aws-1.0-SNAPSHOT.jar indexer-queue-aws-1.0-SNAPSHOT.jar -EXPOSE 8080 - -CMD ["java","-jar", "indexer-queue-aws-1.0-SNAPSHOT.jar"] \ No newline at end of file diff --git a/provider/indexer-aws/indexer-queue-aws/pom.xml b/provider/indexer-aws/indexer-queue-aws/pom.xml deleted file mode 100644 index 31ffa4f2929c6615279369fe952a4465b3a678f3..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/pom.xml +++ /dev/null @@ -1,206 +0,0 @@ -<?xml version="1.0"?> -<!-- - Copyright © Amazon Web Services - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - <groupId>org.opengroup.osdu.indexerqueue.aws</groupId> - <artifactId>indexer-queue-aws</artifactId> - <version>1.0-SNAPSHOT</version> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>3.2.1</version> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <manifestEntries> - <Main-Class> - org.opengroup.osdu.indexerqueue.aws.api.IndexerQueue - </Main-Class> - <X-Compile-Source-JDK>8</X-Compile-Source-JDK> - <X-Compile-Target-JDK>8</X-Compile-Target-JDK> - </manifestEntries> - </transformer> - </transformers> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>8</source> - <target>8</target> - </configuration> - </plugin> - </plugins> - <extensions> - <extension> - <groupId>com.github.seahen</groupId> - <artifactId>maven-s3-wagon</artifactId> - <version>1.3.0</version> - </extension> - </extensions> - </build> - <name>indexer-queue-aws</name> - <packaging>jar</packaging> - - <properties> - <aws.version>1.11.637</aws.version> - <deployment.environment>dev</deployment.environment> - <deployment.repositorybucket>osdu-local-maven-repository</deployment.repositorybucket> - </properties> - - <!-- Internal packages --> - <dependencies> - <dependency> - <groupId>org.opengroup.osdu.core.aws</groupId> - <artifactId>aws-osdu-util</artifactId> - <version>0.0.1</version> - </dependency> - - <!-- AWS managed packages --> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-core</artifactId> - <version>1.11.651</version> - </dependency> - - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk</artifactId> - <version>1.11.327</version> - </dependency> - - <!-- Third party Apache 2.0 license packages --> - <dependency> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.8.1</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>2.12.1</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - <version>1.7.28</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-to-slf4j</artifactId> - <version>2.12.1</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <version>2.12.1</version> - </dependency> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-core</artifactId> - <version>7.6.0</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>javax.inject</groupId> - <artifactId>javax.inject</artifactId> - <version>1</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>1.4</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - <version>4.1</version> - </dependency> - - <!-- Testing packages --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.10.19</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>2.0.2</version> - </dependency> - </dependencies> - - <repositories> - <repository> - <id>maven-s3-release-repo</id> - <name>S3 Maven Release Repository</name> - <url>s3://${deployment.repositorybucket}/release</url> - </repository> - - <repository> - <id>maven-s3-snapshot-repo</id> - <name>S3 Maven Snapshot Repository</name> - <url>s3://${deployment.repositorybucket}/snapshot</url> - </repository> - - <repository> - <id>local.release</id> - <url>file:../../local-release-dir</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - </snapshots> - </repository> - </repositories> - - <distributionManagement> - <site> - <id>s3.site</id> - <url>s3://${deployment.repositorybucket}/site</url> - </site> - <repository> - <id>s3.release</id> - <url>s3://${deployment.repositorybucket}/release</url> - </repository> - <snapshotRepository> - <id>s3.snapshot</id> - <url>s3://${deployment.repositorybucket}/snapshot</url> - </snapshotRepository> - </distributionManagement> -</project> diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java deleted file mode 100644 index f1ec465afe78386ab2edf42bf63f10eb672145de..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/CallableResult.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -public enum CallableResult { - Pass, - Fail -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java deleted file mode 100644 index 98cb1238626042d92fc2c2f4576fd81f2c7a7c00..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/EnvironmentVariables.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -public class EnvironmentVariables { - public String region; - public String queueName; - public String targetURL; - public String deadLetterQueueName; - public int maxBatchRequestCount; - public int maxMessagesAllowed; - public int maxIndexThreads; - public long keepAliveTimeInMin; - public String cognitoClientId; - public String cognitoAuthFlow; - public String cognitoUser; - public String cognitoPassword; - - public EnvironmentVariables() { - - this.region = System.getenv("AWS_REGION") != null ? System.getenv("AWS_REGION") : "us-east-1"; - this.queueName = System.getenv("AWS_QUEUE_INDEXER_NAME") != null ? System.getenv("AWS_QUEUE_INDEXER_NAME") : "dev-osdu-storage-queue"; - this.targetURL = System.getenv("AWS_INDEXER_INDEX_API") != null ? System.getenv("AWS_INDEXER_INDEX_API"): "ECSALB-os-indexer-1927005132.us-east-1.elb.amazonaws.com/api/indexer/v2/_dps/task-handlers/index-worker"; - this.deadLetterQueueName = System.getenv("AWS_DEADLETTER_QUEUE_NAME") != null ? System.getenv("AWS_DEADLETTER_QUEUE_NAME") : "dev-osdu-storage-dead-letter-queue"; - this.maxIndexThreads = System.getenv("MAX_INDEX_THREADS") != null ? Integer.parseInt(System.getenv("MAX_INDEX_THREADS")) : 50; - this.maxBatchRequestCount = System.getenv("MAX_REQUEST_COUNT") != null ? Integer.parseInt(System.getenv("MAX_REQUEST_COUNT")) : 10; - this.maxMessagesAllowed = System.getenv("MAX_MESSAGE_COUNT") != null ? Integer.parseInt(System.getenv("MAX_MESSAGE_COUNT")) : 100000; - this.keepAliveTimeInMin = System.getenv("KEEP_ALIVE_IN_MINUTES") != null ? Long.parseLong(System.getenv("KEEP_ALIVE_IN_MINUTES")) : 9999; - this.cognitoClientId = System.getenv("AWS_COGNITO_CLIENT_ID") != null ? System.getenv("AWS_COGNITO_CLIENT_ID") : "3rmgmg8mup281ttc1mbut1pimc"; - this.cognitoAuthFlow = System.getenv("AWS_COGNITO_AUTH_FLOW") != null ? System.getenv("AWS_COGNITO_AUTH_FLOW") : "USER_PASSWORD_AUTH"; - this.cognitoUser = System.getenv("AWS_COGNITO_AUTH_PARAMS_USER") != null ? System.getenv("AWS_COGNITO_AUTH_PARAMS_USER") : "test-user-with-access@testing.com"; - this.cognitoPassword = System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD") != null ? System.getenv("AWS_COGNITO_AUTH_PARAMS_PASSWORD"): "Password123*"; - }; -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java deleted file mode 100644 index 917abb9b2f3a86c66f3261a4135631cd8dc1bfd2..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessor.java +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - - - -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.xray.model.Http; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.*; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.ProtocolException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.Callable; -import com.amazonaws.services.sqs.model.Message; - - -public class IndexProcessor implements Callable<IndexProcessor> { - public CallableResult result; - public Exception exception; - public Message message; - public String messageId; - public String receiptHandle; - public StringBuilder response; - public String targetURL; - public String indexerServiceAccountJWT; - - public IndexProcessor(Message message, String targetUrl, String indexServiceAccountJWT){ - this.message = message; - this.targetURL = targetUrl; - this.receiptHandle = message.getReceiptHandle(); - result = CallableResult.Pass; - this.indexerServiceAccountJWT = indexServiceAccountJWT; - } - - @Override - public IndexProcessor call() { - ObjectMapper mapper = new ObjectMapper(); - - try { - this.messageId = message.getMessageId(); - - RecordChangedMessages convertedMessage = getConvertedMessage(message); - String body = mapper.writeValueAsString(convertedMessage); - - HttpURLConnection connection = getConnection(body, convertedMessage.attributes.get("data-partition-id")); - - sendRequest(connection, body); - - getResponse(connection); - } catch (MalformedURLException e) { - System.out.println(e.getMessage()); - result = CallableResult.Fail; - exception = e; - } catch (ProtocolException e) { - System.out.println(e.getMessage()); - result = CallableResult.Fail; - exception = e; - } catch (IOException e) { - System.out.println(e.getMessage()); - result = CallableResult.Fail; - exception = e; - } catch (Exception e){ - System.out.println(e.getMessage()); - result = CallableResult.Fail; - exception = e; - } - return this; - } - - private RecordChangedMessages getConvertedMessage(Message message){ - RecordChangedMessages convertedMessage = new RecordChangedMessages(); - convertedMessage.data = message.getBody(); - convertedMessage.messageId = message.getMessageId(); - - Map<String, MessageAttributeValue> messageAttributes = message.getMessageAttributes(); - MessageAttributeValue dataPartitionIdValue = messageAttributes.get("data-partition-id"); - MessageAttributeValue accountIdValue = messageAttributes.get("account-id"); - - Map<String, String> attributes = new HashMap<>(); - attributes.put("data-partition-id", dataPartitionIdValue.getStringValue()); - attributes.put("account-id", accountIdValue.getStringValue()); - - convertedMessage.attributes = attributes; - return convertedMessage; - } - - private HttpURLConnection getConnection(String body, String dataPartitionId) throws IOException { - URL url = new URL(this.targetURL); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Length", - Integer.toString(body.getBytes().length));; - connection.setRequestProperty("Content-Type", - "application/json"); - connection.setRequestProperty("data-partition-id", dataPartitionId); - connection.setRequestProperty("Authorization", "Bearer " + this.indexerServiceAccountJWT); - - connection.setUseCaches(false); - connection.setDoOutput(true); - return connection; - } - - private void sendRequest(HttpURLConnection connection, String body) throws IOException { - DataOutputStream wr = new DataOutputStream ( - connection.getOutputStream()); - wr.writeBytes(body); - wr.close(); - } - - private void getResponse(HttpURLConnection connection) throws IOException { - InputStream is = connection.getInputStream(); - BufferedReader rd = new BufferedReader(new InputStreamReader(is)); - String line; - while ((line = rd.readLine()) != null) { - this.response.append(line); - this.response.append('\r'); - } - rd.close(); - } -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java deleted file mode 100644 index 26ae681b5f610c253526e6e6d73c8ab5f7855253..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -import com.amazonaws.services.logs.AWSLogs; -import com.amazonaws.services.logs.model.InputLogEvent; -import com.amazonaws.services.logs.model.PutLogEventsRequest; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.*; -import org.opengroup.osdu.core.aws.cognito.AWSCognitoClient; -import org.opengroup.osdu.core.aws.logging.AmazonLogConfig; -import org.opengroup.osdu.core.aws.sqs.AmazonSQSConfig; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; - -public class IndexerQueue { - /** - * int maxIndexThreads = 10; - * String region = "us-east-1"; - * String queueName = "dev-osdu-storage-queue"; - * String targetURL = "http://127.0.0.1:8080/api/indexer/v2/_dps/task-handlers/index-worker"; - * @param args - * @throws ExecutionException - * @throws InterruptedException - */ - - public static void main(String[] args) { - EnvironmentVariables environmentVariables = new EnvironmentVariables(); -// AmazonLogConfig logConfig = new AmazonLogConfig(environmentVariables.region); -// AWSLogs log = logConfig.AmazonLog(); -// PutLogEventsRequest logRequest = new PutLogEventsRequest("ECSLogGroup-indexer-queue", "indexer-queue", new ArrayList<>()); - -// InputLogEvent logEvent = new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments"); -// logRequest.getLogEvents().add(logEvent); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT")); - System.out.println(environmentVariables.region); - System.out.println(environmentVariables.maxBatchRequestCount); - System.out.println(environmentVariables.queueName); - System.out.println(environmentVariables.cognitoAuthFlow); - System.out.println(environmentVariables.cognitoClientId); - System.out.println(environmentVariables.cognitoPassword); - System.out.println(environmentVariables.cognitoUser); - System.out.println(environmentVariables.deadLetterQueueName); - System.out.println(environmentVariables.maxIndexThreads); - System.out.println(environmentVariables.maxMessagesAllowed); - System.out.println(environmentVariables.targetURL); - System.out.println(environmentVariables.keepAliveTimeInMin); - - try { -// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments")); - -// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT")); - AWSCognitoClient cognitoClient = new AWSCognitoClient(environmentVariables.cognitoClientId, environmentVariables.cognitoAuthFlow, - environmentVariables.cognitoUser, environmentVariables.cognitoPassword); - String indexerServiceAccountJWT = cognitoClient.getToken(); - if(indexerServiceAccountJWT == null){ -// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Indexer service account not set up correctly")); - } - -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName))); - AmazonSQSConfig sqsConfig = new AmazonSQSConfig(environmentVariables.region); - AmazonSQS sqsClient = sqsConfig.AmazonSQS(); - -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads))); - ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(environmentVariables.maxIndexThreads); - - final String deadLetterQueueUrl = sqsClient.getQueueUrl(environmentVariables.deadLetterQueueName).getQueueUrl(); - System.out.println(deadLetterQueueUrl); - List<Message> messages = IndexerQueueService.getMessages(sqsClient, environmentVariables.queueName, environmentVariables.maxBatchRequestCount, environmentVariables.maxMessagesAllowed); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Processing %s messages from storage queue", messages.size()))); - - if (!messages.isEmpty()) { - - List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, environmentVariables.targetURL, executorPool, indexerServiceAccountJWT); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Processed", indexProcessors.size()))); - - List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList()); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Failed", failedProcessors.size()))); - - List<SendMessageResult> deadLetterResults = IndexerQueueService.sendMsgsToDeadLetterQueue(deadLetterQueueUrl, failedProcessors, sqsClient); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Dead Lettered", deadLetterResults.size()))); - - List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList()); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Deleting", deleteEntries.size()))); - - final String sqsQueueUrl = sqsClient.getQueueUrl(environmentVariables.queueName).getQueueUrl(); - List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, environmentVariables.maxBatchRequestCount); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Delete Batch Request Created", deleteBatchRequests.size()))); - - List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Requests Deleted", deleteMessageBatchResults.size()))); - } - - } catch (ExecutionException e) { - System.out.println(e); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage())); - } catch (InterruptedException e) { - System.out.println(e); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage())); - } catch (NullPointerException e) { - System.out.println(e); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage())); - }catch (Exception e) { - System.out.println(e); -// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage())); - } finally { - System.out.println("failing finally"); -// log.putLogEvents(logRequest); - } - } -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java deleted file mode 100644 index 0472d46029c840e61ac6bb4e00b55199433e129e..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.*; -import org.apache.commons.collections4.ListUtils; -import org.apache.logging.log4j.Logger; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; - -public class IndexerQueueService { - - public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool, String indexerServiceAccountJWT) - throws ExecutionException, InterruptedException { - - List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url, indexerServiceAccountJWT); - - CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); - CompletableFuture<List<IndexProcessor>> results = CompletableFuture - .allOf(cfs) - .thenApply(ignored -> futures - .stream() - .map(CompletableFuture::join) - .collect(Collectors.toList())); - - List<IndexProcessor> processed = results.get(); - executorPool.shutdown(); - - return processed; - } - - public static List<DeleteMessageBatchResult> deleteMessages(List<DeleteMessageBatchRequest> deleteBatchRequests, AmazonSQS sqsClient) { - return deleteBatchRequests.stream().map(deleteRequest -> sqsClient.deleteMessageBatch(deleteRequest)).collect(Collectors.toList()); - } - - - public static List<DeleteMessageBatchRequest> createMultipleBatchDeleteRequest(String queueUrl, List<DeleteMessageBatchRequestEntry> deleteEntries, int maxBatchRequest) { - List<List<DeleteMessageBatchRequestEntry>> batchedEntries = ListUtils.partition(deleteEntries, maxBatchRequest); - return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList()); - } - - public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url, String indexerServiceAccountJWT){ - List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); - - for (final Message message : messages) { - System.out.println(message); - System.out.println(url); - System.out.println(indexerServiceAccountJWT); - IndexProcessor processor = new IndexProcessor(message, url, indexerServiceAccountJWT); - CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); - futures.add(future); - } - return futures; - } - - public static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int numOfmessages, int maxMessageCount){ - final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); - System.out.println("inside get messages"); - System.out.println(sqsQueueUrl); - int numOfMessages = numOfmessages; - List<Message> messages = new ArrayList<>(); - do { - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); - receiveMessageRequest.setMaxNumberOfMessages(numOfMessages); - receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); - List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); - messages.addAll(retrievedMessages); - numOfMessages = retrievedMessages.size(); - - }while (messages.size() < maxMessageCount && numOfMessages > 0); - - return messages; - } - - public static List<SendMessageResult> sendMsgsToDeadLetterQueue(String deadLetterQueueUrl, List<IndexProcessor> indexProcessors, AmazonSQS sqsClient) { - return indexProcessors.stream().map(indexProcessor -> sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient)).collect(Collectors.toList()); - } - - private static SendMessageResult sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){ - Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes(); - MessageAttributeValue exceptionAttribute = new MessageAttributeValue() - .withDataType("String"); - String exceptionMessage = indexProcessor.exception.getMessage(); - if(exceptionMessage == null){ - exceptionMessage = "Empty"; - } - - StringWriter sw = new StringWriter(); - indexProcessor.exception.printStackTrace(new PrintWriter(sw)); - String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString()); - exceptionAttribute.setStringValue(exceptionAsString); - messageAttributes.put("Exception", exceptionAttribute); - SendMessageRequest send_msg_request = new SendMessageRequest() - .withQueueUrl(deadLetterQueueUrl) - .withMessageBody(indexProcessor.message.getBody()) - .withMessageAttributes(messageAttributes); - return sqsClient.sendMessage(send_msg_request); - } -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java deleted file mode 100644 index 7be52f258901a6a540eeddc3a3b49d708ce51089..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RecordChangedMessages.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -import java.util.Map; - -// TODO: consolidate this model with core refactor -public class RecordChangedMessages { - public String messageId; - public String publishTime; - public String data; - public Map<String, String> attributes; - - public String getMessageId(){ - return this.messageId; - } -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java b/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java deleted file mode 100644 index fcfb70727bc046966e8fe6879c2feaba6c010eaa..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/RejectedExecutionHandlerImpl.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - System.out.println(r.toString() + " is rejected"); - } -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java b/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java deleted file mode 100644 index ca986213e7221bd564d0ca2a3e41b17adbadf0f4..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - -public class IndexProcessorTest { -} diff --git a/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java b/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java deleted file mode 100644 index 69caed9733b5e7d18b0e23d8e3e25fa6cad64125..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/indexer-queue-aws/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright © Amazon Web Services -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexerqueue.aws.api; - - -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; -import org.mockito.Mockito; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - -public class IndexerQueueServiceTest { - - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Mock - private AmazonSQS sqsClient = mock(AmazonSQS.class); - - private GetQueueUrlResult queueUrlResult; - - private ReceiveMessageResult receiveResult; - - private String queueName = "testQueue"; - private String queueUrl ="localhost"; - - @Before - public void setUp() { - - queueUrlResult = new GetQueueUrlResult(); - queueUrlResult.setQueueUrl(queueUrl); - Mockito.when(sqsClient.getQueueUrl(queueName)).thenReturn(queueUrlResult); - receiveResult = new ReceiveMessageResult(); - } - - @Test - public void test_getMessages_10Messages_pass() { - // Arrange - int numOfmessages = 10; - int maxMessageCount = 10; - - - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); - receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); - receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); - - List<Message> messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - Message msg = new Message(); - messages.add(msg); - } - - receiveResult.setMessages(messages); - - Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); - - // Act - List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount); - - // Assert - assertEquals(messages.get(1), actualMessages.get(1)); - assertEquals(messages.size(), actualMessages.size()); - } - - @Test - public void test_getMessages_0Messages_pass() { - // Arrange - int numOfmessages = 10; - int maxMessageCount = 10; - - - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); - receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); - receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); - - List<Message> messages = new ArrayList<>(); - - receiveResult.setMessages(messages); - - Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); - - // Act - List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount); - - // Assert - assertEquals(messages.size(), actualMessages.size()); - } - - @Test(expected = NullPointerException.class) - public void test_getMessages_invalidqueuename_fail() { - // Arrange - int numOfmessages = 10; - int maxMessageCount = 10; - - String invalidQueueName = "invalid"; - - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); - receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); - receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); - - List<Message> messages = new ArrayList<>(); - - receiveResult.setMessages(messages); - - Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); - - // Act - List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, invalidQueueName, numOfmessages, maxMessageCount); - } - - -}