Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osdu/platform/system/indexer-service
  • schundu/indexer-service
2 results
Show changes
Commits on Source (7)
......@@ -14,15 +14,23 @@
package org.opengroup.osdu.indexer.azure.util;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.java.Log;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
......@@ -31,11 +39,14 @@ import org.springframework.web.context.annotation.RequestScope;
import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;
@Log
@Component
......@@ -46,6 +57,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Autowired
private ITopicClientFactory topicClientFactory;
@Inject
private IndexerConfigurationProperties configurationProperties;
@Inject
private JaxRsDpsLog logger;
......@@ -53,14 +67,59 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Named("SERVICE_BUS_TOPIC")
private String serviceBusTopic;
@Inject
private StorageService storageService;
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
createTask(payload, headers);
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers);
}
private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
// fetch all the remaining records
// This logic is temporary and would be updated to call the storage service async.
// Currently the storage client can't be called out of request scope hence making the
// storage calls sync here
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null;
try {
do {
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
}
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<RecordInfo> records = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
}
private void createTask(String payload, DpsHeaders headers) {
......