Commit 15063244 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Added index/reindex task type handling

parent aa318824
Pipeline #98876 failed with stages
in 1 minute and 56 seconds
......@@ -22,13 +22,17 @@ import com.google.gson.Gson;
import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
......@@ -43,6 +47,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.config.IndexerConfigProperties;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.opengroup.osdu.indexer.service.ReindexService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
......@@ -66,6 +71,7 @@ public class OqmSubscriberManager {
private final IndexerConfigProperties properties;
private final ITenantFactory tenantInfoFactory;
private final IndexerService indexerService;
private final ReindexService reindexService;
private String subscriptionName;
......@@ -155,8 +161,7 @@ public class OqmSubscriberManager {
}
try {
RecordChangedMessages recordChangedMessages = getRecordChangedMessages(oqmMessage);
indexerService.processRecordChangedMessages(recordChangedMessages, getRecordInfos(recordChangedMessages));
resolveTaskType(oqmMessage);
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
} catch (Exception e) {
......@@ -165,6 +170,27 @@ public class OqmSubscriberManager {
}
}
private void resolveTaskType(OqmMessage oqmMessage) throws Exception {
String workerUrl = Optional.of(oqmMessage.getAttributes().get("relative-indexer-worker-url"))
.orElseThrow(() -> new AppException(
HttpStatus.SC_INTERNAL_SERVER_ERROR, "Relative indexer worker url is not set",
"Fill relative indexer worker url in IndexerQueue service"));
if (workerUrl.contains("/index-worker")) {
RecordChangedMessages recordChangedMessages = getRecordChangedMessages(oqmMessage);
log.info("Index task from OQM: {}", recordChangedMessages.getData());
indexerService.processRecordChangedMessages(recordChangedMessages, getRecordInfos(recordChangedMessages));
} else if (workerUrl.contains("/reindex-worker")) {
RecordReindexRequest request = getRecordReindexRequest(oqmMessage);
log.info("Reindex task from OQM: {}", request.getKind());
reindexService.reindexRecords(request,false);
} else {
throw new AppException(
HttpStatus.SC_INTERNAL_SERVER_ERROR, "Relative indexer worker url is undefined",
"Fill correct relative indexer worker url in IndexerQueue service");
}
}
private RecordChangedMessages getRecordChangedMessages(OqmMessage oqmMessage) {
return RecordChangedMessages.builder()
.messageId(oqmMessage.getId())
......@@ -174,6 +200,14 @@ public class OqmSubscriberManager {
.build();
}
// TODO:
private RecordReindexRequest getRecordReindexRequest(OqmMessage oqmMessage) {
return RecordReindexRequest.builder()
.kind(oqmMessage.getData())
.cursor(oqmMessage.getData())
.build();
}
@Nullable
private List<RecordInfo> getRecordInfos(RecordChangedMessages recordChangedMessages) {
Type listType = new TypeToken<List<RecordInfo>>() {}.getType();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment