diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/BulkRequestResult.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/BulkRequestResult.java new file mode 100644 index 0000000000000000000000000000000000000000..d7ce36672795f3170d83f8f699b23a1baf3d2dac --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/BulkRequestResult.java @@ -0,0 +1,12 @@ +package org.opengroup.osdu.indexer.model; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class BulkRequestResult { + private List<String> failureRecordIds; + private List<String> retryUpsertRecordIds; +} \ No newline at end of file diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 8750a455aa427aac44f41232488719cfa9253339..f0e6ad373d6b811a604726819c74a14a0da7b66c 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -41,11 +41,13 @@ import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.RequestStatus; import org.opengroup.osdu.core.common.model.indexer.*; +import org.opengroup.osdu.core.common.model.indexer.RecordIndexerPayload.Record; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.model.BulkRequestResult; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.indexer.util.ElasticClientHandler; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; @@ -71,7 +73,7 @@ public class IndexerServiceImpl implements IndexerService { private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.FORBIDDEN)); - private static final String FAILED_TO_PARSE_EXCEPTION_MESSAGE = "Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [data.SpatialLocation.Wgs84Coordinates] of type [geo_shape]]"; + private static final String MAPPER_PARSING_EXCEPTION_TYPE = "type=mapper_parsing_exception"; private final Gson gson = new GsonBuilder().serializeNulls().create(); @@ -372,23 +374,28 @@ public class IndexerServiceImpl implements IndexerService { BulkRequestResult bulkRequestResult = this.upsertRecords(records, restClient); List<String> failedRecordIds = bulkRequestResult.getFailureRecordIds(); - List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds(); - if (!retryUpsertRecordIds.isEmpty()) { - List<RecordIndexerPayload.Record> retryUpsertRecords = records.stream() - .filter(record -> retryUpsertRecordIds.contains(record.getId())) - .collect(Collectors.toList()); - retryUpsertRecords.forEach(record -> { - record.setData(null); - record.setTags(null); - }); - bulkRequestResult = upsertRecords(retryUpsertRecords, restClient); - failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds()); - } + processRetryUpsertRecords(restClient, records, bulkRequestResult, failedRecordIds); return failedRecordIds; } } + private void processRetryUpsertRecords(RestHighLevelClient restClient, List<Record> records, + BulkRequestResult bulkRequestResult, List<String> failedRecordIds) { + List<String> retryUpsertRecordIds = bulkRequestResult.getRetryUpsertRecordIds(); + if (!retryUpsertRecordIds.isEmpty()) { + List<Record> retryUpsertRecords = records.stream() + .filter(record -> retryUpsertRecordIds.contains(record.getId())) + .collect(Collectors.toList()); + retryUpsertRecords.forEach(record -> { + record.setData(Collections.emptyMap()); + record.setTags(Collections.emptyMap()); + }); + bulkRequestResult = upsertRecords(retryUpsertRecords, restClient); + failedRecordIds.addAll(bulkRequestResult.getFailureRecordIds()); + } + } + private void cacheOrCreateElasticMapping(List<IndexSchema> schemas, RestHighLevelClient restClient) throws Exception { for (IndexSchema schema : schemas) { @@ -473,7 +480,7 @@ public class IndexerServiceImpl implements IndexerService { bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); - if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause().getMessage().equals(FAILED_TO_PARSE_EXCEPTION_MESSAGE)) { + if (RestStatus.BAD_REQUEST.equals(failure.getStatus()) && failure.getCause() != null && failure.getCause().getMessage().contains(MAPPER_PARSING_EXCEPTION_TYPE)) { retryUpsertRecordIds.add(bulkItemResponse.getId()); } else if (canIndexerRetry(bulkItemResponse)) { failureRecordIds.add(bulkItemResponse.getId()); @@ -601,11 +608,4 @@ public class IndexerServiceImpl implements IndexerService { failedEvent.accept(failedRecords.stream().map(RecordStatus::failedAuditLogMessage).collect(Collectors.toList())); } } - - @Data - @AllArgsConstructor - private static class BulkRequestResult { - private List<String> failureRecordIds; - private List<String> retryUpsertRecordIds; - } }