Skip to content
Snippets Groups Projects
Commit 52ed1614 authored by Yauheni Lesnikau's avatar Yauheni Lesnikau
Browse files

Merge branch 'upd-cluster-health' into 'master'

upd IndicesService.isIndexReady health check not use to cluster one

See merge request !863
parents 2fb22c55 77c62a71
No related branches found
No related tags found
1 merge request!863upd IndicesService.isIndexReady health check not use to cluster one
Pipeline #309922 failed
......@@ -16,11 +16,13 @@ package org.opengroup.osdu.indexer.service;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.HealthStatus;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.analysis.*;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch._types.analysis.Analyzer;
import co.elastic.clients.elasticsearch._types.analysis.CharFilter;
import co.elastic.clients.elasticsearch._types.analysis.CharFilterDefinition;
import co.elastic.clients.elasticsearch._types.analysis.CustomAnalyzer;
import co.elastic.clients.elasticsearch._types.analysis.MappingCharFilter;
import co.elastic.clients.elasticsearch._types.analysis.PatternReplaceCharFilter;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest.Builder;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
......@@ -29,8 +31,8 @@ import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -53,13 +55,21 @@ import org.opengroup.osdu.indexer.util.CustomIndexAnalyzerSetting;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.opengroup.osdu.indexer.util.TypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static co.elastic.clients.elasticsearch._types.HealthStatus.*;
@Service
@RequestScope
......@@ -85,6 +95,10 @@ public class IndicesServiceImpl implements IndicesService {
private ObjectMapper objectMapper;
@Autowired
private CustomIndexAnalyzerSetting customIndexAnalyzerSetting;
@Value("${index.health.retry.threshold:5}")
private int healthRetryThreshold;
@Value("${index.health.retry.sleepPeriodInMilliseconds:5000}")
private int healthRetrySleepPeriodInMilliseconds;
/**
* Create a new index in Elasticsearch
......@@ -117,7 +131,7 @@ public class IndicesServiceImpl implements IndicesService {
boolean indexStatus = createIndexResponse.acknowledged() && createIndexResponse.shardsAcknowledged();
if (indexStatus) {
this.indexCache.put(index, true);
this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime-startTime));
this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime - startTime));
// Create alias for index
indexAliasService.createIndexAlias(client, elasticIndexNameResolver.getKindFromIndexName(index));
......@@ -158,10 +172,10 @@ public class IndicesServiceImpl implements IndicesService {
return false;
}
throw new AppException(
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
......@@ -174,31 +188,54 @@ public class IndicesServiceImpl implements IndicesService {
*/
public boolean isIndexReady(ElasticsearchClient client, String index) throws IOException {
try {
if (this.indexExistInCache(index)){
if (this.indexExistInCache(index)) {
return true;
}
ExistsRequest existsRequest = ExistsRequest.of(builder -> builder.index(index));
BooleanResponse exists = client.indices().exists(existsRequest);
if (!exists.value()){
return false;
}
HealthRequest healthRequest = HealthRequest.of(builder -> builder.index(index)
.timeout(REQUEST_TIMEOUT)
.waitForStatus(HealthStatus.Yellow)
);
HealthResponse health = client.cluster().health(healthRequest);
if (health.status() == HealthStatus.Green || health.status() == HealthStatus.Yellow) {
boolean isHealthy = isIndexHealthy(client, index);
if (isHealthy) {
this.indexCache.put(index, true);
return true;
}
return false;
return isHealthy;
} catch (ElasticsearchException exception) {
if (exception.status() == HttpStatus.SC_NOT_FOUND) return false;
throw new AppException(
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
private boolean isIndexHealthy(ElasticsearchClient client, String index) throws IOException {
ExistsRequest existsRequest = ExistsRequest.of(builder -> builder.index(index));
BooleanResponse exists = client.indices().exists(existsRequest);
if (!exists.value()) {
return false;
}
String actualHealthStatus = null;
for(int retryCount = 0; retryCount <= healthRetryThreshold; retryCount++) {
List<IndexInfo> indexHealthInfos = this.getIndexInfo(client, index);
if (!indexHealthInfos.isEmpty()) {
actualHealthStatus = indexHealthInfos.get(0).getHealth();
if (Green.jsonValue().equalsIgnoreCase(actualHealthStatus) || Yellow.jsonValue().equalsIgnoreCase(actualHealthStatus)) {
return true;
}
}
sleepInMilliSeconds(healthRetrySleepPeriodInMilliseconds);
}
if (Red.jsonValue().equalsIgnoreCase(actualHealthStatus)) {
throw new AppException(HttpStatus.SC_SERVICE_UNAVAILABLE,
"Index not available for indexing",
String.format("Index: %s primary shards are unassigned", index));
}
return false;
}
private void sleepInMilliSeconds(int milliseconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
......@@ -207,7 +244,7 @@ public class IndicesServiceImpl implements IndicesService {
builder.refreshInterval(Time.of(timeBuilder -> timeBuilder.time("30s")));
builder.numberOfShards("1");
builder.numberOfReplicas("1");
if(customIndexAnalyzerSetting.isEnabled()) {
if (customIndexAnalyzerSetting.isEnabled()) {
IndexSettingsAnalysis analysis = getCustomAnalyzer();
builder.analysis(analysis);
}
......@@ -258,7 +295,7 @@ public class IndicesServiceImpl implements IndicesService {
public boolean deleteIndex(ElasticsearchClient client, String index) throws ElasticsearchException, IOException, AppException {
List<String> indices = this.resolveIndex(client, index);
boolean responseStatus = true;
for (String idx: indices) {
for (String idx : indices) {
responseStatus &= removeIndexInElasticsearch(client, idx);
}
if (responseStatus) {
......@@ -314,26 +351,29 @@ public class IndicesServiceImpl implements IndicesService {
}
/**
* Remove index in Elasticsearch
* Get index information from Elasticsearch
*
* @param client Elasticsearch client
* @param indexPattern Index pattern
* @return List of indices matching indexPattern
* @throws IOException Throws {@link IOException} if elastic cannot complete the request
*/
public List<IndexInfo> getIndexInfo(ElasticsearchClient client, String indexPattern) throws IOException {
Objects.requireNonNull(client, CLIENT_CANNOT_BE_NULL);
String requestUrl = (indexPattern == null || indexPattern.isEmpty())
? "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json"
: String.format("/_cat/indices/%s?h=index,docs.count,creation.date&format=json", indexPattern);
? "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json"
: String.format("/_cat/indices/%s?h=index,health,docs.count,creation.date&format=json", indexPattern);
RestClientTransport clientTransport = (RestClientTransport)client._transport();
Request request = new Request("GET", requestUrl);
Response response = clientTransport.restClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
try (RestClientTransport clientTransport = (RestClientTransport) client._transport()) {
Request request = new Request("GET", requestUrl);
Response response = clientTransport.restClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
Type typeOf = new TypeToken<List<IndexInfo>>() {}.getType();
return new Gson().fromJson(responseBody, typeOf);
Type typeOf = new TypeToken<List<IndexInfo>>() {
}.getType();
return new Gson().fromJson(responseBody, typeOf);
}
}
private void clearCacheOnIndexDeletion(String index) {
......
......@@ -18,12 +18,8 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.ErrorResponse;
import co.elastic.clients.elasticsearch._types.HealthStatus;
import co.elastic.clients.elasticsearch._types.analysis.Analyzer;
import co.elastic.clients.elasticsearch._types.analysis.CharFilter;
import co.elastic.clients.elasticsearch.cluster.ElasticsearchClusterClient;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
......@@ -66,6 +62,7 @@ import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.opengroup.osdu.indexer.testutils.ReflectionTestUtil.setFieldValueForClass;
@RunWith(SpringRunner.class)
public class IndicesServiceTest {
......@@ -95,16 +92,16 @@ public class IndicesServiceTest {
private ElasticsearchClient restHighLevelClient;
private ElasticsearchIndicesClient indicesClient;
private ElasticsearchClusterClient clusterClient;
private RestClientTransport restClientTransport;
@Before
public void setup() {
initMocks(this);
indicesClient = mock(ElasticsearchIndicesClient.class);
clusterClient = mock(ElasticsearchClusterClient.class);
restHighLevelClient = mock(ElasticsearchClient.class);
restClientTransport = mock(RestClientTransport.class);
setFieldValueForClass(sut, "healthRetryThreshold", 1);
setFieldValueForClass(sut, "healthRetrySleepPeriodInMilliseconds", 1);
}
@Test
......@@ -376,7 +373,7 @@ public class IndicesServiceTest {
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json");
Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
......@@ -402,7 +399,7 @@ public class IndicesServiceTest {
" \"creation.date\": \"1545912868416\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,docs.count,creation.date&format=json");
Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
......@@ -468,10 +465,20 @@ public class IndicesServiceTest {
BooleanResponse booleanResponse = new BooleanResponse(true);
doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class));
HealthResponse healthResponse = mock(HealthResponse.class);
when(healthResponse.status()).thenReturn(HealthStatus.Green);
doReturn(clusterClient).when(restHighLevelClient).cluster();
doReturn(healthResponse).when(clusterClient).health(any(HealthRequest.class));
String responseJson = "[" +
" {" +
" \"index\": \"anyIndex\"," +
" \"health\": \"yellow\"," +
" \"docs.count\": \"92\"," +
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
when(this.restClient.performRequest(request)).thenReturn(response);
when(this.response.getEntity()).thenReturn(entity);
boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex");
......@@ -480,4 +487,30 @@ public class IndicesServiceTest {
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
@Test(expected = AppException.class)
public void should_throwIndexUnavailableException_whenIndexNotReady() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
BooleanResponse booleanResponse = new BooleanResponse(true);
doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class));
String responseJson = "[" +
" {" +
" \"index\": \"anyIndex\"," +
" \"health\": \"red\"," +
" \"docs.count\": \"92\"," +
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
when(this.restClient.performRequest(request)).thenReturn(response);
when(this.response.getEntity()).thenReturn(entity);
this.sut.isIndexReady(restHighLevelClient, "anyIndex");
}
}
package org.opengroup.osdu.indexer.testutils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
public class ReflectionTestUtil {
public static <T> void setFieldValueForClass(T targetObject, String fieldName, Object fieldValue) {
Field field = ReflectionUtils.findField(targetObject.getClass(), fieldName);
field.setAccessible(true);
ReflectionUtils.setField(field, targetObject, fieldValue);
}
}
......@@ -12,7 +12,7 @@
<java.version>17</java.version>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<os-core-common.version>3.0.0</os-core-common.version>
<os-core-common.version>3.1.0</os-core-common.version>
<hibernate-validator.version>8.0.0.Final</hibernate-validator.version>
<common-codec.version>1.14</common-codec.version>
<netty.version>4.1.51.Final</netty.version>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment