Skip to content
Snippets Groups Projects
Commit e726d00e authored by Neelesh Thakur's avatar Neelesh Thakur
Browse files

Merge branch 'retry-upsert' into 'master'

retry upsert when bulk response is 400 and matches error type

See merge request !541
parents 42c39ab8 4b1bbb87
No related branches found
No related tags found
1 merge request!541retry upsert when bulk response is 400 and matches error type
Pipeline #186453 failed
/*
* Copyright © Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.model;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class BulkRequestResult {
private List<String> failureRecordIds;
private List<String> retryUpsertRecordIds;
}
\ No newline at end of file
......@@ -17,6 +17,9 @@ package org.opengroup.osdu.indexer.service;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.Collections;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
......@@ -38,11 +41,13 @@ import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.RequestStatus;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.model.indexer.RecordIndexerPayload.Record;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.model.BulkRequestResult;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
......@@ -68,6 +73,8 @@ public class IndexerServiceImpl implements IndexerService {
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.FORBIDDEN));
private static final String MAPPER_PARSING_EXCEPTION_TYPE = "type=mapper_parsing_exception";
private final Gson gson = new GsonBuilder().serializeNulls().create();
// we index a normalized kind (authority + source + entity type + major version) as a tags attribute for all records
......@@ -363,7 +370,29 @@ public class IndexerServiceImpl implements IndexerService {
this.cacheOrCreateElasticMapping(schemas, restClient);
// process the records
return this.upsertRecords(recordIndexerPayload.getRecords(), restClient);
List<RecordIndexerPayload.Record> records = recordIndexerPayload.getRecords();
BulkRequestResult bulkRequestResult = this.upsertRecords(records, restClient);
List<String> failedRecordIds = bulkRequestResult.getFailureRecordIds();
processRetryUpsertRecords(restClient, records, bulkRequestResult, failedRecordIds);
return failedRecordIds;
}
}
private void processRetryUpsertRecords(RestHighLevelClient restClient, List<Record> records,
BulkRequestResult bulkRequestResult, List<String> failedRecordIds) {
List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds();
if (!retryUpsertRecordIds.isEmpty()) {
List<Record> retryUpsertRecords = records.stream()
.filter(record -> retryUpsertRecordIds.contains(record.getId()))
.collect(Collectors.toList());
retryUpsertRecords.forEach(record -> {
record.setData(Collections.emptyMap());
record.setTags(Collections.emptyMap());
});
bulkRequestResult = upsertRecords(retryUpsertRecords, restClient);
failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds());
}
}
......@@ -386,8 +415,8 @@ public class IndexerServiceImpl implements IndexerService {
}
}
private List<String> upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException {
if (records == null || records.isEmpty()) return new LinkedList<>();
private BulkRequestResult upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException {
if (records == null || records.isEmpty()) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList());
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(BULK_REQUEST_TIMEOUT);
......@@ -423,14 +452,15 @@ public class IndexerServiceImpl implements IndexerService {
}
try (RestHighLevelClient restClient = this.elasticClientHandler.createRestClient()) {
return processBulkRequest(restClient, bulkRequest);
return processBulkRequest(restClient, bulkRequest).getFailureRecordIds();
}
}
private List<String> processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException {
private BulkRequestResult processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException {
if (bulkRequest.numberOfActions() == 0) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList());
List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) return failureRecordIds;
List<String> retryUpsertRecordIds = new LinkedList<>();
int failedRequestStatus = 500;
Exception failedRequestCause = null;
......@@ -449,7 +479,10 @@ public class IndexerServiceImpl implements IndexerService {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (canIndexerRetry(bulkItemResponse)) {
if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause() != null && failure.getCause().getMessage().contains(MAPPER_PARSING_EXCEPTION_TYPE)) {
retryUpsertRecordIds.add(bulkItemResponse.getId());
} else if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
......@@ -481,7 +514,7 @@ public class IndexerServiceImpl implements IndexerService {
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e);
}
return failureRecordIds;
return new BulkRequestResult(failureRecordIds, retryUpsertRecordIds);
}
private Map<String, Object> getSourceMap(RecordIndexerPayload.Record record) {
......
......@@ -6,6 +6,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -81,12 +82,14 @@ public class IndexerServiceImplTest {
private List<RecordInfo> recordInfos = new ArrayList<>();
private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," +
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}, {\"id\":\"opendes:doc:test3\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
private final String kind1 = "opendes:testindexer1:well:1.0.0";
private final String kind2 = "opendes:testindexer2:well:1.0.0";
private final String recordId1 = "opendes:doc:test1";
private final String recordId2 = "opendes:doc:test2";
private final String recordId3 = "opendes:doc:test3";
private final String failureMassage = "test failure";
private final String badRequestMessage = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]";
private DpsHeaders dpsHeaders;
private RecordChangedMessages recordChangedMessages;
......@@ -140,6 +143,7 @@ public class IndexerServiceImplTest {
storageData.put("schema1", "test-value");
List<Records.Entity> validRecords = new ArrayList<>();
validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build());
validRecords.add(Records.Entity.builder().id(recordId3).kind(kind2).data(storageData).build());
List<ConversionStatus> conversionStatus = new LinkedList<>();
Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build();
when(this.storageService.getStorageRecords(any(), any())).thenReturn(storageRecords);
......@@ -155,17 +159,18 @@ public class IndexerServiceImplTest {
indexerMappedPayload.put("id", "keyword");
when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload);
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()};
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse(), prepare400Response()};
when(this.bulkResponse.getItems()).thenReturn(responses);
// test
JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos);
// validate
assertEquals(2, jobStatus.getStatusesList().size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(3, jobStatus.getStatusesList().size());
assertEquals(2, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size());
verify(restHighLevelClient, times(2)).bulk(any(), any());
verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)"));
verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)"));
} catch (Exception e) {
......@@ -182,6 +187,14 @@ public class IndexerServiceImplTest {
return responseFail;
}
private BulkItemResponse prepare400Response() {
BulkItemResponse responseFail = mock(BulkItemResponse.class);
when(responseFail.isFailed()).thenReturn(true);
when(responseFail.getId()).thenReturn(recordId3);
when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception(badRequestMessage), RestStatus.BAD_REQUEST));
return responseFail;
}
private BulkItemResponse prepareSuccessfulResponse() {
BulkItemResponse responseSuccess = mock(BulkItemResponse.class);
when(responseSuccess.getId()).thenReturn(recordId2);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment