Commit 83222997 authored by Ronak Sakhuja's avatar Ronak Sakhuja
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/lib/cloud/azure/os-core-lib-azure into rosakhuj/eventgridRetry
parents a39227f8 edf97f4f
Pipeline #48928 failed with stage
in 9 seconds
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.10.0-SNAPSHOT</version>
<version>0.11.0-SNAPSHOT</version>
<name>core-lib-azure</name>
<properties>
......
......@@ -15,9 +15,12 @@
package org.opengroup.osdu.azure.blobstorage;
import com.azure.identity.DefaultAzureCredential;
import com.azure.security.keyvault.secrets.SecretClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.policy.RequestRetryOptions;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.blobstorage.system.config.SystemBlobStoreConfig;
import org.opengroup.osdu.azure.cache.BlobServiceClientCache;
import org.opengroup.osdu.azure.di.BlobStoreRetryConfiguration;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
......@@ -32,10 +35,17 @@ public class BlobServiceClientFactoryImpl implements IBlobServiceClientFactory {
private DefaultAzureCredential defaultAzureCredential;
private PartitionServiceClient partitionService;
private BlobServiceClientCache clientCache;
private static final String SYSTEM_STORAGE_CACHE_KEY = "system_storage";
@Autowired
private BlobStoreRetryConfiguration blobStoreRetryConfiguration;
@Autowired
private SecretClient secretClient;
@Autowired
private SystemBlobStoreConfig systemBlobStoreConfig;
/**
* Constructor to initialize instance of {@link BlobServiceClientFactoryImpl}.
* @param credentials Azure Credentials to use
......@@ -79,4 +89,37 @@ public class BlobServiceClientFactoryImpl implements IBlobServiceClientFactory {
return blobServiceClient;
}
/**
* @return BlobServiceClient for system resources.
*/
@Override
public BlobServiceClient getSystemBlobServiceClient() {
Validators.checkNotNull(defaultAzureCredential, "Credential");
if (this.clientCache.containsKey(SYSTEM_STORAGE_CACHE_KEY)) {
return this.clientCache.get(SYSTEM_STORAGE_CACHE_KEY);
}
String endpoint = String.format("https://%s.blob.core.windows.net", getSecret(systemBlobStoreConfig.getStorageAccountNameKeyName()));
RequestRetryOptions requestRetryOptions = blobStoreRetryConfiguration.getRequestRetryOptions();
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.endpoint(endpoint)
.credential(defaultAzureCredential)
.retryOptions(requestRetryOptions)
.buildClient();
this.clientCache.put(SYSTEM_STORAGE_CACHE_KEY, blobServiceClient);
return blobServiceClient;
}
/**
* @param keyName Name of the key to be read from key vault.
* @return secret value
*/
private String getSecret(final String keyName) {
return KeyVaultFacade.getSecretWithValidation(secretClient, keyName);
}
}
......@@ -110,30 +110,19 @@ public class BlobStore {
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done reading from {}", filePath).getMessage());
return downloadStream.toString(StandardCharsets.UTF_8.name());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to read specified blob", ex);
} catch (UnsupportedEncodingException ex) {
statusCode = HttpStatus.SC_BAD_REQUEST;
throw handleBlobStoreException(400, MessageFormatter.format("Encoding was not correct for item with name={}", filePath).getMessage(), ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("READ_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
return this.readFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
* @param filePath Path of file to be read.
* @param containerName Name of the storage container
* @return the content of file with provided file path.
*/
public String readFromStorageContainer(
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
return this.readFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
......@@ -147,24 +136,19 @@ public class BlobStore {
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
blockBlobClient.delete();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done deleting blob at {}", filePath).getMessage());
return true;
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to delete blob", ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("DELETE_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
return this.deleteFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
* @param filePath Path of file to be deleted.
* @param containerName Name of the storage container
* @return boolean indicating whether the deletion of given file was successful or not.
*/
public boolean deleteFromStorageContainer(
final String filePath,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
return this.deleteFromStorageContainerInternal(filePath, containerName, blobContainerClient);
}
/**
......@@ -178,27 +162,21 @@ public class BlobStore {
final String filePath,
final String content,
final String containerName) {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int bytesSize = bytes.length;
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
this.writeToStorageContainerInternal(filePath, content, containerName, blobContainerClient);
}
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayInputStream dataStream = new ByteArrayInputStream(bytes)) {
blockBlobClient.upload(dataStream, bytesSize, true);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done uploading file content to {}", filePath).getMessage());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
throw handleBlobStoreException(500, "Failed to upload file content.", ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.format("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("WRITE_TO_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
/**
* @param filePath Path of file to be written at.
* @param content Content to be written in the file.
* @param containerName Name of the storage container
*/
public void writeToStorageContainer(
final String filePath,
final String content,
final String containerName) {
BlobContainerClient blobContainerClient = getSystemBlobContainerClient(containerName);
this.writeToStorageContainerInternal(filePath, content, containerName, blobContainerClient);
}
/**
......@@ -332,6 +310,105 @@ public class BlobStore {
return blobCopyInfo;
}
/**
* @param filePath Path of file to be read.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
* @return the content of file with provided file path.
*/
private String readFromStorageContainerInternal(
final String filePath,
final String containerName,
final BlobContainerClient blobContainerClient) {
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done reading from {}", filePath).getMessage());
return downloadStream.toString(StandardCharsets.UTF_8.name());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to read specified blob", ex);
} catch (UnsupportedEncodingException ex) {
statusCode = HttpStatus.SC_BAD_REQUEST;
throw handleBlobStoreException(400, MessageFormatter.format("Encoding was not correct for item with name={}", filePath).getMessage(), ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("READ_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param filePath Path of file to be deleted.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
* @return boolean indicating whether the deletion of given file was successful or not.
*/
private boolean deleteFromStorageContainerInternal(
final String filePath,
final String containerName,
final BlobContainerClient blobContainerClient) {
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
blockBlobClient.delete();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done deleting blob at {}", filePath).getMessage());
return true;
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to delete blob", ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("DELETE_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param filePath Path of file to be written at.
* @param content Content to be written in the file.
* @param containerName Name of the storage container
* @param blobContainerClient Blob container client
*/
private void writeToStorageContainerInternal(
final String filePath,
final String content,
final String containerName,
final BlobContainerClient blobContainerClient) {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int bytesSize = bytes.length;
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayInputStream dataStream = new ByteArrayInputStream(bytes)) {
blockBlobClient.upload(dataStream, bytesSize, true);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).debug("{}", MessageFormatter.format("Done uploading file content to {}", filePath).getMessage());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
throw handleBlobStoreException(500, "Failed to upload file content.", ex);
} catch (IOException ex) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.format("{}/{}", new String[]{containerName, filePath}).getMessage();
logDependency("WRITE_TO_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
/**
* @param blockBlobClient Blob client
* @param expiryTime Time after which SAS Token expires
......@@ -378,6 +455,21 @@ public class BlobStore {
}
}
/**
* @param containerName Name of storage container.
* @return blob container client corresponding for system resources.
*/
private BlobContainerClient getSystemBlobContainerClient(final String containerName) {
try {
BlobServiceClient serviceClient = blobServiceClientFactory.getSystemBlobServiceClient();
return serviceClient.getBlobContainerClient(containerName);
} catch (AppException ex) {
throw handleBlobStoreException(ex.getError().getCode(), "Error creating creating blob container client.", ex);
} catch (Exception ex) {
throw handleBlobStoreException(500, "Error creating creating blob container client.", ex);
}
}
/**
* Logs and returns instance of AppException.
*
......
......@@ -27,4 +27,10 @@ public interface IBlobServiceClientFactory {
* @return BlobServiceClient corresponding to the given data partition id.
*/
BlobServiceClient getBlobServiceClient(String dataPartitionId);
/**
*
* @return BlobServiceClient for system resources.
*/
BlobServiceClient getSystemBlobServiceClient();
}
package org.opengroup.osdu.azure.blobstorage.system.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* A configuration class to store blob related config for system resources.
*/
@Configuration
@Getter
@Setter
@ConfigurationProperties("osdu.azure.system")
public class SystemBlobStoreConfig {
private String storageAccountNameKeyName;
private String storageKeyKeyName;
}
......@@ -8,6 +8,9 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.security.keyvault.secrets.SecretClient;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.cosmosdb.system.config.SystemCosmosConfig;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.opengroup.osdu.azure.di.CosmosRetryConfiguration;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
......@@ -24,10 +27,18 @@ import org.springframework.stereotype.Component;
@Lazy
public class CosmosClientFactoryImpl implements ICosmosClientFactory {
private static final String LOGGER_NAME = CosmosClientFactoryImpl.class.getName();
private static final String SYSTEM_COSMOS_CACHE_KEY = "system_cosmos";
@Lazy
@Autowired
private PartitionServiceClient partitionService;
@Autowired
private SecretClient secretClient;
@Autowired
private SystemCosmosConfig systemCosmosConfig;
private Map<String, CosmosClient> cosmosClientMap;
@Autowired
......@@ -57,6 +68,20 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
return this.cosmosClientMap.computeIfAbsent(cacheKey, cosmosClient -> createCosmosClient(dataPartitionId));
}
/**
* @return Cosmos client instance for system resources.
*/
@Override
public CosmosClient getSystemClient() {
if (this.cosmosClientMap.containsKey(SYSTEM_COSMOS_CACHE_KEY)) {
return this.cosmosClientMap.get(SYSTEM_COSMOS_CACHE_KEY);
}
return this.cosmosClientMap.computeIfAbsent(
SYSTEM_COSMOS_CACHE_KEY, cosmosClient -> createSystemCosmosClient()
);
}
/**
*
* @param dataPartitionId Data Partition Id
......@@ -76,4 +101,27 @@ public class CosmosClientFactoryImpl implements ICosmosClientFactory {
.info("Created CosmosClient for dataPartition {}.", dataPartitionId);
return cosmosClient;
}
/**
* Method to create the cosmos client for system resources.
* @return cosmos client.
*/
private CosmosClient createSystemCosmosClient() {
CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint(getSecret(systemCosmosConfig.getCosmosDBAccountKeyName()))
.key(getSecret(systemCosmosConfig.getCosmosPrimaryKeyName()))
.buildClient();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME)
.info("Created CosmosClient for system resources");
return cosmosClient;
}
/**
* @param keyName Name of the key to be read from key vault.
* @return secret value
*/
private String getSecret(final String keyName) {
return KeyVaultFacade.getSecretWithValidation(secretClient, keyName);
}
}
......@@ -14,4 +14,9 @@ public interface ICosmosClientFactory {
*/
CosmosClient getClient(String dataPartitionId);
/**
* @return Cosmos client instance for system resources.
*/
CosmosClient getSystemClient();
}
package org.opengroup.osdu.azure.cosmosdb.system.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* A configuration class to store cosmos db related config for system resources.
*/
@Configuration
@Getter
@Setter
@ConfigurationProperties("osdu.azure.system")
public class SystemCosmosConfig {
private String cosmosDBAccountKeyName;
private String cosmosPrimaryKeyName;
private String cosmosConnectionStringKeyName;
}
......@@ -21,6 +21,7 @@ import org.opengroup.osdu.core.common.http.IHttpClient;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.http.UrlFetchServiceImpl;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
......@@ -31,8 +32,10 @@ import java.net.URISyntaxException;
*/
@Primary
@Component
public class HttpClientAzure extends UrlFetchServiceImpl implements IHttpClient {
public class HttpClientAzure implements IHttpClient {
@Autowired
private UrlFetchServiceImpl urlFetchService;
/**
* calls urlfetchservice's send request.
*
......@@ -43,7 +46,7 @@ public class HttpClientAzure extends UrlFetchServiceImpl implements IHttpClient
public HttpResponse send(final HttpRequest httpRequest) {
org.opengroup.osdu.core.common.model.http.HttpResponse response = null;
try {
response = super.sendRequest(FetchServiceHttpRequest.builder()
response = this.urlFetchService.sendRequest(FetchServiceHttpRequest.builder()
.body(httpRequest.getBody())
.httpMethod(httpRequest.getHttpMethod())
.queryParams(httpRequest.getQueryParams())
......
......@@ -82,6 +82,20 @@ public class PartitionInfoAzure {
@SerializedName("policy-service-enabled")
private Property policyServiceConfig = Property.builder().sensitive(false).build();
@Builder.Default
@SerializedName("airflow-enabled")
private Property airflowEnabledConfig = Property.builder().value("false").sensitive(false).build();
@SerializedName("airflow-endpoint")
private Property airflowEndpointConfig;
@SerializedName("airflow-username")
private Property airflowUsernameConfig;
@SerializedName("airflow-password")
private Property airflowPasswordConfig;
private Property azureSubscriptionIdConfig = Property.builder().value("subscription-id").sensitive(true).build();
private Property servicePrincipalAppIdConfig = Property.builder().value("app-dev-sp-username").sensitive(true).build();
......@@ -156,6 +170,43 @@ public class PartitionInfoAzure {
return String.valueOf(this.getElasticPasswordConfig().getValue());
}
/**
* @return partition airflow endpoint
*/
public Boolean getAirflowEnabled() {
return Boolean.parseBoolean((String) this.getAirflowEnabledConfig().getValue());
}
/**
* @return partition airflow endpoint
*/
public String getAirflowEndpoint() {
if (this.getAirflowEndpointConfig().isSensitive()) {
return getSecret(this.getAirflowEndpointConfig());
}
return String.valueOf(this.getAirflowEndpointConfig().getValue());
}
/**
* @return partition airflow username
*/
public String getAirflowUsername() {
if (this.getAirflowUsernameConfig().isSensitive()) {
return getSecret(this.getAirflowUsernameConfig());
}
return String.valueOf(this.getAirflowUsernameConfig().getValue());
}
/**
* @return partition airflow password
*/
public String getAirflowPassword() {
if (this.getAirflowPasswordConfig().isSensitive()) {
return getSecret(this.getAirflowPasswordConfig());
}
return String.valueOf(this.getAirflowPasswordConfig().getValue());
}
/**
* @return partition elastic ssl enabled
*/
......
......@@ -24,4 +24,11 @@
server.tomcat.mbeanregistry.enabled=true