diff --git a/src/main/java/org/opengroup/osdu/azure/publisherFacade/PubsubConfiguration.java b/src/main/java/org/opengroup/osdu/azure/publisherFacade/PubsubConfiguration.java index e6a6f46764bfc5dd9641dc2ce71dab8d499e1a2b..33ace29f83bb67392e5175d55c9eb85da1819314 100644 --- a/src/main/java/org/opengroup/osdu/azure/publisherFacade/PubsubConfiguration.java +++ b/src/main/java/org/opengroup/osdu/azure/publisherFacade/PubsubConfiguration.java @@ -32,4 +32,6 @@ public class PubsubConfiguration { @Value("${azure.eventGrid.enabled:false}") private String isEventGridEnabled; + @Value("${azure.serviceBus.retrylimit:1}") + private String retryLimit; } diff --git a/src/main/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisher.java b/src/main/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisher.java index 8976c7880e0c676935b5cf088addfe5cff7afd0f..55dff4c4ecbb342fd81aa2dd413612a30a07fa0a 100644 --- a/src/main/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisher.java +++ b/src/main/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisher.java @@ -20,6 +20,7 @@ import org.opengroup.osdu.azure.publisherFacade.models.MessageProperties; import org.opengroup.osdu.azure.publisherFacade.models.PubSubAttributesBuilder; import org.opengroup.osdu.azure.publisherFacade.models.ServiceBusMessageBody; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class ServiceBusPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusPublisher.class); @Autowired private ITopicClientFactory topicClientFactory; + @Autowired + private PubsubConfiguration pubsubConfiguration; /** * @param publisherInfo Contains Service bus batch and publishing details @@ -47,6 +50,7 @@ public class ServiceBusPublisher { public void publishToServiceBus(final DpsHeaders headers, final PublisherInfo publisherInfo) { Gson gson = new Gson(); Message message = new Message(); + Integer retryCount = Integer.parseInt(pubsubConfiguration.getRetryLimit()); // properties headers.addCorrelationIdIfMissing(); PubSubAttributesBuilder pubSubBuilder = PubSubAttributesBuilder.builder().dpsHeaders(headers).build(); @@ -67,13 +71,21 @@ public class ServiceBusPublisher { .build(); message.setBody(gson.toJson(serviceBusMessageBody).toString().getBytes(StandardCharsets.UTF_8)); message.setContentType("application/json"); - - try { - LOGGER.debug(String.format("Storage publishes message to Service Bus %s", headers.getCorrelationId())); - topicClientFactory.getClient(headers.getPartitionId(), publisherInfo.getServiceBusTopicName()).send(message); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); + while (retryCount >= 0) { + try { + topicClientFactory.getClient(headers.getPartitionId(), publisherInfo.getServiceBusTopicName()).send(message); + LOGGER.debug("Storage published message to Service Bus {} with message id {}", headers.getCorrelationId(), message.getMessageId()); + break; + } catch (Exception e) { + LOGGER.error("Failed to publish message with message id {} due to exception : {}. Retry count {}. {}.", message.getMessageId(), e.getMessage(), retryCount, e); + retryCount--; + if (retryCount < 0) { + LOGGER.error("Retry limit Exceeded.Unable to publish message with message id {}", message.getMessageId()); + throw new AppException(501,"Internal Server Error" , "Failed to publish message in service bus", e); + } + } } + } } diff --git a/src/main/java/org/opengroup/osdu/azure/servicebus/TopicClientFactoryImpl.java b/src/main/java/org/opengroup/osdu/azure/servicebus/TopicClientFactoryImpl.java index 6aa39c55fdc4ed046329cfc04262d3fc2724e496..b4480de83da59dc0db84d5641a0d5b95574020d1 100644 --- a/src/main/java/org/opengroup/osdu/azure/servicebus/TopicClientFactoryImpl.java +++ b/src/main/java/org/opengroup/osdu/azure/servicebus/TopicClientFactoryImpl.java @@ -7,7 +7,6 @@ import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import org.opengroup.osdu.azure.dependencies.DefaultAzureServiceBusCredential; import org.opengroup.osdu.azure.di.MSIConfiguration; -import org.opengroup.osdu.azure.logging.CoreLoggerFactory; import org.opengroup.osdu.azure.partition.PartitionInfoAzure; import org.opengroup.osdu.azure.partition.PartitionServiceClient; import org.opengroup.osdu.common.Validators; @@ -63,14 +62,11 @@ public class TopicClientFactoryImpl implements ITopicClientFactory { if (this.topicClientMap.containsKey(cacheKey)) { return this.topicClientMap.get(cacheKey); } - return this.topicClientMap.computeIfAbsent(cacheKey, topicClient -> { - try { - return createTopicClient(dataPartitionId, topicName); - } catch (Exception e) { - CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn(e.getMessage(), e); - return null; - } - }); + TopicClient topicClient = createTopicClient(dataPartitionId, topicName); + if (topicClient != null) { + topicClientMap.put(cacheKey, topicClient); + } + return topicClient; } /** diff --git a/src/test/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisherTest.java b/src/test/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisherTest.java index 7beb72771168c57a985703e98769f87589a536be..58c8af29b40259ddd4cc57eb5e604ad3be3ff806 100644 --- a/src/test/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisherTest.java +++ b/src/test/java/org/opengroup/osdu/azure/publisherFacade/ServiceBusPublisherTest.java @@ -52,6 +52,8 @@ public class ServiceBusPublisherTest { private TopicClientFactoryImpl topicClientFactory; @Mock private PublisherInfo publisherInfo; + @Mock + private PubsubConfiguration pubsubConfiguration; @InjectMocks private ServiceBusPublisher sut; @@ -73,6 +75,7 @@ public class ServiceBusPublisherTest { try { doReturn(topicClient).when(topicClientFactory).getClient(PARTITION_ID, SERVICE_BUS_TOPIC_NAME); doNothing().when(topicClient).send(message); + doReturn("3").when(pubsubConfiguration).getRetryLimit(); sut.publishToServiceBus(dpsHeaders, publisherInfo); verify(topicClientFactory, times(1)).getClient(PARTITION_ID, SERVICE_BUS_TOPIC_NAME); } catch (Exception e) {