Skip to content
Snippets Groups Projects
Commit d767aa60 authored by Bhushan Rade's avatar Bhushan Rade
Browse files

failed record sending back to record queue

parent 7bbf9115
No related branches found
No related tags found
No related merge requests found
package org.opengroup.osdu.indexer.ibm.util;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
@Primary
@Component
public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder {
private static final Logger logger = LoggerFactory.getLogger(IndexerQueueTaskBuilderIbm.class);
@Inject
IMessageFactory mq;
private Gson gson;
private String retryString = "retry";
@Inject
public void init() {
gson =new Gson();
}
@Override
public void createWorkerTask(String payload, DpsHeaders headers) {
createTask(payload, headers);
}
@Override
public void createReIndexTask(String payload, DpsHeaders headers) {
createTask(payload, headers);
}
private void createTask(String payload, DpsHeaders headers) {
//RecordChangedMessages recordChangedMessages = this.gson.fromJson(payload, RecordChangedMessages.class);
/*Map<String, String> message = new HashMap<>();
message.put("data", recordChangedMessages.getData());
message.put(DpsHeaders.DATA_PARTITION_ID, recordChangedMessages.getAttributes().get(DpsHeaders.DATA_PARTITION_ID));
//message.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
//message.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
message.put(DpsHeaders.CORRELATION_ID, recordChangedMessages.getAttributes().get(DpsHeaders.CORRELATION_ID));*/
//mq.sendMessage(IMessageFactory.INDEXER_QUEUE_NAME, gson.toJson(message));
try {
System.out.println("Payload recived :"+payload);
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
Map<String, String> attributes = receivedPayload.getAttributes();
int retryCount=0;
if (attributes.containsKey(retryString)) {
retryCount = Integer.parseInt(attributes.get(retryString));
retryCount++;
} else {
retryCount = 1;
}
attributes.put(retryString, String.valueOf(retryCount));
receivedPayload.setAttributes(attributes);
mq.sendMessage(IMessageFactory.DEFAULT_QUEUE_NAME, gson.toJson(receivedPayload));
logger.info("Mesge send to queue : "+receivedPayload);
} catch (JsonSyntaxException e) {
logger.error("JsonSyntaxException "+e.toString());
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
/*if (retryCount <= 9) {
mq.sendMessage(IMessageFactory.INDEXER_QUEUE_NAME, gson.toJson(message));
} else {
// TODO: add to DLQ
}*/
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment