Skip to content
Snippets Groups Projects
Commit 5a564dd7 authored by Akansha Rajput[Microsoft]'s avatar Akansha Rajput[Microsoft]
Browse files

Legal tag update event grid

parent 42e62441
No related branches found
No related tags found
2 merge requests!138Lock down maven,!121Legal tag update event grid
package org.opengroup.osdu.legal.azure.di;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventGridConfig {
private final boolean publishToEventGridEnabled;
// The Event Grid Event can be a maximum of 1MB. The batch size manipulation will impact the costing.
private final Integer eventGridBatchSize;
private final String topicName;
public EventGridConfig(@Value("#{new Boolean('${azure.publishToEventGrid}')}") boolean publish,
@Value("#{new Integer('${azure.eventGridBatchSize}')}") int batchSize,
@Value("${azure.eventGrid.topicName}") String topicName) {
if (publish) {
if ((topicName.isEmpty() || batchSize <= 0)) {
throw new RuntimeException("Missing EventGrid Configuration");
}
}
this.publishToEventGridEnabled = publish;
this.eventGridBatchSize = batchSize;
this.topicName = topicName;
}
public boolean isPublishingToEventGridEnabled() {
return publishToEventGridEnabled;
}
public String getTopicName() {
return topicName;
}
public int getEventGridBatchSize() {
return eventGridBatchSize;
}
}
......@@ -16,22 +16,39 @@ package org.opengroup.osdu.legal.azure.jobs;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import com.microsoft.azure.servicebus.Message;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.legal.StatusChangedTags;
import org.opengroup.osdu.legal.azure.di.EventGridConfig;
import org.opengroup.osdu.legal.provider.interfaces.ILegalTagPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.inject.Named;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
@Component
public class LegalTagPublisherImpl implements ILegalTagPublisher {
private final static String LEGAL_TAGS_CHANGE_EVENT_SUBJECT = "legaltagschanged";
private final static String LEGAL_TAGS_CHANGE_EVENT_TYPE = "legaltagschanged";
private final static String LEGAL_TAGS_CHANGE_EVENT_DATA_VERSION = "1.0";
@Autowired
private EventGridConfig eventGridConfig;
@Autowired
private EventGridTopicStore eventGridTopicStore;
@Inject
private ITopicClientFactory topicClientFactory;
......@@ -45,6 +62,52 @@ public class LegalTagPublisherImpl implements ILegalTagPublisher {
@Override
public void publish(String projectId, DpsHeaders headers, StatusChangedTags tags) throws Exception {
publishToServiceBus(projectId, headers, tags);
if (eventGridConfig.isPublishingToEventGridEnabled()) {
publishToEventGrid(headers, tags);
}
}
private void publishToServiceBus(String projectId, DpsHeaders headers, StatusChangedTags tags) {
Message message = createMessage(headers, tags);
try {
logger.info("Storage publishes message " + headers.getCorrelationId());
topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private void publishToEventGrid(DpsHeaders headers, StatusChangedTags tags) {
if (tags == null) {
return;
}
HashMap<String, Object> data = new HashMap<>();
List<EventGridEvent> eventsList = new ArrayList<>();
data.put("data", tags);
data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
data.put(DpsHeaders.USER_EMAIL, headers.getUserEmail());
String messageId = UUID.randomUUID().toString();
try {
eventsList.add(new EventGridEvent(
messageId,
LEGAL_TAGS_CHANGE_EVENT_SUBJECT,
data,
LEGAL_TAGS_CHANGE_EVENT_TYPE,
DateTime.now(),
LEGAL_TAGS_CHANGE_EVENT_DATA_VERSION
));
logger.info("Legal publishes tag changed event: " + data.get(DpsHeaders.CORRELATION_ID));
eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getTopicName(), eventsList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private Message createMessage(DpsHeaders headers, StatusChangedTags tags) {
Gson gson = new Gson();
Message message = new Message();
Map<String, Object> properties = new HashMap<>();
......@@ -71,11 +134,6 @@ public class LegalTagPublisherImpl implements ILegalTagPublisher {
message.setBody(jomsg.toString().getBytes(StandardCharsets.UTF_8));
message.setContentType("application/json");
try {
logger.info("Storage publishes message " + headers.getCorrelationId());
topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return message;
}
}
......@@ -47,6 +47,11 @@ azure.blobStore.required=true
# Azure Service Bus configuration
azure.servicebus.topic-name=${servicebus_topic_name}
# Azure Event Grid Configuration
azure.publishToEventGrid=false
azure.eventGridBatchSize=10
azure.eventGrid.topicName=legaltagschangedtopic
# Azure KeyVault configuration
azure.keyvault.url=${KEYVAULT_URI}
......
......@@ -16,6 +16,7 @@ package org.opengroup.osdu.legal.azure.jobs;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.MessageBody;
import com.microsoft.azure.servicebus.TopicClient;
......@@ -27,11 +28,14 @@ import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.legal.StatusChangedTags;
import org.opengroup.osdu.legal.azure.di.EventGridConfig;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
......@@ -51,6 +55,12 @@ public class LegalTagPublisherImplTest {
@Mock
private ITopicClientFactory topicClientFactory;
@Mock
private EventGridTopicStore eventGridTopicStore;
@Mock
private EventGridConfig eventGridConfig;
@Mock
private TopicClient topicClient;
......@@ -69,6 +79,28 @@ public class LegalTagPublisherImplTest {
doReturn(topicClient).when(topicClientFactory).getClient(eq(PARTITION_ID), any());
}
@Test
public void should_publishToEventGrid_WhenFlagIsSet() throws Exception {
StatusChangedTags tags = new StatusChangedTags();
ArgumentCaptor<String> partitionNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> topicNameArgumentCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<List<EventGridEvent>> listEventGridEventArgumentCaptor = ArgumentCaptor.forClass(List.class);
doNothing().when(this.eventGridTopicStore).publishToEventGridTopic(
partitionNameCaptor.capture(), topicNameArgumentCaptor.capture(), listEventGridEventArgumentCaptor.capture()
);
when(this.eventGridConfig.isPublishingToEventGridEnabled()).thenReturn(true);
when(this.eventGridConfig.getTopicName()).thenReturn("legaltagschangedtopic");
sut.publish("project-id", headers, tags);
verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(any(), any(), anyList());
assertEquals(1, listEventGridEventArgumentCaptor.getValue().size());
assertEquals(topicNameArgumentCaptor.getValue(), "legaltagschangedtopic");
assertEquals(partitionNameCaptor.getValue(), PARTITION_ID);
}
@Test
public void testPublishLegalTag() throws Exception {
StatusChangedTags tags = new StatusChangedTags();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment