Commit 54b40722 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

init

parent 424fd9fd
Pipeline #64741 passed with stages
in 19 minutes and 16 seconds
......@@ -43,26 +43,34 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
while(true) {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
for (String partition : tenantList) {
if(partitions.contains(partition)) {
continue;
}
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
registerMessageHandler(subscriptionClient, executorService);
partitions.add(partition);
}
catch (Exception e) {
LOGGER.error("Error while creating or registering subscription client", e);
try {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
LOGGER.info("Total number of partitions " + tenantList.size());
for (String partition : tenantList) {
if(partitions.contains(partition)) {
continue;
}
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
registerMessageHandler(subscriptionClient, executorService);
partitions.add(partition);
}
catch (Exception e) {
LOGGER.error("Error while creating or registering subscription client", e);
}
}
}
catch (Exception e) {
LOGGER.error("Exception encountered while fetching partition information", e);
}
try {
Thread.sleep(600000);
LOGGER.info("Sleeping for a minute");
Thread.sleep(60000);
}
catch (Exception e) {
LOGGER.error("Execption encountered while sleeping the thread", e);
......@@ -88,3 +96,76 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
}
//package org.opengroup.osdu.wks.provider.azure.pubsub;
//
//import com.microsoft.azure.servicebus.MessageHandlerOptions;
//import com.microsoft.azure.servicebus.SubscriptionClient;
//import com.microsoft.azure.servicebus.primitives.ServiceBusException;
//import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
//import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
//import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
//import org.opengroup.osdu.wks.provider.interfaces.SubscriptionManager;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//import java.time.Duration;
//import java.util.List;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//import java.util.stream.Collectors;
//
//@Component
//public class SubscriptionManagerImpl implements SubscriptionManager {
// private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionManagerImpl.class);
//
// @Autowired
// private SubscriptionClientFactory subscriptionClientFactory;
//
// @Autowired
// private ProcessWKSTransform processWKSTransform;
//
// @Autowired
// private AzureBootstrapConfig azureBootstrapConfig;
//
// @Autowired
// private ITenantFactory tenantFactory;
//
// @Override public void subscribeRecordsChangeEvent() {
//
// List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
// .collect(Collectors.toList());
//
// ExecutorService executorService = Executors
// .newFixedThreadPool(Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads()));
// for (String partition : tenantList) {
// try {
// SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
// registerMessageHandler(subscriptionClient, executorService);
// }
// catch (Exception e) {
// LOGGER.error("Error while creating or registering subscription client", e);
// }
//
// }
// }
//
// private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) {
// try {
// MessageHandler messageHandler = new MessageHandler(subscriptionClient, processWKSTransform);
// subscriptionClient.registerMessageHandler(
// messageHandler,
// new MessageHandlerOptions(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxConcurrentCalls()),
// false,
// Duration.ofSeconds(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxLockRenewDurationInSeconds())),
// Duration.ofSeconds(1)
// ),
// executorService);
//
// } catch (InterruptedException | ServiceBusException e) {
// LOGGER.error("Error registering message handler {}", e.getMessage(), e);
// }
// }
//
//}
\ No newline at end of file
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionManagerImplTest {
private static final String maxLockRenewDuration = "60";
private static final String maxConcurrentCalls = "1";
private static final String nThreads = "2";
private static final String errorMessage = "some-error";
@InjectMocks
private SubscriptionManagerImpl subscriptionManager;
@Mock
private SubscriptionClientFactory subscriptionClientFactory;
@Mock
private ProcessWKSTransform processWKSTransform;
@Mock
private AzureBootstrapConfig azureBootstrapConfig;
@Mock
private SubscriptionClient subscriptionClient;
@Mock
private ITenantFactory tenantFactory;
private static final String dataPartition = "testTenant";
@BeforeEach
public void init() {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setDataPartitionId(dataPartition);
when(azureBootstrapConfig.getMaxConcurrentCalls()).thenReturn(maxConcurrentCalls);
when(azureBootstrapConfig.getNThreads()).thenReturn(nThreads);
when(azureBootstrapConfig.getMaxLockRenewDurationInSeconds()).thenReturn(maxLockRenewDuration);
when(tenantFactory.listTenantInfo()).thenReturn(Collections.singletonList(tenantInfo));
}
@Test
public void shouldSuccessfullyRegisterMessageHandler() throws ServiceBusException, InterruptedException {
doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any());
when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
subscriptionManager.subscribeRecordsChangeEvent();
verify(azureBootstrapConfig, times(1)).getMaxConcurrentCalls();
verify(azureBootstrapConfig, times(1)).getNThreads();
verify(azureBootstrapConfig, times(1)).getMaxLockRenewDurationInSeconds();
}
@Test
public void shouldThrowExceptionIfErrorWhileRegisteringMessageHandler() throws ServiceBusException, InterruptedException {
doThrow(new InterruptedException(errorMessage)).when(subscriptionClient).registerMessageHandler(any(), any(), any());
when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
subscriptionManager.subscribeRecordsChangeEvent();
verify(azureBootstrapConfig, times(1)).getMaxConcurrentCalls();
verify(azureBootstrapConfig, times(1)).getNThreads();
verify(azureBootstrapConfig, times(1)).getMaxLockRenewDurationInSeconds();
}
}
//package org.opengroup.osdu.wks.provider.azure.pubsub;
//
//import com.microsoft.azure.servicebus.SubscriptionClient;
//import com.microsoft.azure.servicebus.primitives.ServiceBusException;
//import org.junit.jupiter.api.BeforeEach;
//import org.junit.jupiter.api.Test;
//import org.junit.jupiter.api.extension.ExtendWith;
//import org.mockito.InjectMocks;
//import org.mockito.Mock;
//import org.mockito.junit.jupiter.MockitoExtension;
//import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
//import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
//import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
//
//import java.util.Collections;
//
//import static org.mockito.ArgumentMatchers.any;
//import static org.mockito.Mockito.doNothing;
//import static org.mockito.Mockito.doThrow;
//import static org.mockito.Mockito.times;
//import static org.mockito.Mockito.verify;
//import static org.mockito.Mockito.when;
//
//@ExtendWith(MockitoExtension.class)
//public class SubscriptionManagerImplTest {
//
// private static final String maxLockRenewDuration = "60";
// private static final String maxConcurrentCalls = "1";
// private static final String nThreads = "2";
// private static final String errorMessage = "some-error";
//
// @InjectMocks
// private SubscriptionManagerImpl subscriptionManager;
//
// @Mock
// private SubscriptionClientFactory subscriptionClientFactory;
//
// @Mock
// private ProcessWKSTransform processWKSTransform;
//
// @Mock
// private AzureBootstrapConfig azureBootstrapConfig;
//
// @Mock
// private SubscriptionClient subscriptionClient;
//
// @Mock
// private ITenantFactory tenantFactory;
//
// private static final String dataPartition = "testTenant";
//
// @BeforeEach
// public void init() {
// TenantInfo tenantInfo = new TenantInfo();
// tenantInfo.setDataPartitionId(dataPartition);
//
// when(azureBootstrapConfig.getMaxConcurrentCalls()).thenReturn(maxConcurrentCalls);
// when(azureBootstrapConfig.getNThreads()).thenReturn(nThreads);
// when(azureBootstrapConfig.getMaxLockRenewDurationInSeconds()).thenReturn(maxLockRenewDuration);
// when(tenantFactory.listTenantInfo()).thenReturn(Collections.singletonList(tenantInfo));
// }
//
// @Test
// public void shouldSuccessfullyRegisterMessageHandler() throws ServiceBusException, InterruptedException {
//
// doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any());
// when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
//
// subscriptionManager.subscribeRecordsChangeEvent();
//
// verify(azureBootstrapConfig, times(1)).getMaxConcurrentCalls();
// verify(azureBootstrapConfig, times(1)).getNThreads();
// verify(azureBootstrapConfig, times(1)).getMaxLockRenewDurationInSeconds();
// }
//
// @Test
// public void shouldThrowExceptionIfErrorWhileRegisteringMessageHandler() throws ServiceBusException, InterruptedException {
//
// doThrow(new InterruptedException(errorMessage)).when(subscriptionClient).registerMessageHandler(any(), any(), any());
// when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
//
// subscriptionManager.subscribeRecordsChangeEvent();
//
// verify(azureBootstrapConfig, times(1)).getMaxConcurrentCalls();
// verify(azureBootstrapConfig, times(1)).getNThreads();
// verify(azureBootstrapConfig, times(1)).getMaxLockRenewDurationInSeconds();
// }
//}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment