Commit 5d6ab124 authored by Krishna Nikhil Vedurumudi's avatar Krishna Nikhil Vedurumudi
Browse files

Merge branch 'cosmosClient' into 'master'

Replace CosmosCache with Map

See merge request osdu/platform/system/lib/cloud/azure/os-core-lib-azure!91
parents 2463b36f 8b18ccdd
package org.opengroup.osdu.azure.cache;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.opengroup.osdu.core.common.cache.enums.CachingStrategy;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation of ICache for DocumentBulkExecutor.
*/
@Component
@Lazy
public class CosmosBulkExecutorCache extends VmCache<String, DocumentBulkExecutor> {
/**
* Default cache constructor.
*/
public CosmosBulkExecutorCache() {
super(60 * 60, 1000, CachingStrategy.EXPIRE_AFTER_WRITE);
}
/**
* @param key cache key
* @return true if found in cache
*/
public boolean containsKey(final String key) {
return this.get(key) != null;
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.cache;
import com.azure.cosmos.CosmosClient;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.opengroup.osdu.core.common.cache.enums.CachingStrategy;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation of ICache for CosmosClient.
*/
@Component
@Lazy
public class CosmosClientCache extends VmCache<String, CosmosClient> {
/**
* Default cache constructor.
*/
public CosmosClientCache() {
super(60 * 60, 1000, CachingStrategy.EXPIRE_AFTER_WRITE);
}
/**
* @param key cache key
* @return true if found in cache
*/
public boolean containsKey(final String key) {
return this.get(key) != null;
}
}
\ No newline at end of file
......@@ -6,7 +6,11 @@ import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.opengroup.osdu.azure.cache.CosmosBulkExecutorCache;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
......@@ -17,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* A factory class to generate DocumentBulkExecutor objects to perform bulk operations.
*/
......@@ -30,9 +35,7 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
@Autowired
private PartitionServiceClient partitionService;
@Lazy
@Autowired
private CosmosBulkExecutorCache cosmosBulkExecutorCache;
private Map<String, DocumentBulkExecutor> cosmosClientMap;
@Autowired
private int documentClientMaxPoolSize;
......@@ -44,6 +47,14 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
private final String unformattedCosmosBulkExecutorCacheKey = "%s-%s-%s-cosmosBulkExecutor";
private final String unformattedDocumentClientCacheKey = "%s-documentClient";
/**
* Initializes the private variables as required.
*/
@PostConstruct
public void initialize() {
cosmosClientMap = new ConcurrentHashMap<>();
}
/**
*
* @param dataPartitionId name of data partition.
......@@ -60,18 +71,29 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
Validators.checkNotNullAndNotEmpty(collectionName, "collectionName");
String cacheKey = String.format(unformattedCosmosBulkExecutorCacheKey, dataPartitionId, cosmosDBName, collectionName);
if (this.cosmosBulkExecutorCache.containsKey(cacheKey)) {
return this.cosmosBulkExecutorCache.get(cacheKey);
if (this.cosmosClientMap.containsKey(cacheKey)) {
return this.cosmosClientMap.get(cacheKey);
}
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
return this.cosmosClientMap.computeIfAbsent(cacheKey,
cosmosClient -> createDocumentBulkExecutor(cosmosDBName, collectionName, dataPartitionId));
}
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
/**
*
* @param cosmosDBName name of the cosmos db
* @param collectionName name of the cosmos collection
* @param dataPartitionId name of the data partition
* @return DocumentBulkExecutor
*/
private DocumentBulkExecutor createDocumentBulkExecutor(final String cosmosDBName, final String collectionName,
final String dataPartitionId) {
try {
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
DocumentBulkExecutor executor = DocumentBulkExecutor.builder().from(
client,
......@@ -80,7 +102,6 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
collection.getPartitionKey(),
bulkExecutorMaxRUs
).build();
cosmosBulkExecutorCache.put(String.format(unformattedCosmosBulkExecutorCacheKey, dataPartitionId, cosmosDBName, collectionName), executor);
// Set client retry options to 0 because retries are handled by DocumentBulkExecutor class.
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
......@@ -123,6 +144,4 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
return client;
}
}
......@@ -2,7 +2,11 @@ package org.opengroup.osdu.azure.cosmosdb;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
import org.opengroup.osdu.azure.cache.CosmosClientCache;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
......@@ -16,14 +20,20 @@ import org.springframework.stereotype.Component;
@Component
@Lazy
public class CosmosClientFactoryImpl implements ICosmosClientFactory {
private static final String LOGGER_NAME = CosmosClientFactoryImpl.class.getName();
@Lazy
@Autowired
private PartitionServiceClient partitionService;
@Lazy
@Autowired
private CosmosClientCache syncClientCache;
private Map<String, CosmosClient> cosmosClientMap;
/**
* Initializes the private variables as required.
*/
@PostConstruct
public void initialize() {
cosmosClientMap = new ConcurrentHashMap<>();
}
/**
* @param dataPartitionId Data Partition Id
......@@ -34,19 +44,26 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
String cacheKey = String.format("%s-cosmosClient", dataPartitionId);
if (this.syncClientCache.containsKey(cacheKey)) {
return this.syncClientCache.get(cacheKey);
if (this.cosmosClientMap.containsKey(cacheKey)) {
return this.cosmosClientMap.get(cacheKey);
}
return this.cosmosClientMap.computeIfAbsent(cacheKey, cosmosClient -> createCosmosClient(dataPartitionId));
}
/**
*
* @param dataPartitionId Data Partition Id
* @return Cosmos Client Instance
*/
private CosmosClient createCosmosClient(final String dataPartitionId) {
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint(pi.getCosmosEndpoint())
.key(pi.getCosmosPrimaryKey())
.buildClient();
this.syncClientCache.put(cacheKey, cosmosClient);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME)
.info("Created CosmosClient for dataPartition {}.", dataPartitionId);
return cosmosClient;
}
}
......@@ -7,9 +7,10 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.cache.CosmosBulkExecutorCache;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;
......@@ -20,7 +21,7 @@ import static org.mockito.MockitoAnnotations.initMocks;
public class ComosBulkExecutorImplTest {
@Mock
private CosmosBulkExecutorCache clientCache;
private Map<String, DocumentBulkExecutor> cosmosClientMap;
@Mock
private PartitionServiceClient partitionService;
@InjectMocks
......@@ -61,8 +62,8 @@ public class ComosBulkExecutorImplTest {
public void should_return_cachedClient_when_cachedEarlier() {
DocumentBulkExecutor cosmosClient = mock(DocumentBulkExecutor.class);
final String cacheKey = String.format("%s-%s-%s-cosmosBulkExecutor", PARTITION_ID, COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
when(this.clientCache.containsKey(cacheKey)).thenReturn(true);
when(this.clientCache.get(cacheKey)).thenReturn(cosmosClient);
when(this.cosmosClientMap.containsKey(cacheKey)).thenReturn(true);
when(this.cosmosClientMap.get(cacheKey)).thenReturn(cosmosClient);
this.sut.getClient(PARTITION_ID, COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
verify(this.partitionService, never()).getPartition(PARTITION_ID);
......
......@@ -7,9 +7,10 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.cache.CosmosClientCache;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;
......@@ -19,7 +20,7 @@ import static org.mockito.MockitoAnnotations.initMocks;
public class CosmosClientFactoryImplTest {
@Mock
private CosmosClientCache clientCache;
private Map<String, CosmosClient> cosmosClientMap;
@Mock
private PartitionServiceClient partitionService;
@InjectMocks
......@@ -58,8 +59,8 @@ public class CosmosClientFactoryImplTest {
public void should_return_cachedClient_when_cachedEarlier() {
CosmosClient cosmosClient = mock(CosmosClient.class);
final String cacheKey = String.format("%s-cosmosClient", PARTITION_ID);
when(this.clientCache.containsKey(cacheKey)).thenReturn(true);
when(this.clientCache.get(cacheKey)).thenReturn(cosmosClient);
when(this.cosmosClientMap.containsKey(cacheKey)).thenReturn(true);
when(this.cosmosClientMap.get(cacheKey)).thenReturn(cosmosClient);
this.sut.getClient(PARTITION_ID);
verify(this.partitionService, never()).getPartition(PARTITION_ID);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment