diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java new file mode 100644 index 0000000000000000000000000000000000000000..347caf52d5057b0dd8e3b67c1e517f9bbe1ccab9 --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApi.java @@ -0,0 +1,81 @@ +// Copyright 2017-2019, Schlumberger +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.indexer.api; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.JsonParseException; +import java.lang.reflect.Type; +import java.util.List; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.extern.java.Log; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.model.search.SearchServiceRole; +import org.opengroup.osdu.indexer.SwaggerDoc; +import org.opengroup.osdu.indexer.service.IndexerService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.annotation.RequestScope; + +@Log +@RestController +@RequestScope +public class CleanupIndiciesApi { + + @Autowired + private IndexerService indexerService; + + @PostMapping(path = "/index-cleanup", consumes = "application/json") + @PreAuthorize("@authorizationFilter.hasPermission('" + SearchServiceRole.ADMIN + "')") + public ResponseEntity cleanupIndices(@NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY) + @Valid @RequestBody RecordChangedMessages message) { + if (message == null) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request body is null", + SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY); + } + + if (message.missingAccountId()) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Invalid tenant", + String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); + } + try { + Type listType = new TypeToken<List<RecordInfo>>() { + }.getType(); + List<RecordInfo> recordInfos = new Gson().fromJson(message.getData(), listType); + + if (recordInfos.isEmpty()) { + log.info("none of record-change message can be deserialized"); + return new ResponseEntity(HttpStatus.OK); + } + indexerService.processSchemaMessages(recordInfos); + return new ResponseEntity(HttpStatus.OK); + } catch (AppException e) { + throw e; + } catch (JsonParseException e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", "Unable to parse request payload.", e); + } catch (Exception e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Unknown error", "An unknown error has occurred.", e); + } + } +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerService.java index 4ae323c7656efb92132a5c64c69564f867ed183c..8feadb3d332414480d82f9082f621ccb86a17521 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerService.java @@ -14,6 +14,7 @@ package org.opengroup.osdu.indexer.service; +import java.io.IOException; import java.util.List; import org.opengroup.osdu.core.common.model.indexer.JobStatus; @@ -24,4 +25,6 @@ public interface IndexerService { JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception; + void processSchemaMessages(List<RecordInfo> recordInfos) throws IOException; + } \ No newline at end of file diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 0fa06829611148c5ed9525dc71a1c4649f995d43..36f91af2405098e188a4113b009e165dfd4500be 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -153,6 +153,33 @@ public class IndexerServiceImpl implements IndexerService { return jobStatus; } + @Override + public void processSchemaMessages(List<RecordInfo> recordInfos) throws IOException { + Map<String, OperationType> schemaMsgs = RecordInfo.getSchemaMsgs(recordInfos); + if (schemaMsgs != null && !schemaMsgs.isEmpty()) { + try (RestHighLevelClient restClient = elasticClientHandler.createRestClient()) { + schemaMsgs.entrySet().forEach(msg -> { + try { + processSchemaEvents(restClient, msg); + } catch (IOException | ElasticsearchStatusException e) { + throw new AppException(org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR.value(), "unable to process schema delete", e.getMessage()); + } + }); + } + } + } + + private void processSchemaEvents(RestHighLevelClient restClient, + Map.Entry<String, OperationType> msg) throws IOException, ElasticsearchStatusException { + String kind = msg.getKey(); + String index = elasticIndexNameResolver.getIndexNameFromKind(kind); + + boolean indexExist = indicesService.isIndexExist(restClient, index); + if (indexExist && msg.getValue() == OperationType.purge_schema) { + indicesService.deleteIndex(restClient, index); + } + } + private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap) throws Exception { // get schema for kind Map<String, IndexSchema> schemas = this.getSchema(upsertRecordMap); diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6a9f2ead5cf965f9e05f0e1d1782d97843ff7deb --- /dev/null +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/CleanupIndiciesApiTest.java @@ -0,0 +1,84 @@ +package org.opengroup.osdu.indexer.api; + +import static org.junit.Assert.fail; +import static org.mockito.MockitoAnnotations.initMocks; +import com.google.gson.Gson; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.opengroup.osdu.core.common.http.HeadersUtil; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.search.Config; +import org.opengroup.osdu.indexer.service.IndexerService; +import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({HeadersUtil.class, IndexerQueueTaskBuilder.class, DpsHeaders.class, Config.class}) +public class CleanupIndiciesApiTest { + + private final String messageValid = "{\"data\":\"[{\\\"id\\\":\\\"opendes:welldb:wellbore-d9033ae1-fb15-496c-9ba0-880fd1d2b2cf\\\",\\\"kind\\\":\\\"tenant1:welldb:wellbore:1.0.0\\\",\\\"op\\\":\\\"purge_schema\\\"}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + private final String messageEmpty = "{}"; + private final String messageWithEmptyData = "{\"data\":\"[]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + private final String messageWithIncorrectJsonFormat = "{\"data\":\"[{}}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}"; + + @InjectMocks + private CleanupIndiciesApi sut; + + @Mock + private IndexerService indexerService; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void should_return200_given_validMessage_indexCleanupTest() { + should_return200_indexerWorkerTest(messageValid); + } + + @Test + public void should_return200_given_emptyData_indexCleanupTest() { + should_return200_indexerWorkerTest(messageWithEmptyData); + } + + @Test + public void should_return400_given_emptyMessage_indexCleanupTest() { + should_return400_indexerWorkerTest(messageEmpty, String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); + } + + @Test + public void should_return400_given_incorrectJsonFormatMessage_indexWorkerTest() { + should_return400_indexerWorkerTest(messageWithIncorrectJsonFormat, "Unable to parse request payload."); + } + + private void should_return200_indexerWorkerTest(String message) { + ResponseEntity response = this.sut.cleanupIndices(createRecordChangedMessage(message)); + Assert.assertEquals(HttpStatus.OK.value(), response.getStatusCodeValue()); + } + + private void should_return400_indexerWorkerTest(String message, String errorMessage) { + try { + this.sut.cleanupIndices(createRecordChangedMessage(message)); + fail("Should throw exception"); + } catch (AppException e) { + Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), e.getError().getCode()); + Assert.assertEquals(errorMessage, e.getError().getMessage()); + } catch (Exception e) { + fail("Should not throw this exception" + e.getMessage()); + } + } + + private RecordChangedMessages createRecordChangedMessage(String message) { + return (new Gson()).fromJson(message, RecordChangedMessages.class); + } +} diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c3c3d3a7a09e86a35c96fe224397aabf60ccfcf1 --- /dev/null +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java @@ -0,0 +1,56 @@ +package org.opengroup.osdu.indexer.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; +import java.util.ArrayList; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; +import org.opengroup.osdu.core.common.search.IndicesService; +import org.opengroup.osdu.indexer.util.ElasticClientHandler; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +public class IndexerServiceImplTest { + + @InjectMocks + private IndexerServiceImpl indexerService; + + @Mock + private ElasticClientHandler elasticClientHandler; + + @Mock + private ElasticIndexNameResolver elasticIndexNameResolver; + + @Mock + private IndicesService indicesService; + + private List<RecordInfo> recordInfos = new ArrayList<>(); + + @Before + public void setup() { + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); + recordInfo.setKind("opendes:ds:mytest2:1.0.0"); + recordInfo.setOp("purge_schema"); + recordInfos.add(recordInfo); + + initMocks(this); + } + + @Test + public void processSchemaMessagesTest() throws Exception { + indexerService.processSchemaMessages(recordInfos); + + verify(elasticClientHandler, times(1)).createRestClient(); + verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); + verify(indicesService, times(1)).isIndexExist(any(), any()); + } +} diff --git a/provider/indexer-gcp/src/main/appengine/app.yaml b/provider/indexer-gcp/src/main/appengine/app.yaml index 6d89c2e56b566564d1d4e58b3fd4b7a02ceeceab..363b4097e3a611b59391832910b92ca518800872 100644 --- a/provider/indexer-gcp/src/main/appengine/app.yaml +++ b/provider/indexer-gcp/src/main/appengine/app.yaml @@ -40,4 +40,3 @@ env_variables: REGION: "REGION_VAR" SPRING_PROFILES_ACTIVE: 'ENVIRONMENT' SECURITY_HTTPS_CERTIFICATE_TRUST: 'SECURITY_HTTPS_CERTIFICATE_TRUST_VAR' -