Commit 424fd9fd authored by harshit aggarwal's avatar harshit aggarwal
Browse files

init

parent 1fde2cc1
Pipeline #64299 failed with stages
in 62 minutes and 7 seconds
......@@ -12,7 +12,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
......@@ -35,20 +37,36 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override public void subscribeRecordsChangeEvent() {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
Set<String> partitions = new HashSet<>();
ExecutorService executorService = Executors
.newFixedThreadPool(Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads()));
for (String partition : tenantList) {
while(true) {
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId)
.collect(Collectors.toList());
for (String partition : tenantList) {
if(partitions.contains(partition)) {
continue;
}
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
registerMessageHandler(subscriptionClient, executorService);
partitions.add(partition);
}
catch (Exception e) {
LOGGER.error("Error while creating or registering subscription client", e);
}
}
try {
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
registerMessageHandler(subscriptionClient, executorService);
Thread.sleep(600000);
}
catch (Exception e) {
LOGGER.error("Error while creating or registering subscription client", e);
LOGGER.error("Execption encountered while sleeping the thread", e);
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment