diff --git a/pom.xml b/pom.xml index 7e6379c31929377214376da3349d9e26602d9d57..46b526d6fd9c27369506e8b17e782cacc058187a 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.opengroup.osdu core-lib-azure jar - 0.0.9 + 0.0.10 core-lib-azure diff --git a/src/main/java/org/opengroup/osdu/azure/CosmosStore.java b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java new file mode 100644 index 0000000000000000000000000000000000000000..e08ce20b8a914a10fdac129f70343430bb7f06cc --- /dev/null +++ b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java @@ -0,0 +1,291 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.azure; + +import com.azure.cosmos.CosmosClientException; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosItem; +import com.azure.cosmos.CosmosItemProperties; +import com.azure.cosmos.CosmosItemRequestOptions; +import com.azure.cosmos.FeedOptions; +import com.azure.cosmos.FeedResponse; +import com.azure.cosmos.NotFoundException; +import com.azure.cosmos.SqlQuerySpec; +import com.azure.cosmos.internal.AsyncDocumentClient; +import com.azure.cosmos.internal.Document; +import org.opengroup.osdu.core.common.model.http.AppException; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A simpler interface for interacting with CosmosDB. + * Usage Examples: + *
+ * {@code
+ *      @Inject
+ *      private CosmosContainer container;
+ *
+ *      @Inject
+ *      private CosmosStore cosmosStore;
+ *
+ *      void findItemExample() {
+ *          Optional myItem = cosmosStore.findItem(container, "id", "partition-key", MyObject.class);
+ *          myItem.isPresent(); // true if found, false otherwise
+ *      }
+ *
+ *      void findAllItemsExample() {
+ *          List objects = cosmosStore.findAllItems(container, MyObject.class);
+ *      }
+ *
+ *      void queryItemsExample() {
+ *          SqlQuerySpec query = new SqlQuerySpec()
+ *                 .setQueryText("SELECT * FROM c WHERE c.isFoo = @isFoo")
+ *                 .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true)));
+ *         FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
+ *
+ *         List objects = cosmosStore.queryItems(container, query, options, MyObject.class);
+ *      }
+ * }
+ * 
+ */ +public final class CosmosStore { + + private static final Logger LOGGER = Logger.getLogger(CosmosStore.class.getName()); + + /** + * @param cosmos Container to query + * @param id ID of item + * @param partitionKey Partition key of item + */ + public void deleteItem( + final CosmosContainer cosmos, + final String id, + final String partitionKey) { + try { + findItem(cosmos, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey)); + } catch (NotFoundException e) { + String errorMessage = "Item was unexpectedly not found"; + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(404, errorMessage, e.getMessage(), e); + } catch (CosmosClientException e) { + String errorMessage = "Unexpectedly failed to delete item from CosmosDB"; + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(500, errorMessage, e.getMessage(), e); + } + } + + /** + * @param cosmos Container to query + * @param id ID of item + * @param partitionKey Partition key of item + * @param clazz Class to serialize results into + * @param Type to return + * @return The item that was found based on the IDs provided + */ + public Optional findItem( + final CosmosContainer cosmos, + final String id, + final String partitionKey, + final Class clazz) { + try { + T item = findItem(cosmos, id, partitionKey) + .read(new CosmosItemRequestOptions(partitionKey)) + .getProperties() + .getObject(clazz); + return Optional.ofNullable(item); + } catch (NotFoundException e) { + LOGGER.info(String.format("Unable to find item with ID=%s and PK=%s", id, partitionKey)); + return Optional.empty(); + } catch (IOException e) { + LOGGER.warning(String.format("Malformed document for item with ID=%s and PK=%s", id, partitionKey)); + return Optional.empty(); + } catch (CosmosClientException e) { + String errorMessage = "Unexpectedly encountered error calling CosmosDB"; + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(500, errorMessage, e.getMessage(), e); + } + } + /** + * @param container Container to query + * @param clazz Class type of response + * @param Type of response + * @return List of items found in container + */ + public List findAllItems(final CosmosContainer container, final Class clazz) { + FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true); + return queryItems(container, new SqlQuerySpec("SELECT * FROM c"), options, clazz); + } + + /** + * @param container Container to query + * @param clazz Class type of response + * @param query {@link SqlQuerySpec} to execute + * @param options Query options + * @param Type of response + * @return List of items found in container + */ + public List queryItems(final CosmosContainer container, final SqlQuerySpec query, final FeedOptions options, final Class clazz) { + ArrayList results = new ArrayList<>(); + Iterator> paginatedResponse = container.queryItems(query, options); + + while (paginatedResponse.hasNext()) { + for (CosmosItemProperties properties : paginatedResponse.next().getResults()) { + try { + results.add(properties.getObject(clazz)); + } catch (IOException e) { + String errorMessage = String.format("Malformed document for item with ID=%s", properties.getId()); + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(500, errorMessage, e.getMessage(), e); + } + } + } + return results; + } + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @return List of items found on specific page in container + */ + public List findAllItems( + final AsyncDocumentClient client, + final String dbName, + final String container, + final Class clazz, + final short pageSize, + final int pageNum) { + return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum); + } + + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param query {@link SqlQuerySpec} to execute + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param pageNum Page number returned + * @return List of items found on specific page in container + */ + public List queryItems( + final AsyncDocumentClient client, + final String dbName, + final String container, + final SqlQuerySpec query, + final Class clazz, + final short pageSize, + final int pageNum) { + String continuationToken = null; + int currentPage = 0; + HashMap> results; + do { + String nextContinuationToken = ""; + + results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken); + for (Map.Entry> entry : results.entrySet()) { + nextContinuationToken = entry.getKey(); + } + continuationToken = nextContinuationToken; + currentPage++; + + } while (currentPage < pageNum && continuationToken != null); + return results.get(continuationToken); + } + /** + * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service + * @param dbName Cosmos DB name + * @param container Container to query + * @param query {@link SqlQuerySpec} to execute + * @param clazz Class type of response + * @param Type of response + * @param pageSize Number of items returned + * @param continuationToken Token used to continue the enumeration + * @return Continuation Token and list of documents in container + */ + private static HashMap> returnItemsWithToken( + final AsyncDocumentClient client, + final String dbName, + final String container, + final SqlQuerySpec query, + final Class clazz, + final short pageSize, + final String continuationToken) { + + HashMap> map = new HashMap<>(); + List items = new ArrayList(); + + FeedOptions feedOptions = new FeedOptions() + .maxItemCount((int) pageSize) + .setEnableCrossPartitionQuery(true) + .requestContinuation(continuationToken); + + String collectionLink = String.format("/dbs/%s/colls/%s", dbName, container); + Flux> queryFlux = client.queryDocuments(collectionLink, query, feedOptions); + + Iterator> it = queryFlux.toIterable().iterator(); + + FeedResponse page = it.next(); + List results = page.getResults(); + for (Document doc : results) { + T obj = doc.toObject(clazz); + items.add(obj); + } + + map.put(page.getContinuationToken(), items); + return map; + } + + /** + * @param container Container to query + * @param item Data object to store + * @param Type of response + */ + public void upsertItem(final CosmosContainer container, final T item) { + try { + container.upsertItem(item); + } catch (CosmosClientException e) { + String errorMessage = "Unexpectedly failed to put item into CosmosDB"; + LOGGER.log(Level.WARNING, errorMessage, e); + throw new AppException(500, errorMessage, e.getMessage(), e); + } + } + + /** + * @param cosmos Container to query + * @param id ID of item + * @param partitionKey Partition key of item + * @return The item. It may not exist - the caller must check + */ + private static CosmosItem findItem( + final CosmosContainer cosmos, + final String id, + final String partitionKey) { + return cosmos.getItem(id, partitionKey); + } +} diff --git a/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d3f2f7c2b660dc7e48f7cac81fb82f28a8d617fa --- /dev/null +++ b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java @@ -0,0 +1,234 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.azure; + +import com.azure.cosmos.*; +import com.azure.cosmos.internal.AsyncDocumentClient; +import com.azure.cosmos.internal.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengroup.osdu.core.common.model.http.AppException; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class CosmosStoreTest { + + private static final String ID = "id"; + private static final String PARTITION_KEY = "pk"; + private static final String COSMOS_DB = "cosmosdb"; + private static final String CONTAINER = "container"; + private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container"; + + @Mock + private AsyncDocumentClient documentClient; + + @Mock + private CosmosContainer container; + + @Mock + private CosmosItem cosmosItem; + + @Mock + private CosmosItemProperties cosmosItemProperties; + + @Mock + private CosmosItemResponse cosmosResponse; + + @Mock + private Iterator> queryResponse; + + @InjectMocks + private CosmosStore cosmosStore; + + @BeforeEach + void init() throws CosmosClientException { + // mock the common cosmos request/response pattern that most tests need. because + // not all tests will leverage these, we make the mocks lenient. + lenient().doReturn(cosmosItem).when(container).getItem(ID, PARTITION_KEY); + lenient().doReturn(cosmosResponse).when(cosmosItem).read(any()); + lenient().doReturn(cosmosItemProperties).when(cosmosResponse).getProperties(); + } + + @Test + void delete_throws404_ifNotFound() throws CosmosClientException { + doThrow(NotFoundException.class).when(cosmosItem).delete(any()); + AppException exception = assertThrows(AppException.class, () -> { + cosmosStore.deleteItem(container, ID, PARTITION_KEY); + }); + assertEquals(404, exception.getError().getCode()); + } + + @Test + void delete_throws500_ifUnknownError() throws CosmosClientException { + doThrow(CosmosClientException.class).when(cosmosItem).delete(any()); + AppException exception = assertThrows(AppException.class, () -> { + cosmosStore.deleteItem(container, ID, PARTITION_KEY); + }); + assertEquals(500, exception.getError().getCode()); + } + + @Test + void findItem_returnsEmpty_ifNotFound() throws CosmosClientException { + doThrow(NotFoundException.class).when(cosmosItem).read(any()); + assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent()); + } + + @Test + void findItem_returnsEmpty_ifMalformedDocument() throws IOException { + doThrow(IOException.class).when(cosmosItemProperties).getObject(any()); + assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent()); + } + + @Test + void findItem_throws500_ifUnknownError() throws CosmosClientException { + doThrow(CosmosClientException.class).when(cosmosItem).read(any()); + AppException exception = assertThrows(AppException.class, () -> { + cosmosStore.findItem(container, ID, PARTITION_KEY, String.class); + }); + assertEquals(500, exception.getError().getCode()); + } + + @Test + void upsertItem_throws500_ifUnknownError() throws CosmosClientException { + doThrow(CosmosClientException.class).when(container).upsertItem(any()); + AppException exception = assertThrows(AppException.class, () -> { + cosmosStore.upsertItem(container, "some-data"); + }); + assertEquals(500, exception.getError().getCode()); + } + + @Test + void findAllItems_executesCorrectQuery() throws IOException { + mockQueryResponse("s1"); + cosmosStore.findAllItems(container, String.class); + + ArgumentCaptor query = ArgumentCaptor.forClass(SqlQuerySpec.class); + ArgumentCaptor feedOptions = ArgumentCaptor.forClass(FeedOptions.class); + + verify(container).queryItems(query.capture(), feedOptions.capture()); + + assertEquals("SELECT * FROM c", query.getValue().getQueryText()); + assertTrue(feedOptions.getValue().getEnableCrossPartitionQuery()); + } + + @Test + void findAllItems_pagesCorrectly() throws IOException { + mockQueryResponse("s1", "s2", "s3"); + List results = cosmosStore.findAllItems(container, String.class); + + assertEquals(3, results.size()); + assertTrue(results.contains("s1")); + assertTrue(results.contains("s2")); + assertTrue(results.contains("s3")); + } + + @Test + void findAllItems_byPageNumber() { + mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5"); + + List results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER, + String.class, (short)2, 2); + + assertEquals(2, results.size()); + assertTrue(results.contains("s3")); + assertTrue(results.contains("s4")); + + mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5"); + results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER, + String.class, (short)3, 2); + + assertEquals(2, results.size()); + assertTrue(results.contains("T4")); + assertTrue(results.contains("T5")); + } + + @Test + void queryItems_byPageNumber() throws IOException { + mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5"); + List results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER, + new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1); + + assertEquals(3, results.size()); + assertTrue(results.contains("W1")); + assertTrue(results.contains("W2")); + assertTrue(results.contains("W3")); + + mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5"); + results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER, + new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3); + + assertEquals(1, results.size()); + assertTrue(results.contains("Z5")); + } + + private void mockQueryResponse(String... responses) throws IOException { + ArrayList> paginatedResponse = new ArrayList<>(); + for (String response : responses) { + @SuppressWarnings("unchecked") + FeedResponse pageResponse = (FeedResponse) mock(FeedResponse.class); + + CosmosItemProperties properties = mock(CosmosItemProperties.class); + doReturn(Collections.singletonList(properties)).when(pageResponse).getResults(); + doReturn(response).when(properties).getObject(any()); + + paginatedResponse.add(pageResponse); + } + + doReturn(paginatedResponse.iterator()).when(container).queryItems(any(SqlQuerySpec.class), any()); + } + + private void mockPaginatedQueryResponse(int pageSize, int pageNum, String... responses) { + List resp = new ArrayList<>(); + FeedResponse pageResponse = (FeedResponse) mock(FeedResponse.class); + + for (String response : responses) { + Document doc = mock(Document.class); + resp.add(doc); + lenient().doReturn(Collections.singletonList(doc)).when(pageResponse).getResults(); + lenient().doReturn(response).when(doc).toObject(any()); + } + + when(pageResponse.getResults()).thenReturn(currentPage(resp,pageSize,pageNum)); + doReturn(Flux.just(pageResponse)) + .when(documentClient) + .queryDocuments(eq(COLLECTION_LINK), any((SqlQuerySpec.class)), any()); + } + + private static List currentPage (List dataList, int pageSize, int pageNum) { + List currentPageList = new ArrayList<>(); + if (dataList != null && dataList.size() > 0) { + int currIdx = (pageNum > 1 ? (pageNum - 1) * pageSize : 0); + for (int i = 0; i < pageSize && i < dataList.size() - currIdx; i++) { + currentPageList.add(dataList.get(currIdx + i)); + } + } + return currentPageList; + } +}