Commit 8af9ff7d authored by harshit aggarwal's avatar harshit aggarwal
Browse files

Initial commit

parent 8a748417
Pipeline #4588 passed with stages
in 12 minutes and 22 seconds
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>os-wks</artifactId>
<groupId>org.opengroup.osdu</groupId>
<version>0.0.1</version>
<relativePath>../../</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>wks-azure</artifactId>
<description>WKS service on Azure</description>
<packaging>jar</packaging>
<properties>
<azure.version>2.1.7</azure.version>
<azure.appservice.resourcegroup></azure.appservice.resourcegroup>
<azure.appservice.plan></azure.appservice.plan>
<azure.appservice.appname></azure.appservice.appname>
<azure.appservice.subscription></azure.appservice.subscription>
</properties>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-active-directory-spring-boot-starter</artifactId>
<version>${azure.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-wks-core</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.11</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<version>0.0.11</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--
Override the spring-boot version of these dependencies to the ones
required by the azure-core library. This needs to be done for each
app that depends on this library
-->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>2.23.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>${gitlab-server}</id>
<url>https://community.opengroup.org/api/v4/groups/17/-/packages/maven</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>${gitlab-server}</id>
<url>https://community.opengroup.org/api/v4/projects/44/packages/maven</url>
</repository>
<snapshotRepository>
<id>${gitlab-server}</id>
<url>https://community.opengroup.org/api/v4/projects/44/packages/maven</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>spring-boot</classifier>
<mainClass>org.opengroup.osdu.wks.WksServiceApplication</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package org.opengroup.osdu.wks.provider.azure.constants;
public class Constants {
public static final String BEARER= "Bearer ";
public static final String CORRELATION_ID = "correlation-id";
public static final String DATA = "data";
public static final String DATA_PARTITION_ID = "data-partition-id";
public static final String JSON_EXTENSION = ".json";
}
package org.opengroup.osdu.wks.provider.azure.credentials;
import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ClientCredentialParameters;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.opengroup.osdu.wks.provider.interfaces.UserCredential;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@Configuration
public class AzureServicePrincipal implements UserCredential {
@Value("${azure-client-id}")
private String clientId;
@Value("${azure-client-secret}")
private String clientSecret;
@Value("${azure-tenant-id}")
private String tenantId;
@Value("${azure-app-resource-id}")
private String appResourceId;
@Override
public String getIdToken(TenantInfo tenant) throws ApplicationException {
String authorityUrl = "https://login.microsoftonline.com/" + tenantId;
String scope = appResourceId + "/.default";
try {
ConfidentialClientApplication app = ConfidentialClientApplication.builder(
clientId,
ClientCredentialFactory.create(clientSecret))
.authority(authorityUrl)
.build();
ClientCredentialParameters clientCredentialParam = ClientCredentialParameters.builder(
Collections.singleton(scope))
.build();
CompletableFuture<IAuthenticationResult> future = app.acquireToken(clientCredentialParam);
return Constants.BEARER + future.get().accessToken();
} catch (Exception e) {
throw new ApplicationException(e.getMessage());
}
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.wks.provider.azure.di;
import com.azure.security.keyvault.secrets.SecretClient;
import com.azure.security.keyvault.secrets.models.KeyVaultSecret;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.inject.Named;
@Configuration
public class AzureBootstrapConfig {
@Value("${azure.storage.account-name}")
private String storageAccount;
@Value("${azure.storage.container-name}")
private String storageContainer;
@Value("${azure.servicebus.topic-name}")
private String serviceBusTopic;
@Value("${azure.servicebus.connection-string}")
private String serviceBusConnectionString;
@Value("${azure.servicebus.topic-subscription}")
private String serviceBusTopicSubscription;
@Value("${azure.keyvault.url}")
private String keyVaultURL;
@Value("${azure.cosmosdb.database}")
private String cosmosDBName;
@Bean
@Named("STORAGE_ACCOUNT_NAME")
public String storageAccount() {
return storageAccount;
}
@Bean
@Named("STORAGE_CONTAINER_NAME")
public String containerName() {
return storageContainer;
}
@Bean
@Named("KEY_VAULT_URL")
public String keyVaultURL() {
return keyVaultURL;
}
@Bean
@Named("COSMOS_DB_NAME")
public String cosmosDBName() {
return cosmosDBName;
}
@Bean
@Named("COSMOS_ENDPOINT")
public String cosmosEndpoint(SecretClient kv) {
return getKeyVaultSecret(kv, "cosmos-endpoint");
}
@Bean
@Named("COSMOS_KEY")
public String cosmosKey(SecretClient kv) {
return getKeyVaultSecret(kv, "cosmos-primary-key");
}
@Bean
@Named("SUBSCRIPTION_CLIENT")
public SubscriptionClient subscriptionClient() {
String entityPath = serviceBusTopic + "/subscriptions/" + serviceBusTopicSubscription;
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(
serviceBusConnectionString,
entityPath
);
SubscriptionClient subscriptionClient;
try {
subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);
} catch (InterruptedException | ServiceBusException e) {
throw new AppException(500, "Server Error", "Unexpected error creating Subscription Client", e);
}
return subscriptionClient;
}
String getKeyVaultSecret(SecretClient kv, String secretName) {
KeyVaultSecret secret = kv.getSecret(secretName);
if (secret == null) {
throw new IllegalStateException(String.format("No secret found with name %s", secretName));
}
String secretValue = secret.getValue();
if (secretValue == null) {
throw new IllegalStateException(String.format(
"Secret unexpectedly missing from KeyVault response for secret with name %s", secretName));
}
return secretValue;
}
}
package org.opengroup.osdu.wks.provider.azure.di;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CosmosContainerConfig {
@Value("${tenantInfo.container.name}")
private String tenantInfoContainerName;
@Bean
public String tenantInfoContainer(){
return tenantInfoContainerName;
}
}
\ No newline at end of file
package org.opengroup.osdu.wks.provider.azure.di;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
@Component
public class TenantFactoryImpl implements ITenantFactory {
@Autowired
private String tenantInfoContainer;
@Autowired
private String cosmosDBName;
@Autowired
private CosmosStore cosmosStore;
private Map<String, TenantInfo> tenants;
public boolean exists(String tenantName)
{
if (this.tenants == null)
initTenants();
return this.tenants.containsKey(tenantName);
}
public TenantInfo getTenantInfo(String tenantName) {
if (this.tenants == null)
initTenants();
return this.tenants.get(tenantName);
}
public Collection<TenantInfo> listTenantInfo() {
if (this.tenants == null)
initTenants();
return this.tenants.values();
}
public <V> ICache<String, V> createCache(String tenantName, String host, int port, int expireTimeSeconds, Class<V> classOfV)
{
return null;
}
public void flushCache() {}
private void initTenants() {
this.tenants = new HashMap<>();
cosmosStore.findAllItems(Constants.DATA_PARTITION_ID, cosmosDBName, tenantInfoContainer, TenantInfoDoc.class).forEach(
tenantInfoDoc -> {
TenantInfo ti = new TenantInfo();
String tenantName = tenantInfoDoc.getId();
ti.setName(tenantName);
this.tenants.put(tenantName, ti);
}
);
}
}
\ No newline at end of file
package org.opengroup.osdu.wks.provider.azure.di;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
public class TenantInfoDoc {
private String id;
private List<String> groups;
}
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.SubscriptionClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CreateSubscriptionClient {
@Autowired
private SubscriptionClient subscriptionClient;
public SubscriptionClient getSubscriptionClient() {
return subscriptionClient;
}
}
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.SubscriptionClient;
import lombok.extern.java.Log;
import java.util.concurrent.CompletableFuture;
@Log
public class MessageHandler implements IMessageHandler {
private final SubscriptionClient receiveClient;
private final ProcessWKSTransform processWKSTransform;
public MessageHandler(SubscriptionClient client, ProcessWKSTransform processWKSTransform) {
this.receiveClient = client;
this.processWKSTransform = processWKSTransform;
}
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
boolean ack = this.processWKSTransform.initiateWksTransformation(message);
if(ack) {
return receiveClient.completeAsync(message.getLockToken());
}
else {
return receiveClient.abandonAsync(message.getLockToken());
}
}
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
log.severe(exceptionPhase + "-" + throwable.getMessage());
}
}
package org.opengroup.osdu.wks.provider.azure.pubsub;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.microsoft.azure.servicebus.IMessage;
import lombok.extern.java.Log;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.BadRequestException;
import org.opengroup.osdu.wks.model.RawRecordDetails;
import org.opengroup.osdu.wks.provider.azure.constants.Constants;
import org.opengroup.osdu.wks.service.WKSService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import java.util.logging.Level;
import static java.nio.charset.StandardCharsets.UTF_8;
@Component
@Log
public class ProcessWKSTransform {
@Autowired
private WKSService wKSService;
public boolean initiateWksTransformation(IMessage message) {
String dataPartitionId = message.getProperties().get(Constants.DATA_PARTITION_ID).toString();
String correlationId = message.getProperties().get(Constants.CORRELATION_ID).toString();
try {
RawRecordDetails[] rawRecordDetails = retrieveDataFromMessage(message);
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
} catch (BadRequestException e) {
log.severe(e.getErrorMsg());
log.log(Level.SEVERE, String.format("Bad Request Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()));
} catch (ApplicationException e) {
log.severe(e.getErrorMsg());
log.log(Level.SEVERE, String.format("Application Error Reason: %s, pubsub message id: %s", e.getErrorMsg(),
message.getMessageId()));
return false;
}
return true;
}
private RawRecordDetails[] retrieveDataFromMessage(IMessage message) throws BadRequestException {
String messageData = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
JsonElement jsonRoot = JsonParser.parseString(messageData);
JsonElement msg = jsonRoot.getAsJsonObject().get("message");
if (msg == null) {
throw new BadRequestException(HttpStatus.BAD_REQUEST,"Invalid record change message, message object not found");
}
String dataValue = msg.getAsJsonObject().get(Constants.DATA).toString();
return getRecordsChangeData(dataValue);
}
private RawRecordDetails[] getRecordsChangeData(String data) throws BadRequestException {
if (data.equals("[]")) {
throw new BadRequestException(HttpStatus.BAD_REQUEST,"Invalid record change message, message data not found");