Skip to content
Snippets Groups Projects
Commit 7cecaf05 authored by Alok Joshi's avatar Alok Joshi
Browse files

Merge branch 'master' of...

parents 8b7e2d28 b379024b
No related branches found
No related tags found
2 merge requests!346Merge branch 'aws-integration' into 'master',!284Update kind event support
Pipeline #97975 failed
......@@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
......@@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService {
private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1);
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND));
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE));
private final Gson gson = new GsonBuilder().serializeNulls().create();
......@@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService {
List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) return failureRecordIds;
int failedRequestStatus = 500;
Exception failedRequestCause = null;
try {
BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
......@@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage()));
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) {
if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
}
failedResponses++;
} else {
succeededResponses++;
......@@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService {
if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures);
jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses));
// retry entire message if all records are failing
if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause);
} catch (IOException e) {
// throw explicit 504 for IOException
throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e);
} catch (ElasticsearchStatusException e) {
throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e);
} catch (Exception e) {
if (e instanceof AppException) {
throw e;
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e);
}
return failureRecordIds;
......@@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService {
return indexerPayload;
}
private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) {
if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true;
if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE)
&& bulkItemResponse.status() == RestStatus.NOT_FOUND) {
return true;
}
return false;
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds));
......
package org.opengroup.osdu.indexer.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.List;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.mockito.Spy;
import org.opengroup.osdu.core.common.http.HeadersUtil;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.entitlements.Acl;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.storage.ConversionStatus;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.springframework.test.context.junit4.SpringRunner;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(SpringRunner.class)
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({RestHighLevelClient.class, BulkResponse.class, Acl.class, HeadersUtil.class})
public class IndexerServiceImplTest {
@InjectMocks
private IndexerServiceImpl indexerService;
@InjectMocks
private IndexerServiceImpl sut;
@Mock
private ElasticClientHandler elasticClientHandler;
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
@Mock
private IndicesService indicesService;
@Mock
private StorageService storageService;
@Mock
private StorageIndexerPayloadMapper storageIndexerPayloadMapper;
@InjectMocks
@Spy
private JobStatus jobStatus = new JobStatus();
@Mock
private AuditLogger auditLogger;
@Mock
private BulkResponse bulkResponse;
@Mock
private IRequestInfo requestInfo;
@Mock
private RestHighLevelClient restHighLevelClient;
@Mock
private IndexSchemaService schemaService;
@Mock
private JaxRsDpsLog jaxRsDpsLog;
@Mock
private IMappingService mappingService;
@Mock
private IPublisher progressPublisher;
private List<RecordInfo> recordInfos = new ArrayList<>();
private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," +
"{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]";
private final String kind1 = "opendes:testindexer1:well:1.0.0";
private final String kind2 = "opendes:testindexer2:well:1.0.0";
private final String recordId1 = "opendes:doc:test1";
private final String recordId2 = "opendes:doc:test2";
private final String failureMassage = "test failure";
private DpsHeaders dpsHeaders;
private RecordChangedMessages recordChangedMessages;
@Before
public void setup() throws IOException {
}
@Test
public void processSchemaMessagesTest() throws Exception {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
this.recordInfos.add(recordInfo);
initMocks(this);
this.sut.processSchemaMessages(recordInfos);
verify(this.elasticClientHandler, times(1)).createRestClient();
verify(this.elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(this.indicesService, times(1)).isIndexExist(any(), any());
}
@Test
public void should_properlyUpdateAuditLogs_givenValidCreateAndUpdateRecords() {
try {
mockStatic(Acl.class);
// setup headers
this.dpsHeaders = new DpsHeaders();
this.dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth");
when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders);
when(this.requestInfo.getHeadersMapWithDwdAuthZ()).thenReturn(dpsHeaders.getHeaders());
// setup message
Type listType = new TypeToken<List<RecordInfo>>() {}.getType();
this.recordInfos = (new Gson()).fromJson(this.pubsubMsg, listType);
Map<String, String> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, "opendes");
this.recordChangedMessages = RecordChangedMessages.builder().attributes(messageAttributes).messageId("xxxx").publishTime("2000-01-02T10:10:44+0000").data("{}").build();
// setup schema
Map<String, Object> schema = createSchema();
indexSchemaServiceMock(kind2, schema);
indexSchemaServiceMock(kind1, null);
// setup storage records
Map<String, Object> storageData = new HashMap<>();
storageData.put("schema1", "test-value");
List<Records.Entity> validRecords = new ArrayList<>();
validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build());
List<ConversionStatus> conversionStatus = new LinkedList<>();
Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build();
when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords);
// setup elastic, index and mapped document
when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true);
when(this.mappingService.getIndexMappingFromRecordSchema(any())).thenReturn(new HashMap<>());
when(this.elasticClientHandler.createRestClient()).thenReturn(this.restHighLevelClient);
when(this.restHighLevelClient.bulk(any(), any(RequestOptions.class))).thenReturn(this.bulkResponse);
Map<String, Object> indexerMappedPayload = new HashMap<>();
indexerMappedPayload.put("id", "keyword");
when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload);
@Mock
private ElasticClientHandler elasticClientHandler;
BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()};
when(this.bulkResponse.getItems()).thenReturn(responses);
@Mock
private ElasticIndexNameResolver elasticIndexNameResolver;
// test
JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos);
@Mock
private IndicesService indicesService;
// validate
assertEquals(2, jobStatus.getStatusesList().size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size());
assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size());
private List<RecordInfo> recordInfos = new ArrayList<>();
verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)"));
verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)"));
} catch (Exception e) {
fail("Should not throw this exception" + e.getMessage());
}
}
@Before
public void setup() {
RecordInfo recordInfo = new RecordInfo();
recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf");
recordInfo.setKind("opendes:ds:mytest2:1.0.0");
recordInfo.setOp("purge_schema");
recordInfos.add(recordInfo);
private BulkItemResponse prepareFailedResponse() {
BulkItemResponse responseFail = mock(BulkItemResponse.class);
when(responseFail.isFailed()).thenReturn(true);
when(responseFail.getFailureMessage()).thenReturn(failureMassage);
when(responseFail.getId()).thenReturn(recordId1);
when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception("test failure")));
return responseFail;
}
initMocks(this);
}
private BulkItemResponse prepareSuccessfulResponse() {
BulkItemResponse responseSuccess = mock(BulkItemResponse.class);
when(responseSuccess.getId()).thenReturn(recordId2);
return responseSuccess;
}
@Test
public void processSchemaMessagesTest() throws Exception {
indexerService.processSchemaMessages(recordInfos);
private void indexSchemaServiceMock(String kind, Map<String, Object> schema) throws UnsupportedEncodingException, URISyntaxException {
IndexSchema indexSchema = schema == null
? IndexSchema.builder().kind(kind).dataSchema(new HashMap<>()).build()
: IndexSchema.builder().kind(kind).dataSchema(schema).build();
when(schemaService.getIndexerInputSchema(kind, new ArrayList<>())).thenReturn(indexSchema);
}
verify(elasticClientHandler, times(1)).createRestClient();
verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any());
verify(indicesService, times(1)).isIndexExist(any(), any());
}
private Map<String, Object> createSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("schema1", "keyword");
schema.put("schema2", "boolean");
schema.put("schema3", "date");
schema.put("schema6", "object");
return schema;
}
}
......@@ -14,6 +14,14 @@ os-indexer-gcp is a [Spring Boot](https://spring.io/projects/spring-boot) servic
## Getting Started
These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system.
# Configuration
## Service Configuration
### Anthos:
[Anthos service configuration ](docs/anthos/README.md)
### GCP:
[Gcp service configuration ](docs/gcp/README.md)
## Mappers
This is a universal solution created using EPAM OQM mappers technology. It allows you to work with various
......
......@@ -12,6 +12,8 @@ Must have:
| `OPENID_PROVIDER_CLIENT_ID` | `*****` | Client id that represents this service and serves to request tokens, example `workload-identity-legal` |yes| - |
| `OPENID_PROVIDER_CLIENT_SECRET` | `*****` | This client secret that serves to request tokens| yes | - |
| `OPENID_PROVIDER_URL` | `https://keycloack.com/auth/realms/master` | URL of OpenID Connect provider, it will be used as `<OpenID URL> + /.well-known/openid-configuration` to auto configure endpoint for token request | no | - |
| `<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>` | ex `user` | Elasticsearch user, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | yes | - |
| `<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>` | ex `password` | Elasticsearch password, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | false | - |
Defined in default application property file but possible to override:
......@@ -39,10 +41,26 @@ Usage of spring profiles is preferred.
| `OQMDRIVER` | `rabbitmq` or `pubsub` | Oqm driver mode that defines which message broker will be used | no | - |
| `SERVICE_TOKEN_PROVIDER` | `GCP` or `OPENID` |Service account token provider, `GCP` means use Google service account `OPEIND` means use OpenId provider like `Keycloak` | no | - |
## Elasticsearch configuration
### Properties set in Partition service:
Note that properties can be set in Partition as `sensitive` in that case in property `value` should be present not value itself, but ENV variable name.
This variable should be present in environment of service that need that variable.
Example:
```
"elasticsearch.port": {
"sensitive": false, <- value not sensitive
"value": "9243" <- will be used as is.
},
"elasticsearch.password": {
"sensitive": true, <- value is sensitive
"value": "ELASTIC_SEARCH_PASSWORD_OSDU" <- service consumer should have env variable ELASTIC_SEARCH_PASSWORD_OSDU with elastic search password
}
```
## Elasticsearch configuration
**prefix:** `elasticsearch`
It can be overridden by:
......@@ -56,7 +74,8 @@ It can be overridden by:
| --- | --- |
| elasticsearch.host | server URL |
| elasticsearch.port | server port |
| elasticsearch.configuration | username and password |
| elasticsearch.user | username |
| elasticsearch.password | password |
<details><summary>Example of a definition for a single tenant</summary></details>
......@@ -72,9 +91,13 @@ curl -L -X PATCH 'http://partition.com/api/partition/v1/partitions/opendes' -H '
"sensitive": false,
"value": "9243"
},
"elasticsearch.configuration": {
"elasticsearch.user": {
"sensitive": true,
"value": "<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable)
},
"elasticsearch.password": {
"sensitive": true,
"value": "elasticuser:elasticpassword"
"value": "<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable)
}
}
}'
......
......@@ -10,6 +10,8 @@ Must have:
| --- | --- | --- | --- | --- |
| `GOOGLE_AUDIENCES` | ex `*****.apps.googleusercontent.com` | Client ID for getting access to cloud resources | yes | https://console.cloud.google.com/apis/credentials |
| `SPRING_PROFILES_ACTIVE` | ex `gcp` | Spring profile that activate default configuration for GCP environment | false | - |
| `<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>` | ex `user` | Elasticsearch user, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | yes | - |
| `<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>` | ex `password` | Elasticsearch password, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | false | - |
Defined in default application property file but possible to override:
......@@ -44,10 +46,27 @@ At Pubsub should be created topic with name:
**name:** `indexing-progress`
## Elasticsearch configuration
### Properties set in Partition service:
Note that properties can be set in Partition as `sensitive` in that case in property `value` should be present not value itself, but ENV variable name.
This variable should be present in environment of service that need that variable.
Example:
```
"elasticsearch.port": {
"sensitive": false, <- value not sensitive
"value": "9243" <- will be used as is.
},
"elasticsearch.password": {
"sensitive": true, <- value is sensitive
"value": "ELASTIC_SEARCH_PASSWORD_OSDU" <- service consumer should have env variable ELASTIC_SEARCH_PASSWORD_OSDU with elastic search password
}
```
There is no hardcode in services, all behaviour defined by sensitivity of property.
## Elasticsearch configuration
**prefix:** `elasticsearch`
It can be overridden by:
......@@ -61,7 +80,8 @@ It can be overridden by:
| --- | --- |
| elasticsearch.host | server URL |
| elasticsearch.port | server port |
| elasticsearch.configuration | username and password |
| elasticsearch.user | username |
| elasticsearch.password | password |
<details><summary>Example of a definition for a single tenant</summary></details>
......@@ -77,9 +97,13 @@ curl -L -X PATCH 'http://partition.com/api/partition/v1/partitions/opendes' -H '
"sensitive": false,
"value": "9243"
},
"elasticsearch.configuration": {
"elasticsearch.user": {
"sensitive": true,
"value": "<USER_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable)
},
"elasticsearch.password": {
"sensitive": true,
"value": "elasticuser:elasticpassword"
"value": "<PASSWORD_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable)
}
}
}'
......
......@@ -19,7 +19,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-gcp</artifactId>
<version>0.14.0-rc1</version>
<version>0.14.0-rc2</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu.indexer</groupId>
......
......@@ -20,6 +20,7 @@ package org.opengroup.osdu.indexer.di;
import org.opengroup.osdu.core.common.partition.IPartitionProvider;
import org.opengroup.osdu.core.common.provider.interfaces.IElasticRepository;
import org.opengroup.osdu.core.destination.elastic.ElasticSearchDestinationResolver;
import org.opengroup.osdu.core.destination.util.IPartitionPropertyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -27,7 +28,8 @@ import org.springframework.context.annotation.Configuration;
public class ElasticSearchConfig {
@Bean
public IElasticRepository elasticRepository(ElasticSearchConfigurationProperties properties, IPartitionProvider partitionProvider) {
return new ElasticSearchDestinationResolver(properties.getElasticsearchPropertiesPrefix(), partitionProvider);
public IElasticRepository elasticRepository(ElasticSearchConfigurationProperties properties,
IPartitionProvider partitionProvider, IPartitionPropertyResolver propertyResolver) {
return new ElasticSearchDestinationResolver(properties.getElasticsearchPropertiesPrefix(), partitionProvider, propertyResolver);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment