Commit 6846ae4f authored by Nikhil Singh[MicroSoft]'s avatar Nikhil Singh[MicroSoft] Committed by Komal Makkar
Browse files

Service Bus Implementation

parent 8b26c48c
This diff is collapsed.
......@@ -86,4 +86,20 @@ spec:
- name: partition_service_endpoint
value: http://partition/api/partition/v1
- name: maxCacheSize
value: "20"
\ No newline at end of file
value: "20"
- name: max_concurrent_calls
value: "3"
- name: executor_n_threads
value: "32"
- name: max_lock_renew_duration_seconds
value: "2000"
- name: initial_subscription_manager_delay_seconds
value: "0"
- name: consecutive_subscription_manager_delay_seconds
value: "1800"
- name: service_bus_enabled
value: "false"
- name: event_grid_to_service_bus_enabled
value: "false"
- name: event_grid_enabled
value: "true"
\ No newline at end of file
......@@ -25,7 +25,7 @@
<java.version>8</java.version>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
<os-core-common.version>0.10.0</os-core-common.version>
<os-core-common.version>0.11.0-rc3</os-core-common.version>
</properties>
<licenses>
......
......@@ -39,7 +39,7 @@
<springframework.version>4.3.0.RELEASE</springframework.version>
<reactor.netty.version>0.10.0.RELEASE</reactor.netty.version>
<reactor.core.version>3.3.0.RELEASE</reactor.core.version>
<osdu.corelibazure.version>0.10.1</osdu.corelibazure.version>
<osdu.corelibazure.version>0.11.0-rc3</osdu.corelibazure.version>
<junit.version>5.6.0</junit.version>
<jjwt.version>3.8.1</jjwt.version>
<mockito.version>2.23.0</mockito.version>
......@@ -214,7 +214,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.2</version>
<configuration>
<useSystemClassLoader>false</useSystemClassLoader>
<threadCount>1</threadCount>
......
......@@ -14,18 +14,48 @@
package org.opengroup.osdu.notification.provider.azure;
import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.ISubscriptionManager;
import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeBeanFactoryPostProcessor;
import org.opengroup.osdu.notification.provider.azure.util.AzureServiceBusConfig;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@ComponentScan({"org.opengroup.osdu"})
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(new Class[]{Application.class}, args);
ApplicationContext context = SpringApplication.run(new Class[]{Application.class}, args);
// Subscribe To Notification Event for Service Bus Notification Processing
AzureServiceBusConfig azureServiceBusConfig = context.getBean(AzureServiceBusConfig.class);
if (Boolean.parseBoolean(azureServiceBusConfig.getServiceBusEnabled())) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
ISubscriptionManager subscriptionManager = context.getBean(ISubscriptionManager.class);
/*
Here the initialSubscriptionManagerDelay is used to have a delay before the first execution.
Every consecutive execution will take place after a delay of consecutiveSubscriptionManagerDelay.
If any of the execution exceeds the time consecutiveSubscriptionManagerDelay then next execution
will begin immediately after the current execution is completed.
*/
executorService.scheduleAtFixedRate(subscriptionManager, Integer.parseUnsignedInt(azureServiceBusConfig.getInitialSubscriptionManagerDelay()),
Integer.parseUnsignedInt(azureServiceBusConfig.getConsecutiveSubscriptionManagerDelay()), TimeUnit.SECONDS);
}
}
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
return new ThreadScopeBeanFactoryPostProcessor();
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.SubscriptionClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
public class MessageHandler implements IMessageHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
private final SubscriptionClient receiveClient;
private ProcessNotification processNotification;
public MessageHandler(SubscriptionClient client, ProcessNotification processNotification) {
this.receiveClient = client;
this.processNotification = processNotification;
}
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
try {
this.processNotification.performNotification(message, receiveClient.getSubscriptionName());
return this.receiveClient.completeAsync(message.getLockToken());
} catch (Exception e) {
LOGGER.error("Unable to process the Notification : " + e);
return this.receiveClient.abandonAsync(message.getLockToken());
}
}
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
LOGGER.error("{} - {}", exceptionPhase, throwable.getMessage());
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus;
import com.microsoft.azure.servicebus.IMessage;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeContextHolder;
import org.opengroup.osdu.notification.provider.azure.models.NotificationContent;
import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.RequestBodyAdapter;
import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadDpsHeaders;
import org.opengroup.osdu.notification.provider.azure.util.MDCContextMap;
import org.opengroup.osdu.notification.service.NotificationHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}")
public class ProcessNotification {
private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
private final static Logger LOGGER = LoggerFactory.getLogger(ProcessNotification.class);
@Autowired
private NotificationHandler notificationHandler;
@Autowired
private RequestBodyAdapter requestBodyAdapter;
@Autowired
private ThreadDpsHeaders dpsHeaders;
@Autowired
private MDCContextMap mdcContextMap;
public void performNotification(IMessage message, String subscriptionName) throws Exception {
try {
NotificationContent notificationContent = requestBodyAdapter.extractNotificationContent(message, subscriptionName);
String dataPartitionId = notificationContent.getExtractAttributes().get(DpsHeaders.DATA_PARTITION_ID);
String correlationId = notificationContent.getExtractAttributes().get(DpsHeaders.CORRELATION_ID);
MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId));
dpsHeaders.setThreadContext(dataPartitionId, correlationId);
LOGGER.info("Notification process started for message with id: {}", message.getMessageId());
HttpResponse response = notificationHandler.notifySubscriber(notificationContent.getNotificationId(),
notificationContent.getData(), notificationContent.getExtractAttributes());
if (!response.isSuccessCode()) {
throw new Exception(NOT_ACKNOWLEDGE);
}
} catch (Exception e) {
LOGGER.error(String.format("An error occurred performing Notification for message with ID: ", message.getMessageId()), e);
throw e;
} finally {
ThreadScopeContextHolder.getContext().clear();
MDC.clear();
}
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.azure.servicebus.ISubscriptionClientFactory;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}")
public class SubscriptionClientFactImpl {
private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionClientFactImpl.class);
@Autowired
private ISubscriptionClientFactory subscriptionClientFactory;
public SubscriptionClient getSubscriptionClient(String dataPartition, String sbTopic, String sbSubscription) throws ServiceBusException, InterruptedException {
try {
return subscriptionClientFactory.getClient(dataPartition, sbTopic, sbSubscription);
} catch (ServiceBusException | InterruptedException e) {
LOGGER.error("Unexpected error creating Subscription Client", e);
throw new AppException(500, "Server Error", "Unexpected error creating Subscription Client", e);
}
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.management.ManagementClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.serviceBusManager.IManagementClientFactory;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.ISubscriptionManager;
import org.opengroup.osdu.notification.provider.azure.messageBus.models.TopicSubscriptions;
import org.opengroup.osdu.notification.provider.azure.util.AzureServiceBusConfig;
import org.opengroup.osdu.notification.provider.azure.util.AzureCosmosProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Component
@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}")
public class SubscriptionManagerImpl implements ISubscriptionManager {
private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionManagerImpl.class);
@Autowired
private TopicSubscriptions topicSubscriptions;
@Autowired
private SubscriptionClientFactImpl subscriptionClientFactory;
@Autowired
private ProcessNotification processNotification;
@Autowired
private ITenantFactory tenantFactory;
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private CosmosStore cosmosStore;
@Autowired
private AzureCosmosProperties azureCosmosProperties;
@Autowired
private AzureServiceBusConfig azureServiceBusConfig;
@Autowired
private IManagementClientFactory factory;
@Override
public void subscribeNotificationsEvent() {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
ExecutorService executorService = Executors
.newFixedThreadPool(Integer.parseUnsignedInt(azureServiceBusConfig.getNThreads()));
for (String partition : tenantList) {
try {
List<Subscription> subscriptionsList = cosmosStore.findAllItems(partition, azureCosmosProperties.cosmosDBName(),
azureCosmosProperties.registerSubscriptionContainerName(), Subscription.class);
ManagementClient managementClient = factory.getManager(partition);
for (Subscription subscription : subscriptionsList) {
// To check if its a not new subscription.
if (!this.topicSubscriptions.checkIfNewTopicSubscription(partition, subscription.getTopic(), subscription.getNotificationId())) {
// Update existing subscriptions and skip registration
this.topicSubscriptions.updateCurrentTopicSubscriptions(partition, subscription.getTopic(), subscription.getNotificationId());
} else {
/* This check is added if a Cosmos subscription is created but the corresponding service bus
subscription is still not created or creation is in progress.We do not register message handler
with the subscription client as it will throw entity not found exception and unregistering is not supported.
Check if its a new Subscription Client */
if (managementClient.topicExists(subscription.getTopic()) && managementClient.subscriptionExists(subscription.getTopic(), subscription.getNotificationId())) {
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory
.getSubscriptionClient(partition, subscription.getTopic(), subscription.getNotificationId());
registerMessageHandler(subscriptionClient, executorService);
this.topicSubscriptions.updateCurrentTopicSubscriptions(partition, subscription.getTopic(), subscription.getNotificationId());
} catch (InterruptedException | ServiceBusException e) {
LOGGER.error("Error while creating or registering subscription client {}", e.getMessage(), e);
} catch (Exception e) {
LOGGER.error("Unknown exception occurred while creating or registering subscription client: ", e);
}
}
}
}
} catch (AppException e) {
LOGGER.error("Error creating Cosmos Client {}", e.getMessage(), e);
} catch (Exception e) {
LOGGER.error("An exception occurred while subscribing to Notification Event : ", e);
}
}
this.topicSubscriptions.clearTopicSubscriptions();
}
private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) throws ServiceBusException, InterruptedException {
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification);
subscriptionClient.registerMessageHandler(
messageHandler,
new MessageHandlerOptions(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxConcurrentCalls()),
false,
Duration.ofSeconds(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxLockRenewDurationInSeconds())),
Duration.ofSeconds(1)
),
executorService);
}
@Override
public void run() {
subscribeNotificationsEvent();
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus.extractor;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.microsoft.azure.servicebus.IMessage;
import lombok.SneakyThrows;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.models.NotificationEventGridServiceBusRequest;
import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData;
import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.IPullRequestBodyExtractor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
@Component
@ConditionalOnProperty(value = "azure.eventGridToServiceBus.enabled", havingValue = "true", matchIfMissing = false)
public class EventGridServiceBusRequestBodyExtractor implements IPullRequestBodyExtractor {
private static final Gson GSON = new Gson();
private NotificationEventGridServiceBusRequest notificationRequest;
private NotificationRecordsChangedData notificationRecordsChangedData;
private IMessage message;
public void InitializeExtractor(IMessage message) {
this.message = message;
this.notificationRequest = extractNotificationRequestFromMessageBody();
}
public Map<String, String> extractAttributesFromRequestBody() {
Map<String, String> attributes = new HashMap<>();
attributes.put("correlation-id", this.notificationRecordsChangedData.getCorrelationId());
attributes.put("data-partition-id", this.notificationRecordsChangedData.getDataPartitionId());
attributes.put("account-id", this.notificationRecordsChangedData.getAccountId());
return attributes;
}
public String extractDataFromRequestBody() {
return notificationRecordsChangedData.getData().toString();
}
@SneakyThrows
private NotificationEventGridServiceBusRequest extractNotificationRequestFromMessageBody() {
NotificationEventGridServiceBusRequest notificationRequest = null;
try {
String requestBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
NotificationEventGridServiceBusRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationEventGridServiceBusRequest[].class);
notificationRequest = notificationRequestArray[0];
extractNotificationData(notificationRequest);
} catch (Exception e) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error",
"Unable to parse request payload.", "Request contents are null or empty", e);
}
return notificationRequest;
}
private void extractNotificationData(NotificationEventGridServiceBusRequest notificationRequest) {
String notifData = notificationRequest.getData().toString();
NotificationRecordsChangedData notificationRecordsChangedData = GSON.fromJson(notifData, NotificationRecordsChangedData.class);
verifyNotificationData(notificationRecordsChangedData);
this.notificationRecordsChangedData = notificationRecordsChangedData;
}
private void verifyNotificationData(NotificationRecordsChangedData notificationRecordsChangedData) {
Preconditions.checkNotNull(notificationRecordsChangedData, "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getData(), "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getCorrelationId(), "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getDataPartitionId(), "Request payload parsing error");
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.messageBus.extractor;
import com.microsoft.azure.servicebus.IMessage;
import org.opengroup.osdu.notification.provider.azure.models.NotificationContent;
import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.IPullRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}")
public class RequestBodyAdapter {
@Autowired
IPullRequestBodyExtractor extractor;