Commit 0cc7e8a4 authored by unknown's avatar unknown
Browse files

Made changes for adding cache staleness for the new version of core-lib-azure

parent edf97f4f
Pipeline #48465 passed with stages
in 9 minutes and 47 seconds
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.11.0-SNAPSHOT</version>
<version>0.11.0-SNAPSHOT-AB-26.6.21</version>
<name>core-lib-azure</name>
<properties>
......
package org.opengroup.osdu.azure.cosmosdb;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
......@@ -93,8 +94,10 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
ThrottlingRetryOptions throttlingRetryOptions = cosmosRetryConfiguration.getThrottlingRetryOptions();
CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint(pi.getCosmosEndpoint())
.endpoint(pi.getCosmosGatewayEndpoint())
.key(pi.getCosmosPrimaryKey())
.gatewayMode()
.consistencyLevel(ConsistencyLevel.EVENTUAL)
.throttlingRetryOptions(throttlingRetryOptions)
.buildClient();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME)
......
......@@ -20,9 +20,10 @@ import com.azure.cosmos.implementation.ConflictException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.DedicatedGatewayRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.CosmosPagedIterable;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
......@@ -87,6 +88,32 @@ public class CosmosStore {
@Autowired
private ICosmosClientFactory cosmosClientFactory;
/**
* @param duration set MaxIntegratedCacheStaleness
* @return CosmosItemRequestOptions
*/
public CosmosItemRequestOptions itemRequestOptions(final Duration duration) {
DedicatedGatewayRequestOptions dedicatedGatewayRequestOptions = new DedicatedGatewayRequestOptions()
.setMaxIntegratedCacheStaleness(duration);
CosmosItemRequestOptions options = new CosmosItemRequestOptions()
.setDedicatedGatewayRequestOptions(dedicatedGatewayRequestOptions);
return options;
}
/**
* @param duration set MaxIntegratedCacheStaleness
* @return CosmosQueryRequestOptions
*/
public CosmosQueryRequestOptions queryRequestOptions(final Duration duration) {
DedicatedGatewayRequestOptions dedicatedGatewayRequestOptions = new DedicatedGatewayRequestOptions()
.setMaxIntegratedCacheStaleness(duration);
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions()
.setDedicatedGatewayRequestOptions(dedicatedGatewayRequestOptions);
return options;
}
/**
* @param dataPartitionId Data partition id
* @param cosmosDBName Database name
......@@ -216,7 +243,7 @@ public class CosmosStore {
try {
CosmosContainer cosmosContainer = getCosmosContainer(dataPartitionId, cosmosDBName, collection);
PartitionKey key = new PartitionKey(partitionKey);
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosItemRequestOptions options = itemRequestOptions(Duration.ofMinutes(60));
cosmosContainer.replaceItem(item, id, key, options);
} catch (NotFoundException e) {
statusCode = e.getStatusCode();
......@@ -285,7 +312,7 @@ public class CosmosStore {
final String cosmosDBName,
final String collection,
final Class<T> clazz) {
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosQueryRequestOptions options = queryRequestOptions(Duration.ofMinutes(60));
return queryItems(dataPartitionId, cosmosDBName, collection, new SqlQuerySpec("SELECT * FROM c"), options, clazz);
}
......@@ -437,7 +464,7 @@ public class CosmosStore {
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosItemRequestOptions options = itemRequestOptions(Duration.ofMinutes(60));
PartitionKey key = new PartitionKey(partitionKey);
T item = container.readItem(id, key, options, clazz).getItem();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug(String.format("READ_ITEM with id=%s and partition_key=%s", id, partitionKey));
......@@ -477,7 +504,7 @@ public class CosmosStore {
int statusCode = HttpStatus.SC_OK;
try {
PartitionKey key = new PartitionKey(partitionKey);
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosItemRequestOptions options = itemRequestOptions(Duration.ofMinutes(60));
container.deleteItem(id, key, options);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug(String.format("DELETE_ITEM with id=%s and partition_key=%s", id, partitionKey));
} catch (NotFoundException e) {
......@@ -516,7 +543,7 @@ public class CosmosStore {
int statusCode = HttpStatus.SC_OK;
try {
PartitionKey key = new PartitionKey(partitionKey);
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosItemRequestOptions options = itemRequestOptions(Duration.ofMinutes(60));
container.upsertItem(item, key, options);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug(String.format("UPSERT_ITEM with partition_key=%s", partitionKey));
} catch (CosmosException e) {
......@@ -550,7 +577,7 @@ public class CosmosStore {
int statusCode = HttpStatus.SC_OK;
try {
PartitionKey key = new PartitionKey(partitionKey);
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosItemRequestOptions options = itemRequestOptions(Duration.ofMinutes(60));
container.createItem(item, key, options);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug(String.format("CREATE_ITEM with partition_key=%s", partitionKey));
} catch (ConflictException e) {
......
......@@ -95,7 +95,11 @@ public class PartitionInfoAzure {
@SerializedName("airflow-password")
private Property airflowPasswordConfig;
@SerializedName("cosmos-gateway-connection")
private Property cosmosGatewayConnectionConfig;
@SerializedName("cosmos-gateway-endpoint")
private Property cosmosGatewayEndpointConfig;
private Property azureSubscriptionIdConfig = Property.builder().value("subscription-id").sensitive(true).build();
private Property servicePrincipalAppIdConfig = Property.builder().value("app-dev-sp-username").sensitive(true).build();
......@@ -227,6 +231,16 @@ public class PartitionInfoAzure {
return String.valueOf(this.getCosmosConnectionConfig().getValue());
}
/**
* @return partition cosmosdb Gateway connection
*/
public String getCosmosGatewayConnection() {
if (this.getCosmosGatewayConnectionConfig().isSensitive()) {
return getSecret(this.getCosmosGatewayConnectionConfig());
}
return String.valueOf(this.getCosmosGatewayConnectionConfig().getValue());
}
/**
* @return partition cosmosdb endpoint
*/
......@@ -237,6 +251,16 @@ public class PartitionInfoAzure {
return String.valueOf(this.getCosmosEndpointConfig().getValue());
}
/**
* @return partition cosmosdb Gateway endpoint
*/
public String getCosmosGatewayEndpoint() {
if (this.getCosmosGatewayEndpointConfig().isSensitive()) {
return getSecret(this.getCosmosGatewayEndpointConfig());
}
return String.valueOf(this.getCosmosGatewayEndpointConfig().getValue());
}
/**
* @return partition cosmosdb primary-key
*/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment