Commit dc8b60ce authored by Aman Verma's avatar Aman Verma
Browse files

Merge branch 'users/amaverma/implementInterfacesForAzure' into 'master'

Implement the new interface method for azure

See merge request !146
parents 28019021 15534c8a
Pipeline #64637 failed with stages
in 1 minute and 45 seconds
......@@ -20,13 +20,17 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.schema.azure.di.EventGridConfig;
import org.opengroup.osdu.schema.azure.di.SystemResourceConfig;
import org.opengroup.osdu.schema.azure.impl.messagebus.model.SchemaPubSubInfo;
import org.opengroup.osdu.schema.constants.SchemaConstants;
import org.opengroup.osdu.schema.logging.AuditLogger;
......@@ -49,15 +53,26 @@ public class MessageBusImpl implements IMessageBus {
private AuditLogger auditLogger;
@Autowired
DpsHeaders headers;
@Autowired
private ITenantFactory tenantFactory;
@Autowired
SystemResourceConfig systemResourceConfig;
private final static String EVENT_DATA_VERSION = "1.0";
@Override
public void publishMessage(String schemaId, String eventType) {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
this.publishMessageForSystemSchema(schemaId, eventType);
return;
}
if (eventGridConfig.isEventGridEnabled()) {
logger.info("Generating event of type {}",eventType);
try {
publishToEventGrid(schemaId, eventType);
publishToEventGrid(schemaId, eventType, headers.getPartitionId());
auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
}catch (AppException ex) {
......@@ -71,7 +86,38 @@ public class MessageBusImpl implements IMessageBus {
}
}
private void publishToEventGrid(String schemaId, String eventType) {
/**
* Method to publish schema create notification for system schemas.
* @param schemaId
* @param eventType
*/
@Override
public void publishMessageForSystemSchema(String schemaId, String eventType) {
if (eventGridConfig.isEventGridEnabled()) {
logger.info("Generating event of type {}",eventType);
try {
// Publish the event for all the tenants.
List<String> privateTenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getName)
.collect(Collectors.toList());
for (String tenant : privateTenantList) {
publishToEventGrid(schemaId, eventType, tenant);
}
auditLogger.schemaNotificationSuccess(Collections.singletonList(schemaId));
}catch (AppException ex) {
//We do not want to fail schema creation if notification delivery has failed, hence just logging the exception
auditLogger.schemaNotificationFailure(Collections.singletonList(schemaId));
logger.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED);
}
}else {
logger.info(SchemaConstants.SCHEMA_NOTIFICATION_IS_DISABLED);
}
}
private void publishToEventGrid(String schemaId, String eventType, String dataPartitionId) {
String messageId = UUID.randomUUID().toString();
SchemaPubSubInfo[] schemaPubSubMsgs = new SchemaPubSubInfo [1];
......@@ -79,8 +125,8 @@ public class MessageBusImpl implements IMessageBus {
List<EventGridEvent> eventsList = new ArrayList<>();
HashMap<String, Object> message = new HashMap<>();
message.put("data", schemaPubSubMsgs);
message.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
message.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
message.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
message.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
//EventGridEvent supports array of messages to be triggered in a batch but at present we do not support
......@@ -95,7 +141,7 @@ public class MessageBusImpl implements IMessageBus {
);
eventsList.add(eventGridEvent);
logger.info("Schema event created: " + messageId);
eventGridTopicStore.publishToEventGridTopic(headers.getPartitionId(), eventGridConfig.getCustomTopicName(), eventsList);
eventGridTopicStore.publishToEventGridTopic(dataPartitionId, eventGridConfig.getCustomTopicName(), eventsList);
logger.info("Schema event generated successfully");
}
......
......@@ -72,17 +72,32 @@ public class AzureAuthorityStore implements IAuthorityStore {
*/
@Override
public Authority get(String authorityId) throws NotFoundException, ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.getSystemAuthority(authorityId);
}
String id = headers.getPartitionId() + ":" + authorityId;
AuthorityDoc authorityDoc;
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
authorityDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), authorityContainer, id, authorityId, AuthorityDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
} else {
authorityDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, authorityContainer, id, authorityId, AuthorityDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
}
authorityDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, authorityContainer, id, authorityId, AuthorityDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
return authorityDoc.getAuthority();
}
/**
* Method to get system Authority
* @param authorityId
* @return
* @throws NotFoundException
* @throws ApplicationException
*/
@Override
public Authority getSystemAuthority(String authorityId) throws NotFoundException, ApplicationException {
AuthorityDoc authorityDoc;
authorityDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), authorityContainer, authorityId, authorityId, AuthorityDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
return authorityDoc.getAuthority();
}
......@@ -96,27 +111,52 @@ public class AzureAuthorityStore implements IAuthorityStore {
*/
@Override
public Authority create(Authority authority) throws ApplicationException, BadRequestException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.createSystemAuthority(authority);
}
String id = headers.getPartitionId() + ":" + authority.getAuthorityId();
try {
AuthorityDoc authorityDoc = new AuthorityDoc(id, authority);
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), authorityContainer, id, authorityDoc);
} else {
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, authorityContainer, id, authorityDoc);
}
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, authorityContainer, id, authorityDoc);
} catch (AppException ex) {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.AUTHORITY_EXISTS_ALREADY_REGISTERED);
throw new BadRequestException(MessageFormat.format(SchemaConstants.AUTHORITY_EXISTS_EXCEPTION,
authority.getAuthorityId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
handleAppException(ex, authority);
}
log.info(SchemaConstants.AUTHORITY_CREATED);
return authority;
}
/**
* Method to register a System Authority
* @param authority
* @return
* @throws ApplicationException
* @throws BadRequestException
*/
@Override
public Authority createSystemAuthority(Authority authority) throws ApplicationException, BadRequestException {
try {
AuthorityDoc authorityDoc = new AuthorityDoc(authority.getAuthorityId(), authority);
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), authorityContainer, authority.getAuthorityId(), authorityDoc);
} catch (AppException ex) {
handleAppException(ex, authority);
}
log.info(SchemaConstants.AUTHORITY_CREATED);
return authority;
}
private void handleAppException(AppException ex, Authority authority) throws ApplicationException, BadRequestException {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.AUTHORITY_EXISTS_ALREADY_REGISTERED);
throw new BadRequestException(MessageFormat.format(SchemaConstants.AUTHORITY_EXISTS_EXCEPTION,
authority.getAuthorityId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
}
}
......@@ -69,17 +69,31 @@ public class AzureEntityTypeStore implements IEntityTypeStore {
*/
@Override
public EntityType get(String entityTypeId) throws NotFoundException, ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.getSystemEntity(entityTypeId);
}
String id = headers.getPartitionId() + ":" + entityTypeId;
EntityTypeDoc entityTypeDoc;
entityTypeDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, entityTypeContainer, id, entityTypeId, EntityTypeDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
entityTypeDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), entityTypeContainer, id, entityTypeId, EntityTypeDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
} else {
entityTypeDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, entityTypeContainer, id, entityTypeId, EntityTypeDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
}
return entityTypeDoc.getEntityType();
}
/**
* Method to get system Entity
* @param entityTypeId
* @return
* @throws NotFoundException
* @throws ApplicationException
*/
@Override
public EntityType getSystemEntity(String entityTypeId) throws NotFoundException, ApplicationException {
EntityTypeDoc entityTypeDoc;
entityTypeDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), entityTypeContainer, entityTypeId, entityTypeId, EntityTypeDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
return entityTypeDoc.getEntityType();
}
......@@ -93,27 +107,51 @@ public class AzureEntityTypeStore implements IEntityTypeStore {
*/
@Override
public EntityType create(EntityType entityType) throws BadRequestException, ApplicationException {
String id = headers.getPartitionId() + ":" + entityType.getEntityTypeId();
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.createSystemEntity(entityType);
}
String id = headers.getPartitionId() + ":" + entityType.getEntityTypeId();
try {
EntityTypeDoc entityTypeDoc = new EntityTypeDoc(id, entityType);
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), entityTypeContainer, id, entityTypeDoc);
} else {
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, entityTypeContainer, id, entityTypeDoc);
}
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, entityTypeContainer, id, entityTypeDoc);
} catch (AppException ex) {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.ENTITY_TYPE_EXISTS);
throw new BadRequestException(MessageFormat.format(SchemaConstants.ENTITY_TYPE_EXISTS_EXCEPTION,
entityType.getEntityTypeId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
handleAppException(ex, entityType);
}
log.info(SchemaConstants.ENTITY_TYPE_CREATED);
return entityType;
}
/**
* Method to create a system Entity
* @param entityType
* @return
* @throws BadRequestException
* @throws ApplicationException
*/
@Override
public EntityType createSystemEntity(EntityType entityType) throws BadRequestException, ApplicationException {
try {
EntityTypeDoc entityTypeDoc = new EntityTypeDoc(entityType.getEntityTypeId(), entityType);
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), entityTypeContainer, entityType.getEntityTypeId(), entityTypeDoc);
} catch (AppException ex) {
handleAppException(ex, entityType);
}
log.info(SchemaConstants.ENTITY_TYPE_CREATED);
return entityType;
}
private void handleAppException(AppException ex, EntityType entityType) throws BadRequestException, ApplicationException {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.ENTITY_TYPE_EXISTS);
throw new BadRequestException(MessageFormat.format(SchemaConstants.ENTITY_TYPE_EXISTS_EXCEPTION,
entityType.getEntityTypeId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
}
}
......@@ -67,17 +67,32 @@ public class AzureSourceStore implements ISourceStore {
*/
@Override
public Source get(String sourceId) throws NotFoundException, ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.getSystemSource(sourceId);
}
String id = headers.getPartitionId().toString() + ":" + sourceId;
SourceDoc sourceDoc;
sourceDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, sourceContainer, id, sourceId, SourceDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
sourceDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), sourceContainer, id, sourceId, SourceDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
} else {
sourceDoc = cosmosStore.findItem(headers.getPartitionId(), cosmosDBName, sourceContainer, id, sourceId, SourceDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
}
return sourceDoc.getSource();
}
/**
* Method to get system Source
* @param sourceId
* @return
* @throws NotFoundException
* @throws ApplicationException
*/
@Override
public Source getSystemSource(String sourceId) throws NotFoundException, ApplicationException {
SourceDoc sourceDoc;
sourceDoc = cosmosStore.findItem(systemResourceConfig.getCosmosDatabase(), sourceContainer, sourceId, sourceId, SourceDoc.class)
.orElseThrow(() -> new NotFoundException("bad input parameter"));
return sourceDoc.getSource();
}
......@@ -91,27 +106,52 @@ public class AzureSourceStore implements ISourceStore {
*/
@Override
public Source create(Source source) throws BadRequestException, ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.createSystemSource(source);
}
String id = headers.getPartitionId() + ":" + source.getSourceId();
try {
SourceDoc sourceDoc = new SourceDoc(id, source);
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), sourceContainer, id, sourceDoc);
} else {
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, sourceContainer, id, sourceDoc);
}
cosmosStore.createItem(headers.getPartitionId(), cosmosDBName, sourceContainer, id, sourceDoc);
} catch (AppException ex) {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.SOURCE_EXISTS);
throw new BadRequestException(MessageFormat.format(SchemaConstants.SOURCE_EXISTS_EXCEPTION,
source.getSourceId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
handleAppException(ex, source);
}
log.info(SchemaConstants.SOURCE_CREATED);
return source;
}
/**
* Method tp register a system Source
* @param source
* @return
* @throws BadRequestException
* @throws ApplicationException
*/
@Override
public Source createSystemSource(Source source) throws BadRequestException, ApplicationException {
try {
SourceDoc sourceDoc = new SourceDoc(source.getSourceId(), source);
cosmosStore.createItem(systemResourceConfig.getCosmosDatabase(), sourceContainer, source.getSourceId(), sourceDoc);
} catch (AppException ex) {
handleAppException(ex, source);
}
log.info(SchemaConstants.SOURCE_CREATED);
return source;
}
private void handleAppException(AppException ex, Source source) throws BadRequestException, ApplicationException {
if (ex.getError().getCode() == 409) {
log.warning(SchemaConstants.SOURCE_EXISTS);
throw new BadRequestException(MessageFormat.format(SchemaConstants.SOURCE_EXISTS_EXCEPTION,
source.getSourceId()));
} else {
log.error(MessageFormat.format(SchemaConstants.OBJECT_INVALID, ex.getMessage()));
throw new ApplicationException(SchemaConstants.INVALID_INPUT);
}
}
}
......@@ -62,14 +62,40 @@ public class AzureSchemaStore implements ISchemaStore {
*/
@Override
public String getSchema(String dataPartitionId, String filePath) throws ApplicationException, NotFoundException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(dataPartitionId)) {
return this.getSystemSchema(filePath);
}
filePath = dataPartitionId + ":" + filePath + SchemaConstants.JSON_EXTENSION;
try {
String content = null;
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(dataPartitionId)) {
content = blobStore.readFromStorageContainer(filePath, systemResourceConfig.getStorageContainerName());
} else {
content = blobStore.readFromStorageContainer(dataPartitionId, filePath, config.containerName());
}
content = blobStore.readFromStorageContainer(dataPartitionId, filePath, config.containerName());
if (content != null)
return content;
else
throw new NotFoundException(SchemaConstants.SCHEMA_NOT_PRESENT);
}
catch (Exception ex) {
throw new NotFoundException(SchemaConstants.SCHEMA_NOT_PRESENT);
}
}
/**
* Method to get a system schema
* @param filePath
* @return
* @throws NotFoundException
* @throws ApplicationException
*/
@Override
public String getSystemSchema(String filePath) throws NotFoundException, ApplicationException {
filePath = filePath + SchemaConstants.JSON_EXTENSION;
try {
String content = null;
content = blobStore.readFromStorageContainer(filePath, systemResourceConfig.getStorageContainerName());
if (content != null)
return content;
else
......@@ -90,15 +116,34 @@ public class AzureSchemaStore implements ISchemaStore {
@Override
public String createSchema(String filePath, String content) throws ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.createSystemSchema(filePath, content);
}
String dataPartitionId = headers.getPartitionId();
filePath = dataPartitionId + ":" + filePath + SchemaConstants.JSON_EXTENSION;
try {
blobStore.writeToStorageContainer(dataPartitionId, filePath, content, config.containerName());
log.info(SchemaConstants.SCHEMA_CREATED);
return filePath;
} catch (Exception ex) {
throw new ApplicationException(SchemaConstants.INTERNAL_SERVER_ERROR);
}
}
/**
* Method to create a system schema
* @param filePath
* @param content
* @return
* @throws ApplicationException
*/
@Override
public String createSystemSchema(String filePath, String content) throws ApplicationException {
filePath = filePath + SchemaConstants.JSON_EXTENSION;
try {
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
blobStore.writeToStorageContainer(filePath, content, systemResourceConfig.getStorageContainerName());
} else {
blobStore.writeToStorageContainer(dataPartitionId, filePath, content, config.containerName());
}
blobStore.writeToStorageContainer(filePath, content, systemResourceConfig.getStorageContainerName());
log.info(SchemaConstants.SCHEMA_CREATED);
return filePath;
} catch (Exception ex) {
......@@ -114,16 +159,35 @@ public class AzureSchemaStore implements ISchemaStore {
*/
@Override
public boolean cleanSchemaProject(String schemaId) throws ApplicationException {
// This if block will be removed once schema-core starts consuming *System* methods.
if (systemResourceConfig.getSharedTenant().equalsIgnoreCase(headers.getPartitionId())) {
return this.cleanSystemSchemaProject(schemaId);
}
String dataPartitionId = headers.getPartitionId();
String filePath = dataPartitionId + ":" + schemaId + SchemaConstants.JSON_EXTENSION;
try
{
return blobStore.deleteFromStorageContainer(dataPartitionId, filePath, config.containerName());
}
catch (Exception e)
{
throw new ApplicationException(SchemaConstants.INTERNAL_SERVER_ERROR);
}
}
/**
* Method to delete a system schema.
* @param schemaId
* @return
* @throws ApplicationException
*/