Skip to content
Snippets Groups Projects
Commit e7090ce5 authored by Aman Verma's avatar Aman Verma
Browse files

adding async logic

parent ee05d60f
No related branches found
No related tags found
1 merge request!112Moving the while loop to iterate over storage records in azure code
Pipeline #30179 passed
......@@ -14,15 +14,23 @@
package org.opengroup.osdu.indexer.azure.util;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.java.Log;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
......@@ -31,11 +39,18 @@ import org.springframework.web.context.annotation.RequestScope;
import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
@Log
@Component
......@@ -46,6 +61,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Autowired
private ITopicClientFactory topicClientFactory;
@Inject
private IndexerConfigurationProperties configurationProperties;
@Inject
private JaxRsDpsLog logger;
......@@ -53,17 +71,71 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Named("SERVICE_BUS_TOPIC")
private String serviceBusTopic;
@Inject
private StorageService storageService;
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
headers.addCorrelationIdIfMissing();
createTask(payload, headers.getPartitionIdWithFallbackToAccountId(), headers.getCorrelationId());
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
headers.addCorrelationIdIfMissing();
String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId();
String correlationId = headers.getCorrelationId();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Future<Boolean> futureTak = executorService.submit(() -> publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId));
executorService.shutdown();
}
private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) {
// fetch all the remaining records
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null;
List<RecordInfo> recordsAll = new ArrayList<>();
Map<String, String> attributes = new HashMap<>();
try {
attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
attributes.put(DpsHeaders.CORRELATION_ID, correlationId);
do {
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
}
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<RecordInfo> records = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
records.parallelStream()
.collect(Collectors.toCollection(() -> recordsAll));
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, dataPartitionId, correlationId);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
return true;
}
private void createTask(String payload, DpsHeaders headers) {
private void createTask(String payload, String dataPartitionId, String correlationId) {
Gson gson = new Gson();
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
......@@ -71,10 +143,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
Map<String, Object> properties = new HashMap<>();
// properties
properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
properties.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
properties.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
properties.put(DpsHeaders.CORRELATION_ID, dataPartitionId);
message.setProperties(properties);
// data
......@@ -83,9 +154,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
// add all to body {"message": {"data":[], "id":...}}
JsonObject jo = new JsonObject();
jo.add("data", gson.toJsonTree(recordInfos));
jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
jo.addProperty(DpsHeaders.ACCOUNT_ID, dataPartitionId);
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
jo.addProperty(DpsHeaders.CORRELATION_ID, correlationId);
JsonObject jomsg = new JsonObject();
jomsg.add("message", jo);
......@@ -93,8 +164,8 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
message.setContentType("application/json");
try {
logger.info("Indexer publishes message to Service Bus " + headers.getCorrelationId());
topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
logger.info("Indexer publishes message to Service Bus " + correlationId);
topicClientFactory.getClient(dataPartitionId, serviceBusTopic).send(message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment