Commit 2430d823 authored by Daniel Scholl's avatar Daniel Scholl
Browse files

Merge branch 'haaggarw/ind/7652848' into 'master'

Added Cosmos Client Factory to support partition based Cosmos client generation

See merge request !9
parents ad6055dc ff3fa178
Pipeline #3761 passed with stages
in 8 minutes and 1 second
package org.opengroup.osdu.azure;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.internal.AsyncDocumentClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* Implementation for ICosmosClientFactory.
*/
@Component
@Lazy
public class CosmosClientFactoryImpl implements ICosmosClientFactory {
@Lazy
@Autowired
private CosmosClient cosmosClient;
@Lazy
@Autowired
private AsyncDocumentClient asyncDocumentClient;
/**
* @param dataPartitionId Data Partition Id
* @return Cosmos Client instance
*/
@Override
public CosmosClient getClient(final String dataPartitionId) {
return cosmosClient;
}
/**
* @param dataPartitionId Data Partition Id
* @return Async Document Client instance
*/
@Override
public AsyncDocumentClient getAsyncClient(final String dataPartitionId) {
return asyncDocumentClient;
}
}
......@@ -26,6 +26,9 @@ import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.internal.AsyncDocumentClient;
import com.azure.cosmos.internal.Document;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.io.IOException;
......@@ -50,12 +53,12 @@ import java.util.logging.Logger;
* private CosmosStore cosmosStore;
*
* void findItemExample() {
* Optional<MyObject> myItem = cosmosStore.findItem(container, "id", "partition-key", MyObject.class);
* Optional<MyObject> myItem = cosmosStore.findItem("dataPartitionId", "cosmosDb", "collection", "id", "partition-key", MyObject.class);
* myItem.isPresent(); // true if found, false otherwise
* }
*
* void findAllItemsExample() {
* List<MyObject> objects = cosmosStore.findAllItems(container, MyObject.class);
* List<MyObject> objects = cosmosStore.findAllItems("dataPartitionId", "cosmosDb", "collection", MyObject.class);
* }
*
* void queryItemsExample() {
......@@ -64,26 +67,37 @@ import java.util.logging.Logger;
* .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true)));
* FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
*
* List<MyObject> objects = cosmosStore.queryItems(container, query, options, MyObject.class);
* List<MyObject> objects = cosmosStore.queryItems("dataPartitionId", "cosmosDb", "collection", query, options, MyObject.class);
* }
* }
* </pre>
*/
public final class CosmosStore {
@Component
@Lazy
public class CosmosStore {
private static final Logger LOGGER = Logger.getLogger(CosmosStore.class.getName());
@Autowired
private ICosmosClientFactory cosmosClientFactory;
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
*/
public void deleteItem(
final CosmosContainer cosmos,
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final String id,
final String partitionKey) {
try {
findItem(cosmos, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey));
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
findItem(cosmosContainer, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey));
} catch (NotFoundException e) {
String errorMessage = "Item was unexpectedly not found";
LOGGER.log(Level.WARNING, errorMessage, e);
......@@ -96,20 +110,25 @@ public final class CosmosStore {
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @return The item that was found based on the IDs provided
*/
public <T> Optional<T> findItem(
final CosmosContainer cosmos,
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final String id,
final String partitionKey,
final Class<T> clazz) {
try {
T item = findItem(cosmos, id, partitionKey)
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
T item = findItem(cosmosContainer, id, partitionKey)
.read(new CosmosItemRequestOptions(partitionKey))
.getProperties()
.getObject(clazz);
......@@ -127,28 +146,42 @@ public final class CosmosStore {
}
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> findAllItems(final CosmosContainer container, final Class<T> clazz) {
public <T> List<T> findAllItems(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final Class<T> clazz) {
FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
return queryItems(container, new SqlQuerySpec("SELECT * FROM c"), options, clazz);
return queryItems(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), options, clazz);
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> queryItems(final CosmosContainer container, final SqlQuerySpec query, final FeedOptions options, final Class<T> clazz) {
public <T> List<T> queryItems(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final SqlQuerySpec query,
final FeedOptions options,
final Class<T> clazz) {
ArrayList<T> results = new ArrayList<>();
Iterator<FeedResponse<CosmosItemProperties>> paginatedResponse = container.queryItems(query, options);
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
Iterator<FeedResponse<CosmosItemProperties>> paginatedResponse = cosmosContainer.queryItems(query, options);
while (paginatedResponse.hasNext()) {
for (CosmosItemProperties properties : paginatedResponse.next().getResults()) {
try {
......@@ -163,40 +196,40 @@ public final class CosmosStore {
return results;
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return List of items found on specific page in container
*/
public <T> List<T> findAllItems(
final AsyncDocumentClient client,
final String dbName,
final String container,
public <T> List<T> findAllItemsAsync(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final Class<T> clazz,
final short pageSize,
final int pageNum) {
return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum);
return queryItemsAsync(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return List of items found on specific page in container
*/
public <T> List<T> queryItems(
final AsyncDocumentClient client,
final String dbName,
final String container,
public <T> List<T> queryItemsAsync(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final SqlQuerySpec query,
final Class<T> clazz,
final short pageSize,
......@@ -206,8 +239,8 @@ public final class CosmosStore {
HashMap<String, List<T>> results;
do {
String nextContinuationToken = "";
results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken);
AsyncDocumentClient client = cosmosClientFactory.getAsyncClient(dataPartitionId);
results = returnItemsWithToken(client, cosmosDBName, collection, query, clazz, pageSize, continuationToken);
for (Map.Entry<String, List<T>> entry : results.entrySet()) {
nextContinuationToken = entry.getKey();
}
......@@ -262,13 +295,20 @@ public final class CosmosStore {
}
/**
* @param container Container to query
* @param item Data object to store
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param item Data object to store
* @param <T> Type of response
*/
public <T> void upsertItem(final CosmosContainer container, final T item) {
public <T> void upsertItem(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final T item) {
try {
container.upsertItem(item);
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
cosmosContainer.upsertItem(item);
} catch (CosmosClientException e) {
String errorMessage = "Unexpectedly failed to put item into CosmosDB";
LOGGER.log(Level.WARNING, errorMessage, e);
......@@ -288,4 +328,26 @@ public final class CosmosStore {
final String partitionKey) {
return cosmos.getItem(id, partitionKey);
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @return Cosmos container
*/
private CosmosContainer getCosmosContainer(
final String dataPartitionId,
final String cosmosDBName,
final String collection) {
try {
return cosmosClientFactory.getClient(dataPartitionId)
.getDatabase(cosmosDBName)
.getContainer(collection);
} catch (Exception e) {
String errorMessage = "Error creating creating Cosmos Client";
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
}
package org.opengroup.osdu.azure;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.internal.AsyncDocumentClient;
/**
* Interface for Cosmos Client Factory to return appropriate cosmos client.
* instances for each tenant based on data partition id
*/
public interface ICosmosClientFactory {
/**
* @param dataPartitionId Data Partition Id
* @return Cosmos client instance
*/
CosmosClient getClient(String dataPartitionId);
/**
* @param dataPartitionId Data Partition Id
* @return Async Document Client instance
*/
AsyncDocumentClient getAsyncClient(String dataPartitionId);
}
......@@ -14,7 +14,17 @@
package org.opengroup.osdu.azure;
import com.azure.cosmos.*;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.CosmosItem;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemResponse;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.NotFoundException;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.internal.AsyncDocumentClient;
import com.azure.cosmos.internal.Document;
import org.junit.jupiter.api.BeforeEach;
......@@ -33,9 +43,19 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CosmosStoreTest {
......@@ -43,8 +63,9 @@ class CosmosStoreTest {
private static final String ID = "id";
private static final String PARTITION_KEY = "pk";
private static final String COSMOS_DB = "cosmosdb";
private static final String CONTAINER = "container";
private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container";
private static final String COLLECTION = "collection";
private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/collection";
private static final String DATA_PARTITION_ID = "data-partition-id";
@Mock
private AsyncDocumentClient documentClient;
......@@ -64,6 +85,15 @@ class CosmosStoreTest {
@Mock
private Iterator<FeedResponse<CosmosItemProperties>> queryResponse;
@Mock
private ICosmosClientFactory cosmosClientFactory;
@Mock
private CosmosClient cosmosClient;
@Mock
private CosmosDatabase cosmosDatabase;
@InjectMocks
private CosmosStore cosmosStore;
......@@ -74,13 +104,17 @@ class CosmosStoreTest {
lenient().doReturn(cosmosItem).when(container).getItem(ID, PARTITION_KEY);
lenient().doReturn(cosmosResponse).when(cosmosItem).read(any());
lenient().doReturn(cosmosItemProperties).when(cosmosResponse).getProperties();
lenient().doReturn(cosmosClient).when(cosmosClientFactory).getClient(anyString());
lenient().doReturn(cosmosDatabase).when(cosmosClient).getDatabase(any());
lenient().doReturn(container).when(cosmosDatabase).getContainer(anyString());
lenient().doReturn(documentClient).when(cosmosClientFactory).getAsyncClient(anyString());
}
@Test
void delete_throws404_ifNotFound() throws CosmosClientException {
doThrow(NotFoundException.class).when(cosmosItem).delete(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.deleteItem(container, ID, PARTITION_KEY);
cosmosStore.deleteItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY);
});
assertEquals(404, exception.getError().getCode());
}
......@@ -89,7 +123,7 @@ class CosmosStoreTest {
void delete_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(cosmosItem).delete(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.deleteItem(container, ID, PARTITION_KEY);
cosmosStore.deleteItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY);
});
assertEquals(500, exception.getError().getCode());
}
......@@ -97,20 +131,20 @@ class CosmosStoreTest {
@Test
void findItem_returnsEmpty_ifNotFound() throws CosmosClientException {
doThrow(NotFoundException.class).when(cosmosItem).read(any());
assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
assertFalse(cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class).isPresent());
}
@Test
void findItem_returnsEmpty_ifMalformedDocument() throws IOException {
doThrow(IOException.class).when(cosmosItemProperties).getObject(any());
assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
assertFalse(cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class).isPresent());
}
@Test
void findItem_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(cosmosItem).read(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.findItem(container, ID, PARTITION_KEY, String.class);
cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class);
});
assertEquals(500, exception.getError().getCode());
}
......@@ -119,7 +153,7 @@ class CosmosStoreTest {
void upsertItem_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(container).upsertItem(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.upsertItem(container, "some-data");
cosmosStore.upsertItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, "some-data");
});
assertEquals(500, exception.getError().getCode());
}
......@@ -127,7 +161,7 @@ class CosmosStoreTest {
@Test
void findAllItems_executesCorrectQuery() throws IOException {
mockQueryResponse("s1");
cosmosStore.findAllItems(container, String.class);
cosmosStore.findAllItems(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class);
ArgumentCaptor<SqlQuerySpec> query = ArgumentCaptor.forClass(SqlQuerySpec.class);
ArgumentCaptor<FeedOptions> feedOptions = ArgumentCaptor.forClass(FeedOptions.class);
......@@ -141,7 +175,7 @@ class CosmosStoreTest {
@Test
void findAllItems_pagesCorrectly() throws IOException {
mockQueryResponse("s1", "s2", "s3");
List<String> results = cosmosStore.findAllItems(container, String.class);
List<String> results = cosmosStore.findAllItems(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class);
assertEquals(3, results.size());
assertTrue(results.contains("s1"));
......@@ -153,7 +187,7 @@ class CosmosStoreTest {
void findAllItems_byPageNumber() {
mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5");
List<String> results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
List<String> results = cosmosStore.findAllItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION,
String.class, (short)2, 2);
assertEquals(2, results.size());
......@@ -161,7 +195,7 @@ class CosmosStoreTest {
assertTrue(results.contains("s4"));
mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5");
results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
results = cosmosStore.findAllItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION,
String.class, (short)3, 2);
assertEquals(2, results.size());
......@@ -172,7 +206,7 @@ class CosmosStoreTest {
@Test
void queryItems_byPageNumber() throws IOException {
mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5");
List<String> results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
List<String> results = cosmosStore.queryItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION,
new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1);
assertEquals(3, results.size());
......@@ -181,7 +215,7 @@ class CosmosStoreTest {
assertTrue(results.contains("W3"));
mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5");
results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
results = cosmosStore.queryItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION,
new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3);
assertEquals(1, results.size());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment