Commit 10d9b4e4 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Merge remote-tracking branch 'origin/master' into hybrid-lib-integration

parents 3be3b176 cfdbcd6a
This diff is collapsed.
......@@ -16,7 +16,7 @@
<properties>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<osdu.oscorecommon.version>0.13.0</osdu.oscorecommon.version>
<osdu.oscorecommon.version>0.14.0-rc4</osdu.oscorecommon.version>
<spring-security-web.version>5.4.9</spring-security-web.version>
<gson.version>2.8.9</gson.version>
<netty.version>4.1.70.Final</netty.version>
......
......@@ -20,6 +20,7 @@ import javassist.NotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.indexer.schema.converter.exeption.SchemaProcessingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
......@@ -74,6 +75,12 @@ public class GlobalExceptionMapperCore extends ResponseEntityExceptionHandler {
}
}
@ExceptionHandler(SchemaProcessingException.class)
public ResponseEntity<Object> handleSchemaProcessingException(SchemaProcessingException e) {
return this.getErrorResponse(
new AppException(HttpStatus.BAD_REQUEST.value(), "Error processing schema.", e.getMessage(), e));
}
@ExceptionHandler(Exception.class)
protected ResponseEntity<Object> handleGeneralException(Exception e) {
return this.getErrorResponse(
......
......@@ -366,7 +366,7 @@ public class IndexerServiceImpl implements IndexerService {
String index = this.elasticIndexNameResolver.getIndexNameFromKind(schema.getKind());
// check if index exist and sync meta attribute schema if required
if (this.indicesService.isIndexExist(restClient, index)) {
if (this.indicesService.isIndexReady(restClient, index)) {
this.mappingService.syncIndexMappingIfRequired(restClient, index);
continue;
}
......@@ -400,7 +400,7 @@ public class IndexerServiceImpl implements IndexerService {
IndexRequest indexRequest = new IndexRequest(index).id(record.getId()).source(this.gson.toJson(sourceMap), XContentType.JSON);
bulkRequest.add(indexRequest);
} else if (operation == OperationType.update) {
UpdateRequest updateRequest = new UpdateRequest(index, record.getId()).upsert(this.gson.toJson(sourceMap), XContentType.JSON);
UpdateRequest updateRequest = new UpdateRequest(index, "_doc", record.getId()).doc(this.gson.toJson(sourceMap), XContentType.JSON).docAsUpsert(true);
bulkRequest.add(updateRequest);
}
}
......
......@@ -36,4 +36,6 @@ public interface IndicesService {
boolean deleteIndex(String index) throws ElasticsearchException, IOException, AppException;
List<IndexInfo> getIndexInfo(RestHighLevelClient client, String indexPattern) throws IOException;
boolean isIndexReady(RestHighLevelClient client, String index) throws IOException;
}
......@@ -22,6 +22,8 @@ import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Request;
......@@ -92,7 +94,7 @@ public class IndicesServiceImpl implements IndicesService {
request.settings(settings != null ? settings : DEFAULT_INDEX_SETTINGS);
if (mapping != null) {
String mappingJsonString = new Gson().toJson(mapping, Map.class);
request.mapping(mappingJsonString,XContentType.JSON);
request.mapping(mappingJsonString, XContentType.JSON);
}
request.setTimeout(REQUEST_TIMEOUT);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
......@@ -121,13 +123,7 @@ public class IndicesServiceImpl implements IndicesService {
*/
public boolean isIndexExist(RestHighLevelClient client, String index) throws IOException {
try {
try {
Boolean isIndexExist = (Boolean) this.indexCache.get(index);
if (isIndexExist != null && isIndexExist) return true;
} catch (RedisException ex) {
//In case the format of cache changes then clean the cache
this.indexCache.delete(index);
}
if (this.indexExistInCache(index)) return true;
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if (exists) this.indexCache.put(index, true);
......@@ -142,6 +138,50 @@ public class IndicesServiceImpl implements IndicesService {
}
}
/**
* Check if an index ready for indexing
*
* @param index Index name
* @return index details if index already exists
* @throws IOException if request cannot be processed
*/
public boolean isIndexReady(RestHighLevelClient client, String index) throws IOException {
try {
if (this.indexExistInCache(index)) return true;
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if (!exists) return false;
ClusterHealthRequest indexHealthRequest = new ClusterHealthRequest();
indexHealthRequest.indices(index);
indexHealthRequest.timeout(REQUEST_TIMEOUT);
indexHealthRequest.waitForYellowStatus();
ClusterHealthResponse healthResponse = client.cluster().health(indexHealthRequest, RequestOptions.DEFAULT);
if (healthResponse.status() == RestStatus.OK) {
this.indexCache.put(index, true);
return true;
}
return false;
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) return false;
throw new AppException(
exception.status().getStatus(),
exception.getMessage(),
String.format("Error getting index: %s status", index),
exception);
}
}
private boolean indexExistInCache(String index) {
try {
Boolean isIndexExist = (Boolean) this.indexCache.get(index);
if (isIndexExist != null && isIndexExist) return true;
} catch (RedisException ex) {
//In case the format of cache changes then clean the cache
this.indexCache.delete(index);
}
return false;
}
/**
* Deletes index if user has required role: search.admin
*
......
......@@ -23,6 +23,7 @@ import org.mockito.Mock;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.RequestStatus;
import org.opengroup.osdu.indexer.schema.converter.exeption.SchemaProcessingException;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.AccessDeniedException;
......@@ -107,4 +108,12 @@ public class GlobalExceptionMapperCoreTest {
ResponseEntity response = this.sut.handleIOException(ioException);
assertEquals(HttpStatus.SC_SERVICE_UNAVAILABLE, response.getStatusCodeValue());
}
@Test
public void should_returnAppException_when_SchemaProcessingExceptionIsThrown() {
SchemaProcessingException schemaProcessingException = new SchemaProcessingException("error processing schema");
ResponseEntity response = this.sut.handleSchemaProcessingException(schemaProcessingException);
assertEquals(HttpStatus.SC_BAD_REQUEST, response.getStatusCodeValue());
}
}
\ No newline at end of file
......@@ -22,8 +22,11 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
......@@ -51,7 +54,7 @@ import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
@RunWith(SpringRunner.class)
@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, EntityUtils.class})
@PrepareForTest({RestHighLevelClient.class, IndicesClient.class, ClusterClient.class, EntityUtils.class})
public class IndicesServiceTest {
@Mock
private ElasticClientHandler elasticClientHandler;
......@@ -73,11 +76,13 @@ public class IndicesServiceTest {
private RestHighLevelClient restHighLevelClient;
private IndicesClient indicesClient;
private ClusterClient clusterClient;
@Before
public void setup() {
initMocks(this);
indicesClient = PowerMockito.mock(IndicesClient.class);
clusterClient = PowerMockito.mock(ClusterClient.class);
restHighLevelClient = PowerMockito.mock(RestHighLevelClient.class);
}
......@@ -218,4 +223,54 @@ public class IndicesServiceTest {
assertEquals("1", indicesList.get(0).getDocumentCount());
assertEquals("1551996907769", indicesList.get(0).getCreationDate());
}
@Test
public void should_returnTrue_indexExistInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(true);
boolean result = this.sut.isIndexExist(any(RestHighLevelClient.class), "anyIndex");
assertTrue(result);
}
@Test
public void should_getIndexExist_whenIndexNotInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class));
boolean result = this.sut.isIndexExist(restHighLevelClient, "anyIndex");
assertTrue(result);
verify(this.indicesExistCache, times(1)).get("anyIndex");
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
@Test
public void should_getIndexReadyStatus_whenIndexInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(true);
boolean result = this.sut.isIndexReady(any(RestHighLevelClient.class), "anyIndex");
assertTrue(result);
}
@Test
public void should_getIndexReadyStatus_whenIndexNotInCache() throws IOException {
when(this.indicesExistCache.get("anyIndex")).thenReturn(false);
doReturn(indicesClient).when(restHighLevelClient).indices();
doReturn(true).when(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class));
ClusterHealthResponse healthResponse = mock(ClusterHealthResponse.class);
when(healthResponse.status()).thenReturn(RestStatus.OK);
doReturn(clusterClient).when(restHighLevelClient).cluster();
doReturn(healthResponse).when(clusterClient).health(any(ClusterHealthRequest.class), any(RequestOptions.class));
boolean result = this.sut.isIndexReady(restHighLevelClient, "anyIndex");
assertTrue(result);
verify(this.indicesExistCache, times(1)).get("anyIndex");
verify(this.indicesExistCache, times(1)).put("anyIndex", true);
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring-cloud.version>Greenwich.SR2</spring-cloud.version>
<os-core-common.version>0.13.0</os-core-common.version>
<os-core-common.version>0.14.0-rc4</os-core-common.version>
<snakeyaml.version>1.26</snakeyaml.version>
<hibernate-validator.version>6.1.5.Final</hibernate-validator.version>
<jackson.version>2.11.4</jackson.version>
......
......@@ -227,6 +227,14 @@
<version>1.7.0</version>
</dependency>
<!-- Prometheus Dependency -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
......
......@@ -93,6 +93,9 @@ management.server.port=8081
management.health.azure-key-vault.enabled=false
management.health.elasticsearch.enabled=false
management.endpoints.web.exposure.include=${web_exposure_endpoints:health,info}
management.metrics.enable.all=${enable_metrics:false}
#Redis
redis.database=${REDIS_DATABASE}
\ No newline at end of file
redis.database=${REDIS_DATABASE}
......@@ -90,6 +90,11 @@ public class Steps extends SchemaServiceRecordSteps {
super.i_should_get_the_documents_for_the_in_the_Elastic_Search(expectedCount, index);
}
@Then("^I should not get any documents for the \"([^\"]*)\" in the Elastic Search$")
public void i_should_not_get_any_documents_for_the_index_in_the_Elastic_Search(String index) throws Throwable {
super.i_should_not_get_any_documents_for_the_index_in_the_Elastic_Search(index);
}
@Then("^I should get the elastic \"(.*?)\" for the \"([^\"]*)\" and \"([^\"]*)\" in the Elastic Search$")
public void i_should_get_the_elastic_for_the_tenant_testindex_timestamp_well_in_the_Elastic_Search(String expectedMapping, String kind, String index) throws Throwable {
super.i_should_get_the_elastic_for_the_tenant_testindex_timestamp_well_in_the_Elastic_Search(expectedMapping, kind, index);
......
......@@ -36,7 +36,7 @@
<cucumber.version>1.2.5</cucumber.version>
<junit.jupiter.version>5.6.0</junit.jupiter.version>
<elasticsearch.version>7.8.1</elasticsearch.version>
<os-core-common.version>0.12.0-rc3</os-core-common.version>
<os-core-common.version>0.14.0-rc4</os-core-common.version>
</properties>
<dependencies>
......
......@@ -58,6 +58,11 @@ public class Steps extends SchemaServiceRecordSteps {
super.i_should_get_the_documents_for_the_in_the_Elastic_Search(expectedCount, index);
}
@Then("^I should not get any documents for the \"([^\"]*)\" in the Elastic Search$")
public void i_should_not_get_any_documents_for_the_index_in_the_Elastic_Search(String index) throws Throwable {
super.i_should_not_get_any_documents_for_the_index_in_the_Elastic_Search(index);
}
@Then("^I should get the elastic \"(.*?)\" for the \"([^\"]*)\" and \"([^\"]*)\" in the Elastic Search$")
public void i_should_get_the_elastic_for_the_tenant_testindex_timestamp_well_in_the_Elastic_Search(String expectedMapping, String kind, String index) throws Throwable {
super.i_should_get_the_elastic_for_the_tenant_testindex_timestamp_well_in_the_Elastic_Search(expectedMapping, kind, index);
......
......@@ -17,7 +17,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<cucumber.version>1.2.5</cucumber.version>
<os-core-common.version>0.12.0-rc1</os-core-common.version>
<os-core-common.version>0.14.0-rc4</os-core-common.version>
</properties>
<dependencies>
......
......@@ -134,6 +134,11 @@ public class RecordSteps extends TestsBase {
assertEquals(expectedCount, numOfIndexedDocuments);
}
public void i_should_not_get_any_documents_for_the_index_in_the_Elastic_Search(String index) throws Throwable {
index = generateActualName(index, timeStamp);
getRecordsInIndex(index, 0);
}
public void i_should_get_the_elastic_for_the_tenant_testindex_timestamp_well_in_the_Elastic_Search(String expectedMapping, String kind, String index) throws Throwable {
index = generateActualName(index, timeStamp);
Map<String, MappingMetadata> elasticMapping = elasticUtils.getMapping(index);
......@@ -253,6 +258,32 @@ public class RecordSteps extends TestsBase {
return numOfIndexedDocuments;
}
private long getRecordsInIndex(String index, int expectedCount) throws InterruptedException, IOException {
long numOfIndexedDocuments = 0;
int iterator;
// index.refresh_interval is set to default 30s, wait for 40s initially
Thread.sleep(40000);
for (iterator = 0; iterator < 20; iterator++) {
numOfIndexedDocuments = elasticUtils.fetchRecords(index);
if (expectedCount == numOfIndexedDocuments) {
log.info(String.format("index: %s | attempts: %s | documents acknowledged by elastic: %s", index, iterator, numOfIndexedDocuments));
break;
} else {
log.info(String.format("index: %s | documents acknowledged by elastic: %s", index, numOfIndexedDocuments));
Thread.sleep(5000);
}
if ((iterator + 1) % 5 == 0) elasticUtils.refreshIndex(index);
}
if (iterator >= 20) {
fail(String.format("index not created after waiting for %s seconds", ((40000 + iterator * 5000) / 1000)));
}
return numOfIndexedDocuments;
}
private Boolean areJsonEqual(String firstJson, String secondJson) {
Gson gson = new Gson();
Type mapType = new TypeToken<Map<String, Object>>() {}.getType();
......
......@@ -3,13 +3,15 @@ Feature: Indexing of the documents
Background:
Given the schema is created with the following kind
| kind | index | schemaFile |
| tenant1:indexer:test-data--Integration:1.0.1 | tenant1-indexer-test-data--integration-1.0.1 | index_records_1 |
| tenant1:indexer:test-data--Integration:2.0.1 | tenant1-indexer-test-data--integration-2.0.1 | index_records_2 |
| tenant1:indexer:test-data--Integration:3.0.1 | tenant1-indexer-test-data--integration-3.0.1 | index_records_3 |
| tenant1:wks:master-data--Wellbore:2.0.3 | tenant1-wks-master-data--wellbore-2.0.3 | r3-index_record_wks_master |
| tenant1:wks:ArraysOfObjectsTestCollection:4.0.0 | tenant1-wks-arraysofobjectstestcollection-4.0.0 | r3-index_record_arrayofobjects |
| tenant1:indexer:test-mapping--Sync:2.0.0 | tenant1-indexer-test-mapping--sync-2.0.0 | index_record_sync_mapping |
| kind | index | schemaFile |
| tenant1:indexer:test-data--Integration:1.0.1 | tenant1-indexer-test-data--integration-1.0.1 | index_records_1 |
| tenant1:indexer:test-data--Integration:2.0.1 | tenant1-indexer-test-data--integration-2.0.1 | index_records_2 |
| tenant1:indexer:test-data--Integration:3.0.1 | tenant1-indexer-test-data--integration-3.0.1 | index_records_3 |
| tenant1:wks:master-data--Wellbore:2.0.3 | tenant1-wks-master-data--wellbore-2.0.3 | r3-index_record_wks_master |
| tenant1:wks:ArraysOfObjectsTestCollection:4.0.0 | tenant1-wks-arraysofobjectstestcollection-4.0.0 | r3-index_record_arrayofobjects |
| tenant1:indexer:test-mapping--Sync:2.0.0 | tenant1-indexer-test-mapping--sync-2.0.0 | index_record_sync_mapping |
| tenant1:indexer:test-update-data--Integration:1.0.1 | tenant1-indexer-test-update-data--integration-1.0.1 | index_update_records_kind_v1 |
| tenant1:indexer:test-update-data--Integration:2.0.1 | tenant1-indexer-test-update-data--integration-2.0.1 | index_update_records_kind_v2 |
Scenario Outline: Ingest the record and Index in the Elastic Search
When I ingest records with the <recordFile> with <acl> for a given <kind>
......@@ -64,4 +66,15 @@ Feature: Indexing of the documents
Examples:
| kind | index | recordFile | mappingFile | acl |
| "tenant1:indexer:test-mapping--Sync:2.0.0" | "tenant1-indexer-test-mapping--sync-2.0.0" | "index_record_sync_mapping" | "index_record_sync_mapping" | "data.default.viewers@tenant1" |
\ No newline at end of file
| "tenant1:indexer:test-mapping--Sync:2.0.0" | "tenant1-indexer-test-mapping--sync-2.0.0" | "index_record_sync_mapping" | "index_record_sync_mapping" | "data.default.viewers@tenant1" |
Scenario Outline: Ingest the record and Index in the Elastic Search
When I ingest records with the <recordFile> with <acl> for a given <kind_v1>
Then I should get the 1 documents for the <index_v1> in the Elastic Search
Then I ingest records with the <recordFile> with <acl> for a given <kind_v2>
Then I should get the 1 documents for the <index_v2> in the Elastic Search
Then I should not get any documents for the <index_v1> in the Elastic Search
Examples:
| kind_v1 | index_v1 | recordFile | acl | kind_v2 | index_v2 |
| "tenant1:indexer:test-update-data--Integration:1.0.1" | "tenant1-indexer-test-update-data--integration-1.0.1" | "index_update_records_kind_v1" | "data.default.viewers@tenant1" | "tenant1:indexer:test-update-data--Integration:2.0.1" | "tenant1-indexer-test-update-data--integration-2.0.1" |
\ No newline at end of file
[
{
"id": "tenant1:test:testSchemaUpdate<timestamp>",
"data": {
"message": "test record1"
}
}
]
\ No newline at end of file
{
"schemaInfo": {
"schemaIdentity": {
"authority": "tenant1",
"source": "indexer",
"entityType": "test-update-data--Integration",
"schemaVersionMajor": "1",
"schemaVersionMinor": "0",
"schemaVersionPatch": "1"
},
"status": "DEVELOPMENT"
},
"schema": {
"properties": {
"data": {
"allOf": [
{
"type": "object",
"properties": {
"message": {
"type": "string"
}
}
}
]
}
}
}
}
\ No newline at end of file
[
{
"id": "tenant1:test:testSchemaUpdate<timestamp>",
"data": {
"message": "test record2"
}
}
]
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment