Commit 104e4343 authored by Erik Leckner's avatar Erik Leckner
Browse files

new cosmosstore method

parent dec78f83
Pipeline #9966 failed with stage
in 1 minute and 52 seconds
......@@ -105,18 +105,4 @@ public class AzureBootstrapConfig {
return secretValue;
}
@Bean
public AsyncDocumentClient asyncDocumentClient(final @Named("COSMOS_ENDPOINT") String endpoint, final @Named("COSMOS_KEY") String key) {
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setConnectionMode(ConnectionMode.DIRECT);
return new AsyncDocumentClient.Builder()
.withServiceEndpoint(endpoint)
.withMasterKeyOrResourceToken(key)
.withConnectionPolicy(connectionPolicy)
.build();
}
}
\ No newline at end of file
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.storage.provider.azure.repository;
import com.azure.cosmos.ConflictException;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosItem;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemRequestOptions;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.NotFoundException;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.*;
import com.azure.cosmos.internal.AsyncDocumentClient;
import com.azure.cosmos.internal.Document;
import org.opengroup.osdu.azure.ICosmosClientFactory;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.storage.provider.azure.query.CosmosStorePageRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* A simpler interface for interacting with CosmosDB.
......@@ -81,11 +78,11 @@ public class AdvancedCosmosStore {
private ICosmosClientFactory cosmosClientFactory;
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
*/
public void deleteItem(
final String dataPartitionId,
......@@ -108,13 +105,13 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @return The item that was found based on the IDs provided
*/
public <T> Optional<T> findItem(
......@@ -145,13 +142,12 @@ public class AdvancedCosmosStore {
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> findAllItems(
......@@ -164,13 +160,13 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param clazz Class type of response
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public <T> List<T> queryItems(
......@@ -196,15 +192,14 @@ public class AdvancedCosmosStore {
}
return results;
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return List of items found on specific page in container
*/
public <T> List<T> findAllItemsAsync(
......@@ -218,34 +213,14 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return Specific page found in container
*/
public <T> Page<T> findAllItemsAsyncPage(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final Class<T> clazz,
final short pageSize,
final int pageNum) {
return queryItemsAsyncPage(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, pageNum);
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return List of items found on specific page in container
*/
public <T> List<T> queryItemsAsync(
......@@ -272,124 +247,6 @@ public class AdvancedCosmosStore {
} while (currentPage < pageNum && continuationToken != null);
return results.get(continuationToken);
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @param <T> Type of response
* @return Specific Page of items found on specific page in container
*/
public <T> Page<T> queryItemsAsyncPage(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final SqlQuerySpec query,
final Class<T> clazz,
final short pageSize,
final int pageNum) {
String continuationToken = null;
int currentPage = 0;
HashMap<String, List<T>> results;
do {
String nextContinuationToken = "";
AsyncDocumentClient client = cosmosClientFactory.getAsyncClient(dataPartitionId);
results = returnItemsWithToken(client, cosmosDBName, collection, query, clazz, pageSize, continuationToken);
for (Map.Entry<String, List<T>> entry : results.entrySet()) {
nextContinuationToken = entry.getKey();
}
continuationToken = nextContinuationToken;
currentPage++;
} while (currentPage < pageNum && continuationToken != null);
CosmosStorePageRequest pageRequest = CosmosStorePageRequest.of(pageNum, pageSize, continuationToken);
return new PageImpl(results.get(continuationToken), pageRequest, 0);
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @return Specific Page of items found in container
*/
public <T> Page<T> queryItemsAsyncPage(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final SqlQuerySpec query,
final Class<T> clazz,
final int pageSize,
final String continuationToken) {
HashMap<String, List<T>> results;
String internalContinuationToken = continuationToken;
String nextContinuationToken = null;
AsyncDocumentClient client = cosmosClientFactory.getAsyncClient(dataPartitionId);
results = returnItemsWithContinuationToken(client, cosmosDBName, collection, query, clazz, pageSize, internalContinuationToken);
for (Map.Entry<String, List<T>> entry : results.entrySet()) {
nextContinuationToken = entry.getKey();
}
internalContinuationToken = nextContinuationToken;
CosmosStorePageRequest pageRequest = CosmosStorePageRequest.of(0, pageSize, internalContinuationToken);
return new PageImpl(results.get(internalContinuationToken), pageRequest, 0);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param continuationToken Token used to continue the enumeration
* @return Continuation Token and list of documents in container
*/
private static <T> HashMap<String, List<T>> returnItemsWithContinuationToken(
final AsyncDocumentClient client,
final String dbName,
final String container,
final SqlQuerySpec query,
final Class<T> clazz,
final int pageSize,
final String continuationToken) {
HashMap<String, List<T>> map = new HashMap<>();
List<T> items = new ArrayList<T>();
FeedOptions feedOptions = new FeedOptions()
.maxItemCount((int) pageSize)
.setEnableCrossPartitionQuery(true)
.requestContinuation(continuationToken);
String collectionLink = String.format("/dbs/%s/colls/%s", dbName, container);
Flux<FeedResponse<Document>> queryFlux = client.queryDocuments(collectionLink, query, feedOptions);
Iterator<FeedResponse<Document>> it = queryFlux.toIterable().iterator();
FeedResponse<Document> page = it.next();
List<Document> results = page.getResults();
for (Document doc : results) {
T obj = doc.toObject(clazz);
items.add(obj);
}
map.put(page.getContinuationToken(), items);
return map;
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
......@@ -435,11 +292,11 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param item Data object to store
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param item Data object to store
* @param <T> Type of response
*/
public <T> void upsertItem(
final String dataPartitionId,
......@@ -457,11 +314,11 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param item Data object to store
* @param <T> Type of response
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param item Data object to store
* @param <T> Type of response
*/
public <T> void createItem(
final String dataPartitionId,
......@@ -496,9 +353,9 @@ public class AdvancedCosmosStore {
}
/**
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @param dataPartitionId Data partition id to fetch appropriate cosmos client for each partition
* @param cosmosDBName Database to be used
* @param collection Collection to be used
* @return Cosmos container
*/
private CosmosContainer getCosmosContainer(
......@@ -516,4 +373,47 @@ public class AdvancedCosmosStore {
}
}
public <T> Page<T> findAllItemsPage(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final Class<T> clazz,
final int pageSize,
final String continuationToken) {
return queryItemsPage(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), clazz, pageSize, continuationToken);
}
public <T> Page<T> queryItemsPage(
final String dataPartitionId,
final String cosmosDBName,
final String collection,
final SqlQuerySpec query,
final Class<T> clazz,
final int pageSize,
final String continuationToken) {
String internalContinuationToken = continuationToken;
FeedOptions options = new FeedOptions()
.maxItemCount((int) pageSize)
.setEnableCrossPartitionQuery(true)
.requestContinuation(continuationToken);
List<T> results = new ArrayList<>();
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
Iterator<FeedResponse<CosmosItemProperties>> paginatedResponse = cosmosContainer.queryItems(query, options);
if (paginatedResponse.hasNext()) {
FeedResponse<CosmosItemProperties> response = paginatedResponse.next();
internalContinuationToken = response.getContinuationToken();
for (CosmosItemProperties properties : response.getResults()) {
try {
results.add(properties.getObject(clazz));
} catch (IOException e) {
String errorMessage = String.format("Malformed document for item with ID=%s", properties.getId());
LOGGER.warn(errorMessage, e);
throw new AppException(500, errorMessage, e.getMessage(), e);
}
}
}
CosmosStorePageRequest pageRequest = CosmosStorePageRequest.of(0, pageSize, internalContinuationToken);
return new PageImpl(results, pageRequest, 0);
}
}
......@@ -16,7 +16,6 @@ package org.opengroup.osdu.storage.provider.azure.repository;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.SqlQuerySpec;
import org.opengroup.osdu.storage.provider.azure.generator.FindQuerySpecGenerator;
import org.opengroup.osdu.storage.provider.azure.query.CosmosStorePageRequest;
import org.opengroup.osdu.storage.provider.azure.query.CosmosStoreQuery;
......@@ -29,7 +28,9 @@ import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class SimpleCosmosStoreRepository<T> implements CosmosStoreRepository<T> {
......@@ -76,27 +77,12 @@ public class SimpleCosmosStoreRepository<T> implements CosmosStoreRepository<T>
return this.operation.queryItems(dataPartitionId, cosmosDBName, collection, query, options, this.getDomainClass());
}
public List<T> findAllItemsAsync(String dataPartitionId, String cosmosDBName, String collection, short pageSize, int pageNum) {
return this.operation.findAllItemsAsync(dataPartitionId, cosmosDBName, collection, this.getDomainClass(), pageSize, pageNum);
}
@Deprecated
public Page<T> findAllItemsAsyncPage(String dataPartitionId, String cosmosDBName, String collection, short pageSize, int pageNum) {
return this.operation.findAllItemsAsyncPage(dataPartitionId, cosmosDBName, collection, this.getDomainClass(), pageSize, pageNum);
}
@Deprecated
public List<T> queryItemsAsync(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query, short pageSize, int pageNum) {
return this.operation.queryItemsAsync(dataPartitionId, cosmosDBName, collection, query, this.getDomainClass(), pageSize, pageNum);
}
@Deprecated
public Page<T> queryItemsAsyncPage(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query, short pageSize, int pageNum) {
return this.operation.queryItemsAsyncPage(dataPartitionId, cosmosDBName, collection, query, this.getDomainClass(), pageSize, pageNum);
public Page<T> findAllItemsPage(String dataPartitionId, String cosmosDBName, String collection, int pageSize, String continuationToken) {
return this.operation.findAllItemsPage(dataPartitionId, cosmosDBName, collection, this.getDomainClass(), pageSize, continuationToken);
}
public Page<T> queryItemsAsyncPage(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query, int pageSize, String coninuationToken) {
return this.operation.queryItemsAsyncPage(dataPartitionId, cosmosDBName, collection, query, this.getDomainClass(), pageSize, coninuationToken);
public Page<T> queryItemsPage(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query, int pageSize, String coninuationToken) {
return this.operation.queryItemsPage(dataPartitionId, cosmosDBName, collection, query, this.getDomainClass(), pageSize, coninuationToken);
}
public void upsertItem(String dataPartitionId, String cosmosDBName, String collection, @NonNull T item) {
......@@ -286,15 +272,11 @@ public class SimpleCosmosStoreRepository<T> implements CosmosStoreRepository<T>
public Page<T> paginationQuery(Pageable pageable, SqlQuerySpec query, Class<T> domainClass, String dataPartitionId, String cosmosDBName, String collectionName) {
Assert.isTrue(pageable.getPageSize() > 0, "pageable should have page size larger than 0");
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
FeedOptions feedOptions = new FeedOptions();
String continuationToken = null;
if (pageable instanceof CosmosStorePageRequest) {
continuationToken = ((CosmosStorePageRequest)pageable).getRequestContinuation();
}
// feedOptions.maxItemCount(pageable.getPageSize());
// feedOptions.setEnableCrossPartitionQuery(true);
// int pageNum = pageable.getPageNumber();
int pageSize = pageable.getPageSize();
return this.queryItemsAsyncPage(dataPartitionId, cosmosDBName, collectionName, query, pageSize, continuationToken);
return this.queryItemsPage(dataPartitionId, cosmosDBName, collectionName, query, pageSize, continuationToken);
}
}
......@@ -4,6 +4,7 @@ import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.SqlQuerySpec;
import org.springframework.data.domain.Page;
import org.springframework.lang.NonNull;
import java.util.List;
import java.util.Optional;
......@@ -35,23 +36,11 @@ public interface CosmosStoreRepository<T> extends PagingAndSortingRepository<T>
List<T> queryItems(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec
query, FeedOptions options);
@Deprecated
List<T> findAllItemsAsync(String dataPartitionId, String cosmosDBName, String collection, short pageSize,
int pageNum);
@Deprecated
List<T> queryItemsAsync(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query,
short pageSize, int pageNum);
void upsertItem(String dataPartitionId, String cosmosDBName, String collection, T item);
void createItem(String dataPartitionId, String cosmosDBName, String collection, T item);
@Deprecated
Page<T> findAllItemsAsyncPage(String dataPartitionId, String cosmosDBName, String collection, short pageSize,
int pageNum);
@Deprecated
Page<T> queryItemsAsyncPage(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query,
short pageSize, int pageNum);
public Page<T> findAllItemsPage(String dataPartitionId, String cosmosDBName, String collection, int pageSize, String continuationToken);
public Page<T> queryItemsPage(String dataPartitionId, String cosmosDBName, String collection, SqlQuerySpec query, int pageSize, String coninuationToken);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment