diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java index c34001666b0c993dae73df6d792caa8d93e6067a..39f5a76b950163f1b283ccd9d8c5adc96a1e5b79 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java @@ -20,6 +20,7 @@ import javassist.NotFoundException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.indexer.schema.converter.exeption.SchemaProcessingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; @@ -74,6 +75,12 @@ public class GlobalExceptionMapperCore extends ResponseEntityExceptionHandler { } } + @ExceptionHandler(SchemaProcessingException.class) + public ResponseEntity<Object> handleSchemaProcessingException(SchemaProcessingException e) { + return this.getErrorResponse( + new AppException(HttpStatus.BAD_REQUEST.value(), "Error processing schema.", e.getMessage(), e)); + } + @ExceptionHandler(Exception.class) protected ResponseEntity<Object> handleGeneralException(Exception e) { return this.getErrorResponse( diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index e5dcc4125396915ad214bb656dd299cca27dff4c..c3517a676f63ffd2d08a848b8bb02a6f31383a48 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -366,7 +366,7 @@ public class IndexerServiceImpl implements IndexerService { String index = this.elasticIndexNameResolver.getIndexNameFromKind(schema.getKind()); // check if index exist and sync meta attribute schema if required - if (this.indicesService.isIndexExist(restClient, index)) { + if (this.indicesService.isIndexReady(restClient, index)) { this.mappingService.syncIndexMappingIfRequired(restClient, index); continue; } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesService.java index c09ff3e5744cb5ad52016499abdb8e55aff0b271..48fa3cfe46adf304beb772415b95b2b19c933450 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesService.java @@ -36,4 +36,6 @@ public interface IndicesService { boolean deleteIndex(String index) throws ElasticsearchException, IOException, AppException; List<IndexInfo> getIndexInfo(RestHighLevelClient client, String indexPattern) throws IOException; + + boolean isIndexReady(RestHighLevelClient client, String index) throws IOException; } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java index ad1d20f43d6c72f8371331e70779f1e7dd18a791..2027796610526b85510a9a9404fdccd45ca39ba8 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndicesServiceImpl.java @@ -22,6 +22,8 @@ import org.apache.http.HttpStatus; import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Request; @@ -92,7 +94,7 @@ public class IndicesServiceImpl implements IndicesService { request.settings(settings != null ? settings : DEFAULT_INDEX_SETTINGS); if (mapping != null) { String mappingJsonString = new Gson().toJson(mapping, Map.class); - request.mapping(mappingJsonString,XContentType.JSON); + request.mapping(mappingJsonString, XContentType.JSON); } request.setTimeout(REQUEST_TIMEOUT); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); @@ -121,13 +123,7 @@ public class IndicesServiceImpl implements IndicesService { */ public boolean isIndexExist(RestHighLevelClient client, String index) throws IOException { try { - try { - Boolean isIndexExist = (Boolean) this.indexCache.get(index); - if (isIndexExist != null && isIndexExist) return true; - } catch (RedisException ex) { - //In case the format of cache changes then clean the cache - this.indexCache.delete(index); - } + if (this.indexExistInCache(index)) return true; GetIndexRequest request = new GetIndexRequest(index); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); if (exists) this.indexCache.put(index, true); @@ -142,6 +138,50 @@ public class IndicesServiceImpl implements IndicesService { } } + /** + * Check if an index ready for indexing + * + * @param index Index name + * @return index details if index already exists + * @throws IOException if request cannot be processed + */ + public boolean isIndexReady(RestHighLevelClient client, String index) throws IOException { + try { + if (this.indexExistInCache(index)) return true; + GetIndexRequest request = new GetIndexRequest(index); + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); + if (!exists) return false; + ClusterHealthRequest indexHealthRequest = new ClusterHealthRequest(); + indexHealthRequest.indices(index); + indexHealthRequest.timeout(REQUEST_TIMEOUT); + indexHealthRequest.waitForYellowStatus(); + ClusterHealthResponse healthResponse = client.cluster().health(indexHealthRequest, RequestOptions.DEFAULT); + if (healthResponse.status() == RestStatus.OK) { + this.indexCache.put(index, true); + return true; + } + return false; + } catch (ElasticsearchException exception) { + if (exception.status() == RestStatus.NOT_FOUND) return false; + throw new AppException( + exception.status().getStatus(), + exception.getMessage(), + String.format("Error getting index: %s status", index), + exception); + } + } + + private boolean indexExistInCache(String index) { + try { + Boolean isIndexExist = (Boolean) this.indexCache.get(index); + if (isIndexExist != null && isIndexExist) return true; + } catch (RedisException ex) { + //In case the format of cache changes then clean the cache + this.indexCache.delete(index); + } + return false; + } + /** * Deletes index if user has required role: search.admin * diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCoreTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCoreTest.java index 992c0a01fea9b1892e20eaaa31b5674028741a85..1877df26e55b971991b6f222cc28a49926426c0f 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCoreTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCoreTest.java @@ -23,6 +23,7 @@ import org.mockito.Mock; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.RequestStatus; +import org.opengroup.osdu.indexer.schema.converter.exeption.SchemaProcessingException; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.http.ResponseEntity; import org.springframework.security.access.AccessDeniedException; @@ -107,4 +108,12 @@ public class GlobalExceptionMapperCoreTest { ResponseEntity response = this.sut.handleIOException(ioException); assertEquals(HttpStatus.SC_SERVICE_UNAVAILABLE, response.getStatusCodeValue()); } + + @Test + public void should_returnAppException_when_SchemaProcessingExceptionIsThrown() { + SchemaProcessingException schemaProcessingException = new SchemaProcessingException("error processing schema"); + + ResponseEntity response = this.sut.handleSchemaProcessingException(schemaProcessingException); + assertEquals(HttpStatus.SC_BAD_REQUEST, response.getStatusCodeValue()); + } } \ No newline at end of file diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java index 38c7a4f682e39182beaac68bac21ebd8ee34d3d9..17bce5783d76f793e848268d9f6a68de7f105a6a 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndicesServiceTest.java @@ -22,8 +22,11 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.rest.RestStatus; import org.junit.Before; import org.junit.Test; @@ -51,7 +54,7 @@ import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @RunWith(SpringRunner.class) -@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, EntityUtils.class}) +@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, ClusterClient.class, EntityUtils.class}) public class IndicesServiceTest { @Mock private ElasticClientHandler elasticClientHandler; @@ -73,11 +76,13 @@ public class IndicesServiceTest { private RestHighLevelClient restHighLevelClient; private IndicesClient indicesClient; + private ClusterClient clusterClient; @Before public void setup() { initMocks(this); indicesClient = PowerMockito.mock(IndicesClient.class); + clusterClient = PowerMockito.mock(ClusterClient.class); restHighLevelClient = PowerMockito.mock(RestHighLevelClient.class); } @@ -218,4 +223,54 @@ public class IndicesServiceTest { assertEquals("1", indicesList.get(0).getDocumentCount()); assertEquals("1551996907769", indicesList.get(0).getCreationDate()); } + + @Test + public void should_returnTrue_indexExistInCache() throws IOException { + when(this.indicesExistCache.get("anyIndex")).thenReturn(true); + + boolean result = this.sut.isIndexExist(any(RestHighLevelClient.class), "anyIndex"); + + assertTrue(result); + } + + @Test + public void should_getIndexExist_whenIndexNotInCache() throws IOException { + when(this.indicesExistCache.get("anyIndex")).thenReturn(false); + + doReturn(indicesClient).when(restHighLevelClient).indices(); + doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class)); + + boolean result = this.sut.isIndexExist(restHighLevelClient, "anyIndex"); + + assertTrue(result); + verify(this.indicesExistCache, times(1)).get("anyIndex"); + verify(this.indicesExistCache, times(1)).put("anyIndex", true); + } + + @Test + public void should_getIndexReadyStatus_whenIndexInCache() throws IOException { + when(this.indicesExistCache.get("anyIndex")).thenReturn(true); + + boolean result = this.sut.isIndexReady(any(RestHighLevelClient.class), "anyIndex"); + + assertTrue(result); + } + + @Test + public void should_getIndexReadyStatus_whenIndexNotInCache() throws IOException { + when(this.indicesExistCache.get("anyIndex")).thenReturn(false); + doReturn(indicesClient).when(restHighLevelClient).indices(); + doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class)); + + ClusterHealthResponse healthResponse = mock(ClusterHealthResponse.class); + when(healthResponse.status()).thenReturn(RestStatus.OK); + doReturn(clusterClient).when(restHighLevelClient).cluster(); + doReturn(healthResponse).when(clusterClient).health(any(ClusterHealthRequest.class), any(RequestOptions.class)); + + boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex"); + + assertTrue(result); + verify(this.indicesExistCache, times(1)).get("anyIndex"); + verify(this.indicesExistCache, times(1)).put("anyIndex", true); + } } \ No newline at end of file