diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index aa98101468e6e345263e47915a44a2d1a3efe25f..88acc1aa12a805c941c2f2450c691be7bc5281ca 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -65,7 +66,7 @@ public class IndexerServiceImpl implements IndexerService { private static final TimeValue BULK_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(1); - private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.NOT_FOUND)); + private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = new ArrayList<>(Arrays.asList(RestStatus.TOO_MANY_REQUESTS, RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE)); private final Gson gson = new GsonBuilder().serializeNulls().create(); @@ -430,6 +431,8 @@ public class IndexerServiceImpl implements IndexerService { List<String> failureRecordIds = new LinkedList<>(); if (bulkRequest.numberOfActions() == 0) return failureRecordIds; + int failedRequestStatus = 500; + Exception failedRequestCause = null; try { BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); @@ -442,11 +445,17 @@ public class IndexerServiceImpl implements IndexerService { for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - bulkFailures.add(String.format("elasticsearch bulk service status: %s id: %s message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); + bulkFailures.add(String.format("elasticsearch bulk service status: %s | id: %s | message: %s", failure.getStatus(), failure.getId(), failure.getMessage())); this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL, failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage()); - if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) { + if (canIndexerRetry(bulkItemResponse)) { failureRecordIds.add(bulkItemResponse.getId()); + + if (failedRequestCause == null) { + failedRequestCause = failure.getCause(); + failedRequestStatus = failure.getStatus().getStatus(); + } } + failedResponses++; } else { succeededResponses++; @@ -456,12 +465,18 @@ public class IndexerServiceImpl implements IndexerService { if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures); jaxRsDpsLog.info(String.format("records in elasticsearch service bulk request: %s | successful: %s | failed: %s", bulkRequest.numberOfActions(), succeededResponses, failedResponses)); + + // retry entire message if all records are failing + if (bulkRequest.numberOfActions() == failureRecordIds.size()) throw new AppException(failedRequestStatus, "Elastic error", failedRequestCause.getMessage(), failedRequestCause); } catch (IOException e) { // throw explicit 504 for IOException throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); } catch (ElasticsearchStatusException e) { throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e); } catch (Exception e) { + if (e instanceof AppException) { + throw e; + } throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error", "Error indexing records.", e); } return failureRecordIds; @@ -506,6 +521,17 @@ public class IndexerServiceImpl implements IndexerService { return indexerPayload; } + private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) { + if (RETRY_ELASTIC_EXCEPTION.contains(bulkItemResponse.status())) return true; + + if ((bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE || bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) + && bulkItemResponse.status() == RestStatus.NOT_FOUND) { + return true; + } + + return false; + } + private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException { jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java index 026ff32cb2e04a3da21b765baefc282d056915bf..285b384a017b4eefd044dc3e58e1434b1ad0543c 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java @@ -1,55 +1,206 @@ package org.opengroup.osdu.indexer.service; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.MockitoAnnotations.initMocks; -import java.util.ArrayList; -import java.util.List; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.mockito.Spy; +import org.opengroup.osdu.core.common.http.HeadersUtil; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.entitlements.Acl; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.*; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.model.storage.ConversionStatus; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; +import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.indexer.util.ElasticClientHandler; -import org.springframework.test.context.junit4.SpringRunner; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; -@RunWith(SpringRunner.class) +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.net.URISyntaxException; +import java.util.*; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.powermock.api.mockito.PowerMockito.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({RestHighLevelClient.class, BulkResponse.class, Acl.class, HeadersUtil.class}) public class IndexerServiceImplTest { - @InjectMocks - private IndexerServiceImpl indexerService; + @InjectMocks + private IndexerServiceImpl sut; + @Mock + private ElasticClientHandler elasticClientHandler; + @Mock + private ElasticIndexNameResolver elasticIndexNameResolver; + @Mock + private IndicesService indicesService; + @Mock + private StorageService storageService; + @Mock + private StorageIndexerPayloadMapper storageIndexerPayloadMapper; + @InjectMocks + @Spy + private JobStatus jobStatus = new JobStatus(); + @Mock + private AuditLogger auditLogger; + @Mock + private BulkResponse bulkResponse; + @Mock + private IRequestInfo requestInfo; + @Mock + private RestHighLevelClient restHighLevelClient; + @Mock + private IndexSchemaService schemaService; + @Mock + private JaxRsDpsLog jaxRsDpsLog; + @Mock + private IMappingService mappingService; + @Mock + private IPublisher progressPublisher; + + private List<RecordInfo> recordInfos = new ArrayList<>(); + + private final String pubsubMsg = "[{\"id\":\"opendes:doc:test1\",\"kind\":\"opendes:testindexer1:well:1.0.0\",\"op\":\"update\"}," + + "{\"id\":\"opendes:doc:test2\",\"kind\":\"opendes:testindexer2:well:1.0.0\",\"op\":\"create\"}]"; + private final String kind1 = "opendes:testindexer1:well:1.0.0"; + private final String kind2 = "opendes:testindexer2:well:1.0.0"; + private final String recordId1 = "opendes:doc:test1"; + private final String recordId2 = "opendes:doc:test2"; + private final String failureMassage = "test failure"; + + private DpsHeaders dpsHeaders; + private RecordChangedMessages recordChangedMessages; + + @Before + public void setup() throws IOException { + } + + @Test + public void processSchemaMessagesTest() throws Exception { + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); + recordInfo.setKind("opendes:ds:mytest2:1.0.0"); + recordInfo.setOp("purge_schema"); + this.recordInfos.add(recordInfo); + + initMocks(this); + + this.sut.processSchemaMessages(recordInfos); + + verify(this.elasticClientHandler, times(1)).createRestClient(); + verify(this.elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); + verify(this.indicesService, times(1)).isIndexExist(any(), any()); + } + + @Test + public void should_properlyUpdateAuditLogs_givenValidCreateAndUpdateRecords() { + try { + mockStatic(Acl.class); + + // setup headers + this.dpsHeaders = new DpsHeaders(); + this.dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth"); + when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders); + when(this.requestInfo.getHeadersMapWithDwdAuthZ()).thenReturn(dpsHeaders.getHeaders()); + + // setup message + Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); + this.recordInfos = (new Gson()).fromJson(this.pubsubMsg, listType); + Map<String, String> messageAttributes = new HashMap<>(); + messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, "opendes"); + this.recordChangedMessages = RecordChangedMessages.builder().attributes(messageAttributes).messageId("xxxx").publishTime("2000-01-02T10:10:44+0000").data("{}").build(); + + // setup schema + Map<String, Object> schema = createSchema(); + indexSchemaServiceMock(kind2, schema); + indexSchemaServiceMock(kind1, null); + + // setup storage records + Map<String, Object> storageData = new HashMap<>(); + storageData.put("schema1", "test-value"); + List<Records.Entity> validRecords = new ArrayList<>(); + validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build()); + List<ConversionStatus> conversionStatus = new LinkedList<>(); + Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build(); + when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords); + + // setup elastic, index and mapped document + when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true); + when(this.mappingService.getIndexMappingFromRecordSchema(any())).thenReturn(new HashMap<>()); + + when(this.elasticClientHandler.createRestClient()).thenReturn(this.restHighLevelClient); + when(this.restHighLevelClient.bulk(any(), any(RequestOptions.class))).thenReturn(this.bulkResponse); + + Map<String, Object> indexerMappedPayload = new HashMap<>(); + indexerMappedPayload.put("id", "keyword"); + when(this.storageIndexerPayloadMapper.mapDataPayload(any(), any(), any())).thenReturn(indexerMappedPayload); - @Mock - private ElasticClientHandler elasticClientHandler; + BulkItemResponse[] responses = new BulkItemResponse[]{prepareFailedResponse(), prepareSuccessfulResponse()}; + when(this.bulkResponse.getItems()).thenReturn(responses); - @Mock - private ElasticIndexNameResolver elasticIndexNameResolver; + // test + JobStatus jobStatus = this.sut.processRecordChangedMessages(recordChangedMessages, recordInfos); - @Mock - private IndicesService indicesService; + // validate + assertEquals(2, jobStatus.getStatusesList().size()); + assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.FAIL).size()); + assertEquals(1, jobStatus.getIdsByIndexingStatus(IndexingStatus.SUCCESS).size()); - private List<RecordInfo> recordInfos = new ArrayList<>(); + verify(this.auditLogger).indexCreateRecordSuccess(singletonList("RecordStatus(id=opendes:doc:test2, kind=opendes:testindexer2:well:1.0.0, operationType=create, status=SUCCESS)")); + verify(this.auditLogger).indexUpdateRecordFail(singletonList("RecordStatus(id=opendes:doc:test1, kind=opendes:testindexer1:well:1.0.0, operationType=update, status=FAIL, message=test failure)")); + } catch (Exception e) { + fail("Should not throw this exception" + e.getMessage()); + } + } - @Before - public void setup() { - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setId("opendes:ds:mytest3-d9033ae1-fb15-496c-9ba0-880fd1d2b2qf"); - recordInfo.setKind("opendes:ds:mytest2:1.0.0"); - recordInfo.setOp("purge_schema"); - recordInfos.add(recordInfo); + private BulkItemResponse prepareFailedResponse() { + BulkItemResponse responseFail = mock(BulkItemResponse.class); + when(responseFail.isFailed()).thenReturn(true); + when(responseFail.getFailureMessage()).thenReturn(failureMassage); + when(responseFail.getId()).thenReturn(recordId1); + when(responseFail.getFailure()).thenReturn(new BulkItemResponse.Failure("failure index", "failure type", "failure id", new Exception("test failure"))); + return responseFail; + } - initMocks(this); - } + private BulkItemResponse prepareSuccessfulResponse() { + BulkItemResponse responseSuccess = mock(BulkItemResponse.class); + when(responseSuccess.getId()).thenReturn(recordId2); + return responseSuccess; + } - @Test - public void processSchemaMessagesTest() throws Exception { - indexerService.processSchemaMessages(recordInfos); + private void indexSchemaServiceMock(String kind, Map<String, Object> schema) throws UnsupportedEncodingException, URISyntaxException { + IndexSchema indexSchema = schema == null + ? IndexSchema.builder().kind(kind).dataSchema(new HashMap<>()).build() + : IndexSchema.builder().kind(kind).dataSchema(schema).build(); + when(schemaService.getIndexerInputSchema(kind, new ArrayList<>())).thenReturn(indexSchema); + } - verify(elasticClientHandler, times(1)).createRestClient(); - verify(elasticIndexNameResolver, times(1)).getIndexNameFromKind(any()); - verify(indicesService, times(1)).isIndexExist(any(), any()); - } + private Map<String, Object> createSchema() { + Map<String, Object> schema = new HashMap<>(); + schema.put("schema1", "keyword"); + schema.put("schema2", "boolean"); + schema.put("schema3", "date"); + schema.put("schema6", "object"); + return schema; + } } diff --git a/provider/indexer-gcp/README.md b/provider/indexer-gcp/README.md index 702ee72d8c365ae2aca410f98ff4cf7389b0fcd0..c7593de2753b57138443b03c4fd1042dc399e4cf 100644 --- a/provider/indexer-gcp/README.md +++ b/provider/indexer-gcp/README.md @@ -14,6 +14,14 @@ os-indexer-gcp is a [Spring Boot](https://spring.io/projects/spring-boot) servic ## Getting Started These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system. +# Configuration + +## Service Configuration +### Anthos: +[Anthos service configuration ](docs/anthos/README.md) +### GCP: +[Gcp service configuration ](docs/gcp/README.md) + ## Mappers This is a universal solution created using EPAM OQM mappers technology. It allows you to work with various diff --git a/provider/indexer-gcp/docs/anthos/README.md b/provider/indexer-gcp/docs/anthos/README.md index 9cffb88ebe038d536a29636ad7a9c7bdea69a93e..880cce5011fff28c9737475189f3f96b8b6d17e3 100644 --- a/provider/indexer-gcp/docs/anthos/README.md +++ b/provider/indexer-gcp/docs/anthos/README.md @@ -12,6 +12,8 @@ Must have: | `OPENID_PROVIDER_CLIENT_ID` | `*****` | Client id that represents this service and serves to request tokens, example `workload-identity-legal` |yes| - | | `OPENID_PROVIDER_CLIENT_SECRET` | `*****` | This client secret that serves to request tokens| yes | - | | `OPENID_PROVIDER_URL` | `https://keycloack.com/auth/realms/master` | URL of OpenID Connect provider, it will be used as `<OpenID URL> + /.well-known/openid-configuration` to auto configure endpoint for token request | no | - | +| `<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>` | ex `user` | Elasticsearch user, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | yes | - | +| `<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>` | ex `password` | Elasticsearch password, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | false | - | Defined in default application property file but possible to override: @@ -39,10 +41,26 @@ Usage of spring profiles is preferred. | `OQMDRIVER` | `rabbitmq` or `pubsub` | Oqm driver mode that defines which message broker will be used | no | - | | `SERVICE_TOKEN_PROVIDER` | `GCP` or `OPENID` |Service account token provider, `GCP` means use Google service account `OPEIND` means use OpenId provider like `Keycloak` | no | - | -## Elasticsearch configuration - ### Properties set in Partition service: +Note that properties can be set in Partition as `sensitive` in that case in property `value` should be present not value itself, but ENV variable name. +This variable should be present in environment of service that need that variable. + +Example: +``` + "elasticsearch.port": { + "sensitive": false, <- value not sensitive + "value": "9243" <- will be used as is. + }, + "elasticsearch.password": { + "sensitive": true, <- value is sensitive + "value": "ELASTIC_SEARCH_PASSWORD_OSDU" <- service consumer should have env variable ELASTIC_SEARCH_PASSWORD_OSDU with elastic search password + } +``` + + +## Elasticsearch configuration + **prefix:** `elasticsearch` It can be overridden by: @@ -56,7 +74,8 @@ It can be overridden by: | --- | --- | | elasticsearch.host | server URL | | elasticsearch.port | server port | -| elasticsearch.configuration | username and password | +| elasticsearch.user | username | +| elasticsearch.password | password | <details><summary>Example of a definition for a single tenant</summary></details> @@ -72,9 +91,13 @@ curl -L -X PATCH 'http://partition.com/api/partition/v1/partitions/opendes' -H ' "sensitive": false, "value": "9243" }, - "elasticsearch.configuration": { + "elasticsearch.user": { + "sensitive": true, + "value": "<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable) + }, + "elasticsearch.password": { "sensitive": true, - "value": "elasticuser:elasticpassword" + "value": "<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable) } } }' diff --git a/provider/indexer-gcp/docs/gcp/README.md b/provider/indexer-gcp/docs/gcp/README.md index 54e948b320b8053bb9d289f836e041c7814fab4b..d583f831a53e5738cdf913fb0b0669974b8f26a8 100644 --- a/provider/indexer-gcp/docs/gcp/README.md +++ b/provider/indexer-gcp/docs/gcp/README.md @@ -10,6 +10,8 @@ Must have: | --- | --- | --- | --- | --- | | `GOOGLE_AUDIENCES` | ex `*****.apps.googleusercontent.com` | Client ID for getting access to cloud resources | yes | https://console.cloud.google.com/apis/credentials | | `SPRING_PROFILES_ACTIVE` | ex `gcp` | Spring profile that activate default configuration for GCP environment | false | - | +| `<ELASTICSEARCH_USER_ENV_VARIABLE_NAME>` | ex `user` | Elasticsearch user, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | yes | - | +| `<ELASTICSEARCH_PASSWORD_ENV_VARIABLE_NAME>` | ex `password` | Elasticsearch password, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Indexer service, see [Partition properties set](#Properties-set-in-Partition-service) | false | - | Defined in default application property file but possible to override: @@ -44,10 +46,27 @@ At Pubsub should be created topic with name: **name:** `indexing-progress` -## Elasticsearch configuration - ### Properties set in Partition service: +Note that properties can be set in Partition as `sensitive` in that case in property `value` should be present not value itself, but ENV variable name. +This variable should be present in environment of service that need that variable. + +Example: +``` + "elasticsearch.port": { + "sensitive": false, <- value not sensitive + "value": "9243" <- will be used as is. + }, + "elasticsearch.password": { + "sensitive": true, <- value is sensitive + "value": "ELASTIC_SEARCH_PASSWORD_OSDU" <- service consumer should have env variable ELASTIC_SEARCH_PASSWORD_OSDU with elastic search password + } +``` + +There is no hardcode in services, all behaviour defined by sensitivity of property. + +## Elasticsearch configuration + **prefix:** `elasticsearch` It can be overridden by: @@ -61,7 +80,8 @@ It can be overridden by: | --- | --- | | elasticsearch.host | server URL | | elasticsearch.port | server port | -| elasticsearch.configuration | username and password | +| elasticsearch.user | username | +| elasticsearch.password | password | <details><summary>Example of a definition for a single tenant</summary></details> @@ -77,9 +97,13 @@ curl -L -X PATCH 'http://partition.com/api/partition/v1/partitions/opendes' -H ' "sensitive": false, "value": "9243" }, - "elasticsearch.configuration": { + "elasticsearch.user": { + "sensitive": true, + "value": "<USER_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable) + }, + "elasticsearch.password": { "sensitive": true, - "value": "elasticuser:elasticpassword" + "value": "<PASSWORD_ENV_VARIABLE_NAME>" <- (Not actual value, just name of env variable) } } }' diff --git a/provider/indexer-gcp/pom.xml b/provider/indexer-gcp/pom.xml index 3f7bb4987478522f50a8d113ae6094c2997c4583..311aa52ab24e9f07619c5db489bf3d1d2a78785f 100644 --- a/provider/indexer-gcp/pom.xml +++ b/provider/indexer-gcp/pom.xml @@ -19,7 +19,7 @@ <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>core-lib-gcp</artifactId> - <version>0.14.0-rc1</version> + <version>0.14.0-rc2</version> </dependency> <dependency> <groupId>org.opengroup.osdu.indexer</groupId> diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java index 72cb1f6a3a1cedfc3f408853d99d10a9c80cca74..0f9cc1f4d3ea4319c729af434fd5710203b30141 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java @@ -20,6 +20,7 @@ package org.opengroup.osdu.indexer.di; import org.opengroup.osdu.core.common.partition.IPartitionProvider; import org.opengroup.osdu.core.common.provider.interfaces.IElasticRepository; import org.opengroup.osdu.core.destination.elastic.ElasticSearchDestinationResolver; +import org.opengroup.osdu.core.destination.util.IPartitionPropertyResolver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -27,7 +28,8 @@ import org.springframework.context.annotation.Configuration; public class ElasticSearchConfig { @Bean - public IElasticRepository elasticRepository(ElasticSearchConfigurationProperties properties, IPartitionProvider partitionProvider) { - return new ElasticSearchDestinationResolver(properties.getElasticsearchPropertiesPrefix(), partitionProvider); + public IElasticRepository elasticRepository(ElasticSearchConfigurationProperties properties, + IPartitionProvider partitionProvider, IPartitionPropertyResolver propertyResolver) { + return new ElasticSearchDestinationResolver(properties.getElasticsearchPropertiesPrefix(), partitionProvider, propertyResolver); } }