Commit 8b18ccdd authored by Krishna Nikhil Vedurumudi's avatar Krishna Nikhil Vedurumudi
Browse files

CosmosBulkExecutor Factory to use Compute if absent

parent 49ff4514
Pipeline #40311 passed with stages
in 10 minutes and 14 seconds
......@@ -75,14 +75,25 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
return this.cosmosClientMap.get(cacheKey);
}
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
return this.cosmosClientMap.computeIfAbsent(cacheKey,
cosmosClient -> createDocumentBulkExecutor(cosmosDBName, collectionName, dataPartitionId));
}
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
/**
*
* @param cosmosDBName name of the cosmos db
* @param collectionName name of the cosmos collection
* @param dataPartitionId name of the data partition
* @return DocumentBulkExecutor
*/
private DocumentBulkExecutor createDocumentBulkExecutor(final String cosmosDBName, final String collectionName,
final String dataPartitionId) {
try {
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
DocumentBulkExecutor executor = DocumentBulkExecutor.builder().from(
client,
......@@ -92,8 +103,6 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
bulkExecutorMaxRUs
).build();
cosmosClientMap.put(String.format(unformattedCosmosBulkExecutorCacheKey, dataPartitionId, cosmosDBName, collectionName), executor);
// Set client retry options to 0 because retries are handled by DocumentBulkExecutor class.
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
......@@ -135,6 +144,4 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
return client;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment