Commit d23fcbda authored by harshit aggarwal's avatar harshit aggarwal
Browse files

Adding more UT's and refactoring IT

parent d61c61ef
Pipeline #5157 failed with stage
in 1 minute and 48 seconds
......@@ -4,7 +4,7 @@
<artifactId>os-wks</artifactId>
<groupId>org.opengroup.osdu</groupId>
<version>0.0.1</version>
<relativePath>../../</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>wks-azure</artifactId>
......@@ -12,10 +12,6 @@
<packaging>jar</packaging>
<properties>
<azure.version>2.1.7</azure.version>
<azure.appservice.resourcegroup></azure.appservice.resourcegroup>
<azure.appservice.plan></azure.appservice.plan>
<azure.appservice.appname></azure.appservice.appname>
<azure.appservice.subscription></azure.appservice.subscription>
</properties>
<dependencies>
<dependency>
......@@ -60,7 +56,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<version>0.0.11</version>
<version>0.0.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
......
package org.opengroup.osdu.wks.provider.azure.constants;
public class Constants {
public static final String BEARER= "Bearer ";
public static final String CORRELATION_ID = "correlation-id";
public static final String DATA = "data";
public static final String DATA_PARTITION_ID = "data-partition-id";
public static final String JSON_EXTENSION = ".json";
}
package org.opengroup.osdu.wks.provider.azure.credentials;
import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ClientCredentialParameters;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.opengroup.osdu.wks.provider.interfaces.UserCredential;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@Configuration
public class AzureServicePrincipal implements UserCredential {
@Value("${azure-client-id}")
private String clientId;
@Value("${azure-client-secret}")
private String clientSecret;
@Value("${azure-tenant-id}")
private String tenantId;
@Value("${azure-app-resource-id}")
private String appResourceId;
@Override
public String getIdToken(TenantInfo tenant) throws ApplicationException {
String authorityUrl = "https://login.microsoftonline.com/" + tenantId;
String scope = appResourceId + "/.default";
try {
ConfidentialClientApplication app = ConfidentialClientApplication.builder(
clientId,
ClientCredentialFactory.create(clientSecret))
.authority(authorityUrl)
.build();
ClientCredentialParameters clientCredentialParam = ClientCredentialParameters.builder(
Collections.singleton(scope))
.build();
CompletableFuture<IAuthenticationResult> future = app.acquireToken(clientCredentialParam);
return Constants.BEARER + future.get().accessToken();
} catch (Exception e) {
throw new ApplicationException(e.getMessage());
}
}
}
package org.opengroup.osdu.wks.provider.azure.credentials;
import org.opengroup.osdu.azure.util.AzureServicePrincipal;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.interfaces.UserCredential;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
@Configuration
public class JwtTokenGenerator implements UserCredential {
......@@ -15,14 +20,14 @@ public class JwtTokenGenerator implements UserCredential {
@Autowired
private AzureBootstrapConfig azureBootstrapConfig;
@Autowired
private AzureServicePrincipal azureServicePrincipal;
@Override
public String getIdToken(TenantInfo tenant) {
@Autowired
private ILogger logger;
if(azureServicePrincipal == null) {
azureServicePrincipal = new AzureServicePrincipal();
}
@Override
public String getIdToken(TenantInfo tenant) throws ApplicationException {
String token = null;
......@@ -34,8 +39,9 @@ public class JwtTokenGenerator implements UserCredential {
azureBootstrapConfig.getAppResourceId());
}
catch (Exception e) {
System.out.println(e.getMessage());
catch (UnsupportedEncodingException e) {
logger.error(azureBootstrapConfig.getLogPrefix(), e.getMessage(), Collections.emptyMap());
throw new ApplicationException(e.getMessage(), e.getCause());
}
return token;
......
......@@ -6,6 +6,8 @@ import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import lombok.Getter;
import org.opengroup.osdu.azure.util.AzureServicePrincipal;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
......@@ -14,6 +16,7 @@ import org.springframework.context.annotation.Configuration;
import javax.inject.Named;
@Configuration
@Getter
public class AzureBootstrapConfig {
@Value("${azure.storage.account-name}")
......@@ -37,6 +40,27 @@ public class AzureBootstrapConfig {
@Value("${azure.cosmosdb.database}")
private String cosmosDBName;
@Value("${azure-client-id}")
private String clientId;
@Value("${azure-client-secret}")
private String clientSecret;
@Value("${azure-tenant-id}")
private String tenantId;
@Value("${azure-app-resource-id}")
private String appResourceId;
@Value("${executor-n-threads}")
private String nThreads;
@Value("${max_concurrent_calls}")
private String maxConcurrentCalls;
@Value("${LOG_PREFIX}")
private String logPrefix;
@Bean
@Named("STORAGE_ACCOUNT_NAME")
public String storageAccount() {
......@@ -93,6 +117,12 @@ public class AzureBootstrapConfig {
return subscriptionClient;
}
@Bean
@Named("AZURE_SERVICE_PRINCIPAL")
public AzureServicePrincipal getAzureServicePrincipal() {
return new AzureServicePrincipal();
}
String getKeyVaultSecret(SecretClient kv, String secretName) {
KeyVaultSecret secret = kv.getSecret(secretName);
if (secret == null) {
......
package org.opengroup.osdu.wks.provider.azure.di;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CosmosContainerConfig {
@Value("${tenantInfo.container.name}")
private String tenantInfoContainerName;
@Bean
public String tenantInfoContainer(){
return tenantInfoContainerName;
}
}
\ No newline at end of file
package org.opengroup.osdu.wks.provider.azure.di;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
@Component
public class TenantFactoryImpl implements ITenantFactory {
@Autowired
private String tenantInfoContainer;
@Autowired
private String cosmosDBName;
@Autowired
private CosmosStore cosmosStore;
private Map<String, TenantInfo> tenants;
public boolean exists(String tenantName)
{
if (this.tenants == null)
initTenants();
return this.tenants.containsKey(tenantName);
}
public TenantInfo getTenantInfo(String tenantName) {
if (this.tenants == null)
initTenants();
return this.tenants.get(tenantName);
}
public Collection<TenantInfo> listTenantInfo() {
if (this.tenants == null)
initTenants();
return this.tenants.values();
}
public <V> ICache<String, V> createCache(String tenantName, String host, int port, int expireTimeSeconds, Class<V> classOfV)
{
return null;
}
public void flushCache() {}
private void initTenants() {
this.tenants = new HashMap<>();
cosmosStore.findAllItems(Constants.DATA_PARTITION_ID, cosmosDBName, tenantInfoContainer, TenantInfoDoc.class).forEach(
tenantInfoDoc -> {
TenantInfo ti = new TenantInfo();
String tenantName = tenantInfoDoc.getId();
ti.setName(tenantName);
this.tenants.put(tenantName, ti);
}
);
}
}
\ No newline at end of file
package org.opengroup.osdu.wks.provider.azure.di;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
public class TenantInfoDoc {
private String id;
private List<String> groups;
}
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.SubscriptionClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CreateSubscriptionClient {
@Autowired
private SubscriptionClient subscriptionClient;
public SubscriptionClient getSubscriptionClient() {
return subscriptionClient;
}
}
......@@ -4,20 +4,24 @@ import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.SubscriptionClient;
import lombok.extern.java.Log;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@Log
public class MessageHandler implements IMessageHandler {
private final SubscriptionClient receiveClient;
private final ProcessWKSTransform processWKSTransform;
private final ILogger logger;
private final AzureBootstrapConfig azureBootstrapConfig;
public MessageHandler(SubscriptionClient client, ProcessWKSTransform processWKSTransform) {
public MessageHandler(SubscriptionClient client, ProcessWKSTransform processWKSTransform, ILogger logger, AzureBootstrapConfig azureBootstrapConfig) {
this.receiveClient = client;
this.processWKSTransform = processWKSTransform;
this.logger = logger;
this.azureBootstrapConfig = azureBootstrapConfig;
}
@Override
......@@ -34,7 +38,7 @@ public class MessageHandler implements IMessageHandler {
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
log.severe(exceptionPhase + "-" + throwable.getMessage());
logger.error(azureBootstrapConfig.getLogPrefix(),exceptionPhase + "-" + throwable.getMessage(), Collections.emptyMap());
}
}
......@@ -4,42 +4,48 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.microsoft.azure.servicebus.IMessage;
import lombok.extern.java.Log;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.BadRequestException;
import org.opengroup.osdu.wks.model.RawRecordDetails;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.service.WKSService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import java.util.logging.Level;
import java.util.Collections;
import static java.nio.charset.StandardCharsets.UTF_8;
@Component
@Log
public class ProcessWKSTransform {
private static final String DATA = "data";
@Autowired
private WKSService wKSService;
@Autowired
private ILogger logger;
@Autowired
private AzureBootstrapConfig azureBootstrapConfig;
public boolean initiateWksTransformation(IMessage message) {
String dataPartitionId = message.getProperties().get(Constants.DATA_PARTITION_ID).toString();
String correlationId = message.getProperties().get(Constants.CORRELATION_ID).toString();
String dataPartitionId = message.getProperties().get(DpsHeaders.DATA_PARTITION_ID).toString();
String correlationId = message.getProperties().get(DpsHeaders.CORRELATION_ID).toString();
try {
RawRecordDetails[] rawRecordDetails = retrieveDataFromMessage(message);
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
} catch (BadRequestException e) {
log.severe(e.getErrorMsg());
log.log(Level.SEVERE, String.format("Bad Request Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()));
logger.error(azureBootstrapConfig.getLogPrefix(), String.format("Bad Request Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()), Collections.emptyMap());
} catch (ApplicationException e) {
log.severe(e.getErrorMsg());
log.log(Level.SEVERE, String.format("Application Error Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()));
logger.error(azureBootstrapConfig.getLogPrefix(), String.format("Application Error Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()), Collections.emptyMap());
return false;
}
return true;
......@@ -54,7 +60,7 @@ public class ProcessWKSTransform {
throw new BadRequestException(HttpStatus.BAD_REQUEST,"Invalid record change message, message object not found");
}
String dataValue = msg.getAsJsonObject().get(Constants.DATA).toString();
String dataValue = msg.getAsJsonObject().get(DATA).toString();
return getRecordsChangeData(dataValue);
}
......
......@@ -3,58 +3,53 @@ package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import lombok.extern.java.Log;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.interfaces.SubscriptionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
@Component
@Log
public class SubscriptionManagerImpl implements SubscriptionManager {
@Autowired
private ITenantFactory tenantFactory;
private ILogger logger;
@Autowired
private CreateSubscriptionClient subscriptionClient;
private SubscriptionClientFactory subscriptionClientFactory;
@Autowired
private ProcessWKSTransform processWKSTransform;
private ExecutorService executorService;
@Autowired
private AzureBootstrapConfig azureBootstrapConfig;
@Override
public void subscribeRecordsChangeEvent() {
// TODO subscribe for every tenant currently subscribing for opendes
SubscriptionClient subscriptionClient = this.subscriptionClient.getSubscriptionClient();
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient();
int nThreads = 2;
executorService = Executors.newFixedThreadPool(nThreads);
registerMessageHandler(subscriptionClient);
ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads()));
registerMessageHandler(subscriptionClient, executorService);
}
public void registerMessageHandler(SubscriptionClient subscriptionClient) {
private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) {
try {
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processWKSTransform);
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processWKSTransform, logger, azureBootstrapConfig);
// The maximum number of concurrent calls to the callback the message pump should initiate
int maxConcurrentCalls = 1;
subscriptionClient.registerMessageHandler(
messageHandler,
new MessageHandlerOptions(maxConcurrentCalls, false, Duration.ofMinutes(2)),
new MessageHandlerOptions(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxConcurrentCalls()), false, Duration.ofMinutes(2)),
executorService);
} catch (InterruptedException | ServiceBusException e) {
log.severe(e.getMessage());
log.log(Level.SEVERE, String.format("Error registering message handler %s", e.getMessage()));
e.printStackTrace();
logger.error(azureBootstrapConfig.getLogPrefix(), String.format("Error registering message handler %s", e.getMessage()), Collections.emptyMap());
}
}
......
package org.opengroup.osdu.wks.provider.azure.storage;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.java.Log;
import org.opengroup.osdu.azure.blobstorage.BlobStore;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.interfaces.MappingStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.Collections;
@Component
@Log
public class MappingStoreImpl implements MappingStore {
private static final String JSON_EXTENSION = ".json";
@Autowired
private BlobStore blobStore;
@Autowired
private BlobContainerClient blobContainerClient;
private ILogger logger;
@Autowired
private AzureBootstrapConfig azureBootstrapConfig;
@Override
public MappingsModel getMapping(final String fileName) {
MappingsModel mappings = null;
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(fileName + Constants.JSON_EXTENSION).getBlockBlobClient();
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
String content = downloadStream.toString(StandardCharsets.UTF_8.name());