Skip to content
Snippets Groups Projects
Commit 3423f8e6 authored by Aman Verma's avatar Aman Verma
Browse files

removing the async code

parent e7090ce5
No related branches found
No related tags found
1 merge request!112Moving the while loop to iterate over storage records in azure code
Pipeline #30187 passed
......@@ -41,15 +41,11 @@ import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
@Log
......@@ -86,9 +82,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
headers.addCorrelationIdIfMissing();
String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId();
String correlationId = headers.getCorrelationId();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Future<Boolean> futureTak = executorService.submit(() -> publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId));
executorService.shutdown();
publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId);
}
private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) {
......@@ -100,12 +94,13 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
List<RecordInfo> recordsAll = new ArrayList<>();
Map<String, String> attributes = new HashMap<>();
int docCount = 0;
try {
attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
attributes.put(DpsHeaders.CORRELATION_ID, correlationId);
do {
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
......@@ -119,6 +114,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
records.parallelStream()
.collect(Collectors.toCollection(() -> recordsAll));
docCount = docCount + records.size();
logger.info(String.format("Current doc count is %d ", docCount));
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, dataPartitionId, correlationId);
......@@ -126,6 +124,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
logger.info(String.format("FINAL doc count is %d ", docCount));
} catch (AppException e) {
throw e;
} catch (Exception e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment