Commit 4ba198b8 authored by Abhishek Kumar's avatar Abhishek Kumar
Browse files

Adding SchemaPubSubInfo wrapper to aling event massage according to

NotificationService
parent 39f3acfa
Pipeline #41266 failed with stages
in 50 seconds
......@@ -20,17 +20,20 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.azure.impl.messagebus.model.SchemaPubSubInfo;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.logging.AuditLogger;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
@Component
......@@ -38,20 +41,15 @@ public class MessageBusImpl implements IMessageBus {
@Autowired
private EventGridTopicStore eventGridTopicStore;
@Autowired
private JaxRsDpsLog logger;
@Autowired
private EventGridConfig eventGridConfig;
@Autowired
private AuditLogger auditLogger;
@Autowired
DpsHeaders headers;
private final static String EVENT_DATA_VERSION = "1.0";
@Override
......@@ -75,31 +73,30 @@ public class MessageBusImpl implements IMessageBus {
private void publishToEventGrid(String schemaId, String eventType) {
String messageId = UUID.randomUUID().toString();
SchemaPubSubInfo[] schemaPubSubMsgs = new SchemaPubSubInfo [1];
schemaPubSubMsgs[0]=new SchemaPubSubInfo(schemaId,eventType);
List<EventGridEvent> eventsList = new ArrayList<>();
HashMap<String, Object> message = new HashMap<>();
message.put("data", schemaPubSubMsgs);
message.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
HashMap<String, Object> data = new HashMap<>();
data.put(SchemaConstants.KIND, schemaId);
data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionId());
data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
HashMap<String, Object> pubsubMessage = new HashMap<>();
pubsubMessage.put("data", data);
String messageId = UUID.randomUUID().toString();
//EventGridEvent supports array of messages to be triggered in a batch but at present we do not support
//schema creation in bulk so generating one event at a time.
eventsList.add(new EventGridEvent(
EventGridEvent eventGridEvent = new EventGridEvent(
messageId,
SchemaConstants.EVENT_SUBJECT,
data,
message,
eventType,
DateTime.now(),
EVENT_DATA_VERSION
));
);
eventsList.add(eventGridEvent);
logger.info("Schema event created: " + messageId);
eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getCustomTopicName(), eventsList);
logger.info("Event generated: " + messageId);
logger.info("Schema event generated successfully");
}
}
package org.opengroup.osdu.schema.azure.impl.messagebus.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SchemaPubSubInfo {
private String kind;
private String op;
}
......@@ -29,8 +29,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith;
......@@ -42,7 +44,7 @@ import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.azure.impl.messagebus.model.SchemaPubSubInfo;
import org.opengroup.osdu.schema.logging.AuditLogger;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
......@@ -102,21 +104,28 @@ public class MessageBusImplTest {
doNothing().when(this.eventGridTopicStore).publishToEventGridTopic(anyString(), anyString(), anyList());;
ArgumentCaptor<ArrayList<EventGridEvent>> captorList = ArgumentCaptor.forClass(ArrayList.class);
SchemaPubSubInfo[] schemaPubSubMsgs = new SchemaPubSubInfo [1];
schemaPubSubMsgs[0]=new SchemaPubSubInfo("dummy","schema_create");
HashMap<String, Object> data = new HashMap<>();
data.put(SchemaConstants.KIND, "dummy");
data.put("data", schemaPubSubMsgs);
data.put(DpsHeaders.ACCOUNT_ID, DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID);
data.put(DpsHeaders.DATA_PARTITION_ID, PARTITION_ID);
data.put(DpsHeaders.CORRELATION_ID, CORRELATION_ID);
//Call publish Message
messageBusImpl.publishMessage("dummy", "dummy");
messageBusImpl.publishMessage("dummy", "schema_create");
//Assert that eventGridTopicStore is called once
verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(anyString(), anyString(), captorList.capture());
ArrayList<EventGridEvent> eventGridList = captorList.getValue();
assertNotNull(eventGridList);
assertThat(eventGridList.size(), is(equalTo(1)));
assertEquals(eventGridList.get(0).data(), data);
HashMap<String, Object> outputData = (HashMap<String, Object>)eventGridList.get(0).data();
assertEquals(((SchemaPubSubInfo[])outputData.get("data"))[0].getKind(), "dummy");
assertEquals(((SchemaPubSubInfo[])outputData.get("data"))[0].getOp(), "schema_create");
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment