diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 4c877b1df7555fbd43f9985867d57ee88b631429..c184ad555f3f8ad024c0a6bffcbf65ad35f54922 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -431,7 +431,7 @@ public class IndexerServiceImpl implements IndexerService { List<String> failureRecordIds = new LinkedList<>(); if (bulkRequest.numberOfActions() == 0) return failureRecordIds; int failedRequestStatus = 500; - String failedRequestMessage = ""; + Exception failedRequestCause = null; try { BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); @@ -444,13 +444,13 @@ public class IndexerServiceImpl implements IndexerService { for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); + bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) { failureRecordIds.add(bulkItemResponse.getId()); } - if (Strings.isNullOrEmpty(failedRequestMessage)) { - failedRequestMessage = failure.getMessage(); + if (failedRequestCause == null) { + failedRequestCause = failure.getCause(); failedRequestStatus = failure.getStatus().getStatus(); } failedResponses++; @@ -464,13 +464,16 @@ public class IndexerServiceImpl implements IndexerService { jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses)); // retry entire message if all records are failing - if (bulkRequest.numberOfActions() == failedResponses) throw new AppException(failedRequestStatus, "Elastic error", failedRequestMessage); + if (bulkRequest.numberOfActions() == failedResponses) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause); } catch (IOException e) { // throw explicit 504 for IOException throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); } catch (ElasticsearchStatusException e) { throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e); } catch (Exception e) { + if (e instanceof AppException) { + throw e; + } throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e); } return failureRecordIds;