Skip to content
Snippets Groups Projects
Commit 5195740a authored by Rostislav Dublin (EPAM)'s avatar Rostislav Dublin (EPAM)
Browse files

Merge branch 'GCP-implement-index-cleanup' into 'master'

GCP implement index cleanup (GONRG-856)

See merge request !41
parents 8c724ba2 cdbca3d8
No related branches found
No related tags found
1 merge request!41GCP implement index cleanup (GONRG-856)
Pipeline #12717 passed
// Copyright 2017-2019, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.api;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import java.lang.reflect.Type;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.extern.java.Log;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.search.SearchServiceRole;
import org.opengroup.osdu.indexer.SwaggerDoc;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
@Log
@RestController
@RequestScope
public class CleanupIndiciesApi {
@Autowired
private IndexerService indexerService;
@PostMapping(path = "/index-cleanup", consumes = "application/json")
@PreAuthorize("@authorizationFilter.hasPermission('" + SearchServiceRole.ADMIN + "')")
public ResponseEntity cleanupIndices(@NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY)
@Valid @RequestBody RecordChangedMessages message) {
if (message == null) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request body is null",
SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY);
}
if (message.missingAccountId()) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Invalid tenant",
String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID));
}
try {
Type listType = new TypeToken<List<RecordInfo>>() {
}.getType();
List<RecordInfo> recordInfos = new Gson().fromJson(message.getData(), listType);
if (recordInfos.isEmpty()) {
log.info("none of record-change message can be deserialized");
return new ResponseEntity(HttpStatus.OK);
}
indexerService.processSchemaMessages(recordInfos);
return new ResponseEntity(HttpStatus.OK);
} catch (AppException e) {
throw e;
} catch (JsonParseException e) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", "Unable to parse request payload.", e);
} catch (Exception e) {
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Unknown error", "An unknown error has occurred.", e);
}
}
}
......@@ -14,6 +14,7 @@
package org.opengroup.osdu.indexer.service;
import java.io.IOException;
import java.util.List;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
......@@ -24,4 +25,6 @@ public interface IndexerService {
JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception;
void processSchemaMessages(List<RecordInfo> recordInfos) throws IOException;
}
\ No newline at end of file
......@@ -153,6 +153,33 @@ public class IndexerServiceImpl implements IndexerService {
return jobStatus;
}
@Override
public void processSchemaMessages(List<RecordInfo> recordInfos) throws IOException {
Map<String, OperationType> schemaMsgs = RecordInfo.getSchemaMsgs(recordInfos);
if (schemaMsgs != null && !schemaMsgs.isEmpty()) {
try (RestHighLevelClient restClient = elasticClientHandler.createRestClient()) {
schemaMsgs.entrySet().forEach(msg -> {
try {
processSchemaEvents(restClient, msg);
} catch (IOException | ElasticsearchStatusException e) {
throw new AppException(org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR.value(), "unable to process schema delete", e.getMessage());
}
});
}
}
}
private void processSchemaEvents(RestHighLevelClient restClient,
Map.Entry<String, OperationType> msg) throws IOException, ElasticsearchStatusException {
String kind = msg.getKey();
String index = elasticIndexNameResolver.getIndexNameFromKind(kind);
boolean indexExist = indicesService.isIndexExist(restClient, index);
if (indexExist && msg.getValue() == OperationType.purge_schema) {
indicesService.deleteIndex(restClient, index);
}
}
private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap) throws Exception {
// get schema for kind
Map<String, IndexSchema> schemas = this.getSchema(upsertRecordMap);
......
package org.opengroup.osdu.indexer.api;
import static org.junit.Assert.fail;
import static org.mockito.MockitoAnnotations.initMocks;
import com.google.gson.Gson;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.http.HeadersUtil;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.search.Config;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@RunWith(PowerMockRunner.class)
@PrepareForTest({HeadersUtil.class, IndexerQueueTaskBuilder.class, DpsHeaders.class, Config.class})
public class CleanupIndiciesApiTest {
private final String messageValid = "{\"data\":\"[{\\\"id\\\":\\\"opendes:welldb:wellbore-d9033ae1-fb15-496c-9ba0-880fd1d2b2cf\\\",\\\"kind\\\":\\\"tenant1:welldb:wellbore:1.0.0\\\",\\\"op\\\":\\\"purge_schema\\\"}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
private final String messageEmpty = "{}";
private final String messageWithEmptyData = "{\"data\":\"[]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
private final String messageWithIncorrectJsonFormat = "{\"data\":\"[{}}]\",\"attributes\":{\"account-id\":\"opendes\",\"correlation-id\":\"b5a281bd-f59d-4db2-9939-b2d85036fc7e\"},\"messageId\":\"75328163778221\",\"publishTime\":\"2018-05-08T21:48:56.131Z\"}";
@InjectMocks
private CleanupIndiciesApi sut;
@Mock
private IndexerService indexerService;
@Before
public void setup() {
initMocks(this);
}
@Test
public void should_return200_given_validMessage_indexCleanupTest() {
should_return200_indexerWorkerTest(messageValid);
}
@Test
public void should_return200_given_emptyData_indexCleanupTest() {
should_return200_indexerWorkerTest(messageWithEmptyData);
}
@Test
public void should_return400_given_emptyMessage_indexCleanupTest() {
should_return400_indexerWorkerTest(messageEmpty, String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID));
}
@Test
public void should_return400_given_incorrectJsonFormatMessage_indexWorkerTest() {
should_return400_indexerWorkerTest(messageWithIncorrectJsonFormat, "Unable to parse request payload.");
}
private void should_return200_indexerWorkerTest(String message) {
ResponseEntity response = this.sut.cleanupIndices(createRecordChangedMessage(message));
Assert.assertEquals(HttpStatus.OK.value(), response.getStatusCodeValue());
}
private void should_return400_indexerWorkerTest(String message, String errorMessage) {
try {
this.sut.cleanupIndices(createRecordChangedMessage(message));
fail("Should throw exception");
} catch (AppException e) {
Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), e.getError().getCode());
Assert.assertEquals(errorMessage, e.getError().getMessage());
} catch (Exception e) {
fail("Should not throw this exception" + e.getMessage());
}
}
private RecordChangedMessages createRecordChangedMessage(String message) {
return (new Gson()).fromJson(message, RecordChangedMessages.class);
}
}
package org.opengroup.osdu.indexer.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.core.common.search.IndicesService;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
public class IndexerServiceImplTest {
@InjectMocks
private IndexerServiceImpl indexerService;
@Mock
private ElasticClientHandler elasticClientHandler;
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
@Mock
private IndicesService indicesService;
private List<RecordInfo> recordInfos = new ArrayList<>();
@Before
public void setup() {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
recordInfos.add(recordInfo);
initMocks(this);
}
@Test
public void processSchemaMessagesTest() throws Exception {
indexerService.processSchemaMessages(recordInfos);
verify(elasticClientHandler, times(1)).createRestClient();
verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(indicesService, times(1)).isIndexExist(any(), any());
}
}
......@@ -40,4 +40,3 @@ env_variables:
REGION: "REGION_VAR"
SPRING_PROFILES_ACTIVE: 'ENVIRONMENT'
SECURITY_HTTPS_CERTIFICATE_TRUST: 'SECURITY_HTTPS_CERTIFICATE_TRUST_VAR'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment