From 741b7a618b3633bb632a9440b54ef524436fe084 Mon Sep 17 00:00:00 2001 From: ZMai <zmai@slb.com> Date: Tue, 15 Aug 2023 06:19:24 -0500 Subject: [PATCH] Refactor the codes --- .../util/IndexerQueueTaskBuilderAzure.java | 56 +++++-------------- 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index 612002eb4..0ec330cb2 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -58,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION @RequestScope @Primary public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { + Gson gson = new Gson(); @Autowired private ITopicClientFactory topicClientFactory; @@ -101,23 +102,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } private void createWorkerTasks(String payload, DpsHeaders headers) { - headers.addCorrelationIdIfMissing(); - Gson gson = new Gson(); RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); - List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData()); if(!CollectionUtils.isEmpty(recordInfos)) { - Map<String, String> attributes = receivedPayload.getAttributes(); - if (attributes == null) { - attributes = new HashMap<>(); - } - - List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize()); - for (List<RecordInfo> recordsBatch : batch) { - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(recordsBatch)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - createTask(recordChangedMessagePayload, headers); - } + createTasks(recordInfos, receivedPayload.getAttributes(), headers); } } @@ -126,8 +114,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { // This logic is temporary and would be updated to call the storage service async. // Currently, the storage client can't be called out of request scope hence making the // storage calls sync here - headers.addCorrelationIdIfMissing(); - Gson gson = new Gson(); RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); final String recordKind = recordReindexRequest.getKind(); RecordQueryResponse recordQueryResponse = null; @@ -141,21 +127,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { - - List<List<String>> batch = Lists.partition(recordQueryResponse.getResults(), publisherConfig.getPubSubBatchSize()); - for (List<String> recordsBatch : batch) { - List<RecordInfo> records = recordsBatch.stream() - .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); - - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - createTask(recordChangedMessagePayload, headers); - } + List<String> recordIds = recordQueryResponse.getResults(); + List<RecordInfo> recordInfos = recordIds.stream() + .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); + createTasks(recordInfos, new HashMap<>(), headers); } } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()); @@ -166,24 +141,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } } - private void createTask(String payload, DpsHeaders headers) { - Gson gson = new Gson(); - RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); + private void createTasks(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) { + headers.addCorrelationIdIfMissing(); + List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize()); + for (List<RecordInfo> recordsBatch : batch) { + createTask(recordsBatch, attributes, headers); + } + } + private void createTask(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) { Message message = new Message(); - Map<String, Object> properties = new HashMap<>(); // properties + Map<String, Object> properties = new HashMap<>(); properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - headers.addCorrelationIdIfMissing(); properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); message.setProperties(properties); - // data - List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData()); - Map<String, String> attributes = receivedPayload.getAttributes(); - // add all to body {"message": {"data":[], "id":...}} JsonObject jo = new JsonObject(); jo.add("data", gson.toJsonTree(recordInfos)); @@ -211,7 +186,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { } private List<RecordInfo> parseRecordsAsJSON(String inputPayload) { - Gson gson = new Gson(); Type type = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type); return recordInfoList; -- GitLab