Skip to content
Snippets Groups Projects
Commit 73b958b6 authored by MIchael Nguyen's avatar MIchael Nguyen
Browse files

updating

parent af125124
No related branches found
No related tags found
1 merge request!6Trusted ibm
......@@ -347,7 +347,7 @@ Resources:
- Name: AWS_ACCESS_KEY_ID
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:access_key}}'
- Name: AWS_SECRET_KEY
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:access_key}}'
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:secret_key}}'
- Name: ENVIRONMENT
Value: dev
- Name: VSTS_FEED_TOKEN
......@@ -380,7 +380,7 @@ Resources:
- Name: AWS_ACCESS_KEY_ID
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:access_key}}'
- Name: AWS_SECRET_KEY
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:access_key}}'
Value: '{{resolve:secretsmanager:dev-IndexerServiceIamCredentials:SecretString:secret_key}}'
- Name: ENVIRONMENT
Value: dev
- Name: VSTS_FEED_TOKEN
......
......@@ -334,39 +334,6 @@ Resources:
- Name: QUEUE_IMAGE_REPO_NAME
Type: PLAINTEXT
Value: !Sub ${Environment}-${CodeCommitRepositoryName}-queue-repository
- Name: INDEXER_QUEUE_NAME
Type: PLAINTEXT
Value: dev-osdu-storage-queue
- Name: INDEXER_API
Type: PLAINTEXT
Value: ECSALB-os-indexer-1927005132.us-east-1.elb.amazonaws.com/api/indexer/v2/_dps/task-handlers/index-worker
- Name: AWS_DEADLETTER_QUEUE_NAME
Type: PLAINTEXT
Value: dev-osdu-storage-dead-letter-queue
- Name: MAX_INDEX_THREADS
Type: PLAINTEXT
Value: 50
- Name: MAX_REQUEST_COUNT
Type: PLAINTEXT
Value: 10
- Name: MAX_MESSAGE_COUNT
Type: PLAINTEXT
Value: 100000
- Name: KEEP_ALIVE_IN_MINUTES
Type: PLAINTEXT
Value: 9999
- Name: AWS_COGNITO_CLIENT_ID
Type: PLAINTEXT
Value: 3rmgmg8mup281ttc1mbut1pimc
- Name: AWS_COGNITO_AUTH_FLOW
Type: PLAINTEXT
Value: USER_PASSWORD_AUTH
- Name: AWS_COGNITO_AUTH_PARAMS_USER
Type: PLAINTEXT
Value: test-user-with-access@testing.com
- Name: AWS_COGNITO_AUTH_PARAMS_PASSWORD
Type: PLAINTEXT
Value: Password123*
PrivilegedMode: true
Source:
BuildSpec: ./provider/indexer-aws/buildspec-post-deploy.yml
......
......@@ -43,13 +43,13 @@ public class IndexerQueue {
public static void main(String[] args) {
EnvironmentVariables environmentVariables = new EnvironmentVariables();
// AmazonLogConfig logConfig = new AmazonLogConfig(environmentVariables.region);
// AWSLogs log = logConfig.AmazonLog();
// PutLogEventsRequest logRequest = new PutLogEventsRequest("ECSLogGroup-indexer-queue", "indexer-queue", new ArrayList<>());
AmazonLogConfig logConfig = new AmazonLogConfig(environmentVariables.region);
AWSLogs log = logConfig.AmazonLog();
PutLogEventsRequest logRequest = new PutLogEventsRequest("ECSLogGroup-indexer-queue", "indexer-queue", new ArrayList<>());
// InputLogEvent logEvent = new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments");
// logRequest.getLogEvents().add(logEvent);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
InputLogEvent logEvent = new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments");
logRequest.getLogEvents().add(logEvent);
logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
System.out.println(environmentVariables.region);
System.out.println(environmentVariables.maxBatchRequestCount);
System.out.println(environmentVariables.queueName);
......@@ -64,66 +64,66 @@ public class IndexerQueue {
System.out.println(environmentVariables.keepAliveTimeInMin);
try {
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments"));
logRequest.getLogEvents().add(new InputLogEvent().withMessage("Starting Indexer Queue and obtaining Arguments"));
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
logRequest.getLogEvents().add(new InputLogEvent().withMessage("Retrieving indexer service account JWT"));
AWSCognitoClient cognitoClient = new AWSCognitoClient(environmentVariables.cognitoClientId, environmentVariables.cognitoAuthFlow,
environmentVariables.cognitoUser, environmentVariables.cognitoPassword);
String indexerServiceAccountJWT = cognitoClient.getToken();
if(indexerServiceAccountJWT == null){
// logRequest.getLogEvents().add(new InputLogEvent().withMessage("Indexer service account not set up correctly"));
logRequest.getLogEvents().add(new InputLogEvent().withMessage("Indexer service account not set up correctly"));
}
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName)));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Connecting to the SQS Queue: %s", environmentVariables.queueName)));
AmazonSQSConfig sqsConfig = new AmazonSQSConfig(environmentVariables.region);
AmazonSQS sqsClient = sqsConfig.AmazonSQS();
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads)));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Creating a thread pool with %s threads", environmentVariables.maxIndexThreads)));
ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(environmentVariables.maxIndexThreads);
final String deadLetterQueueUrl = sqsClient.getQueueUrl(environmentVariables.deadLetterQueueName).getQueueUrl();
System.out.println(deadLetterQueueUrl);
List<Message> messages = IndexerQueueService.getMessages(sqsClient, environmentVariables.queueName, environmentVariables.maxBatchRequestCount, environmentVariables.maxMessagesAllowed);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Processing %s messages from storage queue", messages.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("Processing %s messages from storage queue", messages.size())));
if (!messages.isEmpty()) {
List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, environmentVariables.targetURL, executorPool, indexerServiceAccountJWT);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Processed", indexProcessors.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Processed", indexProcessors.size())));
List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList());
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Failed", failedProcessors.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Failed", failedProcessors.size())));
List<SendMessageResult> deadLetterResults = IndexerQueueService.sendMsgsToDeadLetterQueue(deadLetterQueueUrl, failedProcessors, sqsClient);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Dead Lettered", deadLetterResults.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Dead Lettered", deadLetterResults.size())));
List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList());
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Deleting", deleteEntries.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Messages Deleting", deleteEntries.size())));
final String sqsQueueUrl = sqsClient.getQueueUrl(environmentVariables.queueName).getQueueUrl();
List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, environmentVariables.maxBatchRequestCount);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())));
List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Requests Deleted", deleteMessageBatchResults.size())));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(String.format("%s Requests Deleted", deleteMessageBatchResults.size())));
}
} catch (ExecutionException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} catch (InterruptedException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} catch (NullPointerException e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
}catch (Exception e) {
System.out.println(e);
// logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
logRequest.getLogEvents().add(new InputLogEvent().withMessage(e.getMessage()));
} finally {
System.out.println("failing finally");
// log.putLogEvents(logRequest);
log.putLogEvents(logRequest);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment