Skip to content
Snippets Groups Projects
Commit ce5bd089 authored by Zhibin Mai's avatar Zhibin Mai
Browse files

Always update ElasticSearch schema mapping when an index is re-indexed

parent bed69666
No related branches found
No related tags found
1 merge request!611Always update schema mapping to ElasticSearch when re-index is triggered
Showing
with 35 additions and 20 deletions
......@@ -113,7 +113,7 @@ public class RecordIndexerApi {
public ResponseEntity<?> reindex(
@RequestBody @NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY)
@Valid RecordReindexRequest recordReindexRequest) {
return new ResponseEntity<>(reIndexService.reindexKind(recordReindexRequest, false), HttpStatus.OK);
return new ResponseEntity<>(reIndexService.reindexKind(recordReindexRequest, false, false), HttpStatus.OK);
}
// THIS IS AN INTERNAL USE API ONLY
......
......@@ -104,7 +104,7 @@ public class ReindexApi {
public ResponseEntity<?> reindex(@NotNull @Valid @RequestBody RecordReindexRequest recordReindexRequest,
@Parameter(description = "Force Clean")
@RequestParam(value = "force_clean", defaultValue = "false") boolean forceClean) throws IOException {
this.reIndexService.reindexKind(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean));
this.reIndexService.reindexKind(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean), true);
this.auditLogger.getReindex(singletonList(recordReindexRequest.getKind()));
return new ResponseEntity<>(org.springframework.http.HttpStatus.OK);
}
......
......@@ -35,6 +35,8 @@ public interface IndexSchemaService {
void processSchemaMessages(Map<String, OperationType> schemaMsgs) throws IOException;
void processSchemaUpsert(String kind) throws AppException;
void processSchemaUpsertEvent(RestHighLevelClient restClient, String kind) throws IOException, ElasticsearchStatusException, URISyntaxException;
void syncIndexMappingWithStorageSchema(String kind) throws ElasticsearchException, IOException, AppException, URISyntaxException;
......
......@@ -106,6 +106,15 @@ public class IndexSchemaServiceImpl implements IndexSchemaService {
}
}
@Override
public void processSchemaUpsert(String kind) throws AppException {
try (RestHighLevelClient restClient = this.elasticClientHandler.createRestClient()) {
processSchemaUpsertEvent(restClient, kind);
} catch (IOException | ElasticsearchStatusException | URISyntaxException e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "unable to process schema update", e.getMessage());
}
}
@Override
public void processSchemaUpsertEvent(RestHighLevelClient restClient, String kind) throws IOException, ElasticsearchStatusException, URISyntaxException {
String index = this.elasticIndexNameResolver.getIndexNameFromKind(kind);
......
......@@ -22,7 +22,7 @@ import java.util.List;
public interface ReindexService {
String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean);
String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean, boolean updateSchemaMapping);
Records reindexRecords(List<String> recordIds);
......
......@@ -52,7 +52,7 @@ public class ReindexServiceImpl implements ReindexService {
@SneakyThrows
@Override
public String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean) {
public String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean, boolean updateSchemaMapping) {
Long initialDelayMillis = 0l;
......@@ -62,6 +62,10 @@ public class ReindexServiceImpl implements ReindexService {
this.indexSchemaService.syncIndexMappingWithStorageSchema(recordReindexRequest.getKind());
initialDelayMillis = 30000l;
}
else if(updateSchemaMapping){
this.indexSchemaService.processSchemaUpsert(recordReindexRequest.getKind());
initialDelayMillis = 15000l;
}
RecordQueryResponse recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
......@@ -113,7 +117,7 @@ public class ReindexServiceImpl implements ReindexService {
}
for (String kind : allKinds) {
try {
reindexKind(new RecordReindexRequest(kind, ""), forceClean);
reindexKind(new RecordReindexRequest(kind, ""), forceClean, true);
} catch (Exception e) {
jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind));
continue;
......
......@@ -64,7 +64,7 @@ public class ReindexApiTest {
@Test
public void should_return200_when_valid_kind_provided() throws IOException {
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenReturn("something");
when(this.reIndexService.reindexKind(recordReindexRequest, false, false)).thenReturn("something");
ResponseEntity<?> response = sut.reindex(recordReindexRequest, false);
......@@ -73,14 +73,14 @@ public class ReindexApiTest {
@Test(expected = AppException.class)
public void should_throwAppException_ifUnknownExceptionCaught_reindexTest() throws IOException {
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new AppException(500, "", ""));
when(this.reIndexService.reindexKind(recordReindexRequest, false, true)).thenThrow(new AppException(500, "", ""));
sut.reindex(recordReindexRequest, false);
}
@Test(expected = NullPointerException.class)
public void should_throwAppException_ifNullPointerExceptionCaught_ReindexTest() throws IOException {
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new NullPointerException(""));
when(this.reIndexService.reindexKind(recordReindexRequest, false, true)).thenThrow(new NullPointerException(""));
sut.reindex(recordReindexRequest, false);
}
......
......@@ -104,7 +104,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -118,7 +118,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -138,7 +138,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -154,7 +154,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
......
......@@ -94,7 +94,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -108,7 +108,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -128,7 +128,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -144,7 +144,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
......
......@@ -88,7 +88,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -102,7 +102,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexKind(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -122,7 +122,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -138,7 +138,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment