From 76f53d2375a5bf691f72748aba6fce2e5cbab85a Mon Sep 17 00:00:00 2001 From: Igor Filippov <igor_filippov@epam.com> Date: Tue, 13 Oct 2020 06:22:24 +0000 Subject: [PATCH] GONRG-856: Implement index cleanup API support --- .../osdu/indexer/api/CleanupIndiciesApi.java | 110 ++++++++++++++++++ .../indexer/api/CleanupIndiciesApiTest.java | 100 ++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java create mode 100644 indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java new file mode 100644 index 000000000..a3f7c5249 --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java @@ -0,0 +1,110 @@ +package org.opengroup.osdu.indexer.api; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.JsonParseException; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.extern.java.Log; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.client.RestHighLevelClient; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.OperationType; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.model.search.SearchServiceRole; +import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; +import org.opengroup.osdu.core.common.search.IndicesService; +import org.opengroup.osdu.indexer.SwaggerDoc; +import org.opengroup.osdu.indexer.util.ElasticClientHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.annotation.RequestScope; + +@Log +@RestController +@RequestScope +public class CleanupIndiciesApi { + + @Autowired + private ElasticClientHandler elasticClientHandler; + + @Autowired + private ElasticIndexNameResolver elasticIndexNameResolver; + + @Autowired + private IndicesService indicesService; + + @PostMapping(path = "/index-cleanup", consumes = "application/json") + @PreAuthorize("@authorizationFilter.hasRole('" + SearchServiceRole.ADMIN + "')") + public ResponseEntity cleanupIndices(@NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY) + @Valid @RequestBody RecordChangedMessages recordChangedMessages) throws Exception { + + if (recordChangedMessages.missingAccountId()) { + throw new AppException(org.apache.http.HttpStatus.SC_BAD_REQUEST, "Invalid tenant", + String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); + } + try { + if (recordChangedMessages == null) { + log.info("record change messages is null"); + } + + Type listType = new TypeToken<List<RecordInfo>>() { + }.getType(); + List<RecordInfo> recordInfos = new Gson().fromJson(recordChangedMessages.getData(), listType); + + if (recordInfos.size() == 0) { + log.info("none of record-change message can be deserialized"); + return new ResponseEntity(HttpStatus.OK); + } + processSchemaMessages(recordChangedMessages, recordInfos); + return new ResponseEntity(HttpStatus.OK); + } catch (AppException e) { + throw e; + } catch (JsonParseException e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", "Unable to parse request payload.", e); + } catch (Exception e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Unknown error", "An unknown error has occurred.", e); + } + } + + private void processSchemaMessages(RecordChangedMessages message, List<RecordInfo> recordInfos) throws Exception { + Map<String, OperationType> schemaMsgs = RecordInfo.getSchemaMsgs(recordInfos); + if (schemaMsgs != null && !schemaMsgs.isEmpty()) { + try (RestHighLevelClient restClient = elasticClientHandler.createRestClient()) { + schemaMsgs.entrySet().forEach(msg -> { + try { + processSchemaEvents(restClient, msg); + } catch (IOException | ElasticsearchStatusException e) { + throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "unable to process schema delete", e.getMessage()); + } + }); + } + } + } + + private void processSchemaEvents(RestHighLevelClient restClient, + Map.Entry<String, OperationType> msg) throws IOException, ElasticsearchStatusException { + String kind = msg.getKey(); + String index = elasticIndexNameResolver.getIndexNameFromKind(kind); + + boolean indexExist = indicesService.isIndexExist(restClient, index); + if (msg.getValue() == OperationType.purge_schema) { + if (indexExist) { + indicesService.deleteIndex(restClient, index); + } else { + log.warning(String.format("Kind: %s not found", kind)); + } + } + } +} diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java new file mode 100644 index 000000000..4ad1376dd --- /dev/null +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java @@ -0,0 +1,100 @@ +package org.opengroup.osdu.indexer.api; + +import static org.junit.Assert.fail; +import static org.mockito.MockitoAnnotations.initMocks; +import com.google.gson.Gson; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.opengroup.osdu.core.common.http.HeadersUtil; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.search.Config; +import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; +import org.opengroup.osdu.core.common.search.IndicesService; +import org.opengroup.osdu.indexer.service.IndexerService; +import org.opengroup.osdu.indexer.util.ElasticClientHandler; +import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({HeadersUtil.class, IndexerQueueTaskBuilder.class, DpsHeaders.class, Config.class}) +public class CleanupIndiciesApiTest { + + private final String messageValid = "{\"data\":\"[{\\\"id\\\":\\\"opendes:welldb:wellbore-d9033ae1-fb15-496c-9ba0-880fd1d2b2cf\\\",\\\"kind\\\":\\\"tenant1:welldb:wellbore:1.0.0\\\",\\\"op\\\":\\\"purge_schema\\\"}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + private final String messageEmpty = "{}"; + private final String messageWithEmptyData = "{\"data\":\"[]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + private final String messageWithIncorrectJsonFormat = "{\"data\":\"[{}}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + + private final String ACCOUNT_ID = "any-account"; + private final String DATA_PARTITION_ID = "opendes"; + + @InjectMocks + private CleanupIndiciesApi sut; + + @Mock + private JaxRsDpsLog log; + + @Mock + private ElasticClientHandler elasticClientHandler; + + @Mock + private ElasticIndexNameResolver elasticIndexNameResolver; + + @Mock + private IndicesService indicesService; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void should_return200_given_validMessage_indexCleanupTest() throws Exception { + should_return200_indexerWorkerTest(messageValid); + } + + @Test + public void should_return200_given_emptyData_indexCleanupTest() throws Exception { + should_return200_indexerWorkerTest(messageWithEmptyData); + } + + @Test + public void should_return400_given_emptyMessage_indexCleanupTest() { + should_return400_indexerWorkerTest(messageEmpty, String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); + } + + @Test + public void should_return400_given_incorrectJsonFormatMessage_indexWorkerTest() { + should_return400_indexerWorkerTest(messageWithIncorrectJsonFormat, "Unable to parse request payload."); + } + + private void should_return200_indexerWorkerTest(String message) throws Exception { + ResponseEntity response = this.sut.cleanupIndices(createRecordChangedMessage(message)); + Assert.assertEquals(HttpStatus.OK.value(), response.getStatusCodeValue()); + } + + private void should_return400_indexerWorkerTest(String message, String errorMessage) { + try { + this.sut.cleanupIndices(createRecordChangedMessage(message)); + fail("Should throw exception"); + } catch (AppException e) { + Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), e.getError().getCode()); + Assert.assertEquals(errorMessage, e.getError().getMessage()); + } catch (Exception e) { + fail("Should not throw this exception" + e.getMessage()); + } + } + + private RecordChangedMessages createRecordChangedMessage(String message) { + return (new Gson()).fromJson(message, RecordChangedMessages.class); + } +} -- GitLab