Commit 343cf808 authored by neelesh thakur's avatar neelesh thakur
Browse files

Merge branch 'wait-for-active-shards' into 'master'

Wait for primary shards to be ready before start indexing

See merge request !235
parents b28f0adf 644664b0
Pipeline #100651 failed with stages
in 49 minutes and 15 seconds
......@@ -366,7 +366,7 @@ public class IndexerServiceImpl implements IndexerService {
String index = this.elasticIndexNameResolver.getIndexNameFromKind(schema.getKind());
// check if index exist and sync meta attribute schema if required
if (this.indicesService.isIndexExist(restClient, index)) {
if (this.indicesService.isIndexReady(restClient, index)) {
this.mappingService.syncIndexMappingIfRequired(restClient, index);
continue;
}
......
......@@ -36,4 +36,6 @@ public interface IndicesService {
boolean deleteIndex(String index) throws ElasticsearchException, IOException, AppException;
List<IndexInfo> getIndexInfo(RestHighLevelClient client, String indexPattern) throws IOException;
boolean isIndexReady(RestHighLevelClient client, String index) throws IOException;
}
......@@ -22,6 +22,8 @@ import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Request;
......@@ -92,7 +94,7 @@ public class IndicesServiceImpl implements IndicesService {
request.settings(settings != null ? settings : DEFAULT_INDEX_SETTINGS);
if (mapping != null) {
String mappingJsonString = new Gson().toJson(mapping, Map.class);
request.mapping(mappingJsonString,XContentType.JSON);
request.mapping(mappingJsonString, XContentType.JSON);
}
request.setTimeout(REQUEST_TIMEOUT);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
......@@ -121,13 +123,7 @@ public class IndicesServiceImpl implements IndicesService {
*/
public boolean isIndexExist(RestHighLevelClient client, String index) throws IOException {
try {
try {
Boolean isIndexExist = (Boolean) this.indexCache.get(index);
if (isIndexExist != null && isIndexExist) return true;
} catch (RedisException ex) {
//In case the format of cache changes then clean the cache
this.indexCache.delete(index);
}
if (this.indexExistInCache(index)) return true;
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if (exists) this.indexCache.put(index, true);
......@@ -142,6 +138,50 @@ public class IndicesServiceImpl implements IndicesService {
}
}
/**
* Check if an index ready for indexing
*
* @param index Index name
* @return index details if index already exists
* @throws IOException if request cannot be processed
*/
public boolean isIndexReady(RestHighLevelClient client, String index) throws IOException {
try {
if (this.indexExistInCache(index)) return true;
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if (!exists) return false;
ClusterHealthRequest indexHealthRequest = new ClusterHealthRequest();
indexHealthRequest.indices(index);
indexHealthRequest.timeout(REQUEST_TIMEOUT);
indexHealthRequest.waitForYellowStatus();
ClusterHealthResponse healthResponse = client.cluster().health(indexHealthRequest, RequestOptions.DEFAULT);
if (healthResponse.status() == RestStatus.OK) {
this.indexCache.put(index, true);
return true;
}
return false;
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) return false;
throw new AppException(
exception.status().getStatus(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
private boolean indexExistInCache(String index) {
try {
Boolean isIndexExist = (Boolean) this.indexCache.get(index);
if (isIndexExist != null && isIndexExist) return true;
} catch (RedisException ex) {
//In case the format of cache changes then clean the cache
this.indexCache.delete(index);
}
return false;
}
/**
* Deletes index if user has required role: search.admin
*
......
......@@ -22,8 +22,11 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
......@@ -51,7 +54,7 @@ import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
@RunWith(SpringRunner.class)
@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, EntityUtils.class})
@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, ClusterClient.class, EntityUtils.class})
public class IndicesServiceTest {
@Mock
private ElasticClientHandler elasticClientHandler;
......@@ -73,11 +76,13 @@ public class IndicesServiceTest {
private RestHighLevelClient restHighLevelClient;
private IndicesClient indicesClient;
private ClusterClient clusterClient;
@Before
public void setup() {
initMocks(this);
indicesClient = PowerMockito.mock(IndicesClient.class);
clusterClient = PowerMockito.mock(ClusterClient.class);
restHighLevelClient = PowerMockito.mock(RestHighLevelClient.class);
}
......@@ -218,4 +223,54 @@ public class IndicesServiceTest {
assertEquals("1", indicesList.get(0).getDocumentCount());
assertEquals("1551996907769", indicesList.get(0).getCreationDate());
}
@Test
public void should_returnTrue_indexExistInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(true);
boolean result = this.sut.isIndexExist(any(RestHighLevelClient.class), "anyIndex");
assertTrue(result);
}
@Test
public void should_getIndexExist_whenIndexNotInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class));
boolean result = this.sut.isIndexExist(restHighLevelClient, "anyIndex");
assertTrue(result);
verify(this.indicesExistCache, times(1)).get("anyIndex");
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
@Test
public void should_getIndexReadyStatus_whenIndexInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(true);
boolean result = this.sut.isIndexReady(any(RestHighLevelClient.class), "anyIndex");
assertTrue(result);
}
@Test
public void should_getIndexReadyStatus_whenIndexNotInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class));
ClusterHealthResponse healthResponse = mock(ClusterHealthResponse.class);
when(healthResponse.status()).thenReturn(RestStatus.OK);
doReturn(clusterClient).when(restHighLevelClient).cluster();
doReturn(healthResponse).when(clusterClient).health(any(ClusterHealthRequest.class), any(RequestOptions.class));
boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex");
assertTrue(result);
verify(this.indicesExistCache, times(1)).get("anyIndex");
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment