Commit 04ffc1fd authored by Komal Makkar's avatar Komal Makkar Committed by Kishore Battula
Browse files

The subscription Client creation is extracted to a new method so that UTs can...

The subscription Client creation is extracted to a new method so that UTs can have the subscription client creation mocked.
parent 891c6537
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.6.0-SNAPSHOT</version>
<version>0.6.1</version>
<name>core-lib-azure</name>
<properties>
......@@ -334,6 +334,4 @@
</plugin>
</plugins>
</build>
</project>
......@@ -18,9 +18,12 @@ import com.microsoft.azure.eventgrid.EventGridClient;
import com.microsoft.azure.eventgrid.TopicCredentials;
import com.microsoft.azure.eventgrid.implementation.EventGridClientImpl;
import org.opengroup.osdu.azure.cache.EventGridTopicClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -29,20 +32,23 @@ import org.springframework.stereotype.Component;
*/
@Component
public class EventGridTopicClientFactoryImpl implements IEventGridTopicClientFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(EventGridTopicClientFactoryImpl.class.getName());
@Autowired
private PartitionServiceClient partitionService;
private PartitionServiceEventGridClient partitionService;
@Autowired
private EventGridTopicClientCache clientCache;
/**
*
* @param dataPartitionId Data partition id
* @param topicName Topic Name
* @param topicName Topic name
* @return EventGridClient
* @throws PartitionException partitionException
*/
@Override
public EventGridClient getClient(final String dataPartitionId, final TopicName topicName) {
public EventGridClient getClient(final String dataPartitionId, final String topicName) throws PartitionException {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
Validators.checkNotNull(topicName, "topicName");
......@@ -50,16 +56,15 @@ public class EventGridTopicClientFactoryImpl implements IEventGridTopicClientFac
if (this.clientCache.containsKey(cacheKey)) {
return this.clientCache.get(cacheKey);
}
EventGridTopicPartitionInfoAzure eventGridTopicPartitionInfoAzure =
this.partitionService.getEventGridTopicInPartition(dataPartitionId, topicName);
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
TopicCredentials topicCredentials = null;
if (topicName == TopicName.RECORDS_CHANGED) {
topicCredentials = new TopicCredentials(pi.getEventGridRecordsTopicAccessKey());
}
TopicCredentials topicCredentials =
new TopicCredentials(eventGridTopicPartitionInfoAzure.getTopicAccessKey());
EventGridClient eventGridClient = new EventGridClientImpl(topicCredentials);
this.clientCache.put(cacheKey, eventGridClient);
return eventGridClient;
}
}
}
\ No newline at end of file
......@@ -16,10 +16,14 @@ package org.opengroup.osdu.azure.eventgrid;
import com.microsoft.azure.eventgrid.EventGridClient;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import lombok.SneakyThrows;
import org.opengroup.osdu.azure.cosmosdb.CosmosStoreBulkOperations;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -55,33 +59,32 @@ import java.util.List;
@Component
public class EventGridTopicStore {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosStoreBulkOperations.class.getName());
@Autowired
private IEventGridTopicClientFactory eventGridTopicClientFactory;
@Autowired
private ILogger logger;
@Autowired
private PartitionServiceClient partitionService;
private PartitionServiceEventGridClient eventGridPartitionClient;
/**
* @param dataPartitionId Data partition id
* @param topicName Topic name
* @param eventsList List of Event Grid Events
*/
public void publishToEventGridTopic(final String dataPartitionId, final TopicName topicName, final List<EventGridEvent> eventsList) {
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
@SneakyThrows
public void publishToEventGridTopic(final String dataPartitionId, final String topicName, final List<EventGridEvent> eventsList) {
EventGridTopicPartitionInfoAzure eventGridTopicPartitionInfoAzure = this.eventGridPartitionClient.getEventGridTopicInPartition(dataPartitionId, topicName);
String endpoint = "";
if (topicName == TopicName.RECORDS_CHANGED) {
try {
endpoint = String.format("https://%s/", new URI(pi.getEventGridRecordsTopicEndpoint()).getHost());
} catch (URISyntaxException e) {
throw new AppException(500, "Invalid Event Grid endpoint URI", "PartitionInfo for eventgrid-recordstopic " + pi.getEventGridRecordsTopicEndpoint(), e);
}
String endpoint;
try {
endpoint = String.format("https://%s/", new URI(eventGridTopicPartitionInfoAzure.getTopicName()).getHost());
} catch (URISyntaxException e) {
throw new AppException(500, "Invalid Event Grid endpoint URI", "PartitionInfo for Event Grid Topic " + topicName, e);
}
EventGridClient eventGridClient = eventGridTopicClientFactory.getClient(dataPartitionId, topicName);
eventGridClient.publishEvents(endpoint, eventsList);
}
}
......@@ -15,6 +15,7 @@
package org.opengroup.osdu.azure.eventgrid;
import com.microsoft.azure.eventgrid.EventGridClient;
import org.opengroup.osdu.core.common.partition.PartitionException;
/**
* Interface for Event Grid Topic client factory to return appropriate
......@@ -23,9 +24,11 @@ import com.microsoft.azure.eventgrid.EventGridClient;
public interface IEventGridTopicClientFactory {
/**
*
* @param dataPartitionId Data partition id
* @param topicName Topic name
* @return EventGridClient
* @throws PartitionException partitionException
*/
EventGridClient getClient(String dataPartitionId, TopicName topicName);
EventGridClient getClient(String dataPartitionId, String topicName) throws PartitionException;
}
......@@ -12,12 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.eventgrid;
package org.opengroup.osdu.azure.partition;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Topic Name Enum.
* The Topic Stores and client are controlled by the same.
* Azure event grid topic data partition variables.
*/
public enum TopicName {
RECORDS_CHANGED
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventGridTopicPartitionInfoAzure {
private String topicName;
private String topicAccessKey;
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.partition;
import com.azure.security.keyvault.secrets.SecretClient;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.util.AzureServicePrincipleTokenService;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.partition.IPartitionFactory;
import org.opengroup.osdu.core.common.partition.IPartitionProvider;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.opengroup.osdu.core.common.partition.PartitionInfo;
import org.opengroup.osdu.core.common.partition.Property;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
/**
* Partition service client for Event Grid implementation.
*/
@Service
@Lazy
public class PartitionServiceEventGridClient {
private static final String ACCESS_KEY_REGEX = "^eventgrid-([a-zA-Z0-9]*)topic-accesskey$";
private static final String TOPIC_NAME_REGEX = "^eventgrid-([a-zA-Z0-9]*)topic$";
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionServiceEventGridClient.class.getName());
private final Gson gson = new Gson();
@Autowired
private IPartitionFactory partitionFactory;
@Autowired
private SecretClient secretClient;
@Autowired
private AzureServicePrincipleTokenService tokenService;
@Autowired
private DpsHeaders headers;
/**
* Get TopicInfo for a given topic.
*
* @param partitionId partitionId
* @param topicName topicName
* @return EventGridTopicPartitionInfoAzure
* @throws AppException exception from the configuration
* @throws PartitionException AppException Exception thrown by {@link IPartitionFactory}
*/
public EventGridTopicPartitionInfoAzure getEventGridTopicInPartition(final String partitionId, final String topicName) throws AppException, PartitionException {
Validators.checkNotNullAndNotEmpty(partitionId, "partitionId");
Validators.checkNotNullAndNotEmpty(topicName, "topicName");
Map<String, EventGridTopicPartitionInfoAzure> eventGridTopicPartitionInfoAzure = getAllEventGridTopicsInPartition(partitionId);
if (!eventGridTopicPartitionInfoAzure.containsKey(topicName)) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Invalid EventGrid Partition configuration for the partition " + partitionId, "Please refer to wiki here <>");
}
return eventGridTopicPartitionInfoAzure.get(topicName);
}
/**
* Get partition info.
*
* @param partitionId Partition Id
* @return Partition info
* @throws AppException Exception thrown by {@link IPartitionFactory}
* @throws PartitionException Exception thrown by {@link IPartitionFactory}
*/
Map<String, EventGridTopicPartitionInfoAzure> getAllEventGridTopicsInPartition(final String partitionId) throws AppException, PartitionException {
PartitionInfo partitionInfo = getPartitionInfo(partitionId);
Map<String, Property> propertyMap = partitionInfo.getProperties();
Map<String, EventGridTopicPartitionInfoAzure> topics = new HashMap<>();
for (Map.Entry<String, Property> property : propertyMap.entrySet()) {
if (isEventGridProperty(property)) {
StringTokenizer stringTokenizer = new StringTokenizer(property.getKey(), "-");
if (stringTokenizer.countTokens() == 2) {
addEventGridTopicName(topics, property, stringTokenizer);
} else if (stringTokenizer.countTokens() == 3) {
addEventGridAccessKey(topics, property, stringTokenizer);
} else {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Invalid EventGrid Partition configuration for the partition " + partitionId, "Please reconfigure the partition service");
}
}
}
return topics;
}
/**
* @param partitionId partitionId
* @return PartitionInfo
* @throws PartitionException Exception thrown by {@link IPartitionFactory}
*/
PartitionInfo getPartitionInfo(final String partitionId) throws PartitionException {
IPartitionProvider serviceClient = getServiceClient();
return serviceClient.get(partitionId);
}
/**
* Util to identify if property is for event grid topic.
*
* @param property property
* @return isEventGridProperty
*/
private boolean isEventGridProperty(final Map.Entry<String, Property> property) {
return property.getKey().matches((ACCESS_KEY_REGEX)) || property.getKey().matches((TOPIC_NAME_REGEX));
}
/**
* Populate the map.
*
* @param topics topics
* @param property properties
* @param stringTokenizer tokenizer util
*/
private void addEventGridAccessKey(final Map<String, EventGridTopicPartitionInfoAzure> topics,
final Map.Entry<String, Property> property,
final StringTokenizer stringTokenizer) {
stringTokenizer.nextToken();
String secret = getSecretValue(property);
String key = stringTokenizer.nextToken();
if (topics.containsKey(key)) {
EventGridTopicPartitionInfoAzure topic = topics.get(key);
topic.setTopicAccessKey(secret);
} else {
EventGridTopicPartitionInfoAzure topic = new EventGridTopicPartitionInfoAzure();
topic.setTopicAccessKey(secret);
topics.put(key, topic);
}
}
/**
* Populate the map.
*
* @param topics topics
* @param property properties
* @param stringTokenizer tokenizer util
*/
private void addEventGridTopicName(final Map<String, EventGridTopicPartitionInfoAzure> topics,
final Map.Entry<String, Property> property,
final StringTokenizer stringTokenizer) {
stringTokenizer.nextToken();
String key = stringTokenizer.nextToken();
String secretValue = getSecretValue(property);
if (topics.containsKey(key)) {
EventGridTopicPartitionInfoAzure topic = topics.get(key);
topic.setTopicName(secretValue);
} else {
EventGridTopicPartitionInfoAzure topic = new EventGridTopicPartitionInfoAzure();
topic.setTopicName(secretValue);
topics.put(key, topic);
}
}
/**
* Get the secret value from KeyVault.
* @param property property
* @return secret
*/
private String getSecretValue(final Map.Entry<String, Property> property) {
JsonElement jsonElement = gson.toJsonTree(property.getValue());
Property p = gson.fromJson(jsonElement, Property.class);
if (p.isSensitive()) {
return KeyVaultFacade.getSecretWithValidation(this.secretClient, String.valueOf(p.getValue()));
}
return String.valueOf(p.getValue());
}
/**
* Get Service client for Partition Service.
*
* @return PartitionServiceClient
*/
private IPartitionProvider getServiceClient() {
this.headers.put(DpsHeaders.AUTHORIZATION, "Bearer " + this.tokenService.getAuthorizationToken());
return this.partitionFactory.create(headers);
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
......@@ -22,22 +23,25 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.cache.EventGridTopicClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.opengroup.osdu.core.common.partition.Property;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class EventGridTopicClientFactoryImplTest {
private static final String VALID_TOPIC_NAME = "RecordsChanged";
private static final String VALID_TOPIC_NAME = "eventgrid-validtopic";
private static final String VALID_TOPICKEY_NAME = "eventgrid-validtopic-accesskey";
private static final String VALID_DATA_PARTIION_ID = "validDataPartitionId";
@Mock
private PartitionServiceClient partitionService;
private PartitionServiceEventGridClient partitionService;
@InjectMocks
private EventGridTopicClientFactoryImpl sut;
......@@ -49,7 +53,7 @@ class EventGridTopicClientFactoryImplTest {
public void should_throwException_given_nullDataPartitionId() {
NullPointerException nullPointerException = Assertions.assertThrows(NullPointerException.class,
() -> this.sut.getClient(null, TopicName.RECORDS_CHANGED));
() -> this.sut.getClient(null, "recordsChanged"));
assertEquals("dataPartitionId cannot be null!", nullPointerException.getMessage());
}
......@@ -57,7 +61,7 @@ class EventGridTopicClientFactoryImplTest {
public void should_throwException_given_emptyDataPartitionId() {
IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class,
() -> this.sut.getClient("", TopicName.RECORDS_CHANGED));
() -> this.sut.getClient("", "recordsChanged"));
assertEquals("dataPartitionId cannot be empty!", illegalArgumentException.getMessage());
}
......@@ -70,20 +74,21 @@ class EventGridTopicClientFactoryImplTest {
}
@Test
public void should_return_validClient_given_validPartitionId() {
public void should_return_validClient_given_validPartitionId() throws PartitionException {
// Setup
when(this.partitionService.getPartition(VALID_DATA_PARTIION_ID)).thenReturn(
PartitionInfoAzure.builder()
.idConfig(Property.builder().value(VALID_DATA_PARTIION_ID).build())
.eventGridRecordsTopicAccessKeyConfig(Property.builder().value(VALID_TOPIC_NAME).build()).build());
when(this.partitionService.getEventGridTopicInPartition(VALID_DATA_PARTIION_ID, "validtopic")).thenReturn(
EventGridTopicPartitionInfoAzure.builder()
.topicName(VALID_TOPIC_NAME)
.topicAccessKey(VALID_TOPICKEY_NAME).build());
when(this.clientCache.containsKey(any())).thenReturn(false);
// Act
EventGridClient eventGridClient = this.sut.getClient(VALID_DATA_PARTIION_ID, TopicName.RECORDS_CHANGED);
EventGridClient eventGridClient = this.sut.getClient(VALID_DATA_PARTIION_ID, "validtopic");
// Assert
assertNotNull(eventGridClient);
verify(this.clientCache, times(1)).put(any(), any());
}
}
\ No newline at end of file
}
......@@ -22,11 +22,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.partition.Property;
import org.opengroup.osdu.core.common.partition.PartitionException;
import java.util.ArrayList;
......@@ -37,23 +37,20 @@ import static org.mockito.MockitoAnnotations.initMocks;
@ExtendWith(MockitoExtension.class)
class EventGridTopicStoreTest {
private static final String VALID_DATA_PARTIION_ID = "validDataPartitionId";
private static final String VALID_TOPIC = "validTopic";
private static final String INVALID_URI = "://invalidUri";
private static final String VALID_KEY = "validkey";
@Mock
EventGridTopicClientFactoryImpl eventGridTopicClientFactory;
@Mock
EventGridClient eventGridClient;
@InjectMocks
EventGridTopicStore sut;
@Mock
private ILogger logger;
@Mock
private PartitionServiceClient partitionService;
@InjectMocks
EventGridTopicStore sut;
private static final String VALID_DATA_PARTIION_ID = "validDataPartitionId";
private static final String INVALID_URI = "://invalidUri";
private PartitionServiceEventGridClient partitionService;
@BeforeEach
void setUp() {
......@@ -61,28 +58,31 @@ class EventGridTopicStoreTest {
}
@Test
public void should_throwException_given_invalidURI() {
when(this.partitionService.getPartition(VALID_DATA_PARTIION_ID)).thenReturn(
PartitionInfoAzure.builder()
.idConfig(Property.builder().value(VALID_DATA_PARTIION_ID).build())
.eventGridRecordsTopicEndpointConfig(Property.builder().value(INVALID_URI).build()).build());
public void should_throwException_given_invalidURI() throws PartitionException {
doReturn(EventGridTopicPartitionInfoAzure.builder()
.topicName(INVALID_URI)
.topicAccessKey(VALID_KEY).build())
.when(this.partitionService).getEventGridTopicInPartition(anyString(), anyString());
AppException appException = Assertions.assertThrows(AppException.class,
() -> this.sut.publishToEventGridTopic(VALID_DATA_PARTIION_ID, TopicName.RECORDS_CHANGED, new ArrayList<>()));
assertEquals("PartitionInfo for eventgrid-recordstopic ://invalidUri", appException.getError().getMessage());
() -> this.sut.publishToEventGridTopic(VALID_DATA_PARTIION_ID, VALID_TOPIC, new ArrayList<>()));
assertEquals("PartitionInfo for Event Grid Topic " + VALID_TOPIC, appException.getError().getMessage());
verify(this.eventGridClient, times(0)).publishEvents(any(), any());
}
@Test
public void should_should_invoke_publishEvents() {
when(this.partitionService.getPartition(VALID_DATA_PARTIION_ID)).thenReturn(
PartitionInfoAzure.builder()
.idConfig(Property.builder().value(VALID_DATA_PARTIION_ID).build())
.eventGridRecordsTopicEndpointConfig(Property.builder().value("validURL").build()).build());
when(this.eventGridTopicClientFactory.getClient(VALID_DATA_PARTIION_ID, TopicName.RECORDS_CHANGED)).thenReturn(this.eventGridClient);
public void should_should_invoke_publishEvents() throws PartitionException {
doReturn(EventGridTopicPartitionInfoAzure.builder()
.topicName(VALID_TOPIC)
.topicAccessKey(VALID_KEY).build())
.when(this.partitionService).getEventGridTopicInPartition(anyString(), anyString());
when(this.eventGridTopicClientFactory.getClient(VALID_DATA_PARTIION_ID, "validTopic")).thenReturn(this.eventGridClient);
this.sut.publishToEventGridTopic(VALID_DATA_PARTIION_ID, TopicName.RECORDS_CHANGED, new ArrayList<>());
this.sut.publishToEventGridTopic(VALID_DATA_PARTIION_ID, "validTopic", new ArrayList<>());
verify(this.eventGridClient, times(1)).publishEvents(any(), any());
}
}
\ No newline at end of file
}