Skip to content
Snippets Groups Projects
Commit f37cae4a authored by Mykyta Savchuk's avatar Mykyta Savchuk
Browse files

retry upsert when bulk response is 400 and matches message

updates from MR
parent b7d00e73
No related branches found
No related tags found
1 merge request!541retry upsert when bulk response is 400 and matches error type
Pipeline #185926 failed
package org.opengroup.osdu.indexer.model;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class BulkRequestResult {
private List<String> failureRecordIds;
private List<String> retryUpsertRecordIds;
}
\ No newline at end of file
......@@ -41,11 +41,13 @@ import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.RequestStatus;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.model.indexer.RecordIndexerPayload.Record;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.model.BulkRequestResult;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
......@@ -71,7 +73,7 @@ public class IndexerServiceImpl implements IndexerService {
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.FORBIDDEN));
private static final String FAILED_TO_PARSE_EXCEPTION_MESSAGE = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]";
private static final String MAPPER_PARSING_EXCEPTION_TYPE = "type=mapper_parsing_exception";
private final Gson gson = new GsonBuilder().serializeNulls().create();
......@@ -372,23 +374,28 @@ public class IndexerServiceImpl implements IndexerService {
BulkRequestResult bulkRequestResult = this.upsertRecords(records, restClient);
List<String> failedRecordIds = bulkRequestResult.getFailureRecordIds();
List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds();
if (!retryUpsertRecordIds.isEmpty()) {
List<RecordIndexerPayload.Record> retryUpsertRecords = records.stream()
.filter(record -> retryUpsertRecordIds.contains(record.getId()))
.collect(Collectors.toList());
retryUpsertRecords.forEach(record -> {
record.setData(null);
record.setTags(null);
});
bulkRequestResult = upsertRecords(retryUpsertRecords, restClient);
failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds());
}
processRetryUpsertRecords(restClient, records, bulkRequestResult, failedRecordIds);
return failedRecordIds;
}
}
private void processRetryUpsertRecords(RestHighLevelClient restClient, List<Record> records,
BulkRequestResult bulkRequestResult, List<String> failedRecordIds) {
List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds();
if (!retryUpsertRecordIds.isEmpty()) {
List<Record> retryUpsertRecords = records.stream()
.filter(record -> retryUpsertRecordIds.contains(record.getId()))
.collect(Collectors.toList());
retryUpsertRecords.forEach(record -> {
record.setData(Collections.emptyMap());
record.setTags(Collections.emptyMap());
});
bulkRequestResult = upsertRecords(retryUpsertRecords, restClient);
failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds());
}
}
private void cacheOrCreateElasticMapping(List<IndexSchema> schemas, RestHighLevelClient restClient) throws Exception {
for (IndexSchema schema : schemas) {
......@@ -473,7 +480,7 @@ public class IndexerServiceImpl implements IndexerService {
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause().getMessage().equals(FAILED_TO_PARSE_EXCEPTION_MESSAGE)) {
if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause() != null && failure.getCause().getMessage().contains(MAPPER_PARSING_EXCEPTION_TYPE)) {
retryUpsertRecordIds.add(bulkItemResponse.getId());
} else if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
......@@ -601,11 +608,4 @@ public class IndexerServiceImpl implements IndexerService {
failedEvent.accept(failedRecords.stream().map(RecordStatus::failedAuditLogMessage).collect(Collectors.toList()));
}
}
@Data
@AllArgsConstructor
private static class BulkRequestResult {
private List<String> failureRecordIds;
private List<String> retryUpsertRecordIds;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment