Commit 93d2fc33 authored by Abhishek Kumar's avatar Abhishek Kumar Committed by Abhishek Kumar
Browse files

Corrections based on review comments

parent 47f3d8f1
package org.opengroup.osdu.schema.provider.aws.impl.messagebus; package org.opengroup.osdu.schema.provider.aws.impl.messagebus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus; import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{ ...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{
private JaxRsDpsLog logger; private JaxRsDpsLog logger;
@Override @Override
public void publishMessage(DpsHeaders headers, String schemaId, String eventType) { public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
logger.warning("publish message not implemented yet"); logger.warning("publish message not implemented yet");
......
package org.opengroup.osdu.schema.azure.di; package org.opengroup.osdu.schema.azure.di;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
@Configuration @Configuration
public class EventGridConfig { public class EventGridConfig {
private boolean eventGridEnabled;
private String eventGridCustomTopic;
public boolean isPublishToEventGridEnabled() { public boolean isEventGridEnabled() {
return publishToEventGridEnabled; return eventGridEnabled;
} }
public String getCustomTopicName() { public String getCustomTopicName() {
return eventGridCustomTopic; return eventGridCustomTopic;
} }
private boolean publishToEventGridEnabled; public EventGridConfig(@Value("#{new Boolean('${azure.eventGrid.enabled:false}')}") boolean publish,
@Value("#{new String('${azure.eventGrid.topicName:schemachangedtopic}')}") String topicName) {
private String eventGridCustomTopic; if (publish && StringUtils.isEmpty(topicName)) {
public EventGridConfig(@Value("#{new Boolean('${azure.publishToEventGrid:false}')}") boolean publish,
@Value("#{new String('${azure.eventGridTopic:schema-change-alert}')}") String topicName) {
if (publish) {
if ((topicName.isEmpty())) {
throw new RuntimeException("Missing EventGrid Configuration"); throw new RuntimeException("Missing EventGrid Configuration");
}
} }
this.publishToEventGridEnabled = publish; this.eventGridEnabled = publish;
this.eventGridCustomTopic = topicName; this.eventGridCustomTopic = topicName;
} }
......
...@@ -5,7 +5,6 @@ import java.util.Collections; ...@@ -5,7 +5,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore; import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
...@@ -33,40 +32,40 @@ public class MessageBusImpl implements IMessageBus { ...@@ -33,40 +32,40 @@ public class MessageBusImpl implements IMessageBus {
@Autowired @Autowired
private AuditLogger auditLogger; private AuditLogger auditLogger;
@Autowired @Autowired
private JaxRsDpsLog log; DpsHeaders headers;
private final static String EVENT_DATA_VERSION = "1.0"; private final static String EVENT_DATA_VERSION = "1.0";
@Override @Override
public void publishMessage(DpsHeaders headers, String schemaId, String eventType) { public void publishMessage(String schemaId, String eventType) {
if (eventGridConfig.isPublishToEventGridEnabled()) { if (eventGridConfig.isEventGridEnabled()) {
logger.info("Generating event of type {}",eventType); logger.info("Generating event of type {}",eventType);
try { try {
publishToEventGrid(headers, schemaId, eventType); publishToEventGrid(schemaId, eventType);
auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId)); auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
}catch (AppException ex) { }catch (AppException ex) {
//We do not want to fail schema creation if notification delivery has failed, hence just logging the exception //We do not want to fail schema creation if notification delivery has failed, hence just logging the exception
auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId)); auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
log.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED); logger.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED);
} }
}else { }else {
logger.info("Schema event notification is turned off."); logger.info(SchemaConstants.SCHEMA_NOTIFICATION_IS_DISABLED);
} }
} }
private void publishToEventGrid(DpsHeaders headers, String schemaId, String eventType) { private void publishToEventGrid(String schemaId, String eventType) {
List<EventGridEvent> eventsList = new ArrayList<>(); List<EventGridEvent> eventsList = new ArrayList<>();
HashMap<String, Object> data = new HashMap<>(); HashMap<String, Object> data = new HashMap<>();
data.put(SchemaConstants.KIND, schemaId); data.put(SchemaConstants.KIND, schemaId);
data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionId());
data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
HashMap<String, Object> pubsubMessage = new HashMap<>(); HashMap<String, Object> pubsubMessage = new HashMap<>();
......
...@@ -68,5 +68,5 @@ azure.activedirectory.app-resource-id=${aad_client_id} ...@@ -68,5 +68,5 @@ azure.activedirectory.app-resource-id=${aad_client_id}
shared.tenant.name=opendes shared.tenant.name=opendes
# Azure Event Grid Configuration # Azure Event Grid Configuration
azure.publishToEventGrid=${event_grid_enabled} azure.eventGrid.enabled=${event_grid_enabled}
azure.eventGridTopic=${event_grid_topic} azure.eventGrid.topicName=${event_grid_topic}
package org.opengroup.osdu.schema.azure.impl.messagebus; package org.opengroup.osdu.schema.azure.impl.messagebus;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
...@@ -9,10 +14,12 @@ import static org.mockito.Mockito.times; ...@@ -9,10 +14,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks; import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
...@@ -20,8 +27,10 @@ import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore; ...@@ -20,8 +27,10 @@ import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig; import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.logging.AuditLogger; import org.opengroup.osdu.schema.logging.AuditLogger;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import com.microsoft.azure.servicebus.primitives.ServiceBusException; import com.microsoft.azure.servicebus.primitives.ServiceBusException;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
...@@ -31,29 +40,27 @@ public class MessageBusImplTest { ...@@ -31,29 +40,27 @@ public class MessageBusImplTest {
private static final String CORRELATION_ID = "correlation-id"; private static final String CORRELATION_ID = "correlation-id";
private static final String PARTITION_ID = "partition-id"; private static final String PARTITION_ID = "partition-id";
@Mock @Mock
private EventGridTopicStore eventGridTopicStore; private EventGridTopicStore eventGridTopicStore;
@Mock @Mock
private EventGridConfig eventGridConfig; private EventGridConfig eventGridConfig;
@Mock @Mock
private DpsHeaders dpsHeaders; private DpsHeaders dpsHeaders;
@Mock @Mock
private JaxRsDpsLog logger; private JaxRsDpsLog logger;
@Mock @Mock
private AuditLogger auditLogger; private AuditLogger auditLogger;
@InjectMocks @InjectMocks
private MessageBusImpl messageBusImpl; private MessageBusImpl messageBusImpl;
@Before @Before
public void init() throws ServiceBusException, InterruptedException { public void init() throws ServiceBusException, InterruptedException {
initMocks(this); initMocks(this);
doReturn(DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID).when(dpsHeaders).getPartitionIdWithFallbackToAccountId(); doReturn(DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID).when(dpsHeaders).getPartitionIdWithFallbackToAccountId();
doReturn(PARTITION_ID).when(dpsHeaders).getPartitionId(); doReturn(PARTITION_ID).when(dpsHeaders).getPartitionId();
doReturn(CORRELATION_ID).when(dpsHeaders).getCorrelationId(); doReturn(CORRELATION_ID).when(dpsHeaders).getCorrelationId();
...@@ -61,32 +68,40 @@ public class MessageBusImplTest { ...@@ -61,32 +68,40 @@ public class MessageBusImplTest {
@Test @Test
public void should_publishToEventGrid_WhenFlagIsFalse() { public void should_publishToEventGrid_WhenFlagIsFalse() {
//The schema-notification is turned off //The schema-notification is turned off
when(this.eventGridConfig.isPublishToEventGridEnabled()).thenReturn(false); when(this.eventGridConfig.isEventGridEnabled()).thenReturn(false);
//Call publish Message //Call publish Message
messageBusImpl.publishMessage(dpsHeaders, "dummy", "dummy"); messageBusImpl.publishMessage("dummy", "dummy");
//Assert that eventGridTopicStore is not called even once //Assert that eventGridTopicStore is not called even once
verify(this.eventGridTopicStore, times(0)).publishToEventGridTopic(any(), any(), anyList()); verify(this.eventGridTopicStore, times(0)).publishToEventGridTopic(any(), any(), anyList());
} }
@Test @Test
public void should_publishToEventGrid_WhenFlagIsTrue() { public void should_publishToEventGrid_WhenFlagIsTrue() {
//The schema-notification is turned off //The schema-notification is turned off
when(this.eventGridConfig.isPublishToEventGridEnabled()).thenReturn(true); when(this.eventGridConfig.isEventGridEnabled()).thenReturn(true);
//The schema-notification is turned off
when(this.eventGridConfig.getCustomTopicName()).thenReturn("dummy-topic");
//The schema-notification is turned off //The schema-notification is turned off
doNothing().when(this.eventGridTopicStore).publishToEventGridTopic(anyString(), anyString(), anyList());; doNothing().when(this.eventGridTopicStore).publishToEventGridTopic(anyString(), anyString(), anyList());;
ArgumentCaptor<ArrayList<EventGridEvent>> captorList = ArgumentCaptor.forClass(ArrayList.class);
//Call publish Message HashMap<String, Object> data = new HashMap<>();
messageBusImpl.publishMessage(dpsHeaders, "dummy", "dummy"); data.put(SchemaConstants.KIND, "dummy");
data.put(DpsHeaders.ACCOUNT_ID, DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID);
//Assert that eventGridTopicStore is not called even once data.put(DpsHeaders.DATA_PARTITION_ID, PARTITION_ID);
verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(any(), any(), anyList()); data.put(DpsHeaders.CORRELATION_ID, CORRELATION_ID);
//Call publish Message
messageBusImpl.publishMessage("dummy", "dummy");
//Assert that eventGridTopicStore is called once
verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(anyString(), anyString(), captorList.capture());
ArrayList<EventGridEvent> eventGridList = captorList.getValue();
assertNotNull(eventGridList);
assertThat(eventGridList.size(), is(equalTo(1)));
assertEquals(eventGridList.get(0).data(), data);
} }
......
...@@ -19,6 +19,17 @@ public class EventGridConfigTest { ...@@ -19,6 +19,17 @@ public class EventGridConfigTest {
// Positive Case // Positive Case
EventGridConfig eventGridConfig = new EventGridConfig(true, VALID_TOPIC_NAME); EventGridConfig eventGridConfig = new EventGridConfig(true, VALID_TOPIC_NAME);
assertEquals(VALID_TOPIC_NAME, eventGridConfig.getCustomTopicName()); assertEquals(VALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(true, eventGridConfig.isEventGridEnabled());
// Positive Case
eventGridConfig = new EventGridConfig(false, VALID_TOPIC_NAME);
assertEquals(VALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(false, eventGridConfig.isEventGridEnabled());
eventGridConfig = new EventGridConfig(false, INVALID_TOPIC_NAME);
assertEquals(INVALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(false, eventGridConfig.isEventGridEnabled());
// Negative Cases // Negative Cases
RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class, RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class,
......
package org.opengroup.osdu.schema.impl.messagebus; package org.opengroup.osdu.schema.impl.messagebus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus; import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{ ...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{
private JaxRsDpsLog logger; private JaxRsDpsLog logger;
@Override @Override
public void publishMessage(DpsHeaders headers, String schemaId, String eventType) { public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
logger.warning("publish message not implemented yet"); logger.warning("publish message not implemented yet");
......
package org.opengroup.osdu.schema.provider.ibm; package org.opengroup.osdu.schema.provider.ibm;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus; import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{ ...@@ -13,7 +12,7 @@ public class MessageBusImpl implements IMessageBus{
private JaxRsDpsLog logger; private JaxRsDpsLog logger;
@Override @Override
public void publishMessage(DpsHeaders headers, String schemaId, String eventType) { public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
logger.warning("publish message not implemented yet"); logger.warning("publish message not implemented yet");
......
...@@ -76,6 +76,7 @@ public class SchemaConstants { ...@@ -76,6 +76,7 @@ public class SchemaConstants {
public static final String EMPTY_ID = "The id provided is empty"; public static final String EMPTY_ID = "The id provided is empty";
public static final String SCHEMA_CREATION_FAILED = "Schema creation failed"; public static final String SCHEMA_CREATION_FAILED = "Schema creation failed";
public static final String SCHEMA_NOTIFICATION_FAILED = "Failed to publish the schema notification."; public static final String SCHEMA_NOTIFICATION_FAILED = "Failed to publish the schema notification.";
public static final String SCHEMA_NOTIFICATION_IS_DISABLED = "Schema event notification is turned off.";
public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed"; public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed";
public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated"; public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated";
public static final String SCHEMA_PUT_CREATE_EXCEPTION = "Only schema in developement stage can be created through put"; public static final String SCHEMA_PUT_CREATE_EXCEPTION = "Only schema in developement stage can be created through put";
......
package org.opengroup.osdu.schema.provider.interfaces.messagebus; package org.opengroup.osdu.schema.provider.interfaces.messagebus;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
public interface IMessageBus { public interface IMessageBus {
void publishMessage(DpsHeaders headers, String schemaId, String eventType); void publishMessage(String schemaId, String eventType);
} }
\ No newline at end of file
...@@ -10,11 +10,9 @@ import java.util.LinkedList; ...@@ -10,11 +10,9 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.json.JSONException; import org.json.JSONException;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.constants.SchemaConstants; import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.enums.SchemaScope; import org.opengroup.osdu.schema.enums.SchemaScope;
...@@ -43,12 +41,10 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -43,12 +41,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.gson.Gson; import com.google.gson.Gson;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
/** /**
...@@ -156,7 +152,7 @@ public class SchemaService implements ISchemaService { ...@@ -156,7 +152,7 @@ public class SchemaService implements ISchemaService {
SchemaInfo schemaInfo = schemaInfoStore.createSchemaInfo(schemaRequest); SchemaInfo schemaInfo = schemaInfoStore.createSchemaInfo(schemaRequest);
schemaStore.createSchema(schemaId, schema); schemaStore.createSchema(schemaId, schema);
auditLogger.schemaRegisteredSuccess(Collections.singletonList(schemaRequest.toString())); auditLogger.schemaRegisteredSuccess(Collections.singletonList(schemaRequest.toString()));
messageBus.publishMessage(headers, schemaId, SchemaConstants.SCHEMA_CREATE_EVENT_TYPE); messageBus.publishMessage(schemaId, SchemaConstants.SCHEMA_CREATE_EVENT_TYPE);
return schemaInfo; return schemaInfo;
} catch (ApplicationException ex) { } catch (ApplicationException ex) {
auditLogger.schemaRegisteredFailure( auditLogger.schemaRegisteredFailure(
...@@ -214,7 +210,7 @@ public class SchemaService implements ISchemaService { ...@@ -214,7 +210,7 @@ public class SchemaService implements ISchemaService {
SchemaInfo schInfo = schemaInfoStore.updateSchemaInfo(schemaRequest); SchemaInfo schInfo = schemaInfoStore.updateSchemaInfo(schemaRequest);
auditLogger.schemaUpdatedSuccess(Collections.singletonList(schemaRequest.toString())); auditLogger.schemaUpdatedSuccess(Collections.singletonList(schemaRequest.toString()));
schemaStore.createSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema); schemaStore.createSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessage(headers, createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE); messageBus.publishMessage(createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
log.info(SchemaConstants.SCHEMA_UPDATED); log.info(SchemaConstants.SCHEMA_UPDATED);
return schInfo; return schInfo;
} else { } else {
......
...@@ -175,7 +175,7 @@ public class SchemaServiceTest { ...@@ -175,7 +175,7 @@ public class SchemaServiceTest {
assertEquals(SchemaStatus.PUBLISHED, assertEquals(SchemaStatus.PUBLISHED,
schemaService.createSchema(schReqPubInt).getStatus()); schemaService.createSchema(schReqPubInt).getStatus());
verify(messageBus, times(1)).publishMessage(any(), anyString(), anyString()); verify(messageBus, times(1)).publishMessage( anyString(), anyString());
} }
@Test @Test
...@@ -205,7 +205,7 @@ public class SchemaServiceTest { ...@@ -205,7 +205,7 @@ public class SchemaServiceTest {
Mockito.when(schemaInfoStore.createSchemaInfo(schReq)) Mockito.when(schemaInfoStore.createSchemaInfo(schReq))
.thenReturn(schInfo); .thenReturn(schInfo);
assertEquals(schInfo, schemaService.createSchema(schReq)); assertEquals(schInfo, schemaService.createSchema(schReq));
verify(messageBus, times(1)).publishMessage(any(), anyString(), anyString()); verify(messageBus, times(1)).publishMessage( anyString(), anyString());
} }