Commit 72a2a6af authored by Orsu Akhil's avatar Orsu Akhil
Browse files

add multi partition support

parent 67b08617
Pipeline #16630 failed with stages
in 4 minutes and 51 seconds
......@@ -10,6 +10,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<springfox-version>2.7.0</springfox-version>
<osdu.oscorecommon-version>0.3.18</osdu.oscorecommon-version>
</properties>
<licenses>
......
......@@ -8,6 +8,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>wks-azure</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>WKS service on Azure</description>
<packaging>jar</packaging>
<properties>
......@@ -51,12 +52,12 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.11</version>
<version>${osdu.oscorecommon-version}</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<version>0.0.23</version>
<version>0.0.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
......@@ -72,16 +73,6 @@
required by the azure-core library. This needs to be done for each
app that depends on this library
-->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
......@@ -96,6 +87,18 @@
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR11</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>${gitlab-server}</id>
......
package org.opengroup.osdu.wks.provider.azure.config;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@Primary
@Scope(value = "ThreadScope", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class ThreadDpsHeaders extends DpsHeaders {
public void setThreadContext(String dataPartitionId, String correlationId){
System.out.println("each message");
System.out.println(this.getHeaders());
Map<String,String> headers = new HashMap<>();
headers.put(DpsHeaders.DATA_PARTITION_ID,dataPartitionId);
headers.put(DpsHeaders.CORRELATION_ID,correlationId);
this.addFromMap(headers);
System.out.println(this.getHeaders());
}
}
package org.opengroup.osdu.wks.provider.azure.di;
import com.azure.security.keyvault.secrets.SecretClient;
import com.azure.security.keyvault.secrets.models.KeyVaultSecret;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import lombok.Getter;
import org.opengroup.osdu.azure.util.AzureServicePrincipal;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,17 +17,12 @@ public class AzureBootstrapConfig {
private final static Logger LOGGER = LoggerFactory.getLogger(AzureBootstrapConfig.class);
@Value("${azure.storage.account-name}")
private String storageAccount;
@Value("${azure.storage.container-name}")
private String storageContainer;
@Value("${azure.servicebus.topic-name}")
private String serviceBusTopic;
private String serviceBusConnectionString;
@Value("${azure.servicebus.topic-subscription}")
private String serviceBusTopicSubscription;
......@@ -62,11 +50,12 @@ public class AzureBootstrapConfig {
@Value("${max-lock-renew}")
private String maxLockRenewDurationInSeconds;
@Value("${shared_tenant}")
private String sharedTenant;
@Bean
@Named("STORAGE_ACCOUNT_NAME")
public String storageAccount() {
return storageAccount;
}
@Named("SHARED_TENANT")
public String sharedTenantName() {return sharedTenant;}
@Bean
@Named("STORAGE_CONTAINER_NAME")
......@@ -80,64 +69,14 @@ public class AzureBootstrapConfig {
return keyVaultURL;
}
@Bean
@Named("COSMOS_ENDPOINT")
public String cosmosEndpoint(SecretClient kv) {
return getKeyVaultSecret(kv, "opendes-cosmos-endpoint");
}
@Bean
@Named("COSMOS_KEY")
public String cosmosKey(SecretClient kv) {
return getKeyVaultSecret(kv, "opendes-cosmos-primary-key");
}
@Bean
public MDCContextMap mdcContextMap() {
return new MDCContextMap();
}
@Bean
public SubscriptionClient subscriptionClient(SecretClient kv) {
String entityPath = serviceBusTopic + "/subscriptions/" + serviceBusTopicSubscription;
serviceBusConnectionString = getKeyVaultSecret(kv, "opendes-sb-connection");
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(
serviceBusConnectionString,
entityPath
);
SubscriptionClient subscriptionClient;
try {
subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);
} catch (InterruptedException | ServiceBusException e) {
LOGGER.error("Unexpected error creating Subscription Client", e);
throw new AppException(500, "Server Error", "Unexpected error creating Subscription Client", e);
}
return subscriptionClient;
}
@Bean
public AzureServicePrincipal getAzureServicePrincipal() {
return new AzureServicePrincipal();
}
String getKeyVaultSecret(SecretClient kv, String secretName) {
KeyVaultSecret secret = kv.getSecret(secretName);
if (secret == null) {
LOGGER.error("No secret found with name {}", secretName);
throw new IllegalStateException(String.format("No secret found with name %s", secretName));
}
String secretValue = secret.getValue();
if (secretValue == null) {
LOGGER.error("Secret unexpectedly missing from KeyVault response for secret with name {}", secretName);
throw new IllegalStateException(String.format(
"Secret unexpectedly missing from KeyVault response for secret with name %s", secretName));
}
return secretValue;
}
}
......@@ -5,9 +5,11 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.microsoft.azure.servicebus.IMessage;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.wks.config.ThreadScopeContextHolder;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.BadRequestException;
import org.opengroup.osdu.wks.model.RawRecordDetails;
import org.opengroup.osdu.wks.provider.azure.config.ThreadDpsHeaders;
import org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap;
import org.opengroup.osdu.wks.service.WKSService;
import org.slf4j.Logger;
......@@ -31,11 +33,15 @@ public class ProcessWKSTransform {
@Autowired
private MDCContextMap mdcContextMap;
@Autowired
private ThreadDpsHeaders dpsHeaders;
public void initiateWksTransformation(IMessage message) {
try {
String dataPartitionId = message.getProperties().get(DpsHeaders.DATA_PARTITION_ID).toString();
String correlationId = message.getProperties().get(DpsHeaders.CORRELATION_ID).toString();
MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId));
dpsHeaders.setThreadContext(dataPartitionId,correlationId);
RawRecordDetails[] rawRecordDetails = retrieveDataFromMessage(message);
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
......@@ -51,6 +57,7 @@ public class ProcessWKSTransform {
LOGGER.error("Invalid format for message with id: {}", message.getMessageId());
}
ThreadScopeContextHolder.getContext().clear();
MDC.clear();
}
......
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.azure.servicebus.ISubscriptionClientFactory;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SubscriptionClientFactory {
private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionClientFactory.class);
@Autowired AzureBootstrapConfig azureBootstrapConfig;
@Autowired
private SubscriptionClient subscriptionClient;
private ISubscriptionClientFactory subscriptionClientFactory;
public SubscriptionClient getSubscriptionClient() {
return subscriptionClient;
public SubscriptionClient getSubscriptionClient(String dataPartition) {
String sbTopic = azureBootstrapConfig.getServiceBusTopic();
String sbSubscription = azureBootstrapConfig.getServiceBusTopicSubscription();
try {
return subscriptionClientFactory.getClient(dataPartition,sbTopic,sbSubscription);
} catch (ServiceBusException | InterruptedException e) {
LOGGER.error("Unexpected error creating Subscription Client", e);
throw new AppException(500, "Server Error", "Unexpected error creating Subscription Client", e);
}
}
}
......@@ -3,6 +3,8 @@ package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.interfaces.SubscriptionManager;
import org.slf4j.Logger;
......@@ -10,8 +12,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Component
public class SubscriptionManagerImpl implements SubscriptionManager {
......@@ -26,15 +30,21 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Autowired
private AzureBootstrapConfig azureBootstrapConfig;
@Autowired
private ITenantFactory tenantFactory;
@Override
public void subscribeRecordsChangeEvent() {
// TODO subscribe for every tenant currently subscribing for opendes
List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId).collect(
Collectors.toList());
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient();
for(String partition : tenantList){
SubscriptionClient subscriptionClient = this.subscriptionClientFactory.getSubscriptionClient(partition);
ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads()));
registerMessageHandler(subscriptionClient, executorService);
ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads()));
registerMessageHandler(subscriptionClient, executorService);
}
}
private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) {
......
package org.opengroup.osdu.wks.provider.azure.statusstore;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.wks.config.RequestIdentity;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
......@@ -35,7 +35,7 @@ public class StatusStoreServiceImpl implements StatusStoreService {
RelationshipStatusDoc doc = new RelationshipStatusDoc(relationshipStatus);
try {
cosmosStore.upsertItem(requestIdentity.getDataPartitionId(), cosmosContainerConfig.getDatabase(), cosmosContainerConfig.getRelationshipStatusContainer(), doc);
cosmosStore.upsertItem(requestIdentity.getDataPartitionId(), cosmosContainerConfig.getDatabase(), cosmosContainerConfig.getRelationshipStatusContainer(), requestIdentity.getDataPartitionId(), doc);
}
catch (AppException exception) {
LOGGER.error("Status update failed for Record ID: {} with exception {}", relationshipStatus.getRawRecordId(), exception.getError().getMessage());
......
......@@ -29,7 +29,7 @@ public class MappingStoreImpl implements MappingStore {
try {
String content = blobStore.readFromStorageContainer(
DpsHeaders.DATA_PARTITION_ID,
config.sharedTenantName(),
fileName + JSON_EXTENSION,
config.containerName());
ObjectMapper mapper = new ObjectMapper();
......
......@@ -18,6 +18,7 @@ azure-client-id=${AZURE_CLIENT_ID}
azure-client-secret=${AZURE_CLIENT_SECRET}
azure-tenant-id=${AZURE_TENANT_ID}
azure-app-resource-id=${AZURE_APP_RESOURCE_ID}
azure.activedirectory.app-resource-id=${AZURE_APP_RESOURCE_ID}
# Azure CosmosDB configuration
wks.azure.cosmosdb.database=${cosmosdb_database}
......@@ -25,15 +26,20 @@ wks.azure.cosmosdb.relationshipStatuscontainer=RelationshipStatus
# Cosmosdb container name
tenantInfo.container.name=TenantInfo
# Adding this because of older version of core-lib-azure(0.0.23) used in wks
azure.cosmosdb.database=${cosmosdb_database}
# TenantFactory to fetch list of partitions service subscribes to
tenantFactoryImpl.required=true
## Adding this because of older version of core-lib-azure(0.0.23) used in wks
#azure.cosmosdb.database=${cosmosdb_database}
# Azure KeyVault configuration
azure.keyvault.url=${KEYVAULT_URI}
# Azure Storage configuration
azure.storage.account-name=${storage_account}
azure.storage.container-name=${storage_container}
azure.storage.enable-https=true
azure.blobStore.required=true
# Azure Service Bus configuration
azure.servicebus.topic-name=${servicebus_topic_name}
......@@ -45,12 +51,18 @@ azure.application-insights.instrumentation-key=${appinsights_key}
# Application name
spring.application.name=wks-azure
# Shared Tenant which contains mapping files
shared_tenant=${default_tenant}
# Storage service
STORAGE_API=${storage_service_endpoint}
# Search service
SEARCH_API=${search_service_endpoint}
# Partition service
PARTITION_API=${partition_service_endpoint}
# Specifies the number of threads to be created on the thread pool
executor-n-threads=${executor_n_threads}
......
......@@ -27,48 +27,6 @@ public class AzureBootstrapConfigTest {
@Mock
private SecretClient kv;
@Test
public void kvSecretChecksForNullResponse() {
doReturn(null).when(kv).getSecret("secret-name");
IllegalStateException exception = assertThrows(IllegalStateException.class, () ->{
bootstrapConfig.getKeyVaultSecret(kv, "secret-name");
});
assertEquals("No secret found with name secret-name", exception.getMessage());
}
@Test
public void kvSecretChecksForNullValueWithinResponse() {
doReturn(null).when(secret).getValue();
doReturn(secret).when(kv).getSecret("secret-name");
IllegalStateException exception = assertThrows(IllegalStateException.class, () ->{
bootstrapConfig.getKeyVaultSecret(kv, "secret-name");
});
assertEquals("Secret unexpectedly missing from KeyVault response for secret with name secret-name", exception.getMessage());
}
@Test
public void configReturnsCorrectSecretCosmosKey() {
doReturn("opendes-cosmos-key-secret").when(secret).getValue();
doReturn(secret).when(kv).getSecret("opendes-cosmos-primary-key");
String secretValue = bootstrapConfig.cosmosKey(kv);
assertEquals( "opendes-cosmos-key-secret", secretValue);
}
@Test
public void configReturnsCorrectSecretCosmosEndpoint() {
doReturn("opendes-cosmos-endpoint-secret").when(secret).getValue();
doReturn(secret).when(kv).getSecret("opendes-cosmos-endpoint");
String secretValue = bootstrapConfig.cosmosEndpoint(kv);
assertEquals( "opendes-cosmos-endpoint-secret", secretValue);
}
@Test
public void azureServicePrincipalShouldNotBeNull() {
......
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.servicebus.ISubscriptionClientFactory;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionClientFactoryTest {
@InjectMocks
private SubscriptionClientFactory subscriptionClientFactory;
private SubscriptionClientFactory subsClientFactory;
@Mock
private SubscriptionClient subscriptionClient;
@Mock
private ISubscriptionClientFactory subscriptionClientFactory;
@Mock
private AzureBootstrapConfig azureBootstrapConfig;
private static final String sbTopic = "testTopic";
private static final String sbSubscription = "testSubscription";
private static final String dataPartition = "testPartition";
@Test
public void subscriptionClientShouldNotBeNull() {
SubscriptionClient result = subscriptionClientFactory.getSubscriptionClient();
public void subscriptionClientShouldNotBeNull() throws ServiceBusException, InterruptedException {
when(azureBootstrapConfig.getServiceBusTopic()).thenReturn(sbTopic);
when(azureBootstrapConfig.getServiceBusTopicSubscription()).thenReturn(sbSubscription);
when(subscriptionClientFactory.getClient(dataPartition,sbTopic,sbSubscription)).thenReturn(subscriptionClient);
SubscriptionClient result = subsClientFactory.getSubscriptionClient(dataPartition);
assertNotNull(result);
assertEquals(subscriptionClient, result);
}
......
......@@ -8,8 +8,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
......@@ -40,18 +44,27 @@ public class SubscriptionManagerImplTest {
@Mock
private SubscriptionClient subscriptionClient;
@Mock
private ITenantFactory tenantFactory;
private static final String dataPartition = "testTenant";
@BeforeEach
public void init() {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setDataPartitionId(dataPartition);
when(azureBootstrapConfig.getMaxConcurrentCalls()).thenReturn(maxConcurrentCalls);
when(azureBootstrapConfig.getNThreads()).thenReturn(nThreads);
when(azureBootstrapConfig.getMaxLockRenewDurationInSeconds()).thenReturn(maxLockRenewDuration);
when(tenantFactory.listTenantInfo()).thenReturn(Collections.singletonList(tenantInfo));
}
@Test
public void shouldSuccessfullyRegisterMessageHandler() throws ServiceBusException, InterruptedException {
doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any());
when(subscriptionClientFactory.getSubscriptionClient()).thenReturn(subscriptionClient);
when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
subscriptionManager.subscribeRecordsChangeEvent();
......@@ -64,7 +77,7 @@ public class SubscriptionManagerImplTest {
public void shouldThrowExceptionIfErrorWhileRegisteringMessageHandler() throws ServiceBusException, InterruptedException {
doThrow(new InterruptedException(errorMessage)).when(subscriptionClient).registerMessageHandler(any(), any(), any());
when(subscriptionClientFactory.getSubscriptionClient()).thenReturn(subscriptionClient);
when(subscriptionClientFactory.getSubscriptionClient(dataPartition)).thenReturn(subscriptionClient);
subscriptionManager.subscribeRecordsChangeEvent();
......
......@@ -6,7 +6,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.wks.config.RequestIdentity;