Skip to content
Snippets Groups Projects
Commit d565801b authored by Alok Joshi's avatar Alok Joshi
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/indexer-service into schema_exception
parents 9ef8f4b6 2abca06c
No related branches found
No related tags found
2 merge requests!346Merge branch 'aws-integration' into 'master',!293Handle schema processing exception explicitly
Pipeline #97226 failed
......@@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
......@@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService {
private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1);
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND));
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE));
private final Gson gson = new GsonBuilder().serializeNulls().create();
......@@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService {
List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) return failureRecordIds;
int failedRequestStatus = 500;
Exception failedRequestCause = null;
try {
BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
......@@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) {
if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
}
failedResponses++;
} else {
succeededResponses++;
......@@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService {
if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures);
jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses));
// retry entire message if all records are failing
if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause);
} catch (IOException e) {
// throw explicit 504 for IOException
throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e);
} catch (ElasticsearchStatusException e) {
throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e);
} catch (Exception e) {
if (e instanceof AppException) {
throw e;
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e);
}
return failureRecordIds;
......@@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService {
return indexerPayload;
}
private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) {
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true;
if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE)
&& bulkItemResponse.status() == RestStatus.NOT_FOUND) {
return true;
}
return false;
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds));
......
package org.opengroup.osdu.indexer.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.List;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.mockito.Spy;
import org.opengroup.osdu.core.common.http.HeadersUtil;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.entitlements.Acl;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.storage.ConversionStatus;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.springframework.test.context.junit4.SpringRunner;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(SpringRunner.class)
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({RestHighLevelClient.class, BulkResponse.class, Acl.class, HeadersUtil.class})
public class IndexerServiceImplTest {
@InjectMocks
private IndexerServiceImpl indexerService;
@InjectMocks
private IndexerServiceImpl sut;
@Mock
private ElasticClientHandler elasticClientHandler;
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
@Mock
private IndicesService indicesService;
@Mock
private StorageService storageService;
@Mock
private StorageIndexerPayloadMapper storageIndexerPayloadMapper;
@InjectMocks
@Spy
private JobStatus jobStatus = new JobStatus();
@Mock
private AuditLogger auditLogger;
@Mock
private BulkResponse bulkResponse;
@Mock
private IRequestInfo requestInfo;
@Mock
private RestHighLevelClient restHighLevelClient;
@Mock
private IndexSchemaService schemaService;
@Mock
private JaxRsDpsLog jaxRsDpsLog;
@Mock
private IMappingService mappingService;
@Mock
private IPublisher progressPublisher;
private List<RecordInfo> recordInfos = new ArrayList<>();
private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," +
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
private final String kind1 = "opendes:testindexer1:well:1.0.0";
private final String kind2 = "opendes:testindexer2:well:1.0.0";
private final String recordId1 = "opendes:doc:test1";
private final String recordId2 = "opendes:doc:test2";
private final String failureMassage = "test failure";
private DpsHeaders dpsHeaders;
private RecordChangedMessages recordChangedMessages;
@Before
public void setup() throws IOException {
}
@Test
public void processSchemaMessagesTest() throws Exception {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
this.recordInfos.add(recordInfo);
initMocks(this);
this.sut.processSchemaMessages(recordInfos);
verify(this.elasticClientHandler, times(1)).createRestClient();
verify(this.elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(this.indicesService, times(1)).isIndexExist(any(), any());
}
@Test
public void should_properlyUpdateAuditLogs_givenValidCreateAndUpdateRecords() {
try {
mockStatic(Acl.class);
// setup headers
this.dpsHeaders = new DpsHeaders();
this.dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth");
when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders);
when(this.requestInfo.getHeadersMapWithDwdAuthZ()).thenReturn(dpsHeaders.getHeaders());
// setup message
Type listType = new TypeToken<List<RecordInfo>>() {}.getType();
this.recordInfos = (new Gson()).fromJson(this.pubsubMsg, listType);
Map<String, String> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, "opendes");
this.recordChangedMessages = RecordChangedMessages.builder().attributes(messageAttributes).messageId("xxxx").publishTime("2000-01-02T10:10:44+0000").data("{}").build();
// setup schema
Map<String, Object> schema = createSchema();
indexSchemaServiceMock(kind2, schema);
indexSchemaServiceMock(kind1, null);
// setup storage records
Map<String, Object> storageData = new HashMap<>();
storageData.put("schema1", "test-value");
List<Records.Entity> validRecords = new ArrayList<>();
validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build());
List<ConversionStatus> conversionStatus = new LinkedList<>();
Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build();
when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords);
// setup elastic, index and mapped document
when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true);
when(this.mappingService.getIndexMappingFromRecordSchema(any())).thenReturn(new HashMap<>());
when(this.elasticClientHandler.createRestClient()).thenReturn(this.restHighLevelClient);
when(this.restHighLevelClient.bulk(any(), any(RequestOptions.class))).thenReturn(this.bulkResponse);
Map<String, Object> indexerMappedPayload = new HashMap<>();
indexerMappedPayload.put("id", "keyword");
when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload);
@Mock
private ElasticClientHandler elasticClientHandler;
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()};
when(this.bulkResponse.getItems()).thenReturn(responses);
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
// test
JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos);
@Mock
private IndicesService indicesService;
// validate
assertEquals(2, jobStatus.getStatusesList().size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size());
private List<RecordInfo> recordInfos = new ArrayList<>();
verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)"));
verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)"));
} catch (Exception e) {
fail("Should not throw this exception" + e.getMessage());
}
}
@Before
public void setup() {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
recordInfos.add(recordInfo);
private BulkItemResponse prepareFailedResponse() {
BulkItemResponse responseFail = mock(BulkItemResponse.class);
when(responseFail.isFailed()).thenReturn(true);
when(responseFail.getFailureMessage()).thenReturn(failureMassage);
when(responseFail.getId()).thenReturn(recordId1);
when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception("test failure")));
return responseFail;
}
initMocks(this);
}
private BulkItemResponse prepareSuccessfulResponse() {
BulkItemResponse responseSuccess = mock(BulkItemResponse.class);
when(responseSuccess.getId()).thenReturn(recordId2);
return responseSuccess;
}
@Test
public void processSchemaMessagesTest() throws Exception {
indexerService.processSchemaMessages(recordInfos);
private void indexSchemaServiceMock(String kind, Map<String, Object> schema) throws UnsupportedEncodingException, URISyntaxException {
IndexSchema indexSchema = schema == null
? IndexSchema.builder().kind(kind).dataSchema(new HashMap<>()).build()
: IndexSchema.builder().kind(kind).dataSchema(schema).build();
when(schemaService.getIndexerInputSchema(kind, new ArrayList<>())).thenReturn(indexSchema);
}
verify(elasticClientHandler, times(1)).createRestClient();
verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(indicesService, times(1)).isIndexExist(any(), any());
}
private Map<String, Object> createSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("schema1", "keyword");
schema.put("schema2", "boolean");
schema.put("schema3", "date");
schema.put("schema6", "object");
return schema;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment