Commit 65c36b42 authored by Muskan Srivastava's avatar Muskan Srivastava
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/lib/cloud/azure/os-core-lib-azure into muskans-corelibazure-retry
parents e5d20632 df6bf488
Pipeline #48193 passed with stages
in 9 minutes and 49 seconds
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.10.0-SNAPSHOT</version>
<version>0.11.0-SNAPSHOT</version>
<name>core-lib-azure</name>
<properties>
......
......@@ -15,9 +15,12 @@
package org.opengroup.osdu.azure.blobstorage;
import com.azure.identity.DefaultAzureCredential;
import com.azure.security.keyvault.secrets.SecretClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.policy.RequestRetryOptions;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.blobstorage.system.config.SystemBlobStoreConfig;
import org.opengroup.osdu.azure.cache.BlobServiceClientCache;
import org.opengroup.osdu.azure.di.BlobStoreRetryConfiguration;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
......@@ -32,10 +35,17 @@ public class BlobServiceClientFactoryImpl implements IBlobServiceClientFactory {
private DefaultAzureCredential defaultAzureCredential;
private PartitionServiceClient partitionService;
private BlobServiceClientCache clientCache;
private static final String SYSTEM_STORAGE_CACHE_KEY = "system_storage";
@Autowired
private BlobStoreRetryConfiguration blobStoreRetryConfiguration;
@Autowired
private SecretClient secretClient;
@Autowired
private SystemBlobStoreConfig systemBlobStoreConfig;
/**
* Constructor to initialize instance of {@link BlobServiceClientFactoryImpl}.
* @param credentials Azure Credentials to use
......@@ -79,4 +89,37 @@ public class BlobServiceClientFactoryImpl implements IBlobServiceClientFactory {
return blobServiceClient;
}
/**
* @return BlobServiceClient for system resources.
*/
@Override
public BlobServiceClient getSystemBlobServiceClient() {
Validators.checkNotNull(defaultAzureCredential, "Credential");
if (this.clientCache.containsKey(SYSTEM_STORAGE_CACHE_KEY)) {
return this.clientCache.get(SYSTEM_STORAGE_CACHE_KEY);
}
String endpoint = String.format("https://%s.blob.core.windows.net", getSecret(systemBlobStoreConfig.getStorageAccountNameKeyName()));
RequestRetryOptions requestRetryOptions = blobStoreRetryConfiguration.getRequestRetryOptions();
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.endpoint(endpoint)
.credential(defaultAzureCredential)
.retryOptions(requestRetryOptions)
.buildClient();
this.clientCache.put(SYSTEM_STORAGE_CACHE_KEY, blobServiceClient);
return blobServiceClient;
}
/**
* @param keyName Name of the key to be read from key vault.
* @return secret value
*/
private String getSecret(final String keyName) {
return KeyVaultFacade.getSecretWithValidation(secretClient, keyName);
}
}
......@@ -110,30 +110,19 @@ public class BlobStore {
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done reading from {}", filePath).getMessage());
return downloadStream.toString(StandardCharsets.UTF_8.name());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to read specified blob", ex);
} catch (UnsupportedEncodingException ex) {
statusCode = HttpStatus.SC_BAD_REQUEST;
throw handleBlobStoreException(400, MessageFormatter.format("Encoding was not correct for item with name={}", filePath).getMessage(), ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("READ_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
return this.readFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
* @param filePath Path of file to be read.
* @param containerName Name of the storage container
* @return the content of file with provided file path.
*/
public String readFromStorageContainer(
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
return this.readFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
......@@ -147,24 +136,19 @@ public class BlobStore {
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
blockBlobClient.delete();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done deleting blob at {}", filePath).getMessage());
return true;
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to delete blob", ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("DELETE_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
return this.deleteFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
* @param filePath Path of file to be deleted.
* @param containerName Name of the storage container
* @return boolean indicating whether the deletion of given file was successful or not.
*/
public boolean deleteFromStorageContainer(
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
return this.deleteFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
......@@ -178,27 +162,21 @@ public class BlobStore {
final String filePath,
final String content,
final String containerName) {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int bytesSize = bytes.length;
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
this.writeToStorageContainerInternal(filePath, content, containerName, blobContainerClient);
}
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayInputStream dataStream = new ByteArrayInputStream(bytes)) {
blockBlobClient.upload(dataStream, bytesSize, true);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done uploading file content to {}", filePath).getMessage());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
throw handleBlobStoreException(500, "Failed to upload file content.", ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.format("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("WRITE_TO_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
/**
* @param filePath Path of file to be written at.
* @param content Content to be written in the file.
* @param containerName Name of the storage container
*/
public void writeToStorageContainer(
final String filePath,
final String content,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
this.writeToStorageContainerInternal(filePath, content, containerName, blobContainerClient);
}
/**
......@@ -332,6 +310,105 @@ public class BlobStore {
return blobCopyInfo;
}
/**
* @param filePath Path of file to be read.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
* @return the content of file with provided file path.
*/
private String readFromStorageContainerInternal(
final String filePath,
final String containerName,
final BlobContainerClient blobContainerClient) {
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done reading from {}", filePath).getMessage());
return downloadStream.toString(StandardCharsets.UTF_8.name());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to read specified blob", ex);
} catch (UnsupportedEncodingException ex) {
statusCode = HttpStatus.SC_BAD_REQUEST;
throw handleBlobStoreException(400, MessageFormatter.format("Encoding was not correct for item with name={}", filePath).getMessage(), ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("READ_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param filePath Path of file to be deleted.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
* @return boolean indicating whether the deletion of given file was successful or not.
*/
private boolean deleteFromStorageContainerInternal(
final String filePath,
final String containerName,
final BlobContainerClient blobContainerClient) {
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
blockBlobClient.delete();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done deleting blob at {}", filePath).getMessage());
return true;
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to delete blob", ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("DELETE_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param filePath Path of file to be written at.
* @param content Content to be written in the file.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
*/
private void writeToStorageContainerInternal(
final String filePath,
final String content,
final String containerName,
final BlobContainerClient blobContainerClient) {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int bytesSize = bytes.length;
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayInputStream dataStream = new ByteArrayInputStream(bytes)) {
blockBlobClient.upload(dataStream, bytesSize, true);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done uploading file content to {}", filePath).getMessage());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
throw handleBlobStoreException(500, "Failed to upload file content.", ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.format("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("WRITE_TO_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param blockBlobClient Blob client
* @param expiryTime Time after which SAS Token expires
......@@ -378,6 +455,21 @@ public class BlobStore {
}
}
/**
* @param containerName Name of storage container.
* @return blob container client corresponding for system resources.
*/
private BlobContainerClient getSystemBlobContainerClient(final String containerName) {
try {
BlobServiceClient serviceClient = blobServiceClientFactory.getSystemBlobServiceClient();
return serviceClient.getBlobContainerClient(containerName);
} catch (AppException ex) {
throw handleBlobStoreException(ex.getError().getCode(), "Error creating creating blob container client.", ex);
} catch (Exception ex) {
throw handleBlobStoreException(500, "Error creating creating blob container client.", ex);
}
}
/**
* Logs and returns instance of AppException.
*
......
......@@ -27,4 +27,10 @@ public interface IBlobServiceClientFactory {
* @return BlobServiceClient corresponding to the given data partition id.
*/
BlobServiceClient getBlobServiceClient(String dataPartitionId);
/**
*
* @return BlobServiceClient for system resources.
*/
BlobServiceClient getSystemBlobServiceClient();
}
package org.opengroup.osdu.azure.blobstorage.system.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* A configuration class to store blob related config for system resources.
*/
@Configuration
@Getter
@Setter
@ConfigurationProperties("osdu.azure.system")
public class SystemBlobStoreConfig {
private String storageAccountNameKeyName;
private String storageKeyKeyName;
}
......@@ -8,6 +8,9 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.security.keyvault.secrets.SecretClient;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.cosmosdb.system.config.SystemCosmosConfig;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.opengroup.osdu.azure.di.CosmosRetryConfiguration;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
......@@ -24,10 +27,18 @@ import org.springframework.stereotype.Component;
@Lazy
public class CosmosClientFactoryImpl implements ICosmosClientFactory {
private static final String LOGGER_NAME = CosmosClientFactoryImpl.class.getName();
private static final String SYSTEM_COSMOS_CACHE_KEY = "system_cosmos";
@Lazy
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private SecretClient secretClient;
@Autowired
private SystemCosmosConfig systemCosmosConfig;
private Map<String, CosmosClient> cosmosClientMap;
@Autowired
......@@ -57,6 +68,20 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
return this.cosmosClientMap.computeIfAbsent(cacheKey, cosmosClient -> createCosmosClient(dataPartitionId));
}
/**
* @return Cosmos client instance for system resources.
*/
@Override
public CosmosClient getSystemClient() {
if (this.cosmosClientMap.containsKey(SYSTEM_COSMOS_CACHE_KEY)) {
return this.cosmosClientMap.get(SYSTEM_COSMOS_CACHE_KEY);
}
return this.cosmosClientMap.computeIfAbsent(
SYSTEM_COSMOS_CACHE_KEY, cosmosClient -> createSystemCosmosClient()
);
}
/**
*
* @param dataPartitionId Data Partition Id
......@@ -76,4 +101,27 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
.info("Created CosmosClient for dataPartition {}.", dataPartitionId);
return cosmosClient;
}
/**
* Method to create the cosmos client for system resources.
* @return cosmos client.
*/
private CosmosClient createSystemCosmosClient() {
CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint(getSecret(systemCosmosConfig.getCosmosDBAccountKeyName()))
.key(getSecret(systemCosmosConfig.getCosmosPrimaryKeyName()))
.buildClient();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME)
.info("Created CosmosClient for system resources");
return cosmosClient;
}
/**
* @param keyName Name of the key to be read from key vault.
* @return secret value
*/
private String getSecret(final String keyName) {
return KeyVaultFacade.getSecretWithValidation(secretClient, keyName);
}
}
......@@ -14,4 +14,9 @@ public interface ICosmosClientFactory {
*/
CosmosClient getClient(String dataPartitionId);
/**
* @return Cosmos client instance for system resources.
*/
CosmosClient getSystemClient();
}
package org.opengroup.osdu.azure.cosmosdb.system.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* A configuration class to store cosmos db related config for system resources.
*/
@Configuration
@Getter
@Setter
@ConfigurationProperties("osdu.azure.system")
public class SystemCosmosConfig {
private String cosmosDBAccountKeyName;
private String cosmosPrimaryKeyName;
private String cosmosConnectionStringKeyName;
}
......@@ -24,4 +24,11 @@
server.tomcat.mbeanregistry.enabled=true
#Tomcat threads configuration
server.tomcat.min-spare-threads=100
\ No newline at end of file
server.tomcat.min-spare-threads=100
#Name of keys in key-vault for system resources/cosmos db
osdu.azure.system.cosmosDBAccountKeyName=system-cosmos-endpoint
osdu.azure.system.cosmosPrimaryKeyName=system-cosmos-primary-key
osdu.azure.system.cosmosConnectionStringKeyName=system-cosmos-connection
osdu.azure.system.storageAccountNameKeyName=airflow-storage
osdu.azure.system.storageKeyKeyName=airflow-storage-key
\ No newline at end of file
......@@ -15,6 +15,8 @@
package org.opengroup.osdu.azure.blobstorage;
import com.azure.identity.DefaultAzureCredential;
import com.azure.security.keyvault.secrets.SecretClient;
import com.azure.security.keyvault.secrets.models.KeyVaultSecret;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.common.policy.RequestRetryOptions;
import org.junit.jupiter.api.BeforeEach;
......@@ -23,6 +25,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.blobstorage.system.config.SystemBlobStoreConfig;
import org.opengroup.osdu.azure.cache.BlobServiceClientCache;
import org.opengroup.osdu.azure.di.BlobStoreConfiguration;
import org.opengroup.osdu.azure.di.BlobStoreRetryConfiguration;
......@@ -50,16 +53,27 @@ public class BlobServiceClientFactoryImplTest {
private BlobStoreRetryConfiguration blobStoreRetryConfiguration;
@Mock
private ILogger logger;
@Mock
private SystemBlobStoreConfig systemBlobStoreConfig;
@Mock
private SecretClient secretClient;
@Mock
private KeyVaultSecret keyVaultSecret;
@InjectMocks
private BlobServiceClientFactoryImpl sut;
private static final String ACCOUNT_NAME = "testAccount";
private static final String PARTITION_ID = "dataPartitionId";
private static final String SYSTEM_STORAGE_KEY_NAME = "system-storage";
private static final String SYSTEM_STORAGE_KEY_VALUE = "dummyURL";
@BeforeEach
void init() {
initMocks(this);
lenient().doReturn(ACCOUNT_NAME).when(configuration).getStorageAccountName();
lenient().doReturn(SYSTEM_STORAGE_KEY_NAME).when(systemBlobStoreConfig).getStorageAccountNameKeyName();
lenient().doReturn(SYSTEM_STORAGE_KEY_VALUE).when(keyVaultSecret).getValue();
lenient().doReturn(keyVaultSecret).when(secretClient).getSecret(SYSTEM_STORAGE_KEY_NAME);
}
@Test
......@@ -96,6 +110,14 @@ public class BlobServiceClientFactoryImplTest {
assertNotNull(serviceClient);
}
@Test
public void should_return_validContainer_System() {
when(this.blobStoreRetryConfiguration.getRequestRetryOptions()).thenReturn(new RequestRetryOptions());
BlobServiceClient serviceClient = this.sut.getSystemBlobServiceClient();
assertNotNull(serviceClient);
}
@Test
public void should_return_cachedContainer_when_cachedEarlier() {
when(this.partitionService.getPartition(PARTITION_ID)).thenReturn(
......
......@@ -140,6 +140,7 @@ public class BlobStoreTest {
lenient().when(blobContainerClient.getBlobClient(FILE_PATH)).thenReturn(blobClient);
lenient().when(blobServiceClient.getBlobContainerClient(STORAGE_CONTAINER_NAME)).thenReturn(blobContainerClient);
lenient().when(blobServiceClientFactory.getBlobServiceClient(PARTITION_ID)).thenReturn(blobServiceClient);
lenient().when(blobServiceClientFactory.getSystemBlobServiceClient()).thenReturn(blobServiceClient);
lenient().doNothing().when(logger).warning(eq("azure-core-lib"), any(), anyMap());
}
......@@ -160,6 +161,18 @@ public class BlobStoreTest {
}
}
@Test
public void readFromStorageContainer_ErrorCreatingBlobContainerClient_System() {
doThrow(BlobStorageException.class).when(blobServiceClientFactory).getSystemBlobServiceClient();
try {
String content = blobStore.readFromStorageContainer(FILE_PATH, STORAGE_CONTAINER_NAME);
} catch (AppException ex) {
assertEquals(500, ex.getError().getCode());
} catch (Exception ex) {
fail("should not get different error code");
}
}