Commit 90177af4 authored by Nick Karsky's avatar Nick Karsky
Browse files

fixed caching issues

parent 08013c72
Pipeline #35764 failed with stage
in 26 seconds
...@@ -24,6 +24,7 @@ import com.azure.storage.blob.specialized.BlockBlobAsyncClient; ...@@ -24,6 +24,7 @@ import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import org.opengroup.osdu.core.common.logging.ILogger; import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.helpers.MessageFormatter; import org.slf4j.helpers.MessageFormatter;
import org.opengroup.osdu.common.Validators;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
...@@ -76,6 +77,10 @@ public class AsyncBlobStore { ...@@ -76,6 +77,10 @@ public class AsyncBlobStore {
final String filePath, final String filePath,
final String content, final String content,
final String containerName) { final String containerName) {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
Validators.checkNotNullAndNotEmpty(filePath, "filePath");
Validators.checkNotNullAndNotEmpty(content, "content");
Validators.checkNotNullAndNotEmpty(containerName, "containerName");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
byte[] bytes = content.getBytes(StandardCharsets.UTF_8); byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int bytesSize = bytes.length; int bytesSize = bytes.length;
...@@ -88,10 +93,10 @@ public class AsyncBlobStore { ...@@ -88,10 +93,10 @@ public class AsyncBlobStore {
try { try {
return blockBlobAsyncClient.upload(data, bytesSize) return blockBlobAsyncClient.upload(data, bytesSize)
.doOnError(error -> { .doOnError(error -> {
throw handleBlobStoreException(500, String.format("Failed to upload data. File name: %s", filePath), error); throw handleBlobStoreException(500, MessageFormatter.format("Failed to upload data. File name: {}", filePath).getMessage(), new Exception(error));
}) })
.doOnSuccess(success -> .doOnSuccess(success ->
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("{}", MessageFormatter.format("Done uploading file content to %s", filePath).getMessage()) CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("{}", MessageFormatter.format("Done uploading file content to {}", filePath).getMessage())
) )
.doFinally(finish -> { .doFinally(finish -> {
final long timeTaken = System.currentTimeMillis() - start; final long timeTaken = System.currentTimeMillis() - start;
...@@ -133,20 +138,6 @@ public class AsyncBlobStore { ...@@ -133,20 +138,6 @@ public class AsyncBlobStore {
return new AppException(status, errorMessage, ex.getMessage(), ex); return new AppException(status, errorMessage, ex.getMessage(), ex);
} }
/**
* Logs and returns instance of AppException.
*
* @param status Response status code
* @param errorMessage Error message
* @param error Error thrown by async upload
* @return Instance of AppException
*/
private AppException handleBlobStoreException(final int status, final String errorMessage, final Throwable error) {
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn(MessageFormatter.format("{}", errorMessage).getMessage(), error);
return new AppException(status, errorMessage, error.getMessage());
}
/** /**
* Log dependency. * Log dependency.
* *
......
...@@ -17,6 +17,7 @@ package org.opengroup.osdu.azure.blobstorage; ...@@ -17,6 +17,7 @@ package org.opengroup.osdu.azure.blobstorage;
import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredential;
import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.BlobServiceClientBuilder;
import org.opengroup.osdu.azure.cache.BlobServiceAsyncClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure; import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient; import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators; import org.opengroup.osdu.common.Validators;
...@@ -27,16 +28,20 @@ import org.opengroup.osdu.common.Validators; ...@@ -27,16 +28,20 @@ import org.opengroup.osdu.common.Validators;
public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClientFactory { public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClientFactory {
private DefaultAzureCredential defaultAzureCredential; private DefaultAzureCredential defaultAzureCredential;
private PartitionServiceClient partitionService; private PartitionServiceClient partitionService;
private BlobServiceAsyncClientCache clientCache;
/** /**
* Constructor to initialize instance of {@link BlobServiceAsyncClientFactoryImpl}. * Constructor to initialize instance of {@link BlobServiceAsyncClientFactoryImpl}.
* @param credentials Azure Credentials to use * @param credentials Azure Credentials to use
* @param partitionServiceClient Partition service client to use * @param partitionServiceClient Partition service client to use
* @param blobServiceAsyncClientCache async cache to use
*/ */
public BlobServiceAsyncClientFactoryImpl(final DefaultAzureCredential credentials, public BlobServiceAsyncClientFactoryImpl(final DefaultAzureCredential credentials,
final PartitionServiceClient partitionServiceClient) { final PartitionServiceClient partitionServiceClient,
final BlobServiceAsyncClientCache blobServiceAsyncClientCache) {
this.defaultAzureCredential = credentials; this.defaultAzureCredential = credentials;
this.partitionService = partitionServiceClient; this.partitionService = partitionServiceClient;
this.clientCache = blobServiceAsyncClientCache;
} }
/** /**
...@@ -48,6 +53,11 @@ public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClien ...@@ -48,6 +53,11 @@ public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClien
Validators.checkNotNull(defaultAzureCredential, "Credential"); Validators.checkNotNull(defaultAzureCredential, "Credential");
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId"); Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
String cacheKey = String.format("%s-blobServiceAsyncClient", dataPartitionId);
if (this.clientCache.containsKey(cacheKey)) {
return this.clientCache.get(cacheKey);
}
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId); PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
String endpoint = String.format("https://%s.blob.core.windows.net", pi.getStorageAccountName()); String endpoint = String.format("https://%s.blob.core.windows.net", pi.getStorageAccountName());
...@@ -56,6 +66,8 @@ public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClien ...@@ -56,6 +66,8 @@ public class BlobServiceAsyncClientFactoryImpl implements IBlobServiceAsyncClien
.credential(defaultAzureCredential) .credential(defaultAzureCredential)
.buildAsyncClient(); .buildAsyncClient();
this.clientCache.put(cacheKey, blobServiceAsyncClient);
return blobServiceAsyncClient; return blobServiceAsyncClient;
} }
} }
...@@ -2,6 +2,7 @@ package org.opengroup.osdu.azure.blobstorage; ...@@ -2,6 +2,7 @@ package org.opengroup.osdu.azure.blobstorage;
import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredential;
import org.opengroup.osdu.azure.cache.BlobServiceClientCache; import org.opengroup.osdu.azure.cache.BlobServiceClientCache;
import org.opengroup.osdu.azure.cache.BlobServiceAsyncClientCache;
import org.opengroup.osdu.azure.partition.PartitionServiceClient; import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.core.common.logging.ILogger; import org.opengroup.osdu.core.common.logging.ILogger;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
...@@ -32,13 +33,15 @@ public class BlobStoreProvider { ...@@ -32,13 +33,15 @@ public class BlobStoreProvider {
* Creates instance of {@link IBlobServiceAsyncClientFactory}. * Creates instance of {@link IBlobServiceAsyncClientFactory}.
* @param defaultAzureCredential Azure credentials to use. * @param defaultAzureCredential Azure credentials to use.
* @param partitionServiceClient Partition service client to use. * @param partitionServiceClient Partition service client to use.
* @param blobServiceAsyncClientCache cache for async client to use.
* @return instance of {@link BlobServiceClientFactoryImpl} * @return instance of {@link BlobServiceClientFactoryImpl}
*/ */
@Bean @Bean
public IBlobServiceAsyncClientFactory buildBlobAsyncClientFactory( public IBlobServiceAsyncClientFactory buildBlobAsyncClientFactory(
final DefaultAzureCredential defaultAzureCredential, final DefaultAzureCredential defaultAzureCredential,
final PartitionServiceClient partitionServiceClient) { final PartitionServiceClient partitionServiceClient,
return new BlobServiceAsyncClientFactoryImpl(defaultAzureCredential, partitionServiceClient); final BlobServiceAsyncClientCache blobServiceAsyncClientCache) {
return new BlobServiceAsyncClientFactoryImpl(defaultAzureCredential, partitionServiceClient, blobServiceAsyncClientCache);
} }
/** /**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment