diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java index 68d72acfa4644bdf09ceba7f6b640e737ea9f5d0..35d92b5f6168f3516f2e858a1e69d0bf5f5d2207 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java @@ -16,11 +16,13 @@ package org.opengroup.osdu.indexer.service; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; -import co.elastic.clients.elasticsearch._types.HealthStatus; import co.elastic.clients.elasticsearch._types.Time; -import co.elastic.clients.elasticsearch._types.analysis.*; -import co.elastic.clients.elasticsearch.cluster.HealthRequest; -import co.elastic.clients.elasticsearch.cluster.HealthResponse; +import co.elastic.clients.elasticsearch._types.analysis.Analyzer; +import co.elastic.clients.elasticsearch._types.analysis.CharFilter; +import co.elastic.clients.elasticsearch._types.analysis.CharFilterDefinition; +import co.elastic.clients.elasticsearch._types.analysis.CustomAnalyzer; +import co.elastic.clients.elasticsearch._types.analysis.MappingCharFilter; +import co.elastic.clients.elasticsearch._types.analysis.PatternReplaceCharFilter; import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; import co.elastic.clients.elasticsearch.indices.CreateIndexRequest.Builder; import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; @@ -29,8 +31,8 @@ import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse; import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.elasticsearch.indices.GetIndexRequest; import co.elastic.clients.elasticsearch.indices.GetIndexResponse; -import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis; import co.elastic.clients.elasticsearch.indices.IndexSettings; +import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis; import co.elastic.clients.transport.endpoints.BooleanResponse; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.fasterxml.jackson.databind.ObjectMapper; @@ -53,13 +55,21 @@ import org.opengroup.osdu.indexer.util.CustomIndexAnalyzerSetting; import org.opengroup.osdu.indexer.util.ElasticClientHandler; import org.opengroup.osdu.indexer.util.TypeMapper; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.context.annotation.RequestScope; import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Type; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static co.elastic.clients.elasticsearch._types.HealthStatus.*; @Service @RequestScope @@ -85,6 +95,10 @@ public class IndicesServiceImpl implements IndicesService { private ObjectMapper objectMapper; @Autowired private CustomIndexAnalyzerSetting customIndexAnalyzerSetting; + @Value("${index.health.retry.threshold:5}") + private int healthRetryThreshold; + @Value("${index.health.retry.sleepPeriodInMilliseconds:5000}") + private int healthRetrySleepPeriodInMilliseconds; /** * Create a new index in Elasticsearch @@ -117,7 +131,7 @@ public class IndicesServiceImpl implements IndicesService { boolean indexStatus = createIndexResponse.acknowledged() && createIndexResponse.shardsAcknowledged(); if (indexStatus) { this.indexCache.put(index, true); - this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime-startTime)); + this.log.info(String.format("Time taken to successfully create new index %s : %d milliseconds", index, stopTime - startTime)); // Create alias for index indexAliasService.createIndexAlias(client, elasticIndexNameResolver.getKindFromIndexName(index)); @@ -158,10 +172,10 @@ public class IndicesServiceImpl implements IndicesService { return false; } throw new AppException( - exception.status(), - exception.getMessage(), - String.format("Error getting index: %s status", index), - exception); + exception.status(), + exception.getMessage(), + String.format("Error getting index: %s status", index), + exception); } } @@ -174,31 +188,54 @@ public class IndicesServiceImpl implements IndicesService { */ public boolean isIndexReady(ElasticsearchClient client, String index) throws IOException { try { - if (this.indexExistInCache(index)){ + if (this.indexExistInCache(index)) { return true; } - ExistsRequest existsRequest = ExistsRequest.of(builder -> builder.index(index)); - BooleanResponse exists = client.indices().exists(existsRequest); - if (!exists.value()){ - return false; - } - HealthRequest healthRequest = HealthRequest.of(builder -> builder.index(index) - .timeout(REQUEST_TIMEOUT) - .waitForStatus(HealthStatus.Yellow) - ); - HealthResponse health = client.cluster().health(healthRequest); - if (health.status() == HealthStatus.Green || health.status() == HealthStatus.Yellow) { + boolean isHealthy = isIndexHealthy(client, index); + if (isHealthy) { this.indexCache.put(index, true); - return true; } - return false; + return isHealthy; } catch (ElasticsearchException exception) { if (exception.status() == HttpStatus.SC_NOT_FOUND) return false; throw new AppException( - exception.status(), - exception.getMessage(), - String.format("Error getting index: %s status", index), - exception); + exception.status(), + exception.getMessage(), + String.format("Error getting index: %s status", index), + exception); + } + } + + private boolean isIndexHealthy(ElasticsearchClient client, String index) throws IOException { + ExistsRequest existsRequest = ExistsRequest.of(builder -> builder.index(index)); + BooleanResponse exists = client.indices().exists(existsRequest); + if (!exists.value()) { + return false; + } + String actualHealthStatus = null; + for(int retryCount = 0; retryCount <= healthRetryThreshold; retryCount++) { + List<IndexInfo> indexHealthInfos = this.getIndexInfo(client, index); + if (!indexHealthInfos.isEmpty()) { + actualHealthStatus = indexHealthInfos.get(0).getHealth(); + if (Green.jsonValue().equalsIgnoreCase(actualHealthStatus) || Yellow.jsonValue().equalsIgnoreCase(actualHealthStatus)) { + return true; + } + } + sleepInMilliSeconds(healthRetrySleepPeriodInMilliseconds); + } + if (Red.jsonValue().equalsIgnoreCase(actualHealthStatus)) { + throw new AppException(HttpStatus.SC_SERVICE_UNAVAILABLE, + "Index not available for indexing", + String.format("Index: %s primary shards are unassigned", index)); + } + return false; + } + + private void sleepInMilliSeconds(int milliseconds) { + try { + TimeUnit.MILLISECONDS.sleep(milliseconds); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -207,7 +244,7 @@ public class IndicesServiceImpl implements IndicesService { builder.refreshInterval(Time.of(timeBuilder -> timeBuilder.time("30s"))); builder.numberOfShards("1"); builder.numberOfReplicas("1"); - if(customIndexAnalyzerSetting.isEnabled()) { + if (customIndexAnalyzerSetting.isEnabled()) { IndexSettingsAnalysis analysis = getCustomAnalyzer(); builder.analysis(analysis); } @@ -258,7 +295,7 @@ public class IndicesServiceImpl implements IndicesService { public boolean deleteIndex(ElasticsearchClient client, String index) throws ElasticsearchException, IOException, AppException { List<String> indices = this.resolveIndex(client, index); boolean responseStatus = true; - for (String idx: indices) { + for (String idx : indices) { responseStatus &= removeIndexInElasticsearch(client, idx); } if (responseStatus) { @@ -314,26 +351,29 @@ public class IndicesServiceImpl implements IndicesService { } /** - * Remove index in Elasticsearch + * Get index information from Elasticsearch * * @param client Elasticsearch client * @param indexPattern Index pattern + * @return List of indices matching indexPattern * @throws IOException Throws {@link IOException} if elastic cannot complete the request */ public List<IndexInfo> getIndexInfo(ElasticsearchClient client, String indexPattern) throws IOException { Objects.requireNonNull(client, CLIENT_CANNOT_BE_NULL); String requestUrl = (indexPattern == null || indexPattern.isEmpty()) - ? "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json" - : String.format("/_cat/indices/%s?h=index,docs.count,creation.date&format=json", indexPattern); + ? "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json" + : String.format("/_cat/indices/%s?h=index,health,docs.count,creation.date&format=json", indexPattern); - RestClientTransport clientTransport = (RestClientTransport)client._transport(); - Request request = new Request("GET", requestUrl); - Response response = clientTransport.restClient().performRequest(request); - String responseBody = EntityUtils.toString(response.getEntity()); + try (RestClientTransport clientTransport = (RestClientTransport) client._transport()) { + Request request = new Request("GET", requestUrl); + Response response = clientTransport.restClient().performRequest(request); + String responseBody = EntityUtils.toString(response.getEntity()); - Type typeOf = new TypeToken<List<IndexInfo>>() {}.getType(); - return new Gson().fromJson(responseBody, typeOf); + Type typeOf = new TypeToken<List<IndexInfo>>() { + }.getType(); + return new Gson().fromJson(responseBody, typeOf); + } } private void clearCacheOnIndexDeletion(String index) { diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java index 2053f189e8b8898777feb3587c12ff30c3d41067..a70e1ad6f5b07a083213729423d51875a1622cf0 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java @@ -18,12 +18,8 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch._types.ErrorResponse; -import co.elastic.clients.elasticsearch._types.HealthStatus; import co.elastic.clients.elasticsearch._types.analysis.Analyzer; import co.elastic.clients.elasticsearch._types.analysis.CharFilter; -import co.elastic.clients.elasticsearch.cluster.ElasticsearchClusterClient; -import co.elastic.clients.elasticsearch.cluster.HealthRequest; -import co.elastic.clients.elasticsearch.cluster.HealthResponse; import co.elastic.clients.elasticsearch.indices.*; import co.elastic.clients.transport.endpoints.BooleanResponse; import co.elastic.clients.transport.rest_client.RestClientTransport; @@ -66,6 +62,7 @@ import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; +import static org.opengroup.osdu.indexer.testutils.ReflectionTestUtil.setFieldValueForClass; @RunWith(SpringRunner.class) public class IndicesServiceTest { @@ -95,16 +92,16 @@ public class IndicesServiceTest { private ElasticsearchClient restHighLevelClient; private ElasticsearchIndicesClient indicesClient; - private ElasticsearchClusterClient clusterClient; private RestClientTransport restClientTransport; @Before public void setup() { initMocks(this); indicesClient = mock(ElasticsearchIndicesClient.class); - clusterClient = mock(ElasticsearchClusterClient.class); restHighLevelClient = mock(ElasticsearchClient.class); restClientTransport = mock(RestClientTransport.class); + setFieldValueForClass(sut, "healthRetryThreshold", 1); + setFieldValueForClass(sut, "healthRetrySleepPeriodInMilliseconds", 1); } @Test @@ -376,7 +373,7 @@ public class IndicesServiceTest { " \"creation.date\": \"1545912860994\"" + " }" + "]"; - Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,docs.count,creation.date&s=docs.count:asc&format=json"); + Request request = new Request("GET", "/_cat/indices/*,-.*?h=index,health,docs.count,creation.date&s=docs.count:asc&format=json"); StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON); when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport); when(this.restClientTransport.restClient()).thenReturn(this.restClient); @@ -402,7 +399,7 @@ public class IndicesServiceTest { " \"creation.date\": \"1545912868416\"" + " }" + "]"; - Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,docs.count,creation.date&format=json"); + Request request = new Request("GET", "/_cat/indices/tenant1-aapg-*?h=index,health,docs.count,creation.date&format=json"); StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON); when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport); when(this.restClientTransport.restClient()).thenReturn(this.restClient); @@ -468,10 +465,20 @@ public class IndicesServiceTest { BooleanResponse booleanResponse = new BooleanResponse(true); doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class)); - HealthResponse healthResponse = mock(HealthResponse.class); - when(healthResponse.status()).thenReturn(HealthStatus.Green); - doReturn(clusterClient).when(restHighLevelClient).cluster(); - doReturn(healthResponse).when(clusterClient).health(any(HealthRequest.class)); + String responseJson = "[" + + " {" + + " \"index\": \"anyIndex\"," + + " \"health\": \"yellow\"," + + " \"docs.count\": \"92\"," + + " \"creation.date\": \"1545912860994\"" + + " }" + + "]"; + Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json"); + StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON); + when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport); + when(this.restClientTransport.restClient()).thenReturn(this.restClient); + when(this.restClient.performRequest(request)).thenReturn(response); + when(this.response.getEntity()).thenReturn(entity); boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex"); @@ -480,4 +487,30 @@ public class IndicesServiceTest { verify(this.indicesExistCache, times(1)).put("anyIndex", true); } + @Test(expected = AppException.class) + public void should_throwIndexUnavailableException_whenIndexNotReady() throws IOException { + when(this.indicesExistCache.get("anyIndex")).thenReturn(false); + doReturn(indicesClient).when(restHighLevelClient).indices(); + + BooleanResponse booleanResponse = new BooleanResponse(true); + doReturn(booleanResponse).when(indicesClient).exists(any(ExistsRequest.class)); + + String responseJson = "[" + + " {" + + " \"index\": \"anyIndex\"," + + " \"health\": \"red\"," + + " \"docs.count\": \"92\"," + + " \"creation.date\": \"1545912860994\"" + + " }" + + "]"; + Request request = new Request("GET", "/_cat/indices/anyIndex?h=index,health,docs.count,creation.date&format=json"); + StringEntity entity = new StringEntity(responseJson, ContentType.APPLICATION_JSON); + when(this.restHighLevelClient._transport()).thenReturn(this.restClientTransport); + when(this.restClientTransport.restClient()).thenReturn(this.restClient); + when(this.restClient.performRequest(request)).thenReturn(response); + when(this.response.getEntity()).thenReturn(entity); + + this.sut.isIndexReady(restHighLevelClient, "anyIndex"); + } + } diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/testutils/ReflectionTestUtil.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/testutils/ReflectionTestUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..51ae2218aeb15b468f62f160f57b84cb92b866f3 --- /dev/null +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/testutils/ReflectionTestUtil.java @@ -0,0 +1,14 @@ +package org.opengroup.osdu.indexer.testutils; + +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Field; + +public class ReflectionTestUtil { + + public static <T> void setFieldValueForClass(T targetObject, String fieldName, Object fieldValue) { + Field field = ReflectionUtils.findField(targetObject.getClass(), fieldName); + field.setAccessible(true); + ReflectionUtils.setField(field, targetObject, fieldValue); + } +} diff --git a/pom.xml b/pom.xml index 46b6e84de0d6f574742ce1181838eadb50fad9fc..357f128fe075acf5a7d721e6b1ec8cf0df82404c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ <java.version>17</java.version> <maven.compiler.target>17</maven.compiler.target> <maven.compiler.source>17</maven.compiler.source> - <os-core-common.version>3.0.0</os-core-common.version> + <os-core-common.version>3.1.0</os-core-common.version> <hibernate-validator.version>8.0.0.Final</hibernate-validator.version> <common-codec.version>1.14</common-codec.version> <netty.version>4.1.51.Final</netty.version>