diff --git a/src/main/java/org/opengroup/osdu/azure/CosmosStore.java b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java
new file mode 100644
index 0000000000000000000000000000000000000000..e08ce20b8a914a10fdac129f70343430bb7f06cc
--- /dev/null
+++ b/src/main/java/org/opengroup/osdu/azure/CosmosStore.java
@@ -0,0 +1,291 @@
+// Copyright © Microsoft Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.opengroup.osdu.azure;
+
+import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosItem;
+import com.azure.cosmos.CosmosItemProperties;
+import com.azure.cosmos.CosmosItemRequestOptions;
+import com.azure.cosmos.FeedOptions;
+import com.azure.cosmos.FeedResponse;
+import com.azure.cosmos.NotFoundException;
+import com.azure.cosmos.SqlQuerySpec;
+import com.azure.cosmos.internal.AsyncDocumentClient;
+import com.azure.cosmos.internal.Document;
+import org.opengroup.osdu.core.common.model.http.AppException;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A simpler interface for interacting with CosmosDB.
+ * Usage Examples:
+ *
+ * {@code
+ * @Inject
+ * private CosmosContainer container;
+ *
+ * @Inject
+ * private CosmosStore cosmosStore;
+ *
+ * void findItemExample() {
+ * Optional myItem = cosmosStore.findItem(container, "id", "partition-key", MyObject.class);
+ * myItem.isPresent(); // true if found, false otherwise
+ * }
+ *
+ * void findAllItemsExample() {
+ * List objects = cosmosStore.findAllItems(container, MyObject.class);
+ * }
+ *
+ * void queryItemsExample() {
+ * SqlQuerySpec query = new SqlQuerySpec()
+ * .setQueryText("SELECT * FROM c WHERE c.isFoo = @isFoo")
+ * .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true)));
+ * FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
+ *
+ * List objects = cosmosStore.queryItems(container, query, options, MyObject.class);
+ * }
+ * }
+ *
+ */
+public final class CosmosStore {
+
+ private static final Logger LOGGER = Logger.getLogger(CosmosStore.class.getName());
+
+ /**
+ * @param cosmos Container to query
+ * @param id ID of item
+ * @param partitionKey Partition key of item
+ */
+ public void deleteItem(
+ final CosmosContainer cosmos,
+ final String id,
+ final String partitionKey) {
+ try {
+ findItem(cosmos, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey));
+ } catch (NotFoundException e) {
+ String errorMessage = "Item was unexpectedly not found";
+ LOGGER.log(Level.WARNING, errorMessage, e);
+ throw new AppException(404, errorMessage, e.getMessage(), e);
+ } catch (CosmosClientException e) {
+ String errorMessage = "Unexpectedly failed to delete item from CosmosDB";
+ LOGGER.log(Level.WARNING, errorMessage, e);
+ throw new AppException(500, errorMessage, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param cosmos Container to query
+ * @param id ID of item
+ * @param partitionKey Partition key of item
+ * @param clazz Class to serialize results into
+ * @param Type to return
+ * @return The item that was found based on the IDs provided
+ */
+ public Optional findItem(
+ final CosmosContainer cosmos,
+ final String id,
+ final String partitionKey,
+ final Class clazz) {
+ try {
+ T item = findItem(cosmos, id, partitionKey)
+ .read(new CosmosItemRequestOptions(partitionKey))
+ .getProperties()
+ .getObject(clazz);
+ return Optional.ofNullable(item);
+ } catch (NotFoundException e) {
+ LOGGER.info(String.format("Unable to find item with ID=%s and PK=%s", id, partitionKey));
+ return Optional.empty();
+ } catch (IOException e) {
+ LOGGER.warning(String.format("Malformed document for item with ID=%s and PK=%s", id, partitionKey));
+ return Optional.empty();
+ } catch (CosmosClientException e) {
+ String errorMessage = "Unexpectedly encountered error calling CosmosDB";
+ LOGGER.log(Level.WARNING, errorMessage, e);
+ throw new AppException(500, errorMessage, e.getMessage(), e);
+ }
+ }
+ /**
+ * @param container Container to query
+ * @param clazz Class type of response
+ * @param Type of response
+ * @return List of items found in container
+ */
+ public List findAllItems(final CosmosContainer container, final Class clazz) {
+ FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
+ return queryItems(container, new SqlQuerySpec("SELECT * FROM c"), options, clazz);
+ }
+
+ /**
+ * @param container Container to query
+ * @param clazz Class type of response
+ * @param query {@link SqlQuerySpec} to execute
+ * @param options Query options
+ * @param Type of response
+ * @return List of items found in container
+ */
+ public List queryItems(final CosmosContainer container, final SqlQuerySpec query, final FeedOptions options, final Class clazz) {
+ ArrayList results = new ArrayList<>();
+ Iterator> paginatedResponse = container.queryItems(query, options);
+
+ while (paginatedResponse.hasNext()) {
+ for (CosmosItemProperties properties : paginatedResponse.next().getResults()) {
+ try {
+ results.add(properties.getObject(clazz));
+ } catch (IOException e) {
+ String errorMessage = String.format("Malformed document for item with ID=%s", properties.getId());
+ LOGGER.log(Level.WARNING, errorMessage, e);
+ throw new AppException(500, errorMessage, e.getMessage(), e);
+ }
+ }
+ }
+ return results;
+ }
+ /**
+ * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
+ * @param dbName Cosmos DB name
+ * @param container Container to query
+ * @param clazz Class type of response
+ * @param Type of response
+ * @param pageSize Number of items returned
+ * @param pageNum Page number returned
+ * @return List of items found on specific page in container
+ */
+ public List findAllItems(
+ final AsyncDocumentClient client,
+ final String dbName,
+ final String container,
+ final Class clazz,
+ final short pageSize,
+ final int pageNum) {
+ return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum);
+ }
+
+ /**
+ * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
+ * @param dbName Cosmos DB name
+ * @param container Container to query
+ * @param query {@link SqlQuerySpec} to execute
+ * @param clazz Class type of response
+ * @param Type of response
+ * @param pageSize Number of items returned
+ * @param pageNum Page number returned
+ * @return List of items found on specific page in container
+ */
+ public List queryItems(
+ final AsyncDocumentClient client,
+ final String dbName,
+ final String container,
+ final SqlQuerySpec query,
+ final Class clazz,
+ final short pageSize,
+ final int pageNum) {
+ String continuationToken = null;
+ int currentPage = 0;
+ HashMap> results;
+ do {
+ String nextContinuationToken = "";
+
+ results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken);
+ for (Map.Entry> entry : results.entrySet()) {
+ nextContinuationToken = entry.getKey();
+ }
+ continuationToken = nextContinuationToken;
+ currentPage++;
+
+ } while (currentPage < pageNum && continuationToken != null);
+ return results.get(continuationToken);
+ }
+ /**
+ * @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
+ * @param dbName Cosmos DB name
+ * @param container Container to query
+ * @param query {@link SqlQuerySpec} to execute
+ * @param clazz Class type of response
+ * @param Type of response
+ * @param pageSize Number of items returned
+ * @param continuationToken Token used to continue the enumeration
+ * @return Continuation Token and list of documents in container
+ */
+ private static HashMap> returnItemsWithToken(
+ final AsyncDocumentClient client,
+ final String dbName,
+ final String container,
+ final SqlQuerySpec query,
+ final Class clazz,
+ final short pageSize,
+ final String continuationToken) {
+
+ HashMap> map = new HashMap<>();
+ List items = new ArrayList();
+
+ FeedOptions feedOptions = new FeedOptions()
+ .maxItemCount((int) pageSize)
+ .setEnableCrossPartitionQuery(true)
+ .requestContinuation(continuationToken);
+
+ String collectionLink = String.format("/dbs/%s/colls/%s", dbName, container);
+ Flux> queryFlux = client.queryDocuments(collectionLink, query, feedOptions);
+
+ Iterator> it = queryFlux.toIterable().iterator();
+
+ FeedResponse page = it.next();
+ List results = page.getResults();
+ for (Document doc : results) {
+ T obj = doc.toObject(clazz);
+ items.add(obj);
+ }
+
+ map.put(page.getContinuationToken(), items);
+ return map;
+ }
+
+ /**
+ * @param container Container to query
+ * @param item Data object to store
+ * @param Type of response
+ */
+ public void upsertItem(final CosmosContainer container, final T item) {
+ try {
+ container.upsertItem(item);
+ } catch (CosmosClientException e) {
+ String errorMessage = "Unexpectedly failed to put item into CosmosDB";
+ LOGGER.log(Level.WARNING, errorMessage, e);
+ throw new AppException(500, errorMessage, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param cosmos Container to query
+ * @param id ID of item
+ * @param partitionKey Partition key of item
+ * @return The item. It may not exist - the caller must check
+ */
+ private static CosmosItem findItem(
+ final CosmosContainer cosmos,
+ final String id,
+ final String partitionKey) {
+ return cosmos.getItem(id, partitionKey);
+ }
+}
diff --git a/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..d3f2f7c2b660dc7e48f7cac81fb82f28a8d617fa
--- /dev/null
+++ b/src/test/java/org/opengroup/osdu/azure/CosmosStoreTest.java
@@ -0,0 +1,234 @@
+// Copyright © Microsoft Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.opengroup.osdu.azure;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.internal.AsyncDocumentClient;
+import com.azure.cosmos.internal.Document;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opengroup.osdu.core.common.model.http.AppException;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class CosmosStoreTest {
+
+ private static final String ID = "id";
+ private static final String PARTITION_KEY = "pk";
+ private static final String COSMOS_DB = "cosmosdb";
+ private static final String CONTAINER = "container";
+ private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container";
+
+ @Mock
+ private AsyncDocumentClient documentClient;
+
+ @Mock
+ private CosmosContainer container;
+
+ @Mock
+ private CosmosItem cosmosItem;
+
+ @Mock
+ private CosmosItemProperties cosmosItemProperties;
+
+ @Mock
+ private CosmosItemResponse cosmosResponse;
+
+ @Mock
+ private Iterator> queryResponse;
+
+ @InjectMocks
+ private CosmosStore cosmosStore;
+
+ @BeforeEach
+ void init() throws CosmosClientException {
+ // mock the common cosmos request/response pattern that most tests need. because
+ // not all tests will leverage these, we make the mocks lenient.
+ lenient().doReturn(cosmosItem).when(container).getItem(ID, PARTITION_KEY);
+ lenient().doReturn(cosmosResponse).when(cosmosItem).read(any());
+ lenient().doReturn(cosmosItemProperties).when(cosmosResponse).getProperties();
+ }
+
+ @Test
+ void delete_throws404_ifNotFound() throws CosmosClientException {
+ doThrow(NotFoundException.class).when(cosmosItem).delete(any());
+ AppException exception = assertThrows(AppException.class, () -> {
+ cosmosStore.deleteItem(container, ID, PARTITION_KEY);
+ });
+ assertEquals(404, exception.getError().getCode());
+ }
+
+ @Test
+ void delete_throws500_ifUnknownError() throws CosmosClientException {
+ doThrow(CosmosClientException.class).when(cosmosItem).delete(any());
+ AppException exception = assertThrows(AppException.class, () -> {
+ cosmosStore.deleteItem(container, ID, PARTITION_KEY);
+ });
+ assertEquals(500, exception.getError().getCode());
+ }
+
+ @Test
+ void findItem_returnsEmpty_ifNotFound() throws CosmosClientException {
+ doThrow(NotFoundException.class).when(cosmosItem).read(any());
+ assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
+ }
+
+ @Test
+ void findItem_returnsEmpty_ifMalformedDocument() throws IOException {
+ doThrow(IOException.class).when(cosmosItemProperties).getObject(any());
+ assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
+ }
+
+ @Test
+ void findItem_throws500_ifUnknownError() throws CosmosClientException {
+ doThrow(CosmosClientException.class).when(cosmosItem).read(any());
+ AppException exception = assertThrows(AppException.class, () -> {
+ cosmosStore.findItem(container, ID, PARTITION_KEY, String.class);
+ });
+ assertEquals(500, exception.getError().getCode());
+ }
+
+ @Test
+ void upsertItem_throws500_ifUnknownError() throws CosmosClientException {
+ doThrow(CosmosClientException.class).when(container).upsertItem(any());
+ AppException exception = assertThrows(AppException.class, () -> {
+ cosmosStore.upsertItem(container, "some-data");
+ });
+ assertEquals(500, exception.getError().getCode());
+ }
+
+ @Test
+ void findAllItems_executesCorrectQuery() throws IOException {
+ mockQueryResponse("s1");
+ cosmosStore.findAllItems(container, String.class);
+
+ ArgumentCaptor query = ArgumentCaptor.forClass(SqlQuerySpec.class);
+ ArgumentCaptor feedOptions = ArgumentCaptor.forClass(FeedOptions.class);
+
+ verify(container).queryItems(query.capture(), feedOptions.capture());
+
+ assertEquals("SELECT * FROM c", query.getValue().getQueryText());
+ assertTrue(feedOptions.getValue().getEnableCrossPartitionQuery());
+ }
+
+ @Test
+ void findAllItems_pagesCorrectly() throws IOException {
+ mockQueryResponse("s1", "s2", "s3");
+ List results = cosmosStore.findAllItems(container, String.class);
+
+ assertEquals(3, results.size());
+ assertTrue(results.contains("s1"));
+ assertTrue(results.contains("s2"));
+ assertTrue(results.contains("s3"));
+ }
+
+ @Test
+ void findAllItems_byPageNumber() {
+ mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5");
+
+ List results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
+ String.class, (short)2, 2);
+
+ assertEquals(2, results.size());
+ assertTrue(results.contains("s3"));
+ assertTrue(results.contains("s4"));
+
+ mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5");
+ results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
+ String.class, (short)3, 2);
+
+ assertEquals(2, results.size());
+ assertTrue(results.contains("T4"));
+ assertTrue(results.contains("T5"));
+ }
+
+ @Test
+ void queryItems_byPageNumber() throws IOException {
+ mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5");
+ List results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
+ new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1);
+
+ assertEquals(3, results.size());
+ assertTrue(results.contains("W1"));
+ assertTrue(results.contains("W2"));
+ assertTrue(results.contains("W3"));
+
+ mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5");
+ results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
+ new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3);
+
+ assertEquals(1, results.size());
+ assertTrue(results.contains("Z5"));
+ }
+
+ private void mockQueryResponse(String... responses) throws IOException {
+ ArrayList> paginatedResponse = new ArrayList<>();
+ for (String response : responses) {
+ @SuppressWarnings("unchecked")
+ FeedResponse pageResponse = (FeedResponse) mock(FeedResponse.class);
+
+ CosmosItemProperties properties = mock(CosmosItemProperties.class);
+ doReturn(Collections.singletonList(properties)).when(pageResponse).getResults();
+ doReturn(response).when(properties).getObject(any());
+
+ paginatedResponse.add(pageResponse);
+ }
+
+ doReturn(paginatedResponse.iterator()).when(container).queryItems(any(SqlQuerySpec.class), any());
+ }
+
+ private void mockPaginatedQueryResponse(int pageSize, int pageNum, String... responses) {
+ List resp = new ArrayList<>();
+ FeedResponse pageResponse = (FeedResponse) mock(FeedResponse.class);
+
+ for (String response : responses) {
+ Document doc = mock(Document.class);
+ resp.add(doc);
+ lenient().doReturn(Collections.singletonList(doc)).when(pageResponse).getResults();
+ lenient().doReturn(response).when(doc).toObject(any());
+ }
+
+ when(pageResponse.getResults()).thenReturn(currentPage(resp,pageSize,pageNum));
+ doReturn(Flux.just(pageResponse))
+ .when(documentClient)
+ .queryDocuments(eq(COLLECTION_LINK), any((SqlQuerySpec.class)), any());
+ }
+
+ private static List currentPage (List dataList, int pageSize, int pageNum) {
+ List currentPageList = new ArrayList<>();
+ if (dataList != null && dataList.size() > 0) {
+ int currIdx = (pageNum > 1 ? (pageNum - 1) * pageSize : 0);
+ for (int i = 0; i < pageSize && i < dataList.size() - currIdx; i++) {
+ currentPageList.add(dataList.get(currIdx + i));
+ }
+ }
+ return currentPageList;
+ }
+}