Commit d9758c06 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

Adding CosmosStore class and UT's

parent d2e06e76
Pipeline #3073 passed with stages
in 5 minutes and 55 seconds
......@@ -21,7 +21,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.0.9</version>
<version>0.0.10</version>
<name>core-lib-azure</name>
<properties>
......
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosItem;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemRequestOptions;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.NotFoundException;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.internal.AsyncDocumentClient;
import com.azure.cosmos.internal.Document;
import org.opengroup.osdu.core.common.model.http.AppException;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A simpler interface for interacting with CosmosDB.
* Usage Examples:
* <pre>
* {@code
* @Inject
* private CosmosContainer container;
*
* @Inject
* private CosmosStore cosmosStore;
*
* void findItemExample() {
* Optional<MyObject> myItem = cosmosStore.findItem(container, "id", "partition-key", MyObject.class);
* myItem.isPresent(); // true if found, false otherwise
* }
*
* void findAllItemsExample() {
* List<MyObject> objects = cosmosStore.findAllItems(container, MyObject.class);
* }
*
* void queryItemsExample() {
* SqlQuerySpec query = new SqlQuerySpec()
* .setQueryText("SELECT * FROM c WHERE c.isFoo = @isFoo")
* .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true)));
* FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
*
* List<MyObject> objects = cosmosStore.queryItems(container, query, options, MyObject.class);
* }
* }
* </pre>
*/
public final class CosmosStore {
private static final Logger LOGGER = Logger.getLogger(CosmosStore.class.getName());
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
*/
public void deleteItem(
final CosmosContainer cosmos,
final String id,
final String partitionKey) {
try {
findItem(cosmos, id, partitionKey).delete(new CosmosItemRequestOptions(partitionKey));
} catch (NotFoundException e) {
String errorMessage = "Item was unexpectedly not found";
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(404, errorMessage, e.getMessage(), e);
} catch (CosmosClientException e) {
String errorMessage = "Unexpectedly failed to delete item from CosmosDB";
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @return The item that was found based on the IDs provided
*/
public <T> Optional<T> findItem(
final CosmosContainer cosmos,
final String id,
final String partitionKey,
final Class<T> clazz) {
try {
T item = findItem(cosmos, id, partitionKey)
.read(new CosmosItemRequestOptions(partitionKey))
.getProperties()
.getObject(clazz);
return Optional.ofNullable(item);
} catch (NotFoundException e) {
LOGGER.info(String.format("Unable to find item with ID=%s and PK=%s", id, partitionKey));
return Optional.empty();
} catch (IOException e) {
LOGGER.warning(String.format("Malformed document for item with ID=%s and PK=%s", id, partitionKey));
return Optional.empty();
} catch (CosmosClientException e) {
String errorMessage = "Unexpectedly encountered error calling CosmosDB";
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> findAllItems(final CosmosContainer container, final Class<T> clazz) {
FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
return queryItems(container, new SqlQuerySpec("SELECT * FROM c"), options, clazz);
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> queryItems(final CosmosContainer container, final SqlQuerySpec query, final FeedOptions options, final Class<T> clazz) {
ArrayList<T> results = new ArrayList<>();
Iterator<FeedResponse<CosmosItemProperties>> paginatedResponse = container.queryItems(query, options);
while (paginatedResponse.hasNext()) {
for (CosmosItemProperties properties : paginatedResponse.next().getResults()) {
try {
results.add(properties.getObject(clazz));
} catch (IOException e) {
String errorMessage = String.format("Malformed document for item with ID=%s", properties.getId());
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
}
return results;
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @return List of items found on specific page in container
*/
public <T> List<T> findAllItems(
final AsyncDocumentClient client,
final String dbName,
final String container,
final Class<T> clazz,
final short pageSize,
final int pageNum) {
return queryItems(client, dbName, container, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @return List of items found on specific page in container
*/
public <T> List<T> queryItems(
final AsyncDocumentClient client,
final String dbName,
final String container,
final SqlQuerySpec query,
final Class<T> clazz,
final short pageSize,
final int pageNum) {
String continuationToken = null;
int currentPage = 0;
HashMap<String, List<T>> results;
do {
String nextContinuationToken = "";
results = returnItemsWithToken(client, dbName, container, query, clazz, pageSize, continuationToken);
for (Map.Entry<String, List<T>> entry : results.entrySet()) {
nextContinuationToken = entry.getKey();
}
continuationToken = nextContinuationToken;
currentPage++;
} while (currentPage < pageNum && continuationToken != null);
return results.get(continuationToken);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param continuationToken Token used to continue the enumeration
* @return Continuation Token and list of documents in container
*/
private static <T> HashMap<String, List<T>> returnItemsWithToken(
final AsyncDocumentClient client,
final String dbName,
final String container,
final SqlQuerySpec query,
final Class<T> clazz,
final short pageSize,
final String continuationToken) {
HashMap<String, List<T>> map = new HashMap<>();
List<T> items = new ArrayList<T>();
FeedOptions feedOptions = new FeedOptions()
.maxItemCount((int) pageSize)
.setEnableCrossPartitionQuery(true)
.requestContinuation(continuationToken);
String collectionLink = String.format("/dbs/%s/colls/%s", dbName, container);
Flux<FeedResponse<Document>> queryFlux = client.queryDocuments(collectionLink, query, feedOptions);
Iterator<FeedResponse<Document>> it = queryFlux.toIterable().iterator();
FeedResponse<Document> page = it.next();
List<Document> results = page.getResults();
for (Document doc : results) {
T obj = doc.toObject(clazz);
items.add(obj);
}
map.put(page.getContinuationToken(), items);
return map;
}
/**
* @param container Container to query
* @param item Data object to store
* @param <T> Type of response
*/
public <T> void upsertItem(final CosmosContainer container, final T item) {
try {
container.upsertItem(item);
} catch (CosmosClientException e) {
String errorMessage = "Unexpectedly failed to put item into CosmosDB";
LOGGER.log(Level.WARNING, errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @return The item. It may not exist - the caller must check
*/
private static CosmosItem findItem(
final CosmosContainer cosmos,
final String id,
final String partitionKey) {
return cosmos.getItem(id, partitionKey);
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure;
import com.azure.cosmos.*;
import com.azure.cosmos.internal.AsyncDocumentClient;
import com.azure.cosmos.internal.Document;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.core.common.model.http.AppException;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class CosmosStoreTest {
private static final String ID = "id";
private static final String PARTITION_KEY = "pk";
private static final String COSMOS_DB = "cosmosdb";
private static final String CONTAINER = "container";
private static final String COLLECTION_LINK = "/dbs/cosmosdb/colls/container";
@Mock
private AsyncDocumentClient documentClient;
@Mock
private CosmosContainer container;
@Mock
private CosmosItem cosmosItem;
@Mock
private CosmosItemProperties cosmosItemProperties;
@Mock
private CosmosItemResponse cosmosResponse;
@Mock
private Iterator<FeedResponse<CosmosItemProperties>> queryResponse;
@InjectMocks
private CosmosStore cosmosStore;
@BeforeEach
void init() throws CosmosClientException {
// mock the common cosmos request/response pattern that most tests need. because
// not all tests will leverage these, we make the mocks lenient.
lenient().doReturn(cosmosItem).when(container).getItem(ID, PARTITION_KEY);
lenient().doReturn(cosmosResponse).when(cosmosItem).read(any());
lenient().doReturn(cosmosItemProperties).when(cosmosResponse).getProperties();
}
@Test
void delete_throws404_ifNotFound() throws CosmosClientException {
doThrow(NotFoundException.class).when(cosmosItem).delete(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.deleteItem(container, ID, PARTITION_KEY);
});
assertEquals(404, exception.getError().getCode());
}
@Test
void delete_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(cosmosItem).delete(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.deleteItem(container, ID, PARTITION_KEY);
});
assertEquals(500, exception.getError().getCode());
}
@Test
void findItem_returnsEmpty_ifNotFound() throws CosmosClientException {
doThrow(NotFoundException.class).when(cosmosItem).read(any());
assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
}
@Test
void findItem_returnsEmpty_ifMalformedDocument() throws IOException {
doThrow(IOException.class).when(cosmosItemProperties).getObject(any());
assertFalse(cosmosStore.findItem(container, ID, PARTITION_KEY, String.class).isPresent());
}
@Test
void findItem_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(cosmosItem).read(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.findItem(container, ID, PARTITION_KEY, String.class);
});
assertEquals(500, exception.getError().getCode());
}
@Test
void upsertItem_throws500_ifUnknownError() throws CosmosClientException {
doThrow(CosmosClientException.class).when(container).upsertItem(any());
AppException exception = assertThrows(AppException.class, () -> {
cosmosStore.upsertItem(container, "some-data");
});
assertEquals(500, exception.getError().getCode());
}
@Test
void findAllItems_executesCorrectQuery() throws IOException {
mockQueryResponse("s1");
cosmosStore.findAllItems(container, String.class);
ArgumentCaptor<SqlQuerySpec> query = ArgumentCaptor.forClass(SqlQuerySpec.class);
ArgumentCaptor<FeedOptions> feedOptions = ArgumentCaptor.forClass(FeedOptions.class);
verify(container).queryItems(query.capture(), feedOptions.capture());
assertEquals("SELECT * FROM c", query.getValue().getQueryText());
assertTrue(feedOptions.getValue().getEnableCrossPartitionQuery());
}
@Test
void findAllItems_pagesCorrectly() throws IOException {
mockQueryResponse("s1", "s2", "s3");
List<String> results = cosmosStore.findAllItems(container, String.class);
assertEquals(3, results.size());
assertTrue(results.contains("s1"));
assertTrue(results.contains("s2"));
assertTrue(results.contains("s3"));
}
@Test
void findAllItems_byPageNumber() {
mockPaginatedQueryResponse(2, 2, "s1", "s2", "s3", "s4", "s5");
List<String> results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
String.class, (short)2, 2);
assertEquals(2, results.size());
assertTrue(results.contains("s3"));
assertTrue(results.contains("s4"));
mockPaginatedQueryResponse(3, 2, "T1", "T2", "T3", "T4", "T5");
results = cosmosStore.findAllItems(documentClient, COSMOS_DB, CONTAINER,
String.class, (short)3, 2);
assertEquals(2, results.size());
assertTrue(results.contains("T4"));
assertTrue(results.contains("T5"));
}
@Test
void queryItems_byPageNumber() throws IOException {
mockPaginatedQueryResponse(3, 1, "W1", "W2", "W3", "W4", "W5");
List<String> results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
new SqlQuerySpec("SELECT * FROM c"), String.class, (short)3, 1);
assertEquals(3, results.size());
assertTrue(results.contains("W1"));
assertTrue(results.contains("W2"));
assertTrue(results.contains("W3"));
mockPaginatedQueryResponse(2, 3, "Z1", "Z2", "Z3", "Z4", "Z5");
results = cosmosStore.queryItems(documentClient, COSMOS_DB, CONTAINER,
new SqlQuerySpec("SELECT * FROM c"), String.class, (short)2, 3);
assertEquals(1, results.size());
assertTrue(results.contains("Z5"));
}
private void mockQueryResponse(String... responses) throws IOException {
ArrayList<FeedResponse<CosmosItemProperties>> paginatedResponse = new ArrayList<>();
for (String response : responses) {
@SuppressWarnings("unchecked")
FeedResponse<CosmosItemProperties> pageResponse = (FeedResponse<CosmosItemProperties>) mock(FeedResponse.class);
CosmosItemProperties properties = mock(CosmosItemProperties.class);
doReturn(Collections.singletonList(properties)).when(pageResponse).getResults();
doReturn(response).when(properties).getObject(any());
paginatedResponse.add(pageResponse);
}
doReturn(paginatedResponse.iterator()).when(container).queryItems(any(SqlQuerySpec.class), any());
}
private void mockPaginatedQueryResponse(int pageSize, int pageNum, String... responses) {
List<Document> resp = new ArrayList<>();
FeedResponse<Document> pageResponse = (FeedResponse<Document>) mock(FeedResponse.class);
for (String response : responses) {
Document doc = mock(Document.class);
resp.add(doc);
lenient().doReturn(Collections.singletonList(doc)).when(pageResponse).getResults();
lenient().doReturn(response).when(doc).toObject(any());
}
when(pageResponse.getResults()).thenReturn(currentPage(resp,pageSize,pageNum));
doReturn(Flux.just(pageResponse))
.when(documentClient)
.queryDocuments(eq(COLLECTION_LINK), any((SqlQuerySpec.class)), any());
}
private static List<Document> currentPage (List<Document> dataList, int pageSize, int pageNum) {
List<Document> currentPageList = new ArrayList<>();
if (dataList != null && dataList.size() > 0) {
int currIdx = (pageNum > 1 ? (pageNum - 1) * pageSize : 0);
for (int i = 0; i < pageSize && i < dataList.size() - currIdx; i++) {
currentPageList.add(dataList.get(currIdx + i));
}
}
return currentPageList;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment