diff --git a/provider/indexer-ibm/pom.xml b/provider/indexer-ibm/pom.xml index a270931b72639e2bcf278592904a7f726dbe5d0b..84e1fea76ba4b9dec786cc532474e437f5c2dac1 100644 --- a/provider/indexer-ibm/pom.xml +++ b/provider/indexer-ibm/pom.xml @@ -31,7 +31,7 @@ <packaging>jar</packaging> <properties> - <os-core-lib-ibm.version>0.9.0</os-core-lib-ibm.version> + <os-core-lib-ibm.version>0.12.0-SNAPSHOT</os-core-lib-ibm.version> </properties> <profiles> diff --git a/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java b/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java index 2b4e88c68de8b4786c4d80b491ef418dfaf0acbc..a3d7805e6aff3ca5ae2ea90a2a26254458528aff 100644 --- a/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java +++ b/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java @@ -4,20 +4,33 @@ package org.opengroup.osdu.indexer.ibm.util; import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; +import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.inject.Inject; +import org.apache.http.HttpStatus; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.OperationType; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory; +import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; +import org.opengroup.osdu.indexer.service.StorageService; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; +import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; @@ -29,6 +42,15 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { @Inject IMessageFactory mq; + @Inject + private StorageService storageService; + + @Inject + private RequestInfoImpl requestInfo; + + @Inject + private IndexerConfigurationProperties configurationProperties; + private Gson gson; private final static String RETRY_STRING = "retry"; private final static String ERROR_CODE = "errorCode"; @@ -49,6 +71,10 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { createTask(payload, headers); } + public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { + publishAllRecordsToSubscriber(payload, headers); + } + //used by reindexer api @Override public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { @@ -94,4 +120,40 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { } } + private void publishAllRecordsToSubscriber(String payload, DpsHeaders headers) { + Gson gson = new Gson(); + RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); + final String recordKind = recordReindexRequest.getKind(); + RecordQueryResponse recordQueryResponse = null; + + try { + do { + headers.put(AUTHORIZATION, this.requestInfo.checkOrGetAuthorizationHeader()); + + if (recordQueryResponse != null) { + recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build(); + } + recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); + if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { + + List<RecordInfo> records = recordQueryResponse.getResults().stream() + .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + + RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); + String recordChangedMessagePayload = gson.toJson(recordChangedMessages); + createTask(recordChangedMessagePayload, headers); + } + } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()); + + } catch (AppException e) { + throw e; + } catch (Exception e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e); + } + } } \ No newline at end of file