Skip to content
Snippets Groups Projects
Commit 3ef95bc6 authored by Bill Wang's avatar Bill Wang
Browse files

Background Async ReIndex

parent 09ef494e
No related branches found
No related tags found
1 merge request!116Aws reindex support
...@@ -79,15 +79,38 @@ public class IndexerQueueTaskBuilderAws extends IndexerQueueTaskBuilder { ...@@ -79,15 +79,38 @@ public class IndexerQueueTaskBuilderAws extends IndexerQueueTaskBuilder {
} }
@Override @Override
public void createReIndexTask(String payload,DpsHeaders headers) { public void createReIndexTask(String payload,DpsHeaders headers) {
this.createTask(payload, headers); Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
headers.addCorrelationIdIfMissing();
messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
messageAttributes.put(DpsHeaders.USER_EMAIL, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getUserEmail()));
messageAttributes.put(DpsHeaders.AUTHORIZATION, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getAuthorization()));
messageAttributes.put("ReIndexCursor", new MessageAttributeValue()
.withDataType("String")
.withStringValue("True"));
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(storageQueue)
.withMessageBody(payload)
.withMessageAttributes(messageAttributes);
sqsClient.sendMessage(sendMessageRequest);
} }
@Override @Override
public void createReIndexTask(String payload, Long countDownMillis, DpsHeaders headers){ public void createReIndexTask(String payload, Long countDownMillis, DpsHeaders headers){
this.createTask(payload, headers); this.createReIndexTask(payload, headers);
} }
private void createTask(String payload, DpsHeaders headers) { private void createTask(String payload, DpsHeaders headers) {
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String") .withDataType("String")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment