Skip to content
Snippets Groups Projects
Commit 54f8d961 authored by Shrikant Garg's avatar Shrikant Garg
Browse files

Merge branch 'IBM-reindex-fixing' into 'master'

added IBM implementation for reIndexing

See merge request !220
parents 04505583 04ad5975
No related branches found
No related tags found
1 merge request!220added IBM implementation for reIndexing
Pipeline #72479 failed
......@@ -31,7 +31,7 @@
<packaging>jar</packaging>
<properties>
<os-core-lib-ibm.version>0.9.0</os-core-lib-ibm.version>
<os-core-lib-ibm.version>0.12.0-SNAPSHOT</os-core-lib-ibm.version>
</properties>
<profiles>
......
......@@ -4,20 +4,33 @@
package org.opengroup.osdu.indexer.ibm.util;
import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL;
import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
......@@ -29,6 +42,15 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
@Inject
IMessageFactory mq;
@Inject
private StorageService storageService;
@Inject
private RequestInfoImpl requestInfo;
@Inject
private IndexerConfigurationProperties configurationProperties;
private Gson gson;
private final static String RETRY_STRING = "retry";
private final static String ERROR_CODE = "errorCode";
......@@ -49,6 +71,10 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
createTask(payload, headers);
}
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishAllRecordsToSubscriber(payload, headers);
}
//used by reindexer api
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
......@@ -94,4 +120,40 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
}
}
private void publishAllRecordsToSubscriber(String payload, DpsHeaders headers) {
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null;
try {
do {
headers.put(AUTHORIZATION, this.requestInfo.checkOrGetAuthorizationHeader());
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
}
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<RecordInfo> records = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment