From e7090ce51c828a779899f0511114b257e0f9700b Mon Sep 17 00:00:00 2001
From: Aman Verma <amaverma@microsoft.com>
Date: Fri, 5 Mar 2021 12:30:09 +0530
Subject: [PATCH] adding async logic

---
 .../util/IndexerQueueTaskBuilderAzure.java    | 97 ++++++++++++++++---
 1 file changed, 84 insertions(+), 13 deletions(-)

diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
index 555dbc396..35997fa93 100644
--- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
+++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
@@ -14,15 +14,23 @@
 
 package org.opengroup.osdu.indexer.azure.util;
 
+import com.google.common.base.Strings;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.*;
 import com.microsoft.azure.servicebus.Message;
 import lombok.extern.java.Log;
+import org.apache.http.HttpStatus;
 import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
 import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
+import org.opengroup.osdu.core.common.model.http.AppException;
 import org.opengroup.osdu.core.common.model.http.DpsHeaders;
+import org.opengroup.osdu.core.common.model.indexer.OperationType;
 import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
+import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
+import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
 import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
+import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
+import org.opengroup.osdu.indexer.service.StorageService;
 import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Primary;
@@ -31,11 +39,18 @@ import org.springframework.web.context.annotation.RequestScope;
 
 import javax.inject.Inject;
 import javax.inject.Named;
+import java.lang.reflect.Type;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.lang.reflect.Type;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
 
 @Log
 @Component
@@ -46,6 +61,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
     @Autowired
     private ITopicClientFactory topicClientFactory;
 
+    @Inject
+    private IndexerConfigurationProperties configurationProperties;
+
     @Inject
     private JaxRsDpsLog logger;
 
@@ -53,17 +71,71 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
     @Named("SERVICE_BUS_TOPIC")
     private String serviceBusTopic;
 
+    @Inject
+    private StorageService storageService;
+
+
     @Override
     public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
-        createTask(payload, headers);
+        headers.addCorrelationIdIfMissing();
+        createTask(payload, headers.getPartitionIdWithFallbackToAccountId(), headers.getCorrelationId());
     }
 
     @Override
     public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
-        createTask(payload, headers);
+        headers.addCorrelationIdIfMissing();
+        String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId();
+        String correlationId = headers.getCorrelationId();
+        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        Future<Boolean> futureTak = executorService.submit(() -> publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId));
+        executorService.shutdown();
+    }
+
+    private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) {
+        // fetch all the remaining records
+        Gson gson = new Gson();
+        RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
+        final String recordKind = recordReindexRequest.getKind();
+        RecordQueryResponse recordQueryResponse = null;
+        List<RecordInfo> recordsAll = new ArrayList<>();
+        Map<String, String> attributes = new HashMap<>();
+
+        try {
+            attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
+            attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
+            attributes.put(DpsHeaders.CORRELATION_ID, correlationId);
+
+
+            do {
+                if (recordQueryResponse != null) {
+                    recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
+                }
+                recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
+                if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
+
+                    List<RecordInfo> records = recordQueryResponse.getResults().stream()
+                            .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
+
+                    records.parallelStream()
+                            .collect(Collectors.toCollection(() -> recordsAll));
+
+                    RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
+                    String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
+                    createTask(recordChangedMessagePayload, dataPartitionId, correlationId);
+                }
+
+            } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
+
+        } catch (AppException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
+        }
+
+        return true;
     }
 
-    private void createTask(String payload, DpsHeaders headers) {
+    private void createTask(String payload, String dataPartitionId, String correlationId) {
         Gson gson = new Gson();
         RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
 
@@ -71,10 +143,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         Map<String, Object> properties = new HashMap<>();
 
         // properties
-        properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
-        properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
-        headers.addCorrelationIdIfMissing();
-        properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
+        properties.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
+        properties.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
+        properties.put(DpsHeaders.CORRELATION_ID, dataPartitionId);
         message.setProperties(properties);
 
         // data
@@ -83,9 +154,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         // add all to body {"message": {"data":[], "id":...}}
         JsonObject jo = new JsonObject();
         jo.add("data", gson.toJsonTree(recordInfos));
-        jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
-        jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
-        jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
+        jo.addProperty(DpsHeaders.ACCOUNT_ID, dataPartitionId);
+        jo.addProperty(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
+        jo.addProperty(DpsHeaders.CORRELATION_ID, correlationId);
         JsonObject jomsg = new JsonObject();
         jomsg.add("message", jo);
 
@@ -93,8 +164,8 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         message.setContentType("application/json");
 
         try {
-            logger.info("Indexer publishes message to Service Bus " + headers.getCorrelationId());
-            topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
+            logger.info("Indexer publishes message to Service Bus " + correlationId);
+            topicClientFactory.getClient(dataPartitionId, serviceBusTopic).send(message);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
-- 
GitLab