Commit dc3906a0 authored by Paresh Behede's avatar Paresh Behede
Browse files

Merge branch 'schema-notification-slb' into 'master'

Schema Notification

See merge request !103
parents 9da8560f a7697a17
Pipeline #42176 failed with stages
in 76 minutes and 26 seconds
...@@ -106,5 +106,9 @@ spec: ...@@ -106,5 +106,9 @@ spec:
value: {{ .Values.default_tenant}} value: {{ .Values.default_tenant}}
- name: azure_istioauth_enabled - name: azure_istioauth_enabled
value: "true" value: "true"
- name: event_grid_enabled
value: "false"
- name: event_grid_topic
value: "schemachangedtopic"
package org.opengroup.osdu.schema.provider.aws.impl.messagebus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageBusImpl implements IMessageBus{
@Autowired
private JaxRsDpsLog logger;
@Override
public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub
logger.warning("publish message not implemented yet");
}
}
...@@ -17,6 +17,7 @@ package org.opengroup.osdu.schema.azure; ...@@ -17,6 +17,7 @@ package org.opengroup.osdu.schema.azure;
import org.opengroup.osdu.azure.dependencies.AzureOSDUConfig; import org.opengroup.osdu.azure.dependencies.AzureOSDUConfig;
import org.opengroup.osdu.schema.azure.di.AzureBootstrapConfig; import org.opengroup.osdu.schema.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.schema.azure.di.CosmosContainerConfig; import org.opengroup.osdu.schema.azure.di.CosmosContainerConfig;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
...@@ -31,7 +32,8 @@ public class SchemaApplication { ...@@ -31,7 +32,8 @@ public class SchemaApplication {
SchemaApplication.class, SchemaApplication.class,
AzureBootstrapConfig.class, AzureBootstrapConfig.class,
AzureOSDUConfig.class, AzureOSDUConfig.class,
CosmosContainerConfig.class CosmosContainerConfig.class,
EventGridConfig.class
}; };
SpringApplication.run(sources, args); SpringApplication.run(sources, args);
} }
......
/*
* Copyright 2021 Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.schema.azure.di;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
@Configuration
public class EventGridConfig {
private boolean eventGridEnabled;
private String eventGridCustomTopic;
public boolean isEventGridEnabled() {
return eventGridEnabled;
}
public String getCustomTopicName() {
return eventGridCustomTopic;
}
public EventGridConfig(@Value("#{new Boolean('${azure.eventGrid.enabled:false}')}") boolean publish,
@Value("#{new String('${azure.eventGrid.topicName:schemachangedtopic}')}") String topicName) {
if (publish && StringUtils.isEmpty(topicName)) {
throw new RuntimeException("Missing EventGrid Configuration");
}
this.eventGridEnabled = publish;
this.eventGridCustomTopic = topicName;
}
}
/*
* Copyright 2021 Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.schema.azure.impl.messagebus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.azure.impl.messagebus.model.SchemaPubSubInfo;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.logging.AuditLogger;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
@Component
public class MessageBusImpl implements IMessageBus {
@Autowired
private EventGridTopicStore eventGridTopicStore;
@Autowired
private JaxRsDpsLog logger;
@Autowired
private EventGridConfig eventGridConfig;
@Autowired
private AuditLogger auditLogger;
@Autowired
DpsHeaders headers;
private final static String EVENT_DATA_VERSION = "1.0";
@Override
public void publishMessage(String schemaId, String eventType) {
if (eventGridConfig.isEventGridEnabled()) {
logger.info("Generating event of type {}",eventType);
try {
publishToEventGrid(schemaId, eventType);
auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
}catch (AppException ex) {
//We do not want to fail schema creation if notification delivery has failed, hence just logging the exception
auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
logger.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED);
}
}else {
logger.info(SchemaConstants.SCHEMA_NOTIFICATION_IS_DISABLED);
}
}
private void publishToEventGrid(String schemaId, String eventType) {
String messageId = UUID.randomUUID().toString();
SchemaPubSubInfo[] schemaPubSubMsgs = new SchemaPubSubInfo [1];
schemaPubSubMsgs[0]=new SchemaPubSubInfo(schemaId,eventType);
List<EventGridEvent> eventsList = new ArrayList<>();
HashMap<String, Object> message = new HashMap<>();
message.put("data", schemaPubSubMsgs);
message.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
//EventGridEvent supports array of messages to be triggered in a batch but at present we do not support
//schema creation in bulk so generating one event at a time.
EventGridEvent eventGridEvent = new EventGridEvent(
messageId,
SchemaConstants.EVENT_SUBJECT,
message,
eventType,
DateTime.now(),
EVENT_DATA_VERSION
);
eventsList.add(eventGridEvent);
logger.info("Schema event created: " + messageId);
eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getCustomTopicName(), eventsList);
logger.info("Schema event generated successfully");
}
}
package org.opengroup.osdu.schema.azure.impl.messagebus.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SchemaPubSubInfo {
private String kind;
private String op;
}
...@@ -66,3 +66,7 @@ azure.activedirectory.app-resource-id=${aad_client_id} ...@@ -66,3 +66,7 @@ azure.activedirectory.app-resource-id=${aad_client_id}
# Use this property to name your shared tenant name # Use this property to name your shared tenant name
# shared.tenant.name=${shared_partition} # shared.tenant.name=${shared_partition}
shared.tenant.name=opendes shared.tenant.name=opendes
# Azure Event Grid Configuration
azure.eventGrid.enabled=${event_grid_enabled}
azure.eventGrid.topicName=${event_grid_topic}
/*
* Copyright 2021 Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.schema.azure.impl.messagebus;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.azure.impl.messagebus.model.SchemaPubSubInfo;
import org.opengroup.osdu.schema.logging.AuditLogger;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
@ExtendWith(MockitoExtension.class)
public class MessageBusImplTest {
private static final String DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID = "data-partition-account-id";
private static final String CORRELATION_ID = "correlation-id";
private static final String PARTITION_ID = "partition-id";
@Mock
private EventGridTopicStore eventGridTopicStore;
@Mock
private EventGridConfig eventGridConfig;
@Mock
private DpsHeaders dpsHeaders;
@Mock
private JaxRsDpsLog logger;
@Mock
private AuditLogger auditLogger;
@InjectMocks
private MessageBusImpl messageBusImpl;
@Before
public void init() throws ServiceBusException, InterruptedException {
initMocks(this);
doReturn(DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID).when(dpsHeaders).getPartitionIdWithFallbackToAccountId();
doReturn(PARTITION_ID).when(dpsHeaders).getPartitionId();
doReturn(CORRELATION_ID).when(dpsHeaders).getCorrelationId();
}
@Test
public void should_publishToEventGrid_WhenFlagIsFalse() {
//The schema-notification is turned off
when(this.eventGridConfig.isEventGridEnabled()).thenReturn(false);
//Call publish Message
messageBusImpl.publishMessage("dummy", "dummy");
//Assert that eventGridTopicStore is not called even once
verify(this.eventGridTopicStore, times(0)).publishToEventGridTopic(any(), any(), anyList());
}
@Test
public void should_publishToEventGrid_WhenFlagIsTrue() {
//The schema-notification is turned off
when(this.eventGridConfig.isEventGridEnabled()).thenReturn(true);
//The schema-notification is turned off
when(this.eventGridConfig.getCustomTopicName()).thenReturn("dummy-topic");
//The schema-notification is turned off
doNothing().when(this.eventGridTopicStore).publishToEventGridTopic(anyString(), anyString(), anyList());;
ArgumentCaptor<ArrayList<EventGridEvent>> captorList = ArgumentCaptor.forClass(ArrayList.class);
SchemaPubSubInfo[] schemaPubSubMsgs = new SchemaPubSubInfo [1];
schemaPubSubMsgs[0]=new SchemaPubSubInfo("dummy","schema_create");
HashMap<String, Object> data = new HashMap<>();
data.put("data", schemaPubSubMsgs);
data.put(DpsHeaders.ACCOUNT_ID, DATA_PARTITION_WITH_FALLBACK_ACCOUNT_ID);
data.put(DpsHeaders.DATA_PARTITION_ID, PARTITION_ID);
data.put(DpsHeaders.CORRELATION_ID, CORRELATION_ID);
//Call publish Message
messageBusImpl.publishMessage("dummy", "schema_create");
//Assert that eventGridTopicStore is called once
verify(this.eventGridTopicStore, times(1)).publishToEventGridTopic(anyString(), anyString(), captorList.capture());
ArrayList<EventGridEvent> eventGridList = captorList.getValue();
assertNotNull(eventGridList);
assertThat(eventGridList.size(), is(equalTo(1)));
HashMap<String, Object> outputData = (HashMap<String, Object>)eventGridList.get(0).data();
assertEquals(((SchemaPubSubInfo[])outputData.get("data"))[0].getKind(), "dummy");
assertEquals(((SchemaPubSubInfo[])outputData.get("data"))[0].getOp(), "schema_create");
}
}
/*
* Copyright 2021 Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.schema.provider.azure.di;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class EventGridConfigTest {
private static String VALID_TOPIC_NAME = "topicname";
private static String INVALID_TOPIC_NAME = "";
@Test
public void configurationValidationTests() {
// Positive Case
EventGridConfig eventGridConfig = new EventGridConfig(true, VALID_TOPIC_NAME);
assertEquals(VALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(true, eventGridConfig.isEventGridEnabled());
// Positive Case
eventGridConfig = new EventGridConfig(false, VALID_TOPIC_NAME);
assertEquals(VALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(false, eventGridConfig.isEventGridEnabled());
eventGridConfig = new EventGridConfig(false, INVALID_TOPIC_NAME);
assertEquals(INVALID_TOPIC_NAME, eventGridConfig.getCustomTopicName());
assertEquals(false, eventGridConfig.isEventGridEnabled());
// Negative Cases
RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class,
() -> new EventGridConfig(true, INVALID_TOPIC_NAME));
assertEquals("Missing EventGrid Configuration", runtimeException.getMessage());
}
}
\ No newline at end of file
package org.opengroup.osdu.schema.impl.messagebus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageBusImpl implements IMessageBus{
@Autowired
private JaxRsDpsLog logger;
@Override
public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub
logger.warning("publish message not implemented yet");
}
}
\ No newline at end of file
package org.opengroup.osdu.schema.provider.ibm;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageBusImpl implements IMessageBus{
@Autowired
private JaxRsDpsLog logger;
@Override
public void publishMessage(String schemaId, String eventType) {
// TODO Auto-generated method stub
logger.warning("publish message not implemented yet");
}
}
\ No newline at end of file
...@@ -75,6 +75,8 @@ public class SchemaConstants { ...@@ -75,6 +75,8 @@ public class SchemaConstants {
public static final String ENTITY_TYPE_EXISTS_EXCEPTION = "EntityType already registered with Id: {0}"; public static final String ENTITY_TYPE_EXISTS_EXCEPTION = "EntityType already registered with Id: {0}";
public static final String EMPTY_ID = "The id provided is empty"; public static final String EMPTY_ID = "The id provided is empty";
public static final String SCHEMA_CREATION_FAILED = "Schema creation failed"; public static final String SCHEMA_CREATION_FAILED = "Schema creation failed";
public static final String SCHEMA_NOTIFICATION_FAILED = "Failed to publish the schema notification.";
public static final String SCHEMA_NOTIFICATION_IS_DISABLED = "Schema event notification is turned off.";
public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed"; public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed";
public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated"; public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated";
public static final String SCHEMA_PUT_CREATE_EXCEPTION = "Only schema in developement stage can be created through put"; public static final String SCHEMA_PUT_CREATE_EXCEPTION = "Only schema in developement stage can be created through put";
...@@ -107,4 +109,10 @@ public class SchemaConstants { ...@@ -107,4 +109,10 @@ public class SchemaConstants {
public static final String CORRELATION_ID = "correlation-id"; public static final String CORRELATION_ID = "correlation-id";
public static final String APPLICATION_NAME = "Schema Service"; public static final String APPLICATION_NAME = "Schema Service";
//pub-sub message
public final static String EVENT_SUBJECT = "schemachanged";
public final static String SCHEMA_CREATE_EVENT_TYPE = "create";
public final static String SCHEMA_UPDATE_EVENT_TYPE = "update";
public final static String KIND = "kind";
} }
...@@ -50,6 +50,12 @@ public class AuditLogger { ...@@ -50,6 +50,12 @@ public class AuditLogger {
public void schemaUpdatedFailure(List<String> resources){ public void schemaUpdatedFailure(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.FAILURE, resources)); this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.FAILURE, resources));
} }
public void schemaNotificationSuccess(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.SUCCESS, resources));
}
public void schemaNotificationFailure(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.FAILURE, resources));
}
private void writeLog(AuditPayload log) { private void writeLog(AuditPayload log) {
this.logger.audit(log); this.logger.audit(log);
......
/*
* Copyright 2021 Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.schema.provider.interfaces.messagebus;
public interface IMessageBus {
void publishMessage(String schemaId, String eventType);
}
\ No newline at end of file
...@@ -10,7 +10,6 @@ import java.util.LinkedList; ...@@ -10,7 +10,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;