diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java index b93d3c94144d859cdf25cde6ff4e1cfcd7ff9358..cb47d53cd17fce4056d18d82a4371f8b1fe2d204 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/IndexerQueueTaskBuilderAws.java @@ -79,15 +79,38 @@ public class IndexerQueueTaskBuilderAws extends IndexerQueueTaskBuilder { } @Override public void createReIndexTask(String payload,DpsHeaders headers) { - this.createTask(payload, headers); + Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); + messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); + messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); + headers.addCorrelationIdIfMissing(); + messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getCorrelationId())); + messageAttributes.put(DpsHeaders.USER_EMAIL, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getUserEmail())); + messageAttributes.put(DpsHeaders.AUTHORIZATION, new MessageAttributeValue() + .withDataType("String") + .withStringValue(headers.getAuthorization())); + messageAttributes.put("ReIndexCursor", new MessageAttributeValue() + .withDataType("String") + .withStringValue("True")); + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(storageQueue) + .withMessageBody(payload) + .withMessageAttributes(messageAttributes); + sqsClient.sendMessage(sendMessageRequest); } @Override public void createReIndexTask(String payload, Long countDownMillis, DpsHeaders headers){ - this.createTask(payload, headers); + this.createReIndexTask(payload, headers); } private void createTask(String payload, DpsHeaders headers) { - Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() .withDataType("String")