Commit 91521e64 authored by Jason's avatar Jason
Browse files

Merge branch 'implementing-cosmos-bulk-insert' into 'master'

Adding Cosmos Bulk Insert

See merge request !66
parents 5c0537e4 1d9c8df6
Pipeline #21848 passed with stages
in 10 minutes and 1 second
......@@ -18,6 +18,7 @@ The following software have components provided under the terms of this license:
- ASM based accessors helper used by json-smart (from )
- Adapter: RxJava (from )
- Apache Commons Codec (from http://commons.apache.org/proper/commons-codec/)
- Apache Commons Collections (from http://commons.apache.org/proper/commons-collections/)
- Apache Commons Lang (from http://commons.apache.org/proper/commons-lang/)
- Apache Commons Logging (from http://commons.apache.org/proper/commons-logging/)
- Apache HttpAsyncClient (from http://hc.apache.org/httpcomponents-asyncclient)
......@@ -69,6 +70,7 @@ The following software have components provided under the terms of this license:
- Jackson-module-parameter-names (from )
- Java Native Access (from https://github.com/java-native-access/jna)
- Java Native Access Platform (from https://github.com/java-native-access/jna)
- Java UUID Generator (from http://wiki.fasterxml.com/JugHome)
- Javassist (from http://www.javassist.org/)
- Joda-Time (from http://www.joda.org/joda-time/)
- KeePassJava2 :: All (from https://repo1.maven.org/maven2/org/linguafranca/pwdb/KeePassJava2)
......@@ -332,6 +334,13 @@ The following software have components provided under the terms of this license:
- Project Lombok (from https://projectlombok.org)
- jakarta.annotation-api (from https://projects.eclipse.org/projects/ee4j.ca)
========================================================================
JSON
========================================================================
The following software have components provided under the terms of this license:
- JSON in Java (from https://github.com/douglascrockford/JSON-java)
========================================================================
LGPL-2.1-only
========================================================================
......@@ -406,6 +415,8 @@ The following software have components provided under the terms of this license:
- Spongy Castle (from http://rtyley.github.io/spongycastle/)
- Spring Data for Azure Cosmos DB SQL API (from https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-spring-data-cosmos)
- adal4j (from https://github.com/AzureAD/azure-activedirectory-library-for-java)
- azure-documentdb (from https://azure.microsoft.com/en-us/services/cosmos-db/)
- documentdb-bulkexecutor (from http://azure.microsoft.com/en-us/services/documentdb/)
- micrometer-core (from https://github.com/micrometer-metrics/micrometer)
- mockito-junit-jupiter (from https://github.com/mockito/mockito)
- msal4j (from https://github.com/AzureAD/microsoft-authentication-library-for-java)
......@@ -473,6 +484,7 @@ The following software have components provided under the terms of this license:
- Project Lombok (from https://projectlombok.org)
- Spring Web (from https://github.com/spring-projects/spring-framework)
- StAX API (from http://stax.codehaus.org/)
- azure-documentdb (from https://azure.microsoft.com/en-us/services/cosmos-db/)
- msal4j (from https://github.com/AzureAD/microsoft-authentication-library-for-java)
- reactive-streams (from http://www.reactive-streams.org/)
......
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.0.58</version>
<version>0.0.59</version>
<name>core-lib-azure</name>
<properties>
......@@ -54,6 +54,7 @@
<spring-boot-starter-log4j2.version>2.3.4.RELEASE</spring-boot-starter-log4j2.version>
<azure-mgmt-eventgrid.version>1.0.0-beta-3</azure-mgmt-eventgrid.version>
<azure-security-keyvault-keys.version>4.2.3</azure-security-keyvault-keys.version>
<documentdb-bulkexecutor.version>2.12.0</documentdb-bulkexecutor.version>
</properties>
<licenses>
......@@ -266,6 +267,11 @@
<artifactId>azure-security-keyvault-keys</artifactId>
<version>${azure-security-keyvault-keys.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkexecutor</artifactId>
<version>${documentdb-bulkexecutor.version}</version>
</dependency>
</dependencies>
<repositories>
......
package org.opengroup.osdu.azure.cache;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation of ICache for DocumentBulkExecutor.
*/
@Component
@Lazy
public class CosmosBulkExecutorCache extends VmCache<String, DocumentBulkExecutor> {
/**
* Default cache constructor.
*/
public CosmosBulkExecutorCache() {
super(60 * 60, 1000);
}
/**
* @param key cache key
* @return true if found in cache
*/
public boolean containsKey(final String key) {
return this.get(key) != null;
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.cosmosdb;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.opengroup.osdu.azure.cache.CosmosBulkExecutorCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* A factory class to generate DocumentBulkExecutor objects to perform bulk operations.
*/
@Component
@Lazy
public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosBulkExecutorFactoryImpl.class.getName());
@Lazy
@Autowired
private PartitionServiceClient partitionService;
@Lazy
@Autowired
private CosmosBulkExecutorCache cosmosBulkExecutorCache;
@Autowired
private int documentClientMaxPoolSize;
@Autowired
private int bulkExecutorMaxRUs;
private final String unformattedCollectionLink = "/dbs/%s/colls/%s";
private final String unformattedCosmosBulkExecutorCacheKey = "%s-%s-%s-cosmosBulkExecutor";
private final String unformattedDocumentClientCacheKey = "%s-documentClient";
/**
*
* @param dataPartitionId name of data partition.
* @param cosmosDBName name of cosmos db.
* @param collectionName name of collection in cosmos.
* @return DocumentBulkExecutor to perform bulk Cosmos opartions.
* @throws Exception if there is an error creating the DocumentBulkExecutor object.
*/
public DocumentBulkExecutor getClient(final String dataPartitionId,
final String cosmosDBName,
final String collectionName) {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
Validators.checkNotNullAndNotEmpty(cosmosDBName, "cosmosDBName");
Validators.checkNotNullAndNotEmpty(collectionName, "collectionName");
String cacheKey = String.format(unformattedCosmosBulkExecutorCacheKey, dataPartitionId, cosmosDBName, collectionName);
if (this.cosmosBulkExecutorCache.containsKey(cacheKey)) {
return this.cosmosBulkExecutorCache.get(cacheKey);
}
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
try {
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
DocumentBulkExecutor executor = DocumentBulkExecutor.builder().from(
client,
cosmosDBName,
collectionName,
collection.getPartitionKey(),
bulkExecutorMaxRUs
).build();
cosmosBulkExecutorCache.put(String.format(unformattedCosmosBulkExecutorCacheKey, dataPartitionId, cosmosDBName, collectionName), executor);
// Set client retry options to 0 because retries are handled by DocumentBulkExecutor class.
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
return executor;
} catch (DocumentClientException e) {
String errorMessage = "Unexpectedly failed to create DocumentCollection object";
LOGGER.warn(errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
} catch (Exception e) {
String errorMessage = "Unexpectedly failed create DocumentBulkExecutor";
LOGGER.warn(errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
/**
*
* @param cosmosEndpoint endpoint to Cosmos db.
* @param cosmosPrimaryKey primary key for connection to Cosmos db.
* @return DocumentClient object.
*/
private DocumentClient getDocumentClient(final String cosmosEndpoint,
final String cosmosPrimaryKey) {
ConnectionPolicy policy = new ConnectionPolicy();
policy.setMaxPoolSize(documentClientMaxPoolSize);
// Initialize with these values for retries. These are overridden once the DocumentBulkExecutor object is created
policy.getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
policy.getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
DocumentClient client = new DocumentClient(
cosmosEndpoint,
cosmosPrimaryKey,
policy,
ConsistencyLevel.Session
);
return client;
}
}
package org.opengroup.osdu.azure.cosmosdb;
import com.google.gson.Gson;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.bulkexecutor.BulkImportResponse;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collection;
/**
* Class to perform bulk Cosmos operations using DocumentBulkExecutor.
*/
@Component
@Lazy
public class CosmosStoreBulkOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosStoreBulkOperations.class.getName());
@Autowired
private ICosmosBulkExecutorFactory bulkExecutorFactory;
/**
*
* Bulk upserts item into cosmos collection.
* @param dataPartitionId name of data partition.
* @param cosmosDBName name of Comsos db.
* @param collectionName name of collection in Cosmos.
* @param documents collection of JSON serializable documents.
* @param isUpsert flag denoting if the isUpsert flag should be set to true.
* @param disableAutomaticIdGeneration flag denoting if automatic id generation should be disabled in Cosmos.
* @param maxConcurrencyPerPartitionRange The maximum degree of concurrency per partition key range. The default value is 20.
* @param <T> Type of object being bulk inserted.
* @return BulkImportResponse object with the results of the operation.
*/
public final <T> BulkImportResponse bulkInsert(final String dataPartitionId,
final String cosmosDBName,
final String collectionName,
final Collection<T> documents,
final boolean isUpsert,
final boolean disableAutomaticIdGeneration,
final int maxConcurrencyPerPartitionRange) {
Collection<String> serializedDocuments = new ArrayList<>();
Gson gson = new Gson();
// Serialize documents to json strings
for (T item : documents) {
String serializedDocument = gson.toJson(item);
serializedDocuments.add(serializedDocument);
}
try {
DocumentBulkExecutor executor = bulkExecutorFactory.getClient(dataPartitionId, cosmosDBName, collectionName);
BulkImportResponse response = executor.importAll(serializedDocuments, isUpsert, disableAutomaticIdGeneration, maxConcurrencyPerPartitionRange);
if (response.getNumberOfDocumentsImported() != documents.size()) {
LOGGER.warn("Failed to import all documents using DocumentBulkExecutor! Attempted to import " + documents.size() + " documents but only imported " + response.getNumberOfDocumentsImported());
}
return response;
} catch (DocumentClientException e) {
String errorMessage = "Unexpectedly failed to bulk insert documents";
LOGGER.warn(errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
}
package org.opengroup.osdu.azure.cosmosdb;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
/**
* Interface for Cosmos Bulk Executor Factory to return appropriate Cosmos Bulk Executor client.
*/
public interface ICosmosBulkExecutorFactory {
/**
*
* @param dataPartitionId name of the data partition.
* @param cosmosDBName name of CosmosDB.
* @param collectionName name of the collection in Cosmos.
* @return the DocumentBulkExecutor.
*/
DocumentBulkExecutor getClient(String dataPartitionId,
String cosmosDBName,
String collectionName);
}
package org.opengroup.osdu.azure.di;
import com.microsoft.azure.documentdb.internal.Utils;
import lombok.Getter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* Defines configurations for Cosmos bulk executor.
*/
@Configuration
@Getter
@Lazy
public class CosmosBulkExecutorConfiguration {
/**
* @return the connection pool size for the http client used by document client. Use SDK default if not set.
*/
@Bean
public int documentClientMaxPoolSize() {
String prop = System.getProperty("DOCUMENT_CLIENT_MAX_POOL_SIZE");
return prop == null ? Utils.getConcurrencyFactor() * 100 : Integer.parseInt(prop);
}
/**
* @return the amount of RUs allocated to bulk executor.
*/
@Bean
public Integer bulkExecutorMaxRUs() {
return Integer.valueOf(System.getProperty("BULK_EXECUTOR_MAX_RUS", "4000"));
}
}
......@@ -19,4 +19,4 @@ public class CosmosDBConfiguration {
@Value("${azure.cosmosdb.database}")
private String cosmosDBName;
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.cosmosdb;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.cache.CosmosBulkExecutorCache;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.never;
import static org.mockito.MockitoAnnotations.initMocks;
@ExtendWith(MockitoExtension.class)
public class ComosBulkExecutorImplTest {
@Mock
private CosmosBulkExecutorCache clientCache;
@Mock
private PartitionServiceClient partitionService;
@InjectMocks
private CosmosBulkExecutorFactoryImpl sut;
private static final String PARTITION_ID = "dataPartitionId";
private static final String COSMOS_DB_NAME = "cosmosDBName";
private static final String COSMOS_COLLECTION_NAME = "cosmosCollectionName";
@BeforeEach
void init() {
initMocks(this);
}
@Test
public void should_throwException_given_nullDataPartitionId() {
try {
this.sut.getClient(null, COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
} catch (NullPointerException ex) {
assertEquals("dataPartitionId cannot be null!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_throwException_given_emptyDataPartitionId() {
try {
this.sut.getClient("", COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
} catch (IllegalArgumentException ex) {
assertEquals("dataPartitionId cannot be empty!", ex.getMessage());
} catch (Exception ex) {
fail("Should not get any other exception. Received " + ex.getClass());
}
}
@Test
public void should_return_cachedClient_when_cachedEarlier() {
DocumentBulkExecutor cosmosClient = mock(DocumentBulkExecutor.class);
final String cacheKey = String.format("%s-%s-%s-cosmosBulkExecutor", PARTITION_ID, COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
when(this.clientCache.containsKey(cacheKey)).thenReturn(true);
when(this.clientCache.get(cacheKey)).thenReturn(cosmosClient);
this.sut.getClient(PARTITION_ID, COSMOS_DB_NAME, COSMOS_COLLECTION_NAME);
verify(this.partitionService, never()).getPartition(PARTITION_ID);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment