diff --git a/src/main/java/org/opengroup/osdu/azure/CosmosFacade.java b/src/main/java/org/opengroup/osdu/azure/CosmosFacade.java index f5aa7bfd704977cb65b0594409c14e7d79ff2df9..4c7c656397f1456cf82f4faa4b5d92b23f83cf00 100644 --- a/src/main/java/org/opengroup/osdu/azure/CosmosFacade.java +++ b/src/main/java/org/opengroup/osdu/azure/CosmosFacade.java @@ -23,12 +23,17 @@ import com.azure.cosmos.FeedOptions; import com.azure.cosmos.FeedResponse; import com.azure.cosmos.NotFoundException; import com.azure.cosmos.SqlQuerySpec; +import com.azure.cosmos.internal.AsyncDocumentClient; +import com.azure.cosmos.internal.Document; import org.opengroup.osdu.core.common.model.http.AppException; +import reactor.core.publisher.Flux; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,7 +69,6 @@ import java.util.logging.Logger; public final class CosmosFacade { private static final Logger LOGGER = Logger.getLogger(CosmosFacade.class.getName()); - /** * Private constructor -- this class should never be instantiated. */ @@ -124,7 +128,6 @@ public final class CosmosFacade { throw new AppException(500, errorMessage, e.getMessage(), e); } } - /** * @param container Container to query * @param clazz Class type of response @@ -161,6 +164,104 @@ public final class CosmosFacade { } return results; } + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @return List of items found on specific page in container + */ + public static List findAllItems( + final AsyncDocumentClient client, + final String dbName, + final String container, + final Class clazz, + final short pageSize, + final int pageNum) { + return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum); + } + + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param query {@link SqlQuerySpec} to execute + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @return List of items found on specific page in container + */ + public static List queryItems( + final AsyncDocumentClient client, + final String dbName, + final String container, + final SqlQuerySpec query, + final Class clazz, + final short pageSize, + final int pageNum) { + String continuationToken = null; + int currentPage = 0; + HashMap> results; + do { + String nextContinuationToken = ""; + + results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken); + for (Map.Entry> entry : results.entrySet()) { + nextContinuationToken = entry.getKey(); + } + continuationToken = nextContinuationToken; + currentPage++; + + } while (currentPage < pageNum && continuationToken != null); + return results.get(continuationToken); + } + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param query {@link SqlQuerySpec} to execute + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param continuationToken Token used to continue the enumeration + * @return Continuation Token and list of documents in container + */ + private static HashMap> returnItemsWithToken( + final AsyncDocumentClient client, + final String dbName, + final String container, + final SqlQuerySpec query, + final Class clazz, + final short pageSize, + final String continuationToken) { + + HashMap> map = new HashMap<>(); + List items = new ArrayList(); + + FeedOptions feedOptions = new FeedOptions() + .maxItemCount((int) pageSize) + .setEnableCrossPartitionQuery(true) + .requestContinuation(continuationToken); + + String collectionLink = String.format("/dbs/%s/colls/%s", dbName, container); + Flux> queryFlux = client.queryDocuments(collectionLink, query, feedOptions); + + Iterator> it = queryFlux.toIterable().iterator(); + + FeedResponse page = it.next(); + List results = page.getResults(); + for (Document doc : results) { + T obj = doc.toObject(clazz); + items.add(obj); + } + + map.put(page.getContinuationToken(), items); + return map; + } /** * @param container Container to query @@ -189,5 +290,4 @@ public final class CosmosFacade { final String partitionKey) { return cosmos.getItem(id, partitionKey); } - } diff --git a/src/test/java/org/opengroup/osdu/azure/CosmosFacadeTest.java b/src/test/java/org/opengroup/osdu/azure/CosmosFacadeTest.java index 08807345b7d7cf8705888dc10b22990728f9d845..4090427dc14c92eca64e0d2301d21caae372d762 100644 --- a/src/test/java/org/opengroup/osdu/azure/CosmosFacadeTest.java +++ b/src/test/java/org/opengroup/osdu/azure/CosmosFacadeTest.java @@ -15,14 +15,16 @@ package org.opengroup.osdu.azure; import com.azure.cosmos.*; +import com.azure.cosmos.internal.AsyncDocumentClient; +import com.azure.cosmos.internal.Document; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; - import org.mockito.junit.jupiter.MockitoExtension; import org.opengroup.osdu.core.common.model.http.AppException; +import reactor.core.publisher.Flux; import java.io.IOException; import java.util.ArrayList; @@ -39,7 +41,13 @@ class CosmosFacadeTest { private static final String ID = "id"; private static final String PARTITION_KEY = "pk"; - + private static final String COSMOS_DB = "cosmosdb"; + private static final String CONTAINER = "container"; + private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container"; + + @Mock + private AsyncDocumentClient documentClient; + @Mock private CosmosContainer container; @@ -54,7 +62,7 @@ class CosmosFacadeTest { @Mock private Iterator> queryResponse; - + @BeforeEach void init() throws CosmosClientException { // mock the common cosmos request/response pattern that most tests need. because @@ -136,6 +144,45 @@ class CosmosFacadeTest { assertTrue(results.contains("s2")); assertTrue(results.contains("s3")); } + + @Test + void findAllItems_byPageNumber() { + mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5"); + + List results = CosmosFacade.findAllItems(documentClient, COSMOS_DB, CONTAINER, + String.class, (short)2, 2); + + assertEquals(2, results.size()); + assertTrue(results.contains("s3")); + assertTrue(results.contains("s4")); + + mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5"); + results = CosmosFacade.findAllItems(documentClient, COSMOS_DB, CONTAINER, + String.class, (short)3, 2); + + assertEquals(2, results.size()); + assertTrue(results.contains("T4")); + assertTrue(results.contains("T5")); + } + + @Test + void queryItems_byPageNumber() throws IOException { + mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5"); + List results = CosmosFacade.queryItems(documentClient, COSMOS_DB, CONTAINER, + new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1); + + assertEquals(3, results.size()); + assertTrue(results.contains("W1")); + assertTrue(results.contains("W2")); + assertTrue(results.contains("W3")); + + mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5"); + results = CosmosFacade.queryItems(documentClient, COSMOS_DB, CONTAINER, + new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3); + + assertEquals(1, results.size()); + assertTrue(results.contains("Z5")); + } private void mockQueryResponse(String... responses) throws IOException { ArrayList> paginatedResponse = new ArrayList<>(); @@ -152,4 +199,32 @@ class CosmosFacadeTest { doReturn(paginatedResponse.iterator()).when(container).queryItems(any(SqlQuerySpec.class), any()); } + + private void mockPaginatedQueryResponse(int pageSize, int pageNum, String... responses) { + List resp = new ArrayList<>(); + FeedResponse pageResponse = (FeedResponse) mock(FeedResponse.class); + + for (String response : responses) { + Document doc = mock(Document.class); + resp.add(doc); + lenient().doReturn(Collections.singletonList(doc)).when(pageResponse).getResults(); + lenient().doReturn(response).when(doc).toObject(any()); + } + + when(pageResponse.getResults()).thenReturn(currentPage(resp,pageSize,pageNum)); + doReturn(Flux.just(pageResponse)) + .when(documentClient) + .queryDocuments(eq(COLLECTION_LINK), any((SqlQuerySpec.class)), any()); + } + + private static List currentPage (List dataList, int pageSize, int pageNum) { + List currentPageList = new ArrayList<>(); + if (dataList != null && dataList.size() > 0) { + int currIdx = (pageNum > 1 ? (pageNum - 1) * pageSize : 0); + for (int i = 0; i < pageSize && i < dataList.size() - currIdx; i++) { + currentPageList.add(dataList.get(currIdx + i)); + } + } + return currentPageList; + } }