Commit 5fcf4fcc authored by Abhishek Kumar's avatar Abhishek Kumar Committed by Abhishek Kumar
Browse files

Schema notification changes

parent 88b35b1b
......@@ -106,5 +106,9 @@ spec:
value: {{ .Values.default_tenant}}
- name: azure_istioauth_enabled
value: "true"
- name: event_grid_enabled
value: "false"
- name: event_grid_topic
value: "schemachangedtopic"
......@@ -17,6 +17,7 @@ package org.opengroup.osdu.schema.azure;
import org.opengroup.osdu.azure.dependencies.AzureOSDUConfig;
import org.opengroup.osdu.schema.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.schema.azure.di.CosmosContainerConfig;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
......@@ -31,7 +32,8 @@ public class SchemaApplication {
SchemaApplication.class,
AzureBootstrapConfig.class,
AzureOSDUConfig.class,
CosmosContainerConfig.class
CosmosContainerConfig.class,
EventGridConfig.class
};
SpringApplication.run(sources, args);
}
......
package org.opengroup.osdu.schema.azure.di;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventGridConfig {
public boolean isPublishingToEventGridEnabled() {
return publishToEventGridEnabled;
}
public String getCustomTopicName() {
return eventGridCustomTopic;
}
@Value("#{new Boolean('${azure.publishToEventGrid:false}')}")
private boolean publishToEventGridEnabled;
@Value("#{new String('${azure.eventGridTopic:schema-change-alert}')}")
private String eventGridCustomTopic;
}
\ No newline at end of file
package org.opengroup.osdu.schema.azure.impl.messagebus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.logging.AuditLogger;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.opengroup.osdu.schema.provider.interfaces.schemainfostore.ISchemaInfoStore;
import org.opengroup.osdu.schema.provider.interfaces.schemastore.ISchemaStore;
import org.opengroup.osdu.schema.service.IAuthorityService;
import org.opengroup.osdu.schema.service.IEntityTypeService;
import org.opengroup.osdu.schema.service.ISourceService;
import org.opengroup.osdu.schema.util.SchemaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class MessageBusImpl implements IMessageBus {
@Autowired
private EventGridTopicStore eventGridTopicStore;
@Autowired
private JaxRsDpsLog logger;
@Autowired
private EventGridConfig eventGridConfig;
private final AuditLogger auditLogger;
final JaxRsDpsLog log;
private final static String EVENT_DATA_VERSION = "1.0";
@Override
public void publishMessage(DpsHeaders headers, String schemaId, String eventType) {
if (eventGridConfig.isPublishingToEventGridEnabled()) {
logger.info("Generating event of type {}",eventType);
try {
publishToEventGrid(headers, schemaId, eventType);
auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
}catch (AppException ex) {
//We do not want to fail schema creation if notification delivery has failed, hence just logging the exception
auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
log.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED);
}
}else {
logger.info("Schema event notification is turned off.");
}
}
private void publishToEventGrid(DpsHeaders headers, String schemaId, String eventType) {
List<EventGridEvent> eventsList = new ArrayList<>();
HashMap<String, Object> data = new HashMap<>();
data.put("id", schemaId);
data.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
data.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
String messageId = UUID.randomUUID().toString();
//EventGridEvent supports array of messages to be triggered in a batch but at present we do not support
//schema creation in bulk so generating one event at a time.
eventsList.add(new EventGridEvent(
messageId,
SchemaConstants.EVENT_SUBJECT,
data,
eventType,
DateTime.now(),
EVENT_DATA_VERSION
));
eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getCustomTopicName(), eventsList);
logger.info("Event generated: " + messageId);
}
}
......@@ -66,3 +66,7 @@ azure.activedirectory.app-resource-id=${aad_client_id}
# Use this property to name your shared tenant name
# shared.tenant.name=${shared_partition}
shared.tenant.name=opendes
# Azure Event Grid Configuration
azure.publishToEventGrid=${event_grid_enabled}
azure.eventGridTopic=${event_grid_topic}
......@@ -75,6 +75,7 @@ public class SchemaConstants {
public static final String ENTITY_TYPE_EXISTS_EXCEPTION = "EntityType already registered with Id: {0}";
public static final String EMPTY_ID = "The id provided is empty";
public static final String SCHEMA_CREATION_FAILED = "Schema creation failed";
public static final String SCHEMA_NOTIFICATION_FAILED = "Failed to publish the schema notification.";
public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed";
public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated";
public static final String SCHEMA_PUT_CREATE_EXCEPTION = "Only schema in developement stage can be created through put";
......@@ -107,4 +108,9 @@ public class SchemaConstants {
public static final String CORRELATION_ID = "correlation-id";
public static final String APPLICATION_NAME = "Schema Service";
//pub-sub message
public final static String EVENT_SUBJECT = "schemachanged";
public final static String SCHEMA_CREATE_EVENT_TYPE = "create";
public final static String SCHEMA_UPDATE_EVENT_TYPE = "update";
}
......@@ -50,6 +50,12 @@ public class AuditLogger {
public void schemaUpdatedFailure(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.FAILURE, resources));
}
public void schemaNotificationSuccess(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.SUCCESS, resources));
}
public void schemaNotificationFailure(List<String> resources){
this.writeLog(this.getAuditEvents().getSchemaUpdated(AuditStatus.FAILURE, resources));
}
private void writeLog(AuditPayload log) {
this.logger.audit(log);
......
package org.opengroup.osdu.schema.provider.interfaces.messagebus;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
public interface IMessageBus {
void publishMessage(DpsHeaders headers, String schemaId, String eventType);
}
\ No newline at end of file
......@@ -10,10 +10,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONException;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.enums.SchemaScope;
......@@ -29,6 +30,7 @@ import org.opengroup.osdu.schema.model.SchemaInfo;
import org.opengroup.osdu.schema.model.SchemaInfoResponse;
import org.opengroup.osdu.schema.model.SchemaRequest;
import org.opengroup.osdu.schema.model.SchemaUpsertResponse;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.opengroup.osdu.schema.provider.interfaces.schemainfostore.ISchemaInfoStore;
import org.opengroup.osdu.schema.provider.interfaces.schemastore.ISchemaStore;
import org.opengroup.osdu.schema.service.IAuthorityService;
......@@ -41,11 +43,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
/**
* Schema Service to register, get and update schema.
*
......@@ -74,6 +79,8 @@ public class SchemaService implements ISchemaService {
private String sharedTenant;
final JaxRsDpsLog log;
private final IMessageBus messageBus;
@Autowired
public void setSchemaResolver(SchemaResolver schemaResolver) {
......@@ -148,8 +155,8 @@ public class SchemaService implements ISchemaService {
try {
SchemaInfo schemaInfo = schemaInfoStore.createSchemaInfo(schemaRequest);
schemaStore.createSchema(schemaId, schema);
auditLogger.schemaRegisteredSuccess(
Collections.singletonList(schemaRequest.toString()));
auditLogger.schemaRegisteredSuccess(Collections.singletonList(schemaRequest.toString()));
messageBus.publishMessage(headers, schemaId, SchemaConstants.SCHEMA_CREATE_EVENT_TYPE);
return schemaInfo;
} catch (ApplicationException ex) {
auditLogger.schemaRegisteredFailure(
......@@ -207,6 +214,7 @@ public class SchemaService implements ISchemaService {
SchemaInfo schInfo = schemaInfoStore.updateSchemaInfo(schemaRequest);
auditLogger.schemaUpdatedSuccess(Collections.singletonList(schemaRequest.toString()));
schemaStore.createSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessage(headers, createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
log.info(SchemaConstants.SCHEMA_UPDATED);
return schInfo;
} else {
......
......@@ -34,6 +34,7 @@ import org.opengroup.osdu.schema.model.QueryParams;
import org.opengroup.osdu.schema.model.SchemaIdentity;
import org.opengroup.osdu.schema.model.SchemaInfo;
import org.opengroup.osdu.schema.model.SchemaRequest;
import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus;
import org.opengroup.osdu.schema.provider.interfaces.schemainfostore.ISchemaInfoStore;
import org.opengroup.osdu.schema.provider.interfaces.schemastore.ISchemaStore;
import org.opengroup.osdu.schema.service.IAuthorityService;
......@@ -88,6 +89,9 @@ public class SchemaServiceTest {
@Value("${shared.tenant.name:common}")
private String sharedTenant;
@Mock
IMessageBus messageBus;
private Date currDate = new Date();
@Rule
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment