Skip to content
Snippets Groups Projects
Commit 1befc03f authored by Neelesh Thakur's avatar Neelesh Thakur
Browse files

only retry on index create failures

parent d39b2aa5
No related branches found
No related tags found
2 merge requests!346Merge branch 'aws-integration' into 'master',!292Fix record retry for deleted events on deleted index
......@@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
......@@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService {
private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1);
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND));
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE));
private final Gson gson = new GsonBuilder().serializeNulls().create();
......@@ -446,13 +447,15 @@ public class IndexerServiceImpl implements IndexerService {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) {
if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
}
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
failedResponses++;
} else {
succeededResponses++;
......@@ -464,7 +467,7 @@ public class IndexerServiceImpl implements IndexerService {
jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses));
// retry entire message if all records are failing
if (bulkRequest.numberOfActions() == failedResponses) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause);
if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause);
} catch (IOException e) {
// throw explicit 504 for IOException
throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e);
......@@ -518,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService {
return indexerPayload;
}
private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) {
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true;
if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE)
&& bulkItemResponse.status() == RestStatus.NOT_FOUND) {
return true;
}
return false;
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment