Skip to content
Snippets Groups Projects
Commit dc033375 authored by Neelesh Thakur's avatar Neelesh Thakur
Browse files

Merge branch 'master' into wait-for-active-shards

parents ca86c11a 2abca06c
No related branches found
No related tags found
2 merge requests!346Merge branch 'aws-integration' into 'master',!235Wait for primary shards to be ready before start indexing
Pipeline #97175 failed
...@@ -19,6 +19,7 @@ import com.google.gson.Gson; ...@@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
...@@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService { ...@@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService {
private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1); private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1);
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND)); private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE));
private final Gson gson = new GsonBuilder().serializeNulls().create(); private final Gson gson = new GsonBuilder().serializeNulls().create();
...@@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService { ...@@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService {
List<String> failureRecordIds = new LinkedList<>(); List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) return failureRecordIds; if (bulkRequest.numberOfActions() == 0) return failureRecordIds;
int failedRequestStatus = 500;
Exception failedRequestCause = null;
try { try {
BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
...@@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService { ...@@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) { if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) { if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId()); failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
} }
failedResponses++; failedResponses++;
} else { } else {
succeededResponses++; succeededResponses++;
...@@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService { ...@@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService {
if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures); if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures);
jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses)); jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses));
// retry entire message if all records are failing
if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause);
} catch (IOException e) { } catch (IOException e) {
// throw explicit 504 for IOException // throw explicit 504 for IOException
throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e);
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchStatusException e) {
throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e); throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e);
} catch (Exception e) { } catch (Exception e) {
if (e instanceof AppException) {
throw e;
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e); throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e);
} }
return failureRecordIds; return failureRecordIds;
...@@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService { ...@@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService {
return indexerPayload; return indexerPayload;
} }
private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) {
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true;
if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE)
&& bulkItemResponse.status() == RestStatus.NOT_FOUND) {
return true;
}
return false;
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException { private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds));
......
package org.opengroup.osdu.indexer.service; package org.opengroup.osdu.indexer.service;
import static org.mockito.ArgumentMatchers.any; import com.google.common.reflect.TypeToken;
import static org.mockito.Mockito.times; import com.google.gson.Gson;
import static org.mockito.Mockito.verify; import org.elasticsearch.action.bulk.BulkItemResponse;
import static org.mockito.MockitoAnnotations.initMocks; import org.elasticsearch.action.bulk.BulkResponse;
import java.util.ArrayList; import org.elasticsearch.client.RequestOptions;
import java.util.List; import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo; import org.mockito.Spy;
import org.opengroup.osdu.core.common.http.HeadersUtil;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.entitlements.Acl;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.storage.ConversionStatus;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler; import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.springframework.test.context.junit4.SpringRunner; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(SpringRunner.class) import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({RestHighLevelClient.class, BulkResponse.class, Acl.class, HeadersUtil.class})
public class IndexerServiceImplTest { public class IndexerServiceImplTest {
@InjectMocks @InjectMocks
private IndexerServiceImpl indexerService; private IndexerServiceImpl sut;
@Mock
private ElasticClientHandler elasticClientHandler;
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
@Mock
private IndicesService indicesService;
@Mock
private StorageService storageService;
@Mock
private StorageIndexerPayloadMapper storageIndexerPayloadMapper;
@InjectMocks
@Spy
private JobStatus jobStatus = new JobStatus();
@Mock
private AuditLogger auditLogger;
@Mock
private BulkResponse bulkResponse;
@Mock
private IRequestInfo requestInfo;
@Mock
private RestHighLevelClient restHighLevelClient;
@Mock
private IndexSchemaService schemaService;
@Mock
private JaxRsDpsLog jaxRsDpsLog;
@Mock
private IMappingService mappingService;
@Mock
private IPublisher progressPublisher;
private List<RecordInfo> recordInfos = new ArrayList<>();
private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," +
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
private final String kind1 = "opendes:testindexer1:well:1.0.0";
private final String kind2 = "opendes:testindexer2:well:1.0.0";
private final String recordId1 = "opendes:doc:test1";
private final String recordId2 = "opendes:doc:test2";
private final String failureMassage = "test failure";
private DpsHeaders dpsHeaders;
private RecordChangedMessages recordChangedMessages;
@Before
public void setup() throws IOException {
}
@Test
public void processSchemaMessagesTest() throws Exception {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
this.recordInfos.add(recordInfo);
initMocks(this);
this.sut.processSchemaMessages(recordInfos);
verify(this.elasticClientHandler, times(1)).createRestClient();
verify(this.elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(this.indicesService, times(1)).isIndexExist(any(), any());
}
@Test
public void should_properlyUpdateAuditLogs_givenValidCreateAndUpdateRecords() {
try {
mockStatic(Acl.class);
// setup headers
this.dpsHeaders = new DpsHeaders();
this.dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth");
when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders);
when(this.requestInfo.getHeadersMapWithDwdAuthZ()).thenReturn(dpsHeaders.getHeaders());
// setup message
Type listType = new TypeToken<List<RecordInfo>>() {}.getType();
this.recordInfos = (new Gson()).fromJson(this.pubsubMsg, listType);
Map<String, String> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, "opendes");
this.recordChangedMessages = RecordChangedMessages.builder().attributes(messageAttributes).messageId("xxxx").publishTime("2000-01-02T10:10:44+0000").data("{}").build();
// setup schema
Map<String, Object> schema = createSchema();
indexSchemaServiceMock(kind2, schema);
indexSchemaServiceMock(kind1, null);
// setup storage records
Map<String, Object> storageData = new HashMap<>();
storageData.put("schema1", "test-value");
List<Records.Entity> validRecords = new ArrayList<>();
validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build());
List<ConversionStatus> conversionStatus = new LinkedList<>();
Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build();
when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords);
// setup elastic, index and mapped document
when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true);
when(this.mappingService.getIndexMappingFromRecordSchema(any())).thenReturn(new HashMap<>());
when(this.elasticClientHandler.createRestClient()).thenReturn(this.restHighLevelClient);
when(this.restHighLevelClient.bulk(any(), any(RequestOptions.class))).thenReturn(this.bulkResponse);
Map<String, Object> indexerMappedPayload = new HashMap<>();
indexerMappedPayload.put("id", "keyword");
when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload);
@Mock BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()};
private ElasticClientHandler elasticClientHandler; when(this.bulkResponse.getItems()).thenReturn(responses);
@Mock // test
private ElasticIndexNameResolver elasticIndexNameResolver; JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos);
@Mock // validate
private IndicesService indicesService; assertEquals(2, jobStatus.getStatusesList().size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size());
private List<RecordInfo> recordInfos = new ArrayList<>(); verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)"));
verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)"));
} catch (Exception e) {
fail("Should not throw this exception" + e.getMessage());
}
}
@Before private BulkItemResponse prepareFailedResponse() {
public void setup() { BulkItemResponse responseFail = mock(BulkItemResponse.class);
RecordInfo recordInfo = new RecordInfo(); when(responseFail.isFailed()).thenReturn(true);
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); when(responseFail.getFailureMessage()).thenReturn(failureMassage);
recordInfo.setKind("opendes:ds:mytest2:1.0.0"); when(responseFail.getId()).thenReturn(recordId1);
recordInfo.setOp("purge_schema"); when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception("test failure")));
recordInfos.add(recordInfo); return responseFail;
}
initMocks(this); private BulkItemResponse prepareSuccessfulResponse() {
} BulkItemResponse responseSuccess = mock(BulkItemResponse.class);
when(responseSuccess.getId()).thenReturn(recordId2);
return responseSuccess;
}
@Test private void indexSchemaServiceMock(String kind, Map<String, Object> schema) throws UnsupportedEncodingException, URISyntaxException {
public void processSchemaMessagesTest() throws Exception { IndexSchema indexSchema = schema == null
indexerService.processSchemaMessages(recordInfos); ? IndexSchema.builder().kind(kind).dataSchema(new HashMap<>()).build()
: IndexSchema.builder().kind(kind).dataSchema(schema).build();
when(schemaService.getIndexerInputSchema(kind, new ArrayList<>())).thenReturn(indexSchema);
}
verify(elasticClientHandler, times(1)).createRestClient(); private Map<String, Object> createSchema() {
verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); Map<String, Object> schema = new HashMap<>();
verify(indicesService, times(1)).isIndexExist(any(), any()); schema.put("schema1", "keyword");
} schema.put("schema2", "boolean");
schema.put("schema3", "date");
schema.put("schema6", "object");
return schema;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment