Skip to content
Snippets Groups Projects
Commit 56b2d043 authored by Zhibin Mai's avatar Zhibin Mai
Browse files

Make sure that the indexing message sent to Azure service bus is not over-size

parent 5ee4c7b6
Branches
Tags
1 merge request!597Make sure that the indexing message sent to Azure service bus is not over-size
Pipeline #204222 failed
......@@ -81,32 +81,46 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Override
public void createWorkerTask(String payload, DpsHeaders headers) {
createTask(payload, headers);
createWorkerTasks(payload, headers);
}
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
createTask(payload, headers);
createWorkerTasks(payload, headers);
}
@Override
public void createReIndexTask(String payload, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers);
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers);
}
private void createWorkerTasks(String payload, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
Gson gson = new Gson();
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
Map<String, String> attributes = receivedPayload.getAttributes();
List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize());
for (List<RecordInfo> recordsBatch : batch) {
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(recordsBatch)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
}
private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
// fetch all the remaining records
// This logic is temporary and would be updated to call the storage service async.
// Currently, the storage client can't be called out of request scope hence making the
// storage calls sync here
headers.addCorrelationIdIfMissing();
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment