diff --git a/src/main/java/org/opengroup/osdu/azure/CosmosClientFactoryImpl.java b/src/main/java/org/opengroup/osdu/azure/CosmosClientFactoryImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..75fd1b7cfe02a74a3d22c31ae6f36ff1531b0a00 --- /dev/null +++ b/src/main/java/org/opengroup/osdu/azure/CosmosClientFactoryImpl.java @@ -0,0 +1,41 @@ +package org.opengroup.osdu.azure; + +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.internal.AsyncDocumentClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +/** + * Implementation for ICosmosClientFactory. + */ +@Component +@Lazy +public class CosmosClientFactoryImpl implements ICosmosClientFactory { + + @Lazy + @Autowired + private CosmosClient cosmosClient; + + @Lazy + @Autowired + private AsyncDocumentClient asyncDocumentClient; + + /** + * @param dataPartitionId Data Partition Id + * @return Cosmos Client instance + */ + @Override + public CosmosClient getClient(final String dataPartitionId) { + return cosmosClient; + } + + /** + * @param dataPartitionId Data Partition Id + * @return Async Document Client instance + */ + @Override + public AsyncDocumentClient getAsyncClient(final String dataPartitionId) { + return asyncDocumentClient; + } +} diff --git a/src/main/java/org/opengroup/osdu/azure/CosmosStore.java b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java index e08ce20b8a914a10fdac129f70343430bb7f06cc..a7e166b0e580d49aa8468a4dbd5fe64af338809e 100644 --- a/src/main/java/org/opengroup/osdu/azure/CosmosStore.java +++ b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java @@ -26,6 +26,9 @@ import com.azure.cosmos.SqlQuerySpec; import com.azure.cosmos.internal.AsyncDocumentClient; import com.azure.cosmos.internal.Document; import org.opengroup.osdu.core.common.model.http.AppException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import java.io.IOException; @@ -50,12 +53,12 @@ import java.util.logging.Logger; * private CosmosStore cosmosStore; * * void findItemExample() { - * Optional myItem = cosmosStore.findItem(container, "id", "partition-key", MyObject.class); + * Optional myItem = cosmosStore.findItem("dataPartitionId", "cosmosDb", "collection", "id", "partition-key", MyObject.class); * myItem.isPresent(); // true if found, false otherwise * } * * void findAllItemsExample() { - * List objects = cosmosStore.findAllItems(container, MyObject.class); + * List objects = cosmosStore.findAllItems("dataPartitionId", "cosmosDb", "collection", MyObject.class); * } * * void queryItemsExample() { @@ -64,26 +67,37 @@ import java.util.logging.Logger; * .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true))); * FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true); * - * List objects = cosmosStore.queryItems(container, query, options, MyObject.class); + * List objects = cosmosStore.queryItems("dataPartitionId", "cosmosDb", "collection", query, options, MyObject.class); * } * } * */ -public final class CosmosStore { + +@Component +@Lazy +public class CosmosStore { private static final Logger LOGGER = Logger.getLogger(CosmosStore.class.getName()); + @Autowired + private ICosmosClientFactory cosmosClientFactory; + /** - * @param cosmos Container to query - * @param id ID of item - * @param partitionKey Partition key of item + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param id ID of item + * @param partitionKey Partition key of item */ public void deleteItem( - final CosmosContainer cosmos, + final String dataPartitionId, + final String cosmosDBName, + final String collection, final String id, final String partitionKey) { try { - findItem(cosmos, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey)); + CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection); + findItem(cosmosContainer, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey)); } catch (NotFoundException e) { String errorMessage = "Item was unexpectedly not found"; LOGGER.log(Level.WARNING, errorMessage, e); @@ -96,20 +110,25 @@ public final class CosmosStore { } /** - * @param cosmos Container to query - * @param id ID of item - * @param partitionKey Partition key of item - * @param clazz Class to serialize results into - * @param Type to return + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param id ID of item + * @param partitionKey Partition key of item + * @param clazz Class to serialize results into + * @param Type to return * @return The item that was found based on the IDs provided */ public Optional findItem( - final CosmosContainer cosmos, + final String dataPartitionId, + final String cosmosDBName, + final String collection, final String id, final String partitionKey, final Class clazz) { try { - T item = findItem(cosmos, id, partitionKey) + CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection); + T item = findItem(cosmosContainer, id, partitionKey) .read(new CosmosItemRequestOptions(partitionKey)) .getProperties() .getObject(clazz); @@ -127,28 +146,42 @@ public final class CosmosStore { } } /** - * @param container Container to query - * @param clazz Class type of response - * @param Type of response + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param clazz Class type of response + * @param Type of response * @return List of items found in container */ - public List findAllItems(final CosmosContainer container, final Class clazz) { + public List findAllItems( + final String dataPartitionId, + final String cosmosDBName, + final String collection, + final Class clazz) { FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true); - return queryItems(container, new SqlQuerySpec("SELECT * FROM c"), options, clazz); + return queryItems(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), options, clazz); } /** - * @param container Container to query - * @param clazz Class type of response - * @param query {@link SqlQuerySpec} to execute - * @param options Query options - * @param Type of response + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param query {@link SqlQuerySpec} to execute + * @param options Query options + * @param clazz Class type of response + * @param Type of response * @return List of items found in container */ - public List queryItems(final CosmosContainer container, final SqlQuerySpec query, final FeedOptions options, final Class clazz) { + public List queryItems( + final String dataPartitionId, + final String cosmosDBName, + final String collection, + final SqlQuerySpec query, + final FeedOptions options, + final Class clazz) { ArrayList results = new ArrayList<>(); - Iterator> paginatedResponse = container.queryItems(query, options); - + CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection); + Iterator> paginatedResponse = cosmosContainer.queryItems(query, options); while (paginatedResponse.hasNext()) { for (CosmosItemProperties properties : paginatedResponse.next().getResults()) { try { @@ -163,40 +196,40 @@ public final class CosmosStore { return results; } /** - * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service - * @param dbName Cosmos DB name - * @param container Container to query - * @param clazz Class type of response - * @param Type of response - * @param pageSize Number of items returned - * @param pageNum Page number returned + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param clazz Class type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @param Type of response * @return List of items found on specific page in container */ - public List findAllItems( - final AsyncDocumentClient client, - final String dbName, - final String container, + public List findAllItemsAsync( + final String dataPartitionId, + final String cosmosDBName, + final String collection, final Class clazz, final short pageSize, final int pageNum) { - return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum); + return queryItemsAsync(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum); } /** - * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service - * @param dbName Cosmos DB name - * @param container Container to query - * @param query {@link SqlQuerySpec} to execute - * @param clazz Class type of response - * @param Type of response - * @param pageSize Number of items returned - * @param pageNum Page number returned + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param query {@link SqlQuerySpec} to execute + * @param clazz Class type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @param Type of response * @return List of items found on specific page in container */ - public List queryItems( - final AsyncDocumentClient client, - final String dbName, - final String container, + public List queryItemsAsync( + final String dataPartitionId, + final String cosmosDBName, + final String collection, final SqlQuerySpec query, final Class clazz, final short pageSize, @@ -206,8 +239,8 @@ public final class CosmosStore { HashMap> results; do { String nextContinuationToken = ""; - - results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken); + AsyncDocumentClient client = cosmosClientFactory.getAsyncClient(dataPartitionId); + results = returnItemsWithToken(client, cosmosDBName, collection, query, clazz, pageSize, continuationToken); for (Map.Entry> entry : results.entrySet()) { nextContinuationToken = entry.getKey(); } @@ -262,13 +295,20 @@ public final class CosmosStore { } /** - * @param container Container to query - * @param item Data object to store - * @param Type of response + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @param item Data object to store + * @param Type of response */ - public void upsertItem(final CosmosContainer container, final T item) { + public void upsertItem( + final String dataPartitionId, + final String cosmosDBName, + final String collection, + final T item) { try { - container.upsertItem(item); + CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection); + cosmosContainer.upsertItem(item); } catch (CosmosClientException e) { String errorMessage = "Unexpectedly failed to put item into CosmosDB"; LOGGER.log(Level.WARNING, errorMessage, e); @@ -288,4 +328,26 @@ public final class CosmosStore { final String partitionKey) { return cosmos.getItem(id, partitionKey); } + + /** + * @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition + * @param cosmosDBName Database to be used + * @param collection Collection to be used + * @return Cosmos container + */ + private CosmosContainer getCosmosContainer( + final String dataPartitionId, + final String cosmosDBName, + final String collection) { + try { + return cosmosClientFactory.getClient(dataPartitionId) + .getDatabase(cosmosDBName) + .getContainer(collection); + } catch (Exception e) { + String errorMessage = "Error creating creating Cosmos Client"; + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(500, errorMessage, e.getMessage(), e); + } + + } } diff --git a/src/main/java/org/opengroup/osdu/azure/ICosmosClientFactory.java b/src/main/java/org/opengroup/osdu/azure/ICosmosClientFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0c80fadaa454e7e1b87261ad77862cc917910117 --- /dev/null +++ b/src/main/java/org/opengroup/osdu/azure/ICosmosClientFactory.java @@ -0,0 +1,23 @@ +package org.opengroup.osdu.azure; + +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.internal.AsyncDocumentClient; + +/** + * Interface for Cosmos Client Factory to return appropriate cosmos client. + * instances for each tenant based on data partition id + */ +public interface ICosmosClientFactory { + + /** + * @param dataPartitionId Data Partition Id + * @return Cosmos client instance + */ + CosmosClient getClient(String dataPartitionId); + + /** + * @param dataPartitionId Data Partition Id + * @return Async Document Client instance + */ + AsyncDocumentClient getAsyncClient(String dataPartitionId); +} diff --git a/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java index d3f2f7c2b660dc7e48f7cac81fb82f28a8d617fa..6d78f55049ed5ccba1ece030e6df9fe0ff193c2d 100644 --- a/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java +++ b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java @@ -14,7 +14,17 @@ package org.opengroup.osdu.azure; -import com.azure.cosmos.*; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientException; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.CosmosItem; +import com.azure.cosmos.CosmosItemProperties; +import com.azure.cosmos.CosmosItemResponse; +import com.azure.cosmos.FeedOptions; +import com.azure.cosmos.FeedResponse; +import com.azure.cosmos.NotFoundException; +import com.azure.cosmos.SqlQuerySpec; import com.azure.cosmos.internal.AsyncDocumentClient; import com.azure.cosmos.internal.Document; import org.junit.jupiter.api.BeforeEach; @@ -33,9 +43,19 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class CosmosStoreTest { @@ -43,8 +63,9 @@ class CosmosStoreTest { private static final String ID = "id"; private static final String PARTITION_KEY = "pk"; private static final String COSMOS_DB = "cosmosdb"; - private static final String CONTAINER = "container"; - private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container"; + private static final String COLLECTION = "collection"; + private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/collection"; + private static final String DATA_PARTITION_ID = "data-partition-id"; @Mock private AsyncDocumentClient documentClient; @@ -64,6 +85,15 @@ class CosmosStoreTest { @Mock private Iterator> queryResponse; + @Mock + private ICosmosClientFactory cosmosClientFactory; + + @Mock + private CosmosClient cosmosClient; + + @Mock + private CosmosDatabase cosmosDatabase; + @InjectMocks private CosmosStore cosmosStore; @@ -74,13 +104,17 @@ class CosmosStoreTest { lenient().doReturn(cosmosItem).when(container).getItem(ID, PARTITION_KEY); lenient().doReturn(cosmosResponse).when(cosmosItem).read(any()); lenient().doReturn(cosmosItemProperties).when(cosmosResponse).getProperties(); + lenient().doReturn(cosmosClient).when(cosmosClientFactory).getClient(anyString()); + lenient().doReturn(cosmosDatabase).when(cosmosClient).getDatabase(any()); + lenient().doReturn(container).when(cosmosDatabase).getContainer(anyString()); + lenient().doReturn(documentClient).when(cosmosClientFactory).getAsyncClient(anyString()); } @Test void delete_throws404_ifNotFound() throws CosmosClientException { doThrow(NotFoundException.class).when(cosmosItem).delete(any()); AppException exception = assertThrows(AppException.class, () -> { - cosmosStore.deleteItem(container, ID, PARTITION_KEY); + cosmosStore.deleteItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY); }); assertEquals(404, exception.getError().getCode()); } @@ -89,7 +123,7 @@ class CosmosStoreTest { void delete_throws500_ifUnknownError() throws CosmosClientException { doThrow(CosmosClientException.class).when(cosmosItem).delete(any()); AppException exception = assertThrows(AppException.class, () -> { - cosmosStore.deleteItem(container, ID, PARTITION_KEY); + cosmosStore.deleteItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY); }); assertEquals(500, exception.getError().getCode()); } @@ -97,20 +131,20 @@ class CosmosStoreTest { @Test void findItem_returnsEmpty_ifNotFound() throws CosmosClientException { doThrow(NotFoundException.class).when(cosmosItem).read(any()); - assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent()); + assertFalse(cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class).isPresent()); } @Test void findItem_returnsEmpty_ifMalformedDocument() throws IOException { doThrow(IOException.class).when(cosmosItemProperties).getObject(any()); - assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent()); + assertFalse(cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class).isPresent()); } @Test void findItem_throws500_ifUnknownError() throws CosmosClientException { doThrow(CosmosClientException.class).when(cosmosItem).read(any()); AppException exception = assertThrows(AppException.class, () -> { - cosmosStore.findItem(container, ID, PARTITION_KEY, String.class); + cosmosStore.findItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, ID, PARTITION_KEY, String.class); }); assertEquals(500, exception.getError().getCode()); } @@ -119,7 +153,7 @@ class CosmosStoreTest { void upsertItem_throws500_ifUnknownError() throws CosmosClientException { doThrow(CosmosClientException.class).when(container).upsertItem(any()); AppException exception = assertThrows(AppException.class, () -> { - cosmosStore.upsertItem(container, "some-data"); + cosmosStore.upsertItem(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, "some-data"); }); assertEquals(500, exception.getError().getCode()); } @@ -127,7 +161,7 @@ class CosmosStoreTest { @Test void findAllItems_executesCorrectQuery() throws IOException { mockQueryResponse("s1"); - cosmosStore.findAllItems(container, String.class); + cosmosStore.findAllItems(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class); ArgumentCaptor query = ArgumentCaptor.forClass(SqlQuerySpec.class); ArgumentCaptor feedOptions = ArgumentCaptor.forClass(FeedOptions.class); @@ -141,7 +175,7 @@ class CosmosStoreTest { @Test void findAllItems_pagesCorrectly() throws IOException { mockQueryResponse("s1", "s2", "s3"); - List results = cosmosStore.findAllItems(container, String.class); + List results = cosmosStore.findAllItems(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class); assertEquals(3, results.size()); assertTrue(results.contains("s1")); @@ -153,7 +187,7 @@ class CosmosStoreTest { void findAllItems_byPageNumber() { mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5"); - List results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER, + List results = cosmosStore.findAllItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class, (short)2, 2); assertEquals(2, results.size()); @@ -161,7 +195,7 @@ class CosmosStoreTest { assertTrue(results.contains("s4")); mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5"); - results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER, + results = cosmosStore.findAllItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, String.class, (short)3, 2); assertEquals(2, results.size()); @@ -172,7 +206,7 @@ class CosmosStoreTest { @Test void queryItems_byPageNumber() throws IOException { mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5"); - List results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER, + List results = cosmosStore.queryItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1); assertEquals(3, results.size()); @@ -181,7 +215,7 @@ class CosmosStoreTest { assertTrue(results.contains("W3")); mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5"); - results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER, + results = cosmosStore.queryItemsAsync(DATA_PARTITION_ID, COSMOS_DB, COLLECTION, new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3); assertEquals(1, results.size());