Commit 0a53efee authored by Nikhil Singh[MicroSoft]'s avatar Nikhil Singh[MicroSoft]
Browse files

SB

parent d05fa8f8
......@@ -48,7 +48,10 @@ public class PubsubEndpoint {
@PostMapping("/records-changed")
@PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')")
public ResponseEntity recordChanged() throws Exception {
NotificationContent notificationContent = this.pubsubRequestBodyExtractor.extractNotificationContent();
String id = pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String data = pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> map = pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
NotificationContent notificationContent = new NotificationContent(id, data, map, false);
try {
HttpResponse response = notificationHandler.notifySubscriber(notificationContent);
if (!response.isSuccessCode()) {
......
package org.opengroup.osdu.notification.models;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NotificationContent {
String NotificationId;
String data;
Map<String, String> extractAttributes;
boolean isHandShakeRequest;
}
package org.opengroup.osdu.notification.models;
public class NotificationPulledContent {
}
......@@ -4,6 +4,13 @@ import org.opengroup.osdu.notification.models.NotificationContent;
import java.util.Map;
public interface IPubsubRequestBodyExtractor extends RequestBodyExtractor {
public interface IPubsubRequestBodyExtractor {
Map<String, String> extractAttributesFromRequestBody();
String extractDataFromRequestBody();
String extractNotificationIdFromRequestBody();
boolean isHandshakeRequest();
}
package org.opengroup.osdu.notification.provider.interfaces;
import org.opengroup.osdu.notification.models.NotificationPulledContent;
public interface IPullRequestBodyExtractor extends RequestBodyExtractor {
void initializeExtractor(NotificationPulledContent notificationPulledContent);
}
package org.opengroup.osdu.notification.provider.interfaces;
import org.opengroup.osdu.notification.models.NotificationContent;
import java.util.Map;
public interface RequestBodyExtractor {
Map<String, String> extractAttributesFromRequestBody();
String extractDataFromRequestBody();
String extractNotificationIdFromRequestBody();
boolean isHandshakeRequest();
NotificationContent extractNotificationContent();
}
......@@ -25,6 +25,8 @@ import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.models.NotificationContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -33,8 +35,9 @@ import java.util.Map;
@Component
public class NotificationHandler {
@Autowired
private JaxRsDpsLog log;
//@Autowired
//private JaxRsDpsLog log;
private final static Logger log = LoggerFactory.getLogger(NotificationHandler.class);
@Autowired
private HttpClient httpClient;
@Autowired
......
......@@ -24,23 +24,28 @@ import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.model.notification.Topic;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
public class SubscriptionHandler {
private final static Logger log = LoggerFactory.getLogger(SubscriptionHandler.class);
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private JaxRsDpsLog log;
// @Autowired
//private JaxRsDpsLog log;
@Autowired
private DpsHeaders headers;
......@@ -48,7 +53,7 @@ public class SubscriptionHandler {
private ObjectMapper objectMapper;
public Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException {
String subscriptionString = subscriptionCacheFactory.get(notificationId);
String subscriptionString = "";subscriptionCacheFactory.get(notificationId);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId);
......@@ -56,10 +61,10 @@ public class SubscriptionHandler {
Subscription subscription = objectMapper.readValue(subscriptionString, Subscription.class);
return subscription;
} catch (IOException e) {
this.log.warning("Error Parsing subscription String to object.");
this.log.warn("Error Parsing subscription String to object.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
this.log.warning("Error query subscription from registration.");
this.log.warn("Error query subscription from registration.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", se);
}
}
......@@ -69,16 +74,22 @@ public class SubscriptionHandler {
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
this.log.warning(String.format("Subscription with notification ID %s not found in registration", notificationId));
this.log.warn(String.format("Subscription with notification ID %s not found in registration", notificationId));
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
// this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
return jsonSubscription;
}
public List<Topic> getTopics() throws Exception {
ISubscriptionService service = subscriptionFactory.create(headers);
return service.getTopics();
}
//unit test purpose
protected ObjectMapper getObjectMapper() {
if (this.objectMapper == null) {
......@@ -86,6 +97,7 @@ public class SubscriptionHandler {
}
return this.objectMapper;
}
//unit test purpose
void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
......
......@@ -88,11 +88,6 @@ public class AwsPubsubRequestBodyExtractor implements IPubsubRequestBodyExtracto
return false;
}
@Override
public NotificationContent extractNotificationContent() {
return null;
}
private MessageContent extractPubsubMessageFromRequestBody() {
if (this.root == null) {
this.root = this.extractRootJsonElementFromRequestBody();
......
......@@ -14,6 +14,7 @@
package org.opengroup.osdu.notification.provider.azure;
import org.opengroup.osdu.notification.provider.azure.servicebus.SubscriptionManagerImpl;
import org.opengroup.osdu.notification.provider.azure.servicebus.interfaces.SubscriptionManager;
import org.opengroup.osdu.notification.provider.azure.servicebus.thread.ThreadScopeBeanFactoryPostProcessor;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
......@@ -25,6 +26,10 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@ComponentScan({"org.opengroup.osdu"})
@EnableAsync
......@@ -33,11 +38,14 @@ public class Application {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(new Class[]{Application.class}, args);
// TODO: Double check with KOMAL/HARSHIT if this is having only one thread running at a time,RISK HERE FOR MEMORY IF NOT TERMINATED
ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
SubscriptionManager subscriptionManager = context.getBean(SubscriptionManager.class);
subscriptionManager.subscribeRecordsChangeEvent();
executorService.scheduleAtFixedRate(subscriptionManager, 0, 30, TimeUnit.SECONDS);
}
@Bean("restTemplate")
@Bean("restTemplate")
public RestTemplate restTemplate() {
return new RestTemplate();
}
......
......@@ -6,6 +6,9 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NotificationServiceBusRequest {
private JsonObject message;
}
package org.opengroup.osdu.notification.provider.azure.models;
import com.microsoft.azure.servicebus.IMessage;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opengroup.osdu.notification.models.NotificationPulledContent;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServiceBusPulledContent extends NotificationPulledContent {
private IMessage message;
}
......@@ -60,8 +60,6 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
private NotificationRecordsChangedData notificationRecordsChangedData;
private HandshakeRequestData handshakeRequestData;
private boolean isHandshakeRequest;
private NotificationContent notificationContent;
@Autowired
public EventGridRequestBodyExtractor(HttpServletRequest httpServletRequest, JaxRsDpsLog log) {
this.httpServletRequest = httpServletRequest;
......@@ -69,17 +67,6 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
this.notificationRequest = extractNotificationRequestFromHttpRequest();
}
/**
* Extracts Notification Data form the request that is filled in by publisher of the message.
*/
public NotificationContent extractNotificationContent() {
if (this.notificationContent == null) {
this.notificationContent = new NotificationContent(extractNotificationIdFromRequestBody(), extractDataFromRequestBody()
, extractAttributesFromRequestBody(), isHandshakeRequest());
}
return this.notificationContent;
}
/**
* Extracts the attributes from the request that are filled in by publisher of the message.
*
......
......@@ -25,7 +25,7 @@ public class MessageHandler implements IMessageHandler {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
this.processNotification.performNotification(message);
this.processNotification.performNotification(message,receiveClient.getSubscriptionName());
return this.receiveClient.completeAsync(message.getLockToken());
}
......
package org.opengroup.osdu.notification.provider.azure.servicebus;
import com.microsoft.azure.servicebus.IMessage;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.notification.models.NotificationContent;
import org.opengroup.osdu.notification.provider.azure.models.ServiceBusPulledContent;
import org.opengroup.osdu.notification.provider.interfaces.IPullRequestBodyExtractor;
import org.opengroup.osdu.notification.provider.azure.servicebus.extractor.RequestBodyAdapter;
import org.opengroup.osdu.notification.provider.azure.servicebus.thread.ThreadDpsHeaders;
import org.opengroup.osdu.notification.service.NotificationHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ProcessNotification {
private final static Logger LOGGER = LoggerFactory.getLogger(ProcessNotification.class);
@Autowired
private NotificationHandler notificationHandler;
@Autowired
private IPullRequestBodyExtractor extractor;
private RequestBodyAdapter requestBodyAdapter;
@Autowired
private ThreadDpsHeaders dpsHeaders;
public void performNotification(IMessage message) {
public void performNotification(IMessage message, String subscriptionName) {
try {
ServiceBusPulledContent serviceBusContent = new ServiceBusPulledContent(message);
extractor.initializeExtractor(serviceBusContent);
NotificationContent notificationContent = extractor.extractNotificationContent();
NotificationContent notificationContent = requestBodyAdapter.extractNotificationContent(message, subscriptionName);
dpsHeaders.setThreadContext(notificationContent.getExtractAttributes().get(DpsHeaders.DATA_PARTITION_ID),
notificationContent.getExtractAttributes().get(DpsHeaders.CORRELATION_ID));
notificationHandler.notifySubscriber(notificationContent);
} catch (Exception e) {
LOGGER.error("Unable to process the Notification : " + e);
}
}
}
......@@ -12,17 +12,10 @@ import org.springframework.stereotype.Component;
@Component
public class SubscriptionClientFactImpl {
private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionClientFactImpl.class);
@Autowired
private ISubscriptionClientFactory subscriptionClientFactory;
// @Autowired
//AzureBootstrapConfig azureBootstrapConfig;
@Autowired private ISubscriptionClientFactory subscriptionClientFactory;
public SubscriptionClient getSubscriptionClient(String dataPartition) {
// String sbTopic = "recordstopiceg";//azureBootstrapConfig.getServiceBusTopic();
//String sbSubscription = "eg_sb_wkssubscription";//azureBootstrapConfig.getServiceBusTopicSubscription();
String sbTopic = "recordstopic";//azureBootstrapConfig.getServiceBusTopic();
String sbSubscription = "wkssubscription";
public SubscriptionClient getSubscriptionClient(String dataPartition, String sbTopic, String sbSubscription) {
try {
return subscriptionClientFactory.getClient(dataPartition, sbTopic, sbSubscription);
} catch (ServiceBusException | InterruptedException e) {
......
......@@ -2,19 +2,35 @@ package org.opengroup.osdu.notification.provider.azure.servicebus;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.management.ManagementClient;
import com.microsoft.azure.servicebus.management.SubscriptionDescription;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.model.notification.Topic;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.notification.provider.azure.servicebus.interfaces.SubscriptionManager;
import org.opengroup.osdu.notification.service.NotificationHandler;
import org.opengroup.osdu.notification.provider.azure.servicebus.thread.ThreadDpsHeaders;
import org.opengroup.osdu.notification.provider.azure.util.AzureCosmosProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
......@@ -22,43 +38,79 @@ import java.util.stream.Collectors;
@Component
public class SubscriptionManagerImpl implements SubscriptionManager {
private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionManagerImpl.class);
@Autowired
private SubscriptionClientFactImpl subscriptionClientFactory;
@Autowired
private ProcessNotification processNotification;
@Autowired
private ITenantFactory tenantFactory;
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private ThreadDpsHeaders dpsHeaders;
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private CosmosStore cosmosStore;
@Autowired
private AzureCosmosProperties azureCosmosProperties;
@Override public void subscribeRecordsChangeEvent() {
private Map<String, Map<String, List<String>>> topicSubscriptions = new HashMap<>();
@Override
public void subscribeRecordsChangeEvent() {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
// TODO: HAVE A C CONFIG FILE
ExecutorService executorService = Executors
.newFixedThreadPool(Integer.parseUnsignedInt("15"));
for (String partition : tenantList) {
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
registerMessageHandler(subscriptionClient, executorService);
}
catch (Exception e) {
// TODO :TOPIC NAME ALIAS MANAGEMENT MIGHT BE REQUIRED IN PARTION SERVICE
List<Subscription> subscriptionsList = cosmosStore.findAllItems(partition, azureCosmosProperties.cosmosDBName(),
azureCosmosProperties.registerSubscriptionContainerName(), Subscription.class);
for (Subscription subscription : subscriptionsList) {
// Check if its a new Subscription Client
if (checkIfNewSubscription(partition, subscription.getTopic(), subscription.getNotificationId())) {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory
.getSubscriptionClient(partition, subscription.getTopic(), subscription.getNotificationId());
if (subscription.getNotificationId().equals("de-55a15b2a-cc26-46d8-b359-5e2d281f404a")) {
registerMessageHandler(subscriptionClient, executorService);
}
updateExistingSubscriptions(partition, subscription.getTopic(), subscription.getNotificationId());
}
}
} catch (Exception e) {
LOGGER.error("Error while creating or registering subscription client", e);
}
}
}
private boolean checkIfNewSubscription(String partition, String sbTopicName, String subscriptionName) {
if (topicSubscriptions.get(partition) == null ||
topicSubscriptions.get(partition).get(sbTopicName) == null ||
!topicSubscriptions.get(partition).get(sbTopicName).contains(subscriptionName))
return true;
return false;
}
private void updateExistingSubscriptions(String partition, String sbTopicName, String subscriptionName) {
// update the active subscriptions
topicSubscriptions.putIfAbsent(partition, new HashMap<String, List<String>>());
topicSubscriptions.get(partition).putIfAbsent(sbTopicName, new ArrayList<>());
topicSubscriptions.get(partition).get(sbTopicName).add(subscriptionName);
}
private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) {
try {
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification);
subscriptionClient.registerMessageHandler(
messageHandler,
new MessageHandlerOptions(Integer.parseUnsignedInt("5"),
new MessageHandlerOptions(Integer.parseUnsignedInt("15"),
false,
Duration.ofSeconds(Integer.parseUnsignedInt("5")),
Duration.ofSeconds(Integer.parseUnsignedInt("15")),
Duration.ofSeconds(1)
),
executorService);
......@@ -67,4 +119,9 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
LOGGER.error("Error registering message handler {}", e.getMessage(), e);
}
}
@Override
public void run() {
subscribeRecordsChangeEvent();
}
}
......@@ -4,26 +4,18 @@ import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.microsoft.azure.servicebus.IMessage;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.models.NotificationContent;
import org.opengroup.osdu.notification.models.NotificationPulledContent;
import org.opengroup.osdu.notification.provider.azure.models.NotificationEventGridServiceBusRequest;
import org.opengroup.osdu.notification.provider.azure.models.NotificationServiceBusRequest;
import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData;
import org.opengroup.osdu.notification.provider.azure.models.ServiceBusPulledContent;
import org.opengroup.osdu.notification.provider.interfaces.IPullRequestBodyExtractor;
import org.opengroup.osdu.notification.provider.azure.servicebus.interfaces.IPullRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
@Component
@Primary
//@Component
//@Primary
public class EventGridServiceBusRequestBodyExtractor implements IPullRequestBodyExtractor {
@Autowired
private JaxRsDpsLog logger;
......@@ -32,38 +24,12 @@ public class EventGridServiceBusRequestBodyExtractor implements IPullRequestBody
private NotificationEventGridServiceBusRequest notificationRequest;
private NotificationRecordsChangedData notificationRecordsChangedData;
private IMessage message;
private NotificationContent notificationContent;
public void initializeExtractor(NotificationPulledContent notificationPulledContent) {
ServiceBusPulledContent serviceBusContent1 = ServiceBusPulledContent.class.cast(notificationPulledContent);