Commit a4289eca authored by Aman Verma's avatar Aman Verma
Browse files

refactoring the update method

parent 7736fee2
Pipeline #62107 passed with stages
in 30 minutes and 11 seconds
......@@ -29,6 +29,19 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.4.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.4.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
......
......@@ -221,7 +221,7 @@ public class SchemaService implements ISchemaService {
* updated. This method checks for the presence of schema based on the schema Id
* provided in input Payload, if found updates both the schemaInfo and schema.
*
* @param schemarequest
* @param schemaRequest
* @return schemaInfo.
* @throws IOException
* @throws JsonProcessingException
......@@ -231,45 +231,18 @@ public class SchemaService implements ISchemaService {
*/
@Override
public SchemaInfo updateSchema(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
String dataPartitionId = headers.getPartitionId();
String createdSchemaId = createAndSetSchemaId(schemaRequest);
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoStore.getSchemaInfo(createdSchemaId);
} catch (NotFoundException e) {
log.error(SchemaConstants.INVALID_SCHEMA_UPDATE);
if (!SchemaStatus.DEVELOPMENT.equals(schemaRequest.getSchemaInfo().getStatus()))
throw new BadRequestException(SchemaConstants.SCHEMA_PUT_CREATE_EXCEPTION);
throw new NoSchemaFoundException(SchemaConstants.INVALID_SCHEMA_UPDATE);
}
if (SchemaStatus.DEVELOPMENT.equals(schemaInfo.getStatus())) {
log.info(MessageFormat.format(SchemaConstants.SCHEMA_UPDATION_STARTED, createdSchemaId));
setScope(schemaRequest, false);
String schema = resolveAndCheckBreakingChanges(schemaRequest);
SchemaInfo schInfo = schemaInfoStore.updateSchemaInfo(schemaRequest);
auditLogger.schemaUpdatedSuccess(Collections.singletonList(schemaRequest.toString()));
schemaStore.createSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessage(createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
log.info(SchemaConstants.SCHEMA_UPDATED);
return schInfo;
} else {
auditLogger.schemaUpdatedFailure(Collections.singletonList(schemaRequest.toString()));
log.error(SchemaConstants.SCHEMA_UPDATE_ERROR);
throw new BadRequestException(SchemaConstants.SCHEMA_UPDATE_EXCEPTION);
}
return this.updateSchemaInternal(schemaRequest, false);
}
private SchemaInfo updateSystemSchema(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
public SchemaInfo updateSchemaInternal(SchemaRequest schemaRequest, Boolean isSystemSchema) throws ApplicationException, BadRequestException {
String createdSchemaId = createAndSetSchemaId(schemaRequest);
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoStore.getSystemSchemaInfo(createdSchemaId);
if (isSystemSchema) {
schemaInfo = schemaInfoStore.getSystemSchemaInfo(createdSchemaId);
} else {
schemaInfo = schemaInfoStore.getSchemaInfo(createdSchemaId);
}
} catch (NotFoundException e) {
log.error(SchemaConstants.INVALID_SCHEMA_UPDATE);
......@@ -282,12 +255,19 @@ public class SchemaService implements ISchemaService {
if (SchemaStatus.DEVELOPMENT.equals(schemaInfo.getStatus())) {
log.info(MessageFormat.format(SchemaConstants.SCHEMA_UPDATION_STARTED, createdSchemaId));
setScope(schemaRequest, true);
setScope(schemaRequest, isSystemSchema);
String schema = resolveAndCheckBreakingChanges(schemaRequest);
SchemaInfo schInfo = schemaInfoStore.updateSystemSchemaInfo(schemaRequest);
SchemaInfo schInfo;
if (isSystemSchema) {
schInfo = schemaInfoStore.updateSystemSchemaInfo(schemaRequest);
schemaStore.createSystemSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessageForSystemSchema(createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
} else {
schInfo = schemaInfoStore.updateSchemaInfo(schemaRequest);
schemaStore.createSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessage(createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
}
auditLogger.schemaUpdatedSuccess(Collections.singletonList(schemaRequest.toString()));
schemaStore.createSystemSchema(schemaRequest.getSchemaInfo().getSchemaIdentity().getId(), schema);
messageBus.publishMessage(createdSchemaId, SchemaConstants.SCHEMA_UPDATE_EVENT_TYPE);
log.info(SchemaConstants.SCHEMA_UPDATED);
return schInfo;
} else {
......@@ -295,7 +275,11 @@ public class SchemaService implements ISchemaService {
log.error(SchemaConstants.SCHEMA_UPDATE_ERROR);
throw new BadRequestException(SchemaConstants.SCHEMA_UPDATE_EXCEPTION);
}
}
private SchemaInfo updateSystemSchema(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
return this.updateSchemaInternal(schemaRequest, true);
}
private String createSchemaId(SchemaRequest schemaRequest) {
......@@ -371,37 +355,28 @@ public class SchemaService implements ISchemaService {
@Override
public SchemaUpsertResponse upsertSchema(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
SchemaInfo response = null;
HttpStatus httpCode = HttpStatus.BAD_REQUEST;
SchemaUpsertResponse.SchemaUpsertResponseBuilder upsertBuilder = SchemaUpsertResponse.builder();
try {
response = updateSchema(schemaRequest);
httpCode = HttpStatus.OK;
} catch (NoSchemaFoundException noSchemaFound) {
try {
response = createSchema(schemaRequest);
httpCode = HttpStatus.CREATED;
}catch (BadRequestException badreqEx) {
//If there is same schema-id for other tenant then throw different error message
if(SchemaConstants.SCHEMA_ID_EXISTS.equals(badreqEx.getMessage()))
throw new BadRequestException(SchemaConstants.INVALID_UPDATE_OPERATION);
throw badreqEx;
}
}
return upsertBuilder.schemaInfo(response).httpCode(httpCode).build();
return upsertSchemaInternal(schemaRequest, false);
}
private SchemaUpsertResponse upsertSystemSchemaInternal(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
private SchemaUpsertResponse upsertSchemaInternal(SchemaRequest schemaRequest, Boolean isSystemSchema) throws ApplicationException, BadRequestException {
SchemaInfo response = null;
HttpStatus httpCode = HttpStatus.BAD_REQUEST;
SchemaUpsertResponse.SchemaUpsertResponseBuilder upsertBuilder = SchemaUpsertResponse.builder();
try {
response = updateSystemSchema(schemaRequest);
if (isSystemSchema) {
response = updateSystemSchema(schemaRequest);
} else {
response = updateSchema(schemaRequest);
}
httpCode = HttpStatus.OK;
} catch (NoSchemaFoundException noSchemaFound) {
try {
response = createSystemSchema(schemaRequest);
if (isSystemSchema) {
response = createSystemSchema(schemaRequest);
} else {
response = createSchema(schemaRequest);
}
httpCode = HttpStatus.CREATED;
}catch (BadRequestException badreqEx) {
//If there is same schema-id for other tenant then throw different error message
......@@ -415,13 +390,9 @@ public class SchemaService implements ISchemaService {
}
public SchemaUpsertResponse upsertSystemSchema(SchemaRequest schemaRequest) throws ApplicationException, BadRequestException {
return this.upsertSystemSchemaInternal(schemaRequest);
}
private void updateDataPartitionId() {
headers.put(SchemaConstants.DATA_PARTITION_ID, sharedTenant);
return this.upsertSchemaInternal(schemaRequest, true);
}
private void getSchemaInfos(QueryParams queryParams, List<SchemaInfo> schemaList, String tenant)
throws ApplicationException {
......@@ -444,10 +415,10 @@ public class SchemaService implements ISchemaService {
* Method to set the scope of the schema according to the tenant
*
* @param schemaRequest
* @param dataPartitionId
* @param isSystemSchema
*/
private void setScope(SchemaRequest schemaRequest, Boolean isPublicSchema) {
if (isPublicSchema) {
private void setScope(SchemaRequest schemaRequest, Boolean isSystemSchema) {
if (isSystemSchema) {
schemaRequest.getSchemaInfo().setScope(SchemaScope.SHARED);
} else {
schemaRequest.getSchemaInfo().setScope(SchemaScope.INTERNAL);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment