Commit aa318824 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Updated ack logic and added RecordChangedMessages conversion

parent 13267703
......@@ -17,14 +17,19 @@
package org.opengroup.osdu.indexer.oqm.subscribe;
import java.io.IOException;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.util.List;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.CloudTaskRequest;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
......@@ -37,10 +42,8 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.config.IndexerConfigProperties;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
/**
......@@ -151,15 +154,32 @@ public class OqmSubscriberManager {
oqmAckReplier.ack();
}
indexerService.processRecordChangedMessages();
if (response.isError()) {
log.error(NOT_ACKNOWLEDGE + response.getReasonPhrase());
} else {
try {
RecordChangedMessages recordChangedMessages = getRecordChangedMessages(oqmMessage);
indexerService.processRecordChangedMessages(recordChangedMessages, getRecordInfos(recordChangedMessages));
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
} catch (Exception e) {
log.error(NOT_ACKNOWLEDGE + e);
oqmAckReplier.nack();
}
}
private RecordChangedMessages getRecordChangedMessages(OqmMessage oqmMessage) {
return RecordChangedMessages.builder()
.messageId(oqmMessage.getId())
.data(oqmMessage.getData())
.attributes(oqmMessage.getAttributes())
.publishTime(LocalDateTime.now().toString())
.build();
}
@Nullable
private List<RecordInfo> getRecordInfos(RecordChangedMessages recordChangedMessages) {
Type listType = new TypeToken<List<RecordInfo>>() {}.getType();
return new Gson().fromJson(recordChangedMessages.getData(), listType);
}
@NotNull
private DpsHeaders getHeaders(OqmMessage oqmMessage) {
DpsHeaders headers = new DpsHeaders();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment