Skip to content
Snippets Groups Projects
Commit b7d00e73 authored by Mykyta Savchuk's avatar Mykyta Savchuk
Browse files

retry upsert when bulk response is 400 and matches message

parent 6ad44b4b
No related branches found
No related tags found
1 merge request!541retry upsert when bulk response is 400 and matches error type
......@@ -17,6 +17,9 @@ package org.opengroup.osdu.indexer.service;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.Collections;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
......@@ -68,6 +71,8 @@ public class IndexerServiceImpl implements IndexerService {
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.FORBIDDEN));
private static final String FAILED_TO_PARSE_EXCEPTION_MESSAGE = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]";
private final Gson gson = new GsonBuilder().serializeNulls().create();
// we index a normalized kind (authority + source + entity type + major version) as a tags attribute for all records
......@@ -363,7 +368,24 @@ public class IndexerServiceImpl implements IndexerService {
this.cacheOrCreateElasticMapping(schemas, restClient);
// process the records
return this.upsertRecords(recordIndexerPayload.getRecords(), restClient);
List<RecordIndexerPayload.Record> records = recordIndexerPayload.getRecords();
BulkRequestResult bulkRequestResult = this.upsertRecords(records, restClient);
List<String> failedRecordIds = bulkRequestResult.getFailureRecordIds();
List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds();
if (!retryUpsertRecordIds.isEmpty()) {
List<RecordIndexerPayload.Record> retryUpsertRecords = records.stream()
.filter(record -> retryUpsertRecordIds.contains(record.getId()))
.collect(Collectors.toList());
retryUpsertRecords.forEach(record -> {
record.setData(null);
record.setTags(null);
});
bulkRequestResult = upsertRecords(retryUpsertRecords, restClient);
failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds());
}
return failedRecordIds;
}
}
......@@ -386,8 +408,8 @@ public class IndexerServiceImpl implements IndexerService {
}
}
private List<String> upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException {
if (records == null || records.isEmpty()) return new LinkedList<>();
private BulkRequestResult upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException {
if (records == null || records.isEmpty()) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList());
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(BULK_REQUEST_TIMEOUT);
......@@ -423,14 +445,15 @@ public class IndexerServiceImpl implements IndexerService {
}
try (RestHighLevelClient restClient = this.elasticClientHandler.createRestClient()) {
return processBulkRequest(restClient, bulkRequest);
return processBulkRequest(restClient, bulkRequest).getFailureRecordIds();
}
}
private List<String> processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException {
private BulkRequestResult processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException {
if (bulkRequest.numberOfActions() == 0) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList());
List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) return failureRecordIds;
List<String> retryUpsertRecordIds = new LinkedList<>();
int failedRequestStatus = 500;
Exception failedRequestCause = null;
......@@ -449,7 +472,10 @@ public class IndexerServiceImpl implements IndexerService {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (canIndexerRetry(bulkItemResponse)) {
if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause().getMessage().equals(FAILED_TO_PARSE_EXCEPTION_MESSAGE)) {
retryUpsertRecordIds.add(bulkItemResponse.getId());
} else if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
......@@ -481,7 +507,7 @@ public class IndexerServiceImpl implements IndexerService {
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e);
}
return failureRecordIds;
return new BulkRequestResult(failureRecordIds, retryUpsertRecordIds);
}
private Map<String, Object> getSourceMap(RecordIndexerPayload.Record record) {
......@@ -575,4 +601,11 @@ public class IndexerServiceImpl implements IndexerService {
failedEvent.accept(failedRecords.stream().map(RecordStatus::failedAuditLogMessage).collect(Collectors.toList()));
}
}
@Data
@AllArgsConstructor
private static class BulkRequestResult {
private List<String> failureRecordIds;
private List<String> retryUpsertRecordIds;
}
}
......@@ -6,6 +6,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -81,12 +82,14 @@ public class IndexerServiceImplTest {
private List<RecordInfo> recordInfos = new ArrayList<>();
private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," +
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}, {\"id\":\"opendes:doc:test3\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
private final String kind1 = "opendes:testindexer1:well:1.0.0";
private final String kind2 = "opendes:testindexer2:well:1.0.0";
private final String recordId1 = "opendes:doc:test1";
private final String recordId2 = "opendes:doc:test2";
private final String recordId3 = "opendes:doc:test3";
private final String failureMassage = "test failure";
private final String badRequestMessage = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]";
private DpsHeaders dpsHeaders;
private RecordChangedMessages recordChangedMessages;
......@@ -140,6 +143,7 @@ public class IndexerServiceImplTest {
storageData.put("schema1", "test-value");
List<Records.Entity> validRecords = new ArrayList<>();
validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build());
validRecords.add(Records.Entity.builder().id(recordId3).kind(kind2).data(storageData).build());
List<ConversionStatus> conversionStatus = new LinkedList<>();
Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build();
when(this.storageService.getStorageRecords(any(), any())).thenReturn(storageRecords);
......@@ -155,17 +159,18 @@ public class IndexerServiceImplTest {
indexerMappedPayload.put("id", "keyword");
when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload);
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()};
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse(), prepare400Response()};
when(this.bulkResponse.getItems()).thenReturn(responses);
// test
JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos);
// validate
assertEquals(2, jobStatus.getStatusesList().size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(3, jobStatus.getStatusesList().size());
assertEquals(2, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size());
verify(restHighLevelClient, times(2)).bulk(any(), any());
verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)"));
verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)"));
} catch (Exception e) {
......@@ -182,6 +187,14 @@ public class IndexerServiceImplTest {
return responseFail;
}
private BulkItemResponse prepare400Response() {
BulkItemResponse responseFail = mock(BulkItemResponse.class);
when(responseFail.isFailed()).thenReturn(true);
when(responseFail.getId()).thenReturn(recordId3);
when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception(badRequestMessage), RestStatus.BAD_REQUEST));
return responseFail;
}
private BulkItemResponse prepareSuccessfulResponse() {
BulkItemResponse responseSuccess = mock(BulkItemResponse.class);
when(responseSuccess.getId()).thenReturn(recordId2);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment