Skip to content
Snippets Groups Projects
Commit ce1c185d authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'system_schemas_publishing' into 'master'

Added system schemas publishing

See merge request !429
parents 39d73635 22ae2560
No related branches found
No related tags found
1 merge request!429Added system schemas publishing
Pipeline #158620 failed
......@@ -9,6 +9,7 @@ data:
LOG_LEVEL: {{ .Values.data.logLevel | quote }}
ENTITLEMENTS_HOST: {{ .Values.data.entitlementsHost | quote }}
GCP_SCHEMA_CHANGED_TOPIC_NAME: {{ .Values.data.schemaTopicName | quote }}
GCP_SCHEMA_CHANGED_MESSAGING_ENABLED: {{ .Values.data.schemaChangedMessagingEnabled | quote }}
PARTITION_HOST: {{ .Values.data.partitionHost | quote }}
SHARED_TENANT_NAME: {{ .Values.data.dataPartitionId | quote }}
SPRING_PROFILES_ACTIVE: {{ .Values.data.springProfilesActive | quote }}
......
......@@ -6,6 +6,7 @@ data:
logLevel: "ERROR"
partitionHost: "http://partition"
schemaTopicName: "schema-changed"
schemaChangedMessagingEnabled: true
springProfilesActive: "gcp"
googleAudiences: ""
datastoreKind: "system_schema_osm"
......
......@@ -26,7 +26,7 @@ Defined in default application property file but possible to override:
| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - |
| `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | https://console.cloud.google.com/iam-admin/serviceaccounts |
| `GCP_SCHEMA_CHANGED_MESSAGING_ENABLED` | `true` OR `false` | Allows to configure message publishing about schemas changes to Pub/Sub | no | - |
| `GCP_SCHEMA_CHANGED_TOPIC_NAME` | `schema_changed` | Topic for schema changes events | no | - |
| `GCP_SCHEMA_CHANGED_TOPIC_NAME` | `schema-changed` | Topic for schema changes events | no | - |
These variables define service behavior, and are used to switch between `anthos` or `gcp` environments, their overriding
and usage in mixed mode was not tested. Usage of spring profiles is preferred.
......
......@@ -23,7 +23,7 @@ Defined in default application property file but possible to override:
| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - |
| `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | https://console.cloud.google.com/iam-admin/serviceaccounts |
| `GCP_SCHEMA_CHANGED_MESSAGING_ENABLED` | `true` OR `false` | Allows to configure message publishing about schemas changes to Pub/Sub | no | - |
| `GCP_SCHEMA_CHANGED_TOPIC_NAME` | `schema_changed` | Topic for schema changes events | no | - |
| `GCP_SCHEMA_CHANGED_TOPIC_NAME` | `schema-changed` | Topic for schema changes events | no | - |
These variables define service behavior, and are used to switch between `anthos` or `gcp` environments, their overriding
and usage in mixed mode was not tested. Usage of spring profiles is preferred.
......
......@@ -68,18 +68,7 @@ public class MessageBusImpl implements IMessageBus {
this.logger.info(String.format("Generating event of type %s", eventType));
OqmDestination destination = destinationProvider.getDestination(headers.getPartitionId());
if (Objects.isNull(this.oqmTopic)) {
try {
this.oqmTopic = OqmTopic.builder().name(eventMessagingPropertiesConfig.getTopicName())
.build();
} catch (OqmDriverRuntimeException e) {
this.logger.info(SchemaConstants.SCHEMA_NOTIFICATION_FAILED);
this.auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Internal error",
"A fatal internal error has occurred.", e);
}
}
healthCheckTopic(schemaId, false);
OqmMessage message = createMessage(schemaId, eventType);
this.driver.publish(message, oqmTopic, destination);
......@@ -89,22 +78,39 @@ public class MessageBusImpl implements IMessageBus {
}
}
// TODO stub must be replaced with actual implementation
@Override
public void publishMessageForSystemSchema(String schemaId, String eventType) {
String correlationId = headers.getCorrelationId();
String dataPartitionId = headers.getPartitionId();
this.logger.debug("Status changed messaging disabled, writing message to log.");
this.logger.debug(
DpsHeaders.CORRELATION_ID + " " + correlationId + DpsHeaders.DATA_PARTITION_ID + " "
+ dataPartitionId
+ " schema id: " + schemaId + " event type: " + eventType);
if (this.eventMessagingPropertiesConfig.isMessagingEnabled()) {
OqmDestination destination = destinationProvider.getDestination(headers.getPartitionId());
healthCheckTopic(schemaId, true);
OqmMessage message = createMessage(schemaId, eventType);
this.driver.publish(message, oqmTopic, destination);
this.auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
} else {
this.logger.info(SchemaConstants.SCHEMA_NOTIFICATION_IS_DISABLED);
}
}
private void healthCheckTopic(String schemaId, boolean isSystemSchema) {
if (Objects.isNull(this.oqmTopic)) {
try {
this.oqmTopic = OqmTopic.builder().name(eventMessagingPropertiesConfig.getTopicName()).build();
} catch (OqmDriverRuntimeException e) {
String errorMessage = isSystemSchema ? SchemaConstants.SYSTEM_SCHEMA_NOTIFICATION_FAILED :
SchemaConstants.SCHEMA_NOTIFICATION_FAILED;
this.logger.info(errorMessage);
this.auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Internal error", errorMessage, e);
}
}
}
private OqmMessage createMessage(String schemaId, String eventType) {
SchemaPubSubInfo schemaPubSubMsg = new SchemaPubSubInfo(schemaId, eventType);
String data = new Gson().toJson(schemaPubSubMsg);
String data = new Gson().toJson(Collections.singletonList(schemaPubSubMsg));
Map<String, String> attributes = new HashMap<>();
......@@ -113,7 +119,7 @@ public class MessageBusImpl implements IMessageBus {
this.headers.getPartitionIdWithFallbackToAccountId());
this.headers.addCorrelationIdIfMissing();
attributes.put(DpsHeaders.CORRELATION_ID, this.headers.getCorrelationId());
attributes.put(SchemaConstants.SERVICE_NAME, SchemaConstants.SCHEMA);
return OqmMessage.builder()
.data(data)
.attributes(attributes)
......
......@@ -45,6 +45,7 @@ public class SchemaConstants {
public static final String ACCOUNT_ID = "account-Id";
//public static final String ACCOUNT_ID_COMMON_PROJECT = "common";
public static final String ON_BEHALF_OF = "on-Behalf-Of";
public static final String SERVICE_NAME = "service";
public static final String ENTITLEMENT_SERVICE_GROUP_VIEWERS = "service.schema-service.viewers";
......@@ -84,6 +85,8 @@ public class SchemaConstants {
public static final String EMPTY_ID = "The id provided is empty";
public static final String SCHEMA_CREATION_FAILED = "Schema creation failed";
public static final String SCHEMA_NOTIFICATION_FAILED = "Failed to publish the schema notification.";
public static final String SYSTEM_SCHEMA_NOTIFICATION_FAILED = "Failed to publish the system schema notification.";
public static final String SCHEMA_NOTIFICATION_IS_DISABLED = "Schema event notification is turned off.";
public static final String SCHEMA_UPDATE_FAILED = "Schema updation failed";
public static final String SCHEMA_UPDATE_EXCEPTION = "Only schema in developement stage can be updated";
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment