Skip to content
Snippets Groups Projects
Commit 741b7a61 authored by Zhibin Mai's avatar Zhibin Mai
Browse files

Refactor the codes

parent 926b43f3
No related branches found
No related tags found
1 merge request!597Make sure that the indexing message sent to Azure service bus is not over-size
Pipeline #204295 failed
Pipeline: Indexer

#204296

    ...@@ -58,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION ...@@ -58,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION
    @RequestScope @RequestScope
    @Primary @Primary
    public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    Gson gson = new Gson();
    @Autowired @Autowired
    private ITopicClientFactory topicClientFactory; private ITopicClientFactory topicClientFactory;
    ...@@ -101,23 +102,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -101,23 +102,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    } }
    private void createWorkerTasks(String payload, DpsHeaders headers) { private void createWorkerTasks(String payload, DpsHeaders headers) {
    headers.addCorrelationIdIfMissing();
    Gson gson = new Gson();
    RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
    List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData()); List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
    if(!CollectionUtils.isEmpty(recordInfos)) { if(!CollectionUtils.isEmpty(recordInfos)) {
    Map<String, String> attributes = receivedPayload.getAttributes(); createTasks(recordInfos, receivedPayload.getAttributes(), headers);
    if (attributes == null) {
    attributes = new HashMap<>();
    }
    List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize());
    for (List<RecordInfo> recordsBatch : batch) {
    RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(recordsBatch)).attributes(attributes).build();
    String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
    createTask(recordChangedMessagePayload, headers);
    }
    } }
    } }
    ...@@ -126,8 +114,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -126,8 +114,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    // This logic is temporary and would be updated to call the storage service async. // This logic is temporary and would be updated to call the storage service async.
    // Currently, the storage client can't be called out of request scope hence making the // Currently, the storage client can't be called out of request scope hence making the
    // storage calls sync here // storage calls sync here
    headers.addCorrelationIdIfMissing();
    Gson gson = new Gson();
    RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
    final String recordKind = recordReindexRequest.getKind(); final String recordKind = recordReindexRequest.getKind();
    RecordQueryResponse recordQueryResponse = null; RecordQueryResponse recordQueryResponse = null;
    ...@@ -141,21 +127,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -141,21 +127,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    } }
    recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
    if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
    List<String> recordIds = recordQueryResponse.getResults();
    List<List<String>> batch = Lists.partition(recordQueryResponse.getResults(), publisherConfig.getPubSubBatchSize()); List<RecordInfo> recordInfos = recordIds.stream()
    for (List<String> recordsBatch : batch) { .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
    List<RecordInfo> records = recordsBatch.stream() createTasks(recordInfos, new HashMap<>(), headers);
    .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
    Map<String, String> attributes = new HashMap<>();
    attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
    attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
    attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
    RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
    String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
    createTask(recordChangedMessagePayload, headers);
    }
    } }
    } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()); } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize());
    ...@@ -166,24 +141,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -166,24 +141,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    } }
    } }
    private void createTask(String payload, DpsHeaders headers) { private void createTasks(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) {
    Gson gson = new Gson(); headers.addCorrelationIdIfMissing();
    RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize());
    for (List<RecordInfo> recordsBatch : batch) {
    createTask(recordsBatch, attributes, headers);
    }
    }
    private void createTask(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) {
    Message message = new Message(); Message message = new Message();
    Map<String, Object> properties = new HashMap<>();
    // properties // properties
    Map<String, Object> properties = new HashMap<>();
    properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
    properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
    headers.addCorrelationIdIfMissing();
    properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
    message.setProperties(properties); message.setProperties(properties);
    // data
    List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
    Map<String, String> attributes = receivedPayload.getAttributes();
    // add all to body {"message": {"data":[], "id":...}} // add all to body {"message": {"data":[], "id":...}}
    JsonObject jo = new JsonObject(); JsonObject jo = new JsonObject();
    jo.add("data", gson.toJsonTree(recordInfos)); jo.add("data", gson.toJsonTree(recordInfos));
    ...@@ -211,7 +186,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -211,7 +186,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
    } }
    private List<RecordInfo> parseRecordsAsJSON(String inputPayload) { private List<RecordInfo> parseRecordsAsJSON(String inputPayload) {
    Gson gson = new Gson();
    Type type = new TypeToken<List<RecordInfo>>() {}.getType(); Type type = new TypeToken<List<RecordInfo>>() {}.getType();
    List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type); List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type);
    return recordInfoList; return recordInfoList;
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment