From 264dd189e0db3756f2845ab2857faec258c6bab3 Mon Sep 17 00:00:00 2001
From: Aman Verma <amaverma@microsoft.com>
Date: Fri, 5 Mar 2021 13:47:20 +0530
Subject: [PATCH] using header directly

---
 .../util/IndexerQueueTaskBuilderAzure.java    | 41 +++++++++----------
 1 file changed, 20 insertions(+), 21 deletions(-)

diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
index c43967d3e..7b4a1fd0a 100644
--- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
+++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java
@@ -74,18 +74,16 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
     @Override
     public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
         headers.addCorrelationIdIfMissing();
-        createTask(payload, headers.getPartitionIdWithFallbackToAccountId(), headers.getCorrelationId());
+        createTask(payload, headers);
     }
 
     @Override
     public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
         headers.addCorrelationIdIfMissing();
-        String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId();
-        String correlationId = headers.getCorrelationId();
-        publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId);
+        publishAllRecordsToServiceBus(payload, headers);
     }
 
-    private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) {
+    private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
         // fetch all the remaining records
         Gson gson = new Gson();
         RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
@@ -104,26 +102,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
                             .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
 
                     Map<String, String> attributes = new HashMap<>();
-                    attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
-                    attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
-                    attributes.put(DpsHeaders.CORRELATION_ID, correlationId);
+                    attributes.put(DpsHeaders.ACCOUNT_ID,  headers.getPartitionIdWithFallbackToAccountId());
+                    attributes.put(DpsHeaders.DATA_PARTITION_ID,  headers.getPartitionIdWithFallbackToAccountId());
+                    attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
 
                     RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
                     String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
-                    createTask(recordChangedMessagePayload, dataPartitionId, correlationId);
+                    createTask(recordChangedMessagePayload, headers);
                 }
             } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
-            
+
         } catch (AppException e) {
             throw e;
         } catch (Exception e) {
             throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
         }
-
-        return true;
     }
 
-    private void createTask(String payload, String dataPartitionId, String correlationId) {
+    private void createTask(String payload, DpsHeaders headers) {
         Gson gson = new Gson();
         RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
 
@@ -131,9 +127,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         Map<String, Object> properties = new HashMap<>();
 
         // properties
-        properties.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
-        properties.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
-        properties.put(DpsHeaders.CORRELATION_ID, dataPartitionId);
+        properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
+        properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
+        headers.addCorrelationIdIfMissing();
+        properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
         message.setProperties(properties);
 
         // data
@@ -142,9 +139,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         // add all to body {"message": {"data":[], "id":...}}
         JsonObject jo = new JsonObject();
         jo.add("data", gson.toJsonTree(recordInfos));
-        jo.addProperty(DpsHeaders.ACCOUNT_ID, dataPartitionId);
-        jo.addProperty(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
-        jo.addProperty(DpsHeaders.CORRELATION_ID, correlationId);
+        jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
+        jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
+        jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
         JsonObject jomsg = new JsonObject();
         jomsg.add("message", jo);
 
@@ -152,13 +149,15 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
         message.setContentType("application/json");
 
         try {
-            logger.info("Indexer publishes message to Service Bus " + correlationId);
-            topicClientFactory.getClient(dataPartitionId, serviceBusTopic).send(message);
+            logger.info("Indexer publishes message to Service Bus " + headers.getCorrelationId());
+            topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
     }
 
+
+
     private List<RecordInfo> parseRecordsAsJSON(String inputPayload) {
         Gson gson = new Gson();
         Type type = new TypeToken<List<RecordInfo>>(){}.getType();
-- 
GitLab