diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index e23773e09db3317ff4d1d303ef7b7a20fe2489f3..b0a565847e31101e905940cfcd4f389c785f38c2 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService { private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1); - private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND)); + private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE)); private final Gson gson = new GsonBuilder().serializeNulls().create(); @@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService { List<String> failureRecordIds = new LinkedList<>(); if (bulkRequest.numberOfActions() == 0) return failureRecordIds; + int failedRequestStatus = 500; + Exception failedRequestCause = null; try { BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); @@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService { for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); + bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); - if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) { + if (canIndexerRetry(bulkItemResponse)) { failureRecordIds.add(bulkItemResponse.getId()); + + if (failedRequestCause == null) { + failedRequestCause = failure.getCause(); + failedRequestStatus = failure.getStatus().getStatus(); + } } + failedResponses++; } else { succeededResponses++; @@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService { if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures); jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses)); + + // retry entire message if all records are failing + if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause); } catch (IOException e) { // throw explicit 504 for IOException throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); } catch (ElasticsearchStatusException e) { throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e); } catch (Exception e) { + if (e instanceof AppException) { + throw e; + } throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e); } return failureRecordIds; @@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService { return indexerPayload; } + private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) { + if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true; + + if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) + && bulkItemResponse.status() == RestStatus.NOT_FOUND) { + return true; + } + + return false; + } + private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException { jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java index 026ff32cb2e04a3da21b765baefc282d056915bf..285b384a017b4eefd044dc3e58e1434b1ad0543c 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java @@ -1,55 +1,206 @@ package org.opengroup.osdu.indexer.service; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.MockitoAnnotations.initMocks; -import java.util.ArrayList; -import java.util.List; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.mockito.Spy; +import org.opengroup.osdu.core.common.http.HeadersUtil; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.entitlements.Acl; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.*; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.model.storage.ConversionStatus; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; +import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.indexer.util.ElasticClientHandler; -import org.springframework.test.context.junit4.SpringRunner; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; -@RunWith(SpringRunner.class) +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.net.URISyntaxException; +import java.util.*; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.powermock.api.mockito.PowerMockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({RestHighLevelClient.class, BulkResponse.class, Acl.class, HeadersUtil.class}) public class IndexerServiceImplTest { - @InjectMocks - private IndexerServiceImpl indexerService; + @InjectMocks + private IndexerServiceImpl sut; + @Mock + private ElasticClientHandler elasticClientHandler; + @Mock + private ElasticIndexNameResolver elasticIndexNameResolver; + @Mock + private IndicesService indicesService; + @Mock + private StorageService storageService; + @Mock + private StorageIndexerPayloadMapper storageIndexerPayloadMapper; + @InjectMocks + @Spy + private JobStatus jobStatus = new JobStatus(); + @Mock + private AuditLogger auditLogger; + @Mock + private BulkResponse bulkResponse; + @Mock + private IRequestInfo requestInfo; + @Mock + private RestHighLevelClient restHighLevelClient; + @Mock + private IndexSchemaService schemaService; + @Mock + private JaxRsDpsLog jaxRsDpsLog; + @Mock + private IMappingService mappingService; + @Mock + private IPublisher progressPublisher; + + private List<RecordInfo> recordInfos = new ArrayList<>(); + + private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," + + "{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]"; + private final String kind1 = "opendes:testindexer1:well:1.0.0"; + private final String kind2 = "opendes:testindexer2:well:1.0.0"; + private final String recordId1 = "opendes:doc:test1"; + private final String recordId2 = "opendes:doc:test2"; + private final String failureMassage = "test failure"; + + private DpsHeaders dpsHeaders; + private RecordChangedMessages recordChangedMessages; + + @Before + public void setup() throws IOException { + } + + @Test + public void processSchemaMessagesTest() throws Exception { + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); + recordInfo.setKind("opendes:ds:mytest2:1.0.0"); + recordInfo.setOp("purge_schema"); + this.recordInfos.add(recordInfo); + + initMocks(this); + + this.sut.processSchemaMessages(recordInfos); + + verify(this.elasticClientHandler, times(1)).createRestClient(); + verify(this.elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); + verify(this.indicesService, times(1)).isIndexExist(any(), any()); + } + + @Test + public void should_properlyUpdateAuditLogs_givenValidCreateAndUpdateRecords() { + try { + mockStatic(Acl.class); + + // setup headers + this.dpsHeaders = new DpsHeaders(); + this.dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth"); + when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders); + when(this.requestInfo.getHeadersMapWithDwdAuthZ()).thenReturn(dpsHeaders.getHeaders()); + + // setup message + Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); + this.recordInfos = (new Gson()).fromJson(this.pubsubMsg, listType); + Map<String, String> messageAttributes = new HashMap<>(); + messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, "opendes"); + this.recordChangedMessages = RecordChangedMessages.builder().attributes(messageAttributes).messageId("xxxx").publishTime("2000-01-02T10:10:44+0000").data("{}").build(); + + // setup schema + Map<String, Object> schema = createSchema(); + indexSchemaServiceMock(kind2, schema); + indexSchemaServiceMock(kind1, null); + + // setup storage records + Map<String, Object> storageData = new HashMap<>(); + storageData.put("schema1", "test-value"); + List<Records.Entity> validRecords = new ArrayList<>(); + validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build()); + List<ConversionStatus> conversionStatus = new LinkedList<>(); + Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build(); + when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords); + + // setup elastic, index and mapped document + when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true); + when(this.mappingService.getIndexMappingFromRecordSchema(any())).thenReturn(new HashMap<>()); + + when(this.elasticClientHandler.createRestClient()).thenReturn(this.restHighLevelClient); + when(this.restHighLevelClient.bulk(any(), any(RequestOptions.class))).thenReturn(this.bulkResponse); + + Map<String, Object> indexerMappedPayload = new HashMap<>(); + indexerMappedPayload.put("id", "keyword"); + when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload); - @Mock - private ElasticClientHandler elasticClientHandler; + BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()}; + when(this.bulkResponse.getItems()).thenReturn(responses); - @Mock - private ElasticIndexNameResolver elasticIndexNameResolver; + // test + JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos); - @Mock - private IndicesService indicesService; + // validate + assertEquals(2, jobStatus.getStatusesList().size()); + assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size()); + assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size()); - private List<RecordInfo> recordInfos = new ArrayList<>(); + verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)")); + verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)")); + } catch (Exception e) { + fail("Should not throw this exception" + e.getMessage()); + } + } - @Before - public void setup() { - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); - recordInfo.setKind("opendes:ds:mytest2:1.0.0"); - recordInfo.setOp("purge_schema"); - recordInfos.add(recordInfo); + private BulkItemResponse prepareFailedResponse() { + BulkItemResponse responseFail = mock(BulkItemResponse.class); + when(responseFail.isFailed()).thenReturn(true); + when(responseFail.getFailureMessage()).thenReturn(failureMassage); + when(responseFail.getId()).thenReturn(recordId1); + when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception("test failure"))); + return responseFail; + } - initMocks(this); - } + private BulkItemResponse prepareSuccessfulResponse() { + BulkItemResponse responseSuccess = mock(BulkItemResponse.class); + when(responseSuccess.getId()).thenReturn(recordId2); + return responseSuccess; + } - @Test - public void processSchemaMessagesTest() throws Exception { - indexerService.processSchemaMessages(recordInfos); + private void indexSchemaServiceMock(String kind, Map<String, Object> schema) throws UnsupportedEncodingException, URISyntaxException { + IndexSchema indexSchema = schema == null + ? IndexSchema.builder().kind(kind).dataSchema(new HashMap<>()).build() + : IndexSchema.builder().kind(kind).dataSchema(schema).build(); + when(schemaService.getIndexerInputSchema(kind, new ArrayList<>())).thenReturn(indexSchema); + } - verify(elasticClientHandler, times(1)).createRestClient(); - verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); - verify(indicesService, times(1)).isIndexExist(any(), any()); - } + private Map<String, Object> createSchema() { + Map<String, Object> schema = new HashMap<>(); + schema.put("schema1", "keyword"); + schema.put("schema2", "boolean"); + schema.put("schema3", "date"); + schema.put("schema6", "object"); + return schema; + } }