Skip to content
Snippets Groups Projects

Update Azure /query/kinds API

Merged Akanksha Prasad requested to merge akprasad/query-all-kinds into master
Files
3
@@ -15,13 +15,19 @@
package org.opengroup.osdu.storage.provider.azure.repository;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.SqlQuerySpec;
import com.google.common.base.Strings;
import com.lambdaworks.redis.RedisException;
import org.apache.http.HttpStatus;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.azure.query.CosmosStorePageRequest;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.DatastoreQueryResult;
import org.opengroup.osdu.core.common.model.storage.RecordState;
import org.opengroup.osdu.core.common.util.Crc32c;
@@ -42,6 +48,9 @@ import java.util.List;
@Repository
public class QueryRepository implements IQueryRepository {
private final String cosmosDBName = "osdu-db";
private final String storageRecordContainer = "StorageRecord";
@Autowired
private RecordMetadataRepository record;
@@ -55,53 +64,34 @@ public class QueryRepository implements IQueryRepository {
@Qualifier("CursorCache")
private ICache<String, String> cursorCache;
@Override
public DatastoreQueryResult getAllKinds(Integer limit, String cursor) {
boolean paginated = false;
@Autowired
private DpsHeaders dpsHeaders;
int numRecords = PAGE_SIZE;
if (limit != null) {
numRecords = limit > 0 ? limit : PAGE_SIZE;
paginated = true;
}
@Autowired
private CosmosStore cosmosStore;
if (cursor != null && !cursor.isEmpty()) {
paginated = true;
}
@Override
public DatastoreQueryResult getAllKinds(Integer limit, String cursor) {
Sort sort = Sort.by(Sort.Direction.ASC, "kind");
DatastoreQueryResult dqr = new DatastoreQueryResult();
List<String> kinds = new ArrayList();
Iterable<SchemaDoc> docs;
List<String> docs = new ArrayList();
try {
if (paginated) {
final Page<SchemaDoc> docPage = schema.findAll(CosmosStorePageRequest.of(0, numRecords, cursor, sort));
Pageable pageable = docPage.getPageable();
String continuation = null;
if (pageable instanceof CosmosStorePageRequest) {
continuation = ((CosmosStorePageRequest) pageable).getRequestContinuation();
}
dqr.setCursor(continuation);
docs = docPage.getContent();
} else {
docs = schema.findAll(sort);
/* TODO: PAGINATION REIMPLEMENTATION NEEDED*/
if (limit != null) {
throw this.getLimitException();
}
docs.forEach(
d -> kinds.add(d.getKind()));
dqr.setResults(kinds);
if (cursor != null) {
throw this.getCursorException();
}
docs = getDistinctKind();
dqr.setResults(docs);
} catch (CosmosException e) {
if (e.getStatusCode() == HttpStatus.SC_BAD_REQUEST && e.getMessage().contains("INVALID JSON in continuation token"))
throw this.getInvalidCursorException();
else
throw e;
throw e;
} catch (Exception e) {
throw e;
}
return dqr;
}
@Override
@@ -139,14 +129,13 @@ public class QueryRepository implements IQueryRepository {
do {
preferredPageSize = numRecords - ids.size();
// Fetch records and set ids
Page<RecordMetadataDoc> docPage = record.findIdsByMetadata_kindAndMetadata_status(kind, status,
CosmosStorePageRequest.of(0, preferredPageSize, continuation));
Page<RecordMetadataDoc> docPage = record.findIdsByMetadata_kindAndMetadata_status(kind, status, CosmosStorePageRequest.of(0, preferredPageSize, continuation));
docs = docPage.getContent();
docs.forEach(d -> ids.add(d.getId()));
if (iteration > 1) {
// cosmosDb did not return the preferredPageSize in previous iteration, so it was queried again.
this.logger.info(String.format("Iteration count of query on cosmosDb: %d, page size returned: %d, remaining page size: %d", iteration, docPage.getContent().size(), numRecords- ids.size()));
this.logger.info(String.format("Iteration count of query on cosmosDb: %d, page size returned: %d, remaining page size: %d", iteration, docPage.getContent().size(), numRecords - ids.size()));
}
// set continuationToken by fetching it from the response
@@ -175,8 +164,7 @@ public class QueryRepository implements IQueryRepository {
} catch (CosmosException e) {
if (e.getStatusCode() == HttpStatus.SC_BAD_REQUEST && e.getMessage().contains("INVALID JSON in continuation token"))
throw this.getInvalidCursorException();
else
throw e;
else throw e;
} catch (Exception e) {
throw e;
}
@@ -184,8 +172,28 @@ public class QueryRepository implements IQueryRepository {
return dqr;
}
private List<String> getDistinctKind() {
List<String> docs;
CosmosQueryRequestOptions storageOptions = new CosmosQueryRequestOptions();
String queryText = String.format("SELECT distinct value c.metadata.kind FROM c");
SqlQuerySpec query = new SqlQuerySpec(queryText);
docs = cosmosStore.queryItems(dpsHeaders.getPartitionId(), cosmosDBName, storageRecordContainer, query, storageOptions, String.class);
return docs;
}
@Contract(" -> new")
@NotNull
private AppException getLimitException() {
throw new AppException(HttpStatus.SC_BAD_REQUEST, "Limit not supported", "The limit is invalid");
}
@Contract(" -> new")
@NotNull
private AppException getCursorException() {
throw new AppException(HttpStatus.SC_BAD_REQUEST, "Cursor not supported", "The cursor is invalid");
}
private AppException getInvalidCursorException() {
return new AppException(HttpStatus.SC_BAD_REQUEST, "Cursor invalid",
"The requested cursor does not exist or is invalid");
}}
return new AppException(HttpStatus.SC_BAD_REQUEST, "Cursor invalid", "The requested cursor does not exist or is invalid");
}
}
Loading