diff --git a/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java b/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java new file mode 100644 index 0000000000000000000000000000000000000000..86f94177be9c691e4d94a4577aa444acb616249a --- /dev/null +++ b/provider/indexer-ibm/src/main/java/org/opengroup/osdu/indexer/ibm/util/IndexerQueueTaskBuilderIbm.java @@ -0,0 +1,86 @@ +package org.opengroup.osdu.indexer.ibm.util; + +import java.util.HashMap; +import java.util.Map; +import javax.inject.Inject; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.ibm.messagebus.IMessageFactory; +import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +@Primary +@Component +public class IndexerQueueTaskBuilderIbm extends IndexerQueueTaskBuilder { + private static final Logger logger = LoggerFactory.getLogger(IndexerQueueTaskBuilderIbm.class); + @Inject + IMessageFactory mq; + private Gson gson; + private String retryString = "retry"; + + @Inject + public void init() { + gson =new Gson(); + } + + @Override + public void createWorkerTask(String payload, DpsHeaders headers) { + createTask(payload, headers); + } + @Override + public void createReIndexTask(String payload, DpsHeaders headers) { + createTask(payload, headers); + } + private void createTask(String payload, DpsHeaders headers) { + + //RecordChangedMessages recordChangedMessages = this.gson.fromJson(payload, RecordChangedMessages.class); + /*Map<String, String> message = new HashMap<>(); + message.put("data", recordChangedMessages.getData()); + message.put(DpsHeaders.DATA_PARTITION_ID, recordChangedMessages.getAttributes().get(DpsHeaders.DATA_PARTITION_ID)); + //message.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + headers.addCorrelationIdIfMissing(); + //message.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + message.put(DpsHeaders.CORRELATION_ID, recordChangedMessages.getAttributes().get(DpsHeaders.CORRELATION_ID));*/ + + //mq.sendMessage(IMessageFactory.INDEXER_QUEUE_NAME, gson.toJson(message)); + + + try { + System.out.println("Payload recived :"+payload); + RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); + + Map<String, String> attributes = receivedPayload.getAttributes(); + int retryCount=0; + if (attributes.containsKey(retryString)) { + retryCount = Integer.parseInt(attributes.get(retryString)); + retryCount++; + } else { + retryCount = 1; + } + attributes.put(retryString, String.valueOf(retryCount)); + receivedPayload.setAttributes(attributes); + mq.sendMessage(IMessageFactory.DEFAULT_QUEUE_NAME, gson.toJson(receivedPayload)); + logger.info("Mesge send to queue : "+receivedPayload); + } catch (JsonSyntaxException e) { + logger.error("JsonSyntaxException "+e.toString()); + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (NumberFormatException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + + + /*if (retryCount <= 9) { + mq.sendMessage(IMessageFactory.INDEXER_QUEUE_NAME, gson.toJson(message)); + } else { + // TODO: add to DLQ + }*/ + } +} \ No newline at end of file