diff --git a/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/di/EventGridConfig.java b/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/di/EventGridConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..f69dc263fdc5a311db664331f38d2ff9bf05cd6a --- /dev/null +++ b/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/di/EventGridConfig.java @@ -0,0 +1,39 @@ +package org.opengroup.osdu.legal.azure.di; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class EventGridConfig { + private final boolean publishToEventGridEnabled; + + // The Event Grid Event can be a maximum of 1MB. The batch size manipulation will impact the costing. + private final Integer eventGridBatchSize; + + private final String topicName; + + public EventGridConfig(@Value("#{new Boolean('${azure.publishToEventGrid}')}") boolean publish, + @Value("#{new Integer('${azure.eventGridBatchSize}')}") int batchSize, + @Value("${azure.eventGrid.topicName}") String topicName) { + if (publish) { + if ((topicName.isEmpty() || batchSize <= 0)) { + throw new RuntimeException("Missing EventGrid Configuration"); + } + } + this.publishToEventGridEnabled = publish; + this.eventGridBatchSize = batchSize; + this.topicName = topicName; + } + + public boolean isPublishingToEventGridEnabled() { + return publishToEventGridEnabled; + } + + public String getTopicName() { + return topicName; + } + + public int getEventGridBatchSize() { + return eventGridBatchSize; + } +} diff --git a/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImpl.java b/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImpl.java index 72bb8c75770e93b6714306c4d559d35ff03554be..a1e22963a645d5d78577dddb2df13ef2d386f14c 100644 --- a/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImpl.java +++ b/provider/legal-azure/src/main/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImpl.java @@ -16,22 +16,39 @@ package org.opengroup.osdu.legal.azure.jobs; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.microsoft.azure.eventgrid.models.EventGridEvent; import com.microsoft.azure.servicebus.Message; +import org.joda.time.DateTime; +import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.legal.StatusChangedTags; +import org.opengroup.osdu.legal.azure.di.EventGridConfig; import org.opengroup.osdu.legal.provider.interfaces.ILegalTagPublisher; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.inject.Inject; import javax.inject.Named; import java.nio.charset.StandardCharsets; -import java.util.HashMap; +import java.util.UUID; import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; @Component public class LegalTagPublisherImpl implements ILegalTagPublisher { + private final static String LEGAL_TAGS_CHANGE_EVENT_SUBJECT = "legaltagschanged"; + private final static String LEGAL_TAGS_CHANGE_EVENT_TYPE = "legaltagschanged"; + private final static String LEGAL_TAGS_CHANGE_EVENT_DATA_VERSION = "1.0"; + + @Autowired + private EventGridConfig eventGridConfig; + + @Autowired + private EventGridTopicStore eventGridTopicStore; @Inject private ITopicClientFactory topicClientFactory; @@ -45,6 +62,52 @@ public class LegalTagPublisherImpl implements ILegalTagPublisher { @Override public void publish(String projectId, DpsHeaders headers, StatusChangedTags tags) throws Exception { + publishToServiceBus(projectId, headers, tags); + if (eventGridConfig.isPublishingToEventGridEnabled()) { + publishToEventGrid(headers, tags); + } + } + + private void publishToServiceBus(String projectId, DpsHeaders headers, StatusChangedTags tags) { + Message message = createMessage(headers, tags); + try { + logger.info("Storage publishes message " + headers.getCorrelationId()); + topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + private void publishToEventGrid(DpsHeaders headers, StatusChangedTags tags) { + if (tags == null) { + return; + } + HashMap<String, Object> data = new HashMap<>(); + List<EventGridEvent> eventsList = new ArrayList<>(); + data.put("data", tags); + data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); + data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + data.put(DpsHeaders.USER_EMAIL, headers.getUserEmail()); + String messageId = UUID.randomUUID().toString(); + + try { + eventsList.add(new EventGridEvent( + messageId, + LEGAL_TAGS_CHANGE_EVENT_SUBJECT, + data, + LEGAL_TAGS_CHANGE_EVENT_TYPE, + DateTime.now(), + LEGAL_TAGS_CHANGE_EVENT_DATA_VERSION + )); + logger.info("Legal publishes tag changed event: " + data.get(DpsHeaders.CORRELATION_ID)); + eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getTopicName(), eventsList); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + private Message createMessage(DpsHeaders headers, StatusChangedTags tags) { Gson gson = new Gson(); Message message = new Message(); Map<String, Object> properties = new HashMap<>(); @@ -71,11 +134,6 @@ public class LegalTagPublisherImpl implements ILegalTagPublisher { message.setBody(jomsg.toString().getBytes(StandardCharsets.UTF_8)); message.setContentType("application/json"); - try { - logger.info("Storage publishes message " + headers.getCorrelationId()); - topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + return message; } } diff --git a/provider/legal-azure/src/main/resources/application.properties b/provider/legal-azure/src/main/resources/application.properties index c7e8e407e101563f19100e2c760bd72fed48a1a7..8ddea15dc4aa366c24b32321cac9c1ff63954dca 100644 --- a/provider/legal-azure/src/main/resources/application.properties +++ b/provider/legal-azure/src/main/resources/application.properties @@ -47,6 +47,11 @@ azure.blobStore.required=true # Azure Service Bus configuration azure.servicebus.topic-name=${servicebus_topic_name} +# Azure Event Grid Configuration +azure.publishToEventGrid=false +azure.eventGridBatchSize=10 +azure.eventGrid.topicName=legaltagschangedtopic + # Azure KeyVault configuration azure.keyvault.url=${KEYVAULT_URI} diff --git a/provider/legal-azure/src/test/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImplTest.java b/provider/legal-azure/src/test/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImplTest.java index 5f71d4104d2c2b1818f805dc84ac0ed87f19f17b..c6ee72ce0c86e96a9ab4846672b18d12dc895427 100644 --- a/provider/legal-azure/src/test/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImplTest.java +++ b/provider/legal-azure/src/test/java/org/opengroup/osdu/legal/azure/jobs/LegalTagPublisherImplTest.java @@ -16,6 +16,7 @@ package org.opengroup.osdu.legal.azure.jobs; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.microsoft.azure.eventgrid.models.EventGridEvent; import com.microsoft.azure.servicebus.Message; import com.microsoft.azure.servicebus.MessageBody; import com.microsoft.azure.servicebus.TopicClient; @@ -27,11 +28,14 @@ import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.legal.StatusChangedTags; +import org.opengroup.osdu.legal.azure.di.EventGridConfig; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -51,6 +55,12 @@ public class LegalTagPublisherImplTest { @Mock private ITopicClientFactory topicClientFactory; + @Mock + private EventGridTopicStore eventGridTopicStore; + + @Mock + private EventGridConfig eventGridConfig; + @Mock private TopicClient topicClient; @@ -69,6 +79,28 @@ public class LegalTagPublisherImplTest { doReturn(topicClient).when(topicClientFactory).getClient(eq(PARTITION_ID), any()); } + @Test + public void should_publishToEventGrid_WhenFlagIsSet() throws Exception { + StatusChangedTags tags = new StatusChangedTags(); + + ArgumentCaptor<String> partitionNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> topicNameArgumentCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<List<EventGridEvent>> listEventGridEventArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(this.eventGridTopicStore).publishToEventGridTopic( + partitionNameCaptor.capture(), topicNameArgumentCaptor.capture(), listEventGridEventArgumentCaptor.capture() + ); + when(this.eventGridConfig.isPublishingToEventGridEnabled()).thenReturn(true); + when(this.eventGridConfig.getTopicName()).thenReturn("legaltagschangedtopic"); + + sut.publish("project-id", headers, tags); + + verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(any(), any(), anyList()); + + assertEquals(1, listEventGridEventArgumentCaptor.getValue().size()); + assertEquals(topicNameArgumentCaptor.getValue(), "legaltagschangedtopic"); + assertEquals(partitionNameCaptor.getValue(), PARTITION_ID); + } + @Test public void testPublishLegalTag() throws Exception { StatusChangedTags tags = new StatusChangedTags();