diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index 35997fa934218b3b49c78a4eb190f07f8d795643..5084b13126fc9b73f29b7cadbfc9defc69662a01 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -41,15 +41,11 @@ import javax.inject.Inject; import javax.inject.Named; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; @Log @@ -86,9 +82,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { headers.addCorrelationIdIfMissing(); String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId(); String correlationId = headers.getCorrelationId(); - ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - Future<Boolean> futureTak = executorService.submit(() -> publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId)); - executorService.shutdown(); + publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId); } private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) { @@ -100,12 +94,13 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { List<RecordInfo> recordsAll = new ArrayList<>(); Map<String, String> attributes = new HashMap<>(); + int docCount = 0; + try { attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId); attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); attributes.put(DpsHeaders.CORRELATION_ID, correlationId); - do { if (recordQueryResponse != null) { recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build(); @@ -119,6 +114,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { records.parallelStream() .collect(Collectors.toCollection(() -> recordsAll)); + docCount = docCount + records.size(); + + logger.info(String.format("Current doc count is %d ", docCount)); RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); String recordChangedMessagePayload = gson.toJson(recordChangedMessages); createTask(recordChangedMessagePayload, dataPartitionId, correlationId); @@ -126,6 +124,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()); + logger.info(String.format("FINAL doc count is %d ", docCount)); } catch (AppException e) { throw e; } catch (Exception e) {