Skip to content
Snippets Groups Projects
Commit 002c7fe9 authored by Yauheni Lesnikau's avatar Yauheni Lesnikau
Browse files

upd IndicesService.isIndexReady health check not use to cluster one

parent 5b851d95
No related branches found
No related tags found
1 merge request!863upd IndicesService.isIndexReady health check not use to cluster one
......@@ -16,11 +16,13 @@ package org.opengroup.osdu.indexer.service;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.HealthStatus;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.analysis.*;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch._types.analysis.Analyzer;
import co.elastic.clients.elasticsearch._types.analysis.CharFilter;
import co.elastic.clients.elasticsearch._types.analysis.CharFilterDefinition;
import co.elastic.clients.elasticsearch._types.analysis.CustomAnalyzer;
import co.elastic.clients.elasticsearch._types.analysis.MappingCharFilter;
import co.elastic.clients.elasticsearch._types.analysis.PatternReplaceCharFilter;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest.Builder;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
......@@ -29,8 +31,8 @@ import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -59,7 +61,11 @@ import org.springframework.web.context.annotation.RequestScope;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Service
@RequestScope
......@@ -117,7 +123,7 @@ public class IndicesServiceImpl implements IndicesService {
boolean indexStatus = createIndexResponse.acknowledged() && createIndexResponse.shardsAcknowledged();
if (indexStatus) {
this.indexCache.put(index, true);
this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime-startTime));
this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime - startTime));
// Create alias for index
indexAliasService.createIndexAlias(client, elasticIndexNameResolver.getKindFromIndexName(index));
......@@ -158,10 +164,10 @@ public class IndicesServiceImpl implements IndicesService {
return false;
}
throw new AppException(
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
......@@ -174,31 +180,30 @@ public class IndicesServiceImpl implements IndicesService {
*/
public boolean isIndexReady(ElasticsearchClient client, String index) throws IOException {
try {
if (this.indexExistInCache(index)){
if (this.indexExistInCache(index)) {
return true;
}
ExistsRequest existsRequest = ExistsRequest.of(builder -> builder.index(index));
BooleanResponse exists = client.indices().exists(existsRequest);
if (!exists.value()){
if (!exists.value()) {
return false;
}
HealthRequest healthRequest = HealthRequest.of(builder -> builder.index(index)
.timeout(REQUEST_TIMEOUT)
.waitForStatus(HealthStatus.Yellow)
);
HealthResponse health = client.cluster().health(healthRequest);
if (health.status() == HealthStatus.Green || health.status() == HealthStatus.Yellow) {
this.indexCache.put(index, true);
return true;
List<IndexInfo> indexHealthInfos = this.getIndexInfo(client, index);
if (indexHealthInfos.isEmpty()) {
return false;
}
if ("red".equalsIgnoreCase(indexHealthInfos.get(0).getHealth())) {
throw new AppException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Index not available for indexing", String.format("Index: %s primary shards are unassigned", index));
}
return false;
this.indexCache.put(index, true);
return true;
} catch (ElasticsearchException exception) {
if (exception.status() == HttpStatus.SC_NOT_FOUND) return false;
throw new AppException(
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
exception.status(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
......@@ -207,7 +212,7 @@ public class IndicesServiceImpl implements IndicesService {
builder.refreshInterval(Time.of(timeBuilder -> timeBuilder.time("30s")));
builder.numberOfShards("1");
builder.numberOfReplicas("1");
if(customIndexAnalyzerSetting.isEnabled()) {
if (customIndexAnalyzerSetting.isEnabled()) {
IndexSettingsAnalysis analysis = getCustomAnalyzer();
builder.analysis(analysis);
}
......@@ -258,7 +263,7 @@ public class IndicesServiceImpl implements IndicesService {
public boolean deleteIndex(ElasticsearchClient client, String index) throws ElasticsearchException, IOException, AppException {
List<String> indices = this.resolveIndex(client, index);
boolean responseStatus = true;
for (String idx: indices) {
for (String idx : indices) {
responseStatus &= removeIndexInElasticsearch(client, idx);
}
if (responseStatus) {
......@@ -314,26 +319,29 @@ public class IndicesServiceImpl implements IndicesService {
}
/**
* Remove index in Elasticsearch
* Get index information from Elasticsearch
*
* @param client Elasticsearch client
* @param indexPattern Index pattern
* @return List of indices matching indexPattern
* @throws IOException Throws {@link IOException} if elastic cannot complete the request
*/
public List<IndexInfo> getIndexInfo(ElasticsearchClient client, String indexPattern) throws IOException {
Objects.requireNonNull(client, CLIENT_CANNOT_BE_NULL);
String requestUrl = (indexPattern == null || indexPattern.isEmpty())
? "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json"
: String.format("/_cat/indices/%s?h=index,docs.count,creation.date&format=json", indexPattern);
? "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json"
: String.format("/_cat/indices/%s?h=index,health,docs.count,creation.date&format=json", indexPattern);
RestClientTransport clientTransport = (RestClientTransport)client._transport();
Request request = new Request("GET", requestUrl);
Response response = clientTransport.restClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
try (RestClientTransport clientTransport = (RestClientTransport) client._transport()) {
Request request = new Request("GET", requestUrl);
Response response = clientTransport.restClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
Type typeOf = new TypeToken<List<IndexInfo>>() {}.getType();
return new Gson().fromJson(responseBody, typeOf);
Type typeOf = new TypeToken<List<IndexInfo>>() {
}.getType();
return new Gson().fromJson(responseBody, typeOf);
}
}
private void clearCacheOnIndexDeletion(String index) {
......
......@@ -376,7 +376,7 @@ public class IndicesServiceTest {
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json");
Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
......@@ -402,7 +402,7 @@ public class IndicesServiceTest {
" \"creation.date\": \"1545912868416\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,docs.count,creation.date&format=json");
Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
......@@ -468,10 +468,20 @@ public class IndicesServiceTest {
BooleanResponse booleanResponse = new BooleanResponse(true);
doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class));
HealthResponse healthResponse = mock(HealthResponse.class);
when(healthResponse.status()).thenReturn(HealthStatus.Green);
doReturn(clusterClient).when(restHighLevelClient).cluster();
doReturn(healthResponse).when(clusterClient).health(any(HealthRequest.class));
String responseJson = "[" +
" {" +
" \"index\": \"anyIndex\"," +
" \"health\": \"yellow\"," +
" \"docs.count\": \"92\"," +
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
when(this.restClient.performRequest(request)).thenReturn(response);
when(this.response.getEntity()).thenReturn(entity);
boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex");
......@@ -480,4 +490,30 @@ public class IndicesServiceTest {
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
@Test(expected = AppException.class)
public void should_throwIndexUnavailableException_whenIndexNotReady() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
BooleanResponse booleanResponse = new BooleanResponse(true);
doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class));
String responseJson = "[" +
" {" +
" \"index\": \"anyIndex\"," +
" \"health\": \"red\"," +
" \"docs.count\": \"92\"," +
" \"creation.date\": \"1545912860994\"" +
" }" +
"]";
Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json");
StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON);
when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport);
when(this.restClientTransport.restClient()).thenReturn(this.restClient);
when(this.restClient.performRequest(request)).thenReturn(response);
when(this.response.getEntity()).thenReturn(entity);
boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex");
}
}
......@@ -12,7 +12,7 @@
<java.version>17</java.version>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<os-core-common.version>3.0.0</os-core-common.version>
<os-core-common.version>3.1.0</os-core-common.version>
<hibernate-validator.version>8.0.0.Final</hibernate-validator.version>
<common-codec.version>1.14</common-codec.version>
<netty.version>4.1.51.Final</netty.version>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment