From 3ef95bc6ed9af4fe97131b15c5d2389ddfd1f522 Mon Sep 17 00:00:00 2001 From: wanzhiji <wanzhiji@amazon.com> Date: Thu, 4 Mar 2021 19:27:13 +0000 Subject: [PATCH] Background Async ReIndex --- .../aws/util/IndexerQueueTaskBuilderAws.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java index b93d3c941..cb47d53cd 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java @@ -79,15 +79,38 @@ public class IndexerQueueTaskBuilderAws extends IndexerQueueTaskBuilder { } @Override public void createReIndexTask(String payload,DpsHeaders headers) { - this.createTask(payload, headers); + Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); + messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); + messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); + headers.addCorrelationIdIfMissing(); + messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getCorrelationId())); + messageAttributes.put(DpsHeaders.USER_EMAIL, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getUserEmail())); + messageAttributes.put(DpsHeaders.AUTHORIZATION, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getAuthorization())); + messageAttributes.put("ReIndexCursor", new MessageAttributeValue() + .withDataType("String") + .withStringValue("True")); + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(storageQueue) + .withMessageBody(payload) + .withMessageAttributes(messageAttributes); + sqsClient.sendMessage(sendMessageRequest); } @Override public void createReIndexTask(String payload, Long countDownMillis, DpsHeaders headers){ - this.createTask(payload, headers); + this.createReIndexTask(payload, headers); } private void createTask(String payload, DpsHeaders headers) { - Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() .withDataType("String") -- GitLab