Commit ea0a87b2 authored by Orsu Akhil's avatar Orsu Akhil Committed by Kishore Battula
Browse files

Implementation to fetch service bus subscription client for partition

parent 56b3702c
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.0.41</version>
<version>0.0.42</version>
<name>core-lib-azure</name>
<properties>
......
package org.opengroup.osdu.azure.cache;
import com.microsoft.azure.servicebus.SubscriptionClient;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation of ICache for SubscriptionClient.
*/
@Component
@Lazy
public class SubscriptionClientCache extends VmCache<String, SubscriptionClient> {
/**
* Default cache constructor.
*/
public SubscriptionClientCache() {
super(60 * 60, 1000);
}
/**
* @param key cache key
* @return true if found in cache
*/
public boolean containsKey(final String key) {
return this.get(key) != null;
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.servicebus;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
/**
* Interface for Subscription Client Factory to return appropriate subscription client
* instance for each tenant based on data partition, service bus topic and subscription Name.
*/
public interface ISubscriptionClientFactory {
/**
* @param dataPartitionId Data Partition Id
* @param topicName Service Bus Topic
* @param subscriptionName Service Bus Subscription
* @return A client configured to communicate with a Service Bus Subscription
* @throws ServiceBusException Exception thrown by {@link SubscriptionClient}
* @throws InterruptedException Exception thrown by {@link SubscriptionClient}
*/
SubscriptionClient getClient(String dataPartitionId, String topicName, String subscriptionName)
throws ServiceBusException, InterruptedException;
}
\ No newline at end of file
package org.opengroup.osdu.azure.servicebus;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.azure.cache.SubscriptionClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation for ISubscriptionClientFactory.
*/
@Component
@Lazy
public class SubscriptionClientFactoryImpl implements ISubscriptionClientFactory {
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private SubscriptionClientCache clientCache;
/**
* @param dataPartitionId Data Partition Id
* @param topicName Service Bus Topic Name
* @param subscriptionName Service Bus Subscription Name
* @return A client configured to communicate with a Service Bus Subscription
* @throws ServiceBusException Exception thrown by {@link SubscriptionClient}
* @throws InterruptedException Exception thrown by {@link SubscriptionClient}
*/
@Override
public SubscriptionClient getClient(final String dataPartitionId, final String topicName, final String subscriptionName) throws ServiceBusException, InterruptedException {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
Validators.checkNotNullAndNotEmpty(topicName, "topicName");
Validators.checkNotNullAndNotEmpty(subscriptionName, "subscriptionName");
String entityPath = String.format("%s/subscriptions/%s", topicName, subscriptionName);
String cacheKey = String.format("%s-%s", dataPartitionId, entityPath);
if (this.clientCache.containsKey(cacheKey)) {
return this.clientCache.get(cacheKey);
}
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
String serviceBusConnectionString = pi.getSbConnection();
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(
serviceBusConnectionString,
entityPath
);
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);
this.clientCache.put(cacheKey, subscriptionClient);
return subscriptionClient;
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.servicebus;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.cache.SubscriptionClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.core.common.partition.Property;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.never;
import static org.mockito.MockitoAnnotations.initMocks;
@ExtendWith(MockitoExtension.class)
public class SubscriptionClientFactoryImplTest {
@InjectMocks
private SubscriptionClientFactoryImpl subscriptionClientFactory;
@Mock
private PartitionServiceClient partitionServiceClient;
@Mock
private SubscriptionClientCache clientCache;
private static final String PARTITION_ID = "dataPartitionId";
private static final String TOPIC_NAME = "testTopic";
private static final String SUBSCRIPTION_NAME = "testSubscription";
private static final String SB_ENTITY_PATH = "testTopic/subscriptions/testSubscription";
private static final String SB_CONNECTION_STRING = "Endpoint=sb://test-bus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=testKey";
@BeforeEach
void init() {
initMocks(this);
}
@Test
public void should_throwException_given_nullSubscriptionName() {
try {
this.subscriptionClientFactory.getClient(PARTITION_ID, TOPIC_NAME, null);
} catch (NullPointerException ex) {
assertEquals("subscriptionName cannot be null!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_throwException_given_emptySubscriptionName() {
try {
this.subscriptionClientFactory.getClient(PARTITION_ID, TOPIC_NAME, "");
} catch (IllegalArgumentException ex) {
assertEquals("subscriptionName cannot be empty!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_throwException_given_nullTopicName() {
try {
this.subscriptionClientFactory.getClient(PARTITION_ID, null, SUBSCRIPTION_NAME);
} catch (NullPointerException ex) {
assertEquals("topicName cannot be null!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_throwException_given_emptyTopicName() {
try {
this.subscriptionClientFactory.getClient(PARTITION_ID, "", SUBSCRIPTION_NAME);
} catch (IllegalArgumentException ex) {
assertEquals("topicName cannot be empty!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_return_cachedClient_when_cachedEarlier() throws ServiceBusException, InterruptedException {
SubscriptionClient subscriptionClient = mock(SubscriptionClient.class);
final String cacheKey = String.format("%s-%s", PARTITION_ID, SB_ENTITY_PATH);
when(this.clientCache.containsKey(cacheKey)).thenReturn(true);
when(this.clientCache.get(cacheKey)).thenReturn(subscriptionClient);
this.subscriptionClientFactory.getClient(PARTITION_ID, TOPIC_NAME, SUBSCRIPTION_NAME);
verify(this.partitionServiceClient, never()).getPartition(PARTITION_ID);
}
@Test
public void should_return_client_when_partition_valid() throws ServiceBusException, InterruptedException {
when(this.partitionServiceClient.getPartition(PARTITION_ID)).thenReturn(
PartitionInfoAzure.builder().sbConnectionConfig(Property.builder().value(SB_CONNECTION_STRING).build())
.build());
assertNotNull(this.subscriptionClientFactory.getClient(PARTITION_ID, TOPIC_NAME, SUBSCRIPTION_NAME));
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment