diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index 5084b13126fc9b73f29b7cadbfc9defc69662a01..c43967d3e365df8760bac3ae857e4fdc1cecf693 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -91,16 +91,8 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); final String recordKind = recordReindexRequest.getKind(); RecordQueryResponse recordQueryResponse = null; - List<RecordInfo> recordsAll = new ArrayList<>(); - Map<String, String> attributes = new HashMap<>(); - - int docCount = 0; try { - attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId); - attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); - attributes.put(DpsHeaders.CORRELATION_ID, correlationId); - do { if (recordQueryResponse != null) { recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build(); @@ -111,20 +103,17 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { List<RecordInfo> records = recordQueryResponse.getResults().stream() .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); - records.parallelStream() - .collect(Collectors.toCollection(() -> recordsAll)); - - docCount = docCount + records.size(); + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId); + attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + attributes.put(DpsHeaders.CORRELATION_ID, correlationId); - logger.info(String.format("Current doc count is %d ", docCount)); RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); String recordChangedMessagePayload = gson.toJson(recordChangedMessages); createTask(recordChangedMessagePayload, dataPartitionId, correlationId); } - } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()); - - logger.info(String.format("FINAL doc count is %d ", docCount)); + } catch (AppException e) { throw e; } catch (Exception e) {