Skip to content
Snippets Groups Projects
Commit 76f53d23 authored by Igor Filippov (EPAM)'s avatar Igor Filippov (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

GONRG-856: Implement index cleanup API support

parent d28f21b0
No related branches found
No related tags found
2 merge requests!46GCP fix swagger (GONRG-1022),!41GCP implement index cleanup (GONRG-856)
package org.opengroup.osdu.indexer.api;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.extern.java.Log;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.RestHighLevelClient;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.search.SearchServiceRole;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.core.common.search.IndicesService;
import org.opengroup.osdu.indexer.SwaggerDoc;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
@Log
@RestController
@RequestScope
public class CleanupIndiciesApi {
@Autowired
private ElasticClientHandler elasticClientHandler;
@Autowired
private ElasticIndexNameResolver elasticIndexNameResolver;
@Autowired
private IndicesService indicesService;
@PostMapping(path = "/index-cleanup", consumes = "application/json")
@PreAuthorize("@authorizationFilter.hasRole('" + SearchServiceRole.ADMIN + "')")
public ResponseEntity cleanupIndices(@NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY)
@Valid @RequestBody RecordChangedMessages recordChangedMessages) throws Exception {
if (recordChangedMessages.missingAccountId()) {
throw new AppException(org.apache.http.HttpStatus.SC_BAD_REQUEST, "Invalid tenant",
String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID));
}
try {
if (recordChangedMessages == null) {
log.info("record change messages is null");
}
Type listType = new TypeToken<List<RecordInfo>>() {
}.getType();
List<RecordInfo> recordInfos = new Gson().fromJson(recordChangedMessages.getData(), listType);
if (recordInfos.size() == 0) {
log.info("none of record-change message can be deserialized");
return new ResponseEntity(HttpStatus.OK);
}
processSchemaMessages(recordChangedMessages, recordInfos);
return new ResponseEntity(HttpStatus.OK);
} catch (AppException e) {
throw e;
} catch (JsonParseException e) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", "Unable to parse request payload.", e);
} catch (Exception e) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Unknown error", "An unknown error has occurred.", e);
}
}
private void processSchemaMessages(RecordChangedMessages message, List<RecordInfo> recordInfos) throws Exception {
Map<String, OperationType> schemaMsgs = RecordInfo.getSchemaMsgs(recordInfos);
if (schemaMsgs != null && !schemaMsgs.isEmpty()) {
try (RestHighLevelClient restClient = elasticClientHandler.createRestClient()) {
schemaMsgs.entrySet().forEach(msg -> {
try {
processSchemaEvents(restClient, msg);
} catch (IOException | ElasticsearchStatusException e) {
throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "unable to process schema delete", e.getMessage());
}
});
}
}
}
private void processSchemaEvents(RestHighLevelClient restClient,
Map.Entry<String, OperationType> msg) throws IOException, ElasticsearchStatusException {
String kind = msg.getKey();
String index = elasticIndexNameResolver.getIndexNameFromKind(kind);
boolean indexExist = indicesService.isIndexExist(restClient, index);
if (msg.getValue() == OperationType.purge_schema) {
if (indexExist) {
indicesService.deleteIndex(restClient, index);
} else {
log.warning(String.format("Kind: %s not found", kind));
}
}
}
}
package org.opengroup.osdu.indexer.api;
import static org.junit.Assert.fail;
import static org.mockito.MockitoAnnotations.initMocks;
import com.google.gson.Gson;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.http.HeadersUtil;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.search.Config;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.core.common.search.IndicesService;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@RunWith(PowerMockRunner.class)
@PrepareForTest({HeadersUtil.class, IndexerQueueTaskBuilder.class, DpsHeaders.class, Config.class})
public class CleanupIndiciesApiTest {
private final String messageValid = "{\"data\":\"[{\\\"id\\\":\\\"opendes:welldb:wellbore-d9033ae1-fb15-496c-9ba0-880fd1d2b2cf\\\",\\\"kind\\\":\\\"tenant1:welldb:wellbore:1.0.0\\\",\\\"op\\\":\\\"purge_schema\\\"}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
private final String messageEmpty = "{}";
private final String messageWithEmptyData = "{\"data\":\"[]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
private final String messageWithIncorrectJsonFormat = "{\"data\":\"[{}}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
private final String ACCOUNT_ID = "any-account";
private final String DATA_PARTITION_ID = "opendes";
@InjectMocks
private CleanupIndiciesApi sut;
@Mock
private JaxRsDpsLog log;
@Mock
private ElasticClientHandler elasticClientHandler;
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
@Mock
private IndicesService indicesService;
@Before
public void setup() {
initMocks(this);
}
@Test
public void should_return200_given_validMessage_indexCleanupTest() throws Exception {
should_return200_indexerWorkerTest(messageValid);
}
@Test
public void should_return200_given_emptyData_indexCleanupTest() throws Exception {
should_return200_indexerWorkerTest(messageWithEmptyData);
}
@Test
public void should_return400_given_emptyMessage_indexCleanupTest() {
should_return400_indexerWorkerTest(messageEmpty, String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID));
}
@Test
public void should_return400_given_incorrectJsonFormatMessage_indexWorkerTest() {
should_return400_indexerWorkerTest(messageWithIncorrectJsonFormat, "Unable to parse request payload.");
}
private void should_return200_indexerWorkerTest(String message) throws Exception {
ResponseEntity response = this.sut.cleanupIndices(createRecordChangedMessage(message));
Assert.assertEquals(HttpStatus.OK.value(), response.getStatusCodeValue());
}
private void should_return400_indexerWorkerTest(String message, String errorMessage) {
try {
this.sut.cleanupIndices(createRecordChangedMessage(message));
fail("Should throw exception");
} catch (AppException e) {
Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), e.getError().getCode());
Assert.assertEquals(errorMessage, e.getError().getMessage());
} catch (Exception e) {
fail("Should not throw this exception" + e.getMessage());
}
}
private RecordChangedMessages createRecordChangedMessage(String message) {
return (new Gson()).fromJson(message, RecordChangedMessages.class);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment