Skip to content
Snippets Groups Projects
Commit 575d700a authored by Nikhil Singh[MicroSoft]'s avatar Nikhil Singh[MicroSoft]
Browse files

Commit 1 Contents:

1-Service Bus Subscriptions Create/delete implementation
parent a2ba56fd
No related branches found
No related tags found
2 merge requests!102Locking down maven central,!97Service Bus Create and Delete Support for Subscriptions
Pipeline #51437 failed
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.register.provider.azure.subscriber;
import com.microsoft.azure.servicebus.management.ManagementClient;
public interface IManagementClientFactory {
ManagementClient getManager(String var1);
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.register.provider.azure.subscriber;
import com.microsoft.azure.servicebus.management.ManagementClient;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Component
public class ManagementClientCache extends VmCache<String, ManagementClient> {
public ManagementClientCache() {
super(3600, 1000);
}
public boolean containsKey(String key) {
return this.get(key) != null;
}
}
\ No newline at end of file
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.register.provider.azure.subscriber;
import com.microsoft.azure.servicebus.management.ManagementClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ManagementClientFactoryImpl implements IManagementClientFactory {
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private ManagementClientCache clientCache;
@Autowired
private DpsHeaders dpsHeaders;
public ManagementClientFactoryImpl() {
}
public ManagementClient getManager(String topicName) {
Validators.checkNotNullAndNotEmpty(dpsHeaders.getPartitionId(), "partitionId");
String cacheKey = dpsHeaders.getPartitionId() + "-managementClient";
if (this.clientCache.containsKey(cacheKey)) {
return (ManagementClient) this.clientCache.get(cacheKey);
} else {
PartitionInfoAzure pi = this.partitionService.getPartition(dpsHeaders.getPartitionId());
String serviceBusConnectionString = pi.getSbConnection();
String entityPath = String.format("%s", topicName);
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(serviceBusConnectionString, entityPath);
ManagementClient managementClient = new ManagementClient(connectionStringBuilder);
this.clientCache.put(cacheKey, managementClient);
return managementClient;
}
}
}
package org.opengroup.osdu.register.provider.azure.subscriber;
import com.microsoft.azure.CloudException;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PullSubscription {
// TODO: UPDATE ERROR_MESSAGE FOR SERVICE BUS
private static final String RESOURCE_PROVISIONING_ERROR_MESSAGE = "Resource cannot be updated during provisioning";
@Autowired
private IManagementClientFactory factory;
@Autowired
private DpsHeaders dpsHeaders;
@Autowired
private JaxRsDpsLog logger;
@Autowired
private PartitionServiceClient partitionService;
public void createPullSubscription(String subscriptionId, String topicName) {
try {
PartitionInfoAzure pi = this.partitionService.getPartition(dpsHeaders.getPartitionId());
String serviceBusConnectionString = pi.getSbConnection();
String topicPath = serviceBusConnectionString + "/" + topicName;
factory.getManager(topicName).createSubscription(topicPath, subscriptionId);
} catch (Exception e) {
if (e.getMessage().equals(RESOURCE_PROVISIONING_ERROR_MESSAGE)) {
logger.error("Another request is trying to create the same Pull subscription");
throw new AppException(409, "Conflict", "Another request is trying to create the same Push subscription");
} else {
logger.error("Creating Pull Subscription failed with error: " + e.toString());
throw new AppException(500, "Server Error", "Unexpected error creating Push subscription");
}
}
}
public void deletePullSubscription(String subscriptionId, String topicName) {
try {
PartitionInfoAzure pi = this.partitionService.getPartition(dpsHeaders.getPartitionId());
String serviceBusConnectionString = pi.getSbConnection();
String topicPath = serviceBusConnectionString + "/" + topicName;
factory.getManager(topicName).deleteSubscription(topicPath, subscriptionId);
} catch (Exception e) {
if (e instanceof CloudException) {
CloudException cloudException = (CloudException) e;
logger.error(cloudException.toString());
throw new AppException(cloudException.response().code(), cloudException.body().code(), cloudException.body().message());
} else {
logger.error("Deleting Pull Subscription failed with error: " + e.toString());
throw new AppException(500, "Server Error", "Unexpected error deleting Pull subscription");
}
}
}
public void checkIfPullSubscriptionExists(String subscriptionId, String topicName) {
try {
PartitionInfoAzure pi = this.partitionService.getPartition(dpsHeaders.getPartitionId());
String serviceBusConnectionString = pi.getSbConnection();
String topicPath = serviceBusConnectionString + "/" + topicName;
factory.getManager(topicName).getSubscription(topicPath, subscriptionId);
} catch (Exception e) {
if (e instanceof NullPointerException) {
logger.error(String.format("Pull Subscription with id %s does not exist.", subscriptionId));
throw new AppException(404, "Not found", String.format("Pull Subscription with id %s does not exist.", subscriptionId));
} else {
logger.error("Fetching Pull subscription failed with error: " + e.toString());
throw new AppException(500, "Server Error", "Unexpected error while Fetching Pull subscription");
}
}
}
}
......@@ -70,6 +70,9 @@ public class SubscriptionRepository implements ISubscriptionRepository {
@Autowired
private TopicsRepositoryImpl topicsRepository;
@Autowired
private PullSubscription pullSubscription;
/**
* @param input Subscription Object
* @return Subscription Object
......@@ -116,7 +119,7 @@ public class SubscriptionRepository implements ISubscriptionRepository {
// This will result in an 500 Exception so the user should be able to create the subscription with
// the same topic and pushEndpoint combination again
logger.info(String.format("Record already exists for Subscription with id %s", input.getId()));
SubscriptionDoc output = createPushSubscriptionIfDoesNotExist(doc);
SubscriptionDoc output = createPushAndPullSubscriptionIfDoesNotExist(doc);
input.setNotificationId(output.getNotificationId());
return input;
}
......@@ -129,6 +132,9 @@ public class SubscriptionRepository implements ISubscriptionRepository {
try {
pushSubscription.createPushSubscription(doc.getNotificationId(), doc.getTopic());
logger.info("Push Subscription created with Event Grid ID:" + doc.getNotificationId());
pullSubscription.createPullSubscription(doc.getNotificationId(), doc.getTopic());
logger.info("Pull Subscription created with Service Bus ID:" + doc.getNotificationId());
return input;
}
catch(AppException e) {
......@@ -193,6 +199,9 @@ public class SubscriptionRepository implements ISubscriptionRepository {
try {
pushSubscription.deletePushSubscription(subscription.get().getNotificationId(), subscription.get().getTopic());
logger.info("Push Subscription deleted with Event Grid ID:" + subscription.get().getNotificationId());
pullSubscription.deletePullSubscription(subscription.get().getNotificationId(), subscription.get().getTopic());
logger.info("Pull Subscription deleted with Service Bus ID:" + subscription.get().getNotificationId());
try {
cosmosStore.deleteItem(dpsHeaders.getPartitionId(), azureBootstrapConfig.getCosmosDBName(), cosmosContainerConfig.getSubscriptionContainerName(), id, dpsHeaders.getPartitionId());
logger.info("Record deleted for subscription with ID: " + id);
......@@ -241,7 +250,7 @@ public class SubscriptionRepository implements ISubscriptionRepository {
.collect(Collectors.toList());
}
private SubscriptionDoc createPushSubscriptionIfDoesNotExist(SubscriptionDoc input) {
private SubscriptionDoc createPushAndPullSubscriptionIfDoesNotExist(SubscriptionDoc input) {
try {
// We are fetching the record from cosmos db again because we want to use the original
// notification-id to create the Push Subscription
......@@ -252,6 +261,7 @@ public class SubscriptionRepository implements ISubscriptionRepository {
// We will check if Push Subscription does not exist then we should try creating it again since
// the corresponding record in the Cosmos Db is already present
pushSubscription.checkIfPushSubscriptionExists(input.getNotificationId(), input.getTopic());
pullSubscription.checkIfPullSubscriptionExists(input.getNotificationId(), input.getTopic());
logger.error("A subscriber already exists with the same topic and endpoint combination");
throw new AppException(409, "Conflict", "A subscriber already exists with the same topic and endpoint combination");
}
......@@ -268,13 +278,14 @@ public class SubscriptionRepository implements ISubscriptionRepository {
// Now creating the Push Subscription, if it fails again we should throw server error
// otherwise we can return Success code as response
pushSubscription.createPushSubscription(input.getNotificationId(), input.getTopic());
logger.info("Push Subscription created with Event Grid ID:" + input.getNotificationId());
pullSubscription.createPullSubscription(input.getNotificationId(), input.getTopic());
logger.info("Push and Pull Subscription created with ID's:" + input.getNotificationId());
return input;
}
catch (AppException exception) {
if(exception.getError().getCode() == 409) {
logger.error("Another request is trying to create the same Push subscription");
throw new AppException(409, "Conflict", "Another request is trying to create the same Push subscription");
logger.error("Another request is trying to create the same Push/Pull subscription");
throw new AppException(409, "Conflict", "Another request is trying to create the same Push/Pull subscription");
}
else {
logger.error("Unexpected error creating subscription");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment