Skip to content
Snippets Groups Projects

Synchronized the securitySchemeThreads operations.

Merged Shankar Pattanayak requested to merge fix-connection-thrade-issue into master
2 unresolved threads
1 file
+ 7
24
Compare changes
  • Side-by-side
  • Inline
@@ -272,40 +272,21 @@ public class EdsDmsServiceImpl implements EdsDmsService {
private SecuritySchemeRefreshResult getExternalServiceTokens(List<ExternalDataset> externalDatasets){
Map<SecurityScheme, SecuritySchemeProcessor> securitySchemeCache = new HashMap<>();
List<CompletableFuture<SecuritySchemeProcessor>> securitySchemeThreads = new ArrayList<>();
List<SecuritySchemeProcessor> securitySchemeThreads = new ArrayList<>();
for(ExternalDataset externalDataset : externalDatasets) {
SecurityScheme securityScheme = externalDataset.getSharedProperties().getSecurityScheme();
if(securitySchemeCache.get(securityScheme.getName()) == null) {
if(securitySchemeCache.get(securityScheme) == null) {
SecuritySchemeProcessor securitySchemeProcessor = new SecuritySchemeProcessor(securityScheme, externalDataset);
List<ExternalDataset> externalDatasetsSubset = new ArrayList<>();
externalDatasetsSubset.add(externalDataset);
securitySchemeCache.put(externalDataset.getSharedProperties().getSecurityScheme(), securitySchemeProcessor);
securitySchemeThreads.add(CompletableFuture.supplyAsync(securitySchemeProcessor::call, this.threadPool));
securitySchemeThreads.add(securitySchemeProcessor.call());
} else {
securitySchemeCache.get(securityScheme.getName()).externalDatasets.add(externalDataset);
securitySchemeCache.get(securityScheme).externalDatasets.add(externalDataset);
}
}
CompletableFuture[] cfs = securitySchemeThreads.toArray(new CompletableFuture[0]);
CompletableFuture<List<SecuritySchemeProcessor>> results = CompletableFuture.allOf(cfs)
.thenApply(ignored -> securitySchemeThreads.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<SecuritySchemeProcessor> securitySchemeProcessors;
try {
securitySchemeProcessors = results.get();
} catch (InterruptedException e) {
log.error("Couldn't retrieve tokens, got interrupted exception");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Couldn't retrieve tokens", e.getMessage());
} catch (ExecutionException e) {
log.error("Couldn't retrieve tokens, got execution exception");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Couldn't retrieve tokens", e.getMessage());
}
List<ExternalDataset> externalDatasetsWithTokens = new ArrayList<>();
List<String> invalidDatasetIds = new ArrayList<>();
for(SecuritySchemeProcessor securitySchemeProcessor : securitySchemeProcessors){
for(SecuritySchemeProcessor securitySchemeProcessor : securitySchemeThreads){
if(securitySchemeProcessor.result == CallableResult.Pass) {
externalDatasetsWithTokens.addAll(securitySchemeProcessor.externalDatasets);
} else {
@@ -319,4 +300,6 @@ public class EdsDmsServiceImpl implements EdsDmsService {
return new SecuritySchemeRefreshResult(externalDatasetsWithTokens, invalidDatasetIds);
}
}
Loading