Skip to content
Snippets Groups Projects
Commit e145394e authored by Bill Wang's avatar Bill Wang
Browse files

Merge branch 'aws-reindex-support' into 'master'

Aws reindex support

See merge request !116
parents b931af50 59d1caba
No related branches found
No related tags found
1 merge request!116Aws reindex support
Pipeline #31698 failed
......@@ -79,15 +79,38 @@ public class IndexerQueueTaskBuilderAws extends IndexerQueueTaskBuilder {
}
@Override
public void createReIndexTask(String payload,DpsHeaders headers) {
this.createTask(payload, headers);
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
headers.addCorrelationIdIfMissing();
messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
messageAttributes.put(DpsHeaders.USER_EMAIL, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getUserEmail()));
messageAttributes.put(DpsHeaders.AUTHORIZATION, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getAuthorization()));
messageAttributes.put("ReIndexCursor", new MessageAttributeValue()
.withDataType("String")
.withStringValue("True"));
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(storageQueue)
.withMessageBody(payload)
.withMessageAttributes(messageAttributes);
sqsClient.sendMessage(sendMessageRequest);
}
@Override
public void createReIndexTask(String payload, Long countDownMillis, DpsHeaders headers){
this.createTask(payload, headers);
this.createReIndexTask(payload, headers);
}
private void createTask(String payload, DpsHeaders headers) {
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment