diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 8e2895fc2b0807e9ad6fedcc2a09f7b060ab27af..8750a455aa427aac44f41232488719cfa9253339 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -17,6 +17,9 @@ package org.opengroup.osdu.indexer.service; import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import java.util.Collections; +import lombok.AllArgsConstructor; +import lombok.Data; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; @@ -68,6 +71,8 @@ public class IndexerServiceImpl implements IndexerService { private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.FORBIDDEN)); + private static final String FAILED_TO_PARSE_EXCEPTION_MESSAGE = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]"; + private final Gson gson = new GsonBuilder().serializeNulls().create(); // we index a normalized kind (authority + source + entity type + major version) as a tags attribute for all records @@ -363,7 +368,24 @@ public class IndexerServiceImpl implements IndexerService { this.cacheOrCreateElasticMapping(schemas, restClient); // process the records - return this.upsertRecords(recordIndexerPayload.getRecords(), restClient); + List<RecordIndexerPayload.Record> records = recordIndexerPayload.getRecords(); + BulkRequestResult bulkRequestResult = this.upsertRecords(records, restClient); + List<String> failedRecordIds = bulkRequestResult.getFailureRecordIds(); + + List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds(); + if (!retryUpsertRecordIds.isEmpty()) { + List<RecordIndexerPayload.Record> retryUpsertRecords = records.stream() + .filter(record -> retryUpsertRecordIds.contains(record.getId())) + .collect(Collectors.toList()); + retryUpsertRecords.forEach(record -> { + record.setData(null); + record.setTags(null); + }); + bulkRequestResult = upsertRecords(retryUpsertRecords, restClient); + failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds()); + } + + return failedRecordIds; } } @@ -386,8 +408,8 @@ public class IndexerServiceImpl implements IndexerService { } } - private List<String> upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException { - if (records == null || records.isEmpty()) return new LinkedList<>(); + private BulkRequestResult upsertRecords(List<RecordIndexerPayload.Record> records, RestHighLevelClient restClient) throws AppException { + if (records == null || records.isEmpty()) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList()); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(BULK_REQUEST_TIMEOUT); @@ -423,14 +445,15 @@ public class IndexerServiceImpl implements IndexerService { } try (RestHighLevelClient restClient = this.elasticClientHandler.createRestClient()) { - return processBulkRequest(restClient, bulkRequest); + return processBulkRequest(restClient, bulkRequest).getFailureRecordIds(); } } - private List<String> processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException { + private BulkRequestResult processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException { + if (bulkRequest.numberOfActions() == 0) return new BulkRequestResult(Collections.emptyList(), Collections.emptyList()); List<String> failureRecordIds = new LinkedList<>(); - if (bulkRequest.numberOfActions() == 0) return failureRecordIds; + List<String> retryUpsertRecordIds = new LinkedList<>(); int failedRequestStatus = 500; Exception failedRequestCause = null; @@ -449,7 +472,10 @@ public class IndexerServiceImpl implements IndexerService { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); - if (canIndexerRetry(bulkItemResponse)) { + + if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause().getMessage().equals(FAILED_TO_PARSE_EXCEPTION_MESSAGE)) { + retryUpsertRecordIds.add(bulkItemResponse.getId()); + } else if (canIndexerRetry(bulkItemResponse)) { failureRecordIds.add(bulkItemResponse.getId()); if (failedRequestCause == null) { @@ -481,7 +507,7 @@ public class IndexerServiceImpl implements IndexerService { } throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e); } - return failureRecordIds; + return new BulkRequestResult(failureRecordIds, retryUpsertRecordIds); } private Map<String, Object> getSourceMap(RecordIndexerPayload.Record record) { @@ -575,4 +601,11 @@ public class IndexerServiceImpl implements IndexerService { failedEvent.accept(failedRecords.stream().map(RecordStatus::failedAuditLogMessage).collect(Collectors.toList())); } } + + @Data + @AllArgsConstructor + private static class BulkRequestResult { + private List<String> failureRecordIds; + private List<String> retryUpsertRecordIds; + } } diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java index 194fdab46f77aa9ca67834f640a3ba13337e25c7..b7e24d13458e006224755595796098c8204be545 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java @@ -6,6 +6,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.rest.RestStatus; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -81,12 +82,14 @@ public class IndexerServiceImplTest { private List<RecordInfo> recordInfos = new ArrayList<>(); private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," + - "{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]"; + "{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}, {\"id\":\"opendes:doc:test3\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]"; private final String kind1 = "opendes:testindexer1:well:1.0.0"; private final String kind2 = "opendes:testindexer2:well:1.0.0"; private final String recordId1 = "opendes:doc:test1"; private final String recordId2 = "opendes:doc:test2"; + private final String recordId3 = "opendes:doc:test3"; private final String failureMassage = "test failure"; + private final String badRequestMessage = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]"; private DpsHeaders dpsHeaders; private RecordChangedMessages recordChangedMessages; @@ -140,6 +143,7 @@ public class IndexerServiceImplTest { storageData.put("schema1", "test-value"); List<Records.Entity> validRecords = new ArrayList<>(); validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build()); + validRecords.add(Records.Entity.builder().id(recordId3).kind(kind2).data(storageData).build()); List<ConversionStatus> conversionStatus = new LinkedList<>(); Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build(); when(this.storageService.getStorageRecords(any(), any())).thenReturn(storageRecords); @@ -155,17 +159,18 @@ public class IndexerServiceImplTest { indexerMappedPayload.put("id", "keyword"); when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload); - BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()}; + BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse(), prepare400Response()}; when(this.bulkResponse.getItems()).thenReturn(responses); // test JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos); // validate - assertEquals(2, jobStatus.getStatusesList().size()); - assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size()); + assertEquals(3, jobStatus.getStatusesList().size()); + assertEquals(2, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size()); assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size()); + verify(restHighLevelClient, times(2)).bulk(any(), any()); verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)")); verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)")); } catch (Exception e) { @@ -182,6 +187,14 @@ public class IndexerServiceImplTest { return responseFail; } + private BulkItemResponse prepare400Response() { + BulkItemResponse responseFail = mock(BulkItemResponse.class); + when(responseFail.isFailed()).thenReturn(true); + when(responseFail.getId()).thenReturn(recordId3); + when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception(badRequestMessage), RestStatus.BAD_REQUEST)); + return responseFail; + } + private BulkItemResponse prepareSuccessfulResponse() { BulkItemResponse responseSuccess = mock(BulkItemResponse.class); when(responseSuccess.getId()).thenReturn(recordId2);