From 1befc03f97ea37f8075857d9d25dd5bdb22cba20 Mon Sep 17 00:00:00 2001 From: NThakur4 <nthakur4@slb.com> Date: Wed, 2 Mar 2022 23:29:59 -0600 Subject: [PATCH] only retry on index create failures --- .../indexer/service/IndexerServiceImpl.java | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index c184ad555..b0a565847 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService { private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1); - private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND)); + private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE)); private final Gson gson = new GsonBuilder().serializeNulls().create(); @@ -446,13 +447,15 @@ public class IndexerServiceImpl implements IndexerService { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); - if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) { + if (canIndexerRetry(bulkItemResponse)) { failureRecordIds.add(bulkItemResponse.getId()); + + if (failedRequestCause == null) { + failedRequestCause = failure.getCause(); + failedRequestStatus = failure.getStatus().getStatus(); + } } - if (failedRequestCause == null) { - failedRequestCause = failure.getCause(); - failedRequestStatus = failure.getStatus().getStatus(); - } + failedResponses++; } else { succeededResponses++; @@ -464,7 +467,7 @@ public class IndexerServiceImpl implements IndexerService { jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses)); // retry entire message if all records are failing - if (bulkRequest.numberOfActions() == failedResponses) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause); + if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause); } catch (IOException e) { // throw explicit 504 for IOException throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); @@ -518,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService { return indexerPayload; } + private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) { + if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true; + + if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) + && bulkItemResponse.status() == RestStatus.NOT_FOUND) { + return true; + } + + return false; + } + private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException { jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); -- GitLab