diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index cf24317df9db9a6316ec7a399c6fd0b515728175..4aea1306b022d87bbe6496a7353cda5226f842a7 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -21,6 +21,7 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import com.microsoft.azure.servicebus.Message; import lombok.extern.java.Log; +import org.apache.commons.collections.CollectionUtils; import org.apache.http.HttpStatus; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; @@ -57,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION @RequestScope @Primary public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { + private Gson gson = new Gson(); @Autowired private ITopicClientFactory topicClientFactory; @@ -81,33 +83,40 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { @Override public void createWorkerTask(String payload, DpsHeaders headers) { - createTask(payload, headers); + createWorkerTasks(payload, headers); } @Override public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { - headers.addCorrelationIdIfMissing(); - createTask(payload, headers); + createWorkerTasks(payload, headers); } @Override public void createReIndexTask(String payload, DpsHeaders headers) { - headers.addCorrelationIdIfMissing(); publishAllRecordsToServiceBus(payload, headers); } @Override public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { - headers.addCorrelationIdIfMissing(); publishAllRecordsToServiceBus(payload, headers); } + private void createWorkerTasks(String payload, DpsHeaders headers) { + RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); + List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData()); + if(!CollectionUtils.isEmpty(recordInfos)) { + Map<String, String> attributes = (receivedPayload.getAttributes() != null) + ? receivedPayload.getAttributes() + : new HashMap<>(); + createTasks(recordInfos, attributes, headers); + } + } + private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) { // fetch all the remaining records // This logic is temporary and would be updated to call the storage service async. // Currently, the storage client can't be called out of request scope hence making the // storage calls sync here - Gson gson = new Gson(); RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); final String recordKind = recordReindexRequest.getKind(); RecordQueryResponse recordQueryResponse = null; @@ -121,21 +130,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { - - List<List<String>> batch = Lists.partition(recordQueryResponse.getResults(), publisherConfig.getPubSubBatchSize()); - for (List<String> recordsBatch : batch) { - List<RecordInfo> records = recordsBatch.stream() - .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); - - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - createTask(recordChangedMessagePayload, headers); - } + List<String> recordIds = recordQueryResponse.getResults(); + List<RecordInfo> recordInfos = recordIds.stream() + .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); + createTasks(recordInfos, new HashMap<>(), headers); } } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()); @@ -146,24 +144,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } } - private void createTask(String payload, DpsHeaders headers) { - Gson gson = new Gson(); - RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); + private void createTasks(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) { + headers.addCorrelationIdIfMissing(); + List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize()); + for (List<RecordInfo> recordsBatch : batch) { + createTask(recordsBatch, attributes, headers); + } + } + private void createTask(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) { Message message = new Message(); - Map<String, Object> properties = new HashMap<>(); // properties + Map<String, Object> properties = new HashMap<>(); properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - headers.addCorrelationIdIfMissing(); properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); message.setProperties(properties); - // data - List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData()); - Map<String, String> attributes = receivedPayload.getAttributes(); - // add all to body {"message": {"data":[], "id":...}} JsonObject jo = new JsonObject(); jo.add("data", gson.toJsonTree(recordInfos)); @@ -171,7 +169,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); // Append the ancestry kinds used to prevent circular chasing - if(attributes != null && attributes.containsKey(Constants.ANCESTRY_KINDS)) { + if(attributes.containsKey(Constants.ANCESTRY_KINDS)) { jo.addProperty(Constants.ANCESTRY_KINDS, attributes.get(Constants.ANCESTRY_KINDS)); } JsonObject jomsg = new JsonObject(); @@ -191,7 +189,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } private List<RecordInfo> parseRecordsAsJSON(String inputPayload) { - Gson gson = new Gson(); Type type = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type); return recordInfoList; diff --git a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java index 559fc9875db254a2896cbc804e0e0e4d963922f7..317bbaeae718f2d724d40963a6fe63399aab841a 100644 --- a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java +++ b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java @@ -31,7 +31,12 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION @RunWith(MockitoJUnitRunner.class) public class IndexerQueueTaskBuilderAzureTest { - private String payload = "{payload : value }"; + private String payload = "{\n" + + " \"data\": \"[{\\\"id\\\":\\\"opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7\\\",\\\"kind\\\":\\\"osdu:wks:work-product-component--WellLog:1.2.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"opendes:work-product-component--WellLog:84958febe54e4908a1703778e1918dae\\\",\\\"kind\\\":\\\"osdu:wks:work-product-component--WellLog:1.2.0\\\",\\\"op\\\":\\\"create\\\"}]\",\n" + + " \"attributes\": {\n" + + " \"data-partition-id\": \"opendes\"\n" + + " }\n" + + "}"; private static String partitionId = "opendes"; private static String correlationId = "correlationId"; private static String serviceBusReindexTopicNameField = "serviceBusReindexTopicName"; @@ -92,7 +97,7 @@ public class IndexerQueueTaskBuilderAzureTest { sut.createWorkerTask(payload, milliseconds, dpsHeaders); - verify(dpsHeaders, times(2)).addCorrelationIdIfMissing(); + verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(4)).getPartitionIdWithFallbackToAccountId(); verify(dpsHeaders, times(2)).getCorrelationId(); verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue); @@ -116,7 +121,7 @@ public class IndexerQueueTaskBuilderAzureTest { verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(storageService, times(1)).getRecordsByKind(any()); - verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); + verify(dpsHeaders, times(0)).addCorrelationIdIfMissing(); } @Test @@ -137,9 +142,9 @@ public class IndexerQueueTaskBuilderAzureTest { verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(storageService, times(1)).getRecordsByKind(any()); - verify(dpsHeaders, times(6)).getPartitionIdWithFallbackToAccountId(); - verify(dpsHeaders, times(3)).getCorrelationId(); - verify(dpsHeaders, times(2)).addCorrelationIdIfMissing(); + verify(dpsHeaders, times(4)).getPartitionIdWithFallbackToAccountId(); + verify(dpsHeaders, times(2)).getCorrelationId(); + verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue); } @@ -161,9 +166,9 @@ public class IndexerQueueTaskBuilderAzureTest { verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(storageService, times(1)).getRecordsByKind(any()); - verify(dpsHeaders, times(120)).getPartitionIdWithFallbackToAccountId(); - verify(dpsHeaders, times(60)).getCorrelationId(); - verify(dpsHeaders, times(21)).addCorrelationIdIfMissing(); + verify(dpsHeaders, times(80)).getPartitionIdWithFallbackToAccountId(); + verify(dpsHeaders, times(40)).getCorrelationId(); + verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); verify(topicClientFactory, times(20)).getClient(partitionId, serviceBusReindexTopicNameValue); } @@ -180,6 +185,6 @@ public class IndexerQueueTaskBuilderAzureTest { verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(storageService, times(1)).getRecordsByKind(any()); - verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); + verify(dpsHeaders, times(0)).addCorrelationIdIfMissing(); } }