From 3423f8e629d7060cca3c4c7ed650a865fc2a27da Mon Sep 17 00:00:00 2001 From: Aman Verma <amaverma@microsoft.com> Date: Fri, 5 Mar 2021 13:18:33 +0530 Subject: [PATCH] removing the async code --- .../azure/util/IndexerQueueTaskBuilderAzure.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index 35997fa93..5084b1312 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -41,15 +41,11 @@ import javax.inject.Inject; import javax.inject.Named; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; @Log @@ -86,9 +82,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { headers.addCorrelationIdIfMissing(); String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId(); String correlationId = headers.getCorrelationId(); - ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - Future<Boolean> futureTak = executorService.submit(() -> publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId)); - executorService.shutdown(); + publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId); } private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) { @@ -100,12 +94,13 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { List<RecordInfo> recordsAll = new ArrayList<>(); Map<String, String> attributes = new HashMap<>(); + int docCount = 0; + try { attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId); attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); attributes.put(DpsHeaders.CORRELATION_ID, correlationId); - do { if (recordQueryResponse != null) { recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build(); @@ -119,6 +114,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { records.parallelStream() .collect(Collectors.toCollection(() -> recordsAll)); + docCount = docCount + records.size(); + + logger.info(String.format("Current doc count is %d ", docCount)); RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); String recordChangedMessagePayload = gson.toJson(recordChangedMessages); createTask(recordChangedMessagePayload, dataPartitionId, correlationId); @@ -126,6 +124,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()); + logger.info(String.format("FINAL doc count is %d ", docCount)); } catch (AppException e) { throw e; } catch (Exception e) { -- GitLab