Commit 13267703 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Added headers` propagation for IndexerService

parent bbd5f6a2
......@@ -61,12 +61,13 @@ public class IndexerServiceImpl implements IndexerService {
private final ElasticIndexNameResolver elasticIndexNameResolver;
private final ElasticClientHandler elasticClientHandler;
private final IndexerQueueTaskBuilder indexerQueueTaskBuilder;
private DpsHeaders headers;
private final DpsHeaders headers;
@Override
public JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception {
String errorMessage = "";
jobStatus.initialize(recordInfos);
putHeadersFromMessages(recordChangedMessages.getAttributes());
try {
auditLogger.indexStarted(recordInfos.stream()
......@@ -113,6 +114,11 @@ public class IndexerServiceImpl implements IndexerService {
}
}
private void putHeadersFromMessages(Map<String, String> attributes) {
headers.put("authorization", attributes.get("authorization"));
headers.put("data-partition-id", attributes.get("data-partition-id"));
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds));
List<RecordInfo> retryRecordInfos = new LinkedList<>();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment