Skip to content
Snippets Groups Projects
Commit 04ad5975 authored by Ashwani Pandey's avatar Ashwani Pandey Committed by Shrikant Garg
Browse files

added IBM implementation for reIndexing

parent 04505583
No related branches found
No related tags found
1 merge request!220added IBM implementation for reIndexing
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<os-core-lib-ibm.version>0.9.0</os-core-lib-ibm.version> <os-core-lib-ibm.version>0.12.0-SNAPSHOT</os-core-lib-ibm.version>
</properties> </properties>
<profiles> <profiles>
......
...@@ -4,20 +4,33 @@ ...@@ -4,20 +4,33 @@
package org.opengroup.osdu.indexer.ibm.util; package org.opengroup.osdu.indexer.ibm.util;
import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL;
import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import javax.inject.Inject; import javax.inject.Inject;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory; import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.common.base.Strings;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
...@@ -29,6 +42,15 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { ...@@ -29,6 +42,15 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
@Inject @Inject
IMessageFactory mq; IMessageFactory mq;
@Inject
private StorageService storageService;
@Inject
private RequestInfoImpl requestInfo;
@Inject
private IndexerConfigurationProperties configurationProperties;
private Gson gson; private Gson gson;
private final static String RETRY_STRING = "retry"; private final static String RETRY_STRING = "retry";
private final static String ERROR_CODE = "errorCode"; private final static String ERROR_CODE = "errorCode";
...@@ -49,6 +71,10 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { ...@@ -49,6 +71,10 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
createTask(payload, headers); createTask(payload, headers);
} }
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishAllRecordsToSubscriber(payload, headers);
}
//used by reindexer api //used by reindexer api
@Override @Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
...@@ -94,4 +120,40 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { ...@@ -94,4 +120,40 @@ public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
} }
} }
private void publishAllRecordsToSubscriber(String payload, DpsHeaders headers) {
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null;
try {
do {
headers.put(AUTHORIZATION, this.requestInfo.checkOrGetAuthorizationHeader());
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
}
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<RecordInfo> records = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
}
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment