diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java index 93d09b0da1f98686c1bb88bbd0c2157bfa6cc481..4f08bbc5489e121e5405b926257522c79516ef54 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java @@ -16,7 +16,6 @@ package org.opengroup.osdu.indexer.api; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; - import com.google.gson.JsonParseException; import io.swagger.v3.oas.annotations.Operation; import lombok.extern.java.Log; @@ -28,14 +27,17 @@ import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; import org.opengroup.osdu.core.common.model.indexer.SchemaInfo; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.indexer.SwaggerDoc; import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.indexer.service.ReindexService; import org.opengroup.osdu.indexer.service.SchemaEventsProcessor; -import org.opengroup.osdu.indexer.service.SchemaService; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.annotation.RequestScope; import javax.inject.Inject; @@ -51,34 +53,29 @@ import java.util.List; @RequestScope public class RecordIndexerApi { + @Inject + private IRequestInfo requestInfo; @Inject private IndexerService indexerService; @Inject private ReindexService reIndexService; @Inject - private SchemaService schemaService; - @Inject private SchemaEventsProcessor eventsProcessingService; // THIS IS AN INTERNAL USE API ONLY // THAT MEANS WE DON'T DOCUMENT IT IN SWAGGER, ACCESS IS LIMITED TO CLOUD TASK QUEUE CALLS ONLY @PostMapping(path = "/index-worker", consumes = "application/json") @Operation(hidden = true, summary = "", description = "") - public ResponseEntity<JobStatus> indexWorker ( + public ResponseEntity<JobStatus> indexWorker( @NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY) @Valid @RequestBody RecordChangedMessages recordChangedMessages) throws Exception { - if (recordChangedMessages.missingAccountId()) { - throw new AppException(org.apache.http.HttpStatus.SC_BAD_REQUEST, "Invalid tenant", - String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); - } - try { - if (recordChangedMessages == null) { - log.info("record change messages is null"); - } + populateCorrelationIdIfExist(recordChangedMessages); + + verifyDataPartitionId(recordChangedMessages); - Type listType = new TypeToken<List<RecordInfo>>() { - }.getType(); + try { + Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfos = new Gson().fromJson(recordChangedMessages.getData(), listType); if (recordInfos.size() == 0) { @@ -96,6 +93,19 @@ public class RecordIndexerApi { } } + private void verifyDataPartitionId(RecordChangedMessages recordChangedMessages) { + if (recordChangedMessages.missingAccountId()) { + throw new AppException(org.apache.http.HttpStatus.SC_BAD_REQUEST, "Invalid tenant", + String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); + } + } + + private void populateCorrelationIdIfExist(RecordChangedMessages recordChangedMessages) { + if (recordChangedMessages.hasCorrelationId()) { + this.requestInfo.getHeaders().put(DpsHeaders.CORRELATION_ID, recordChangedMessages.getCorrelationId()); + } + } + // THIS IS AN INTERNAL USE API ONLY // THAT MEANS WE DON'T DOCUMENT IT IN SWAGGER, ACCESS IS LIMITED TO CLOUD TASK QUEUE CALLS ONLY @PostMapping("/reindex-worker") diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 529d9f15a1f81ae20f9ce3767ba8a9c783122d3f..65d8c9f2bfaafe979800468590efa6109a4a6747 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -129,7 +129,7 @@ public class IndexerServiceImpl implements IndexerService { try { auditLogger.indexStarted(recordInfos.stream() - .map(RecordInfo::getKind) + .map(entry -> String.format("id=%s kind=%s operationType=%s", entry.getId(), entry.getKind(), entry.getOp())) .collect(Collectors.toList())); // get upsert records diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/BooleanFeatureFlagClient.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/BooleanFeatureFlagClient.java index f21e0968d976de22c1f2f35daf4e33b17cbe7406..b21bbb47aec44f2774bc2cec8486a9c4b77bb5cf 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/BooleanFeatureFlagClient.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/BooleanFeatureFlagClient.java @@ -55,7 +55,11 @@ public class BooleanFeatureFlagClient { PartitionInfo partitionInfo = partitionProvider.get(dataPartitionId); return partitionInfo; } catch (PartitionException e) { - logger.error(String.format("Error getting partition info for data-partition: %s", dataPartitionId), e); + if (e.getResponse() != null) { + logger.error(String.format("Error getting partition info for data-partition: %s. Message: %s. ResponseCode: %s.", dataPartitionId, e.getResponse().getBody(), e.getResponse().getResponseCode()), e); + } else { + logger.error(String.format("Error getting partition info for data-partition: %s.", dataPartitionId), e); + } throw e; } } diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java index 4c008d8646ded0eaf6d247d7f55d283a2e90d29e..fd5940d16d90b59844aea1d6d5f4ac64a23405ac 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java @@ -17,22 +17,23 @@ package org.opengroup.osdu.indexer.api; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.MockitoAnnotations.initMocks; - +import static org.mockito.Mockito.when; import com.google.gson.Gson; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Mockito; import org.opengroup.osdu.core.common.http.HeadersUtil; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.indexer.service.SchemaEventsProcessor; -import org.opengroup.osdu.indexer.service.SchemaService; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -59,7 +60,7 @@ public class RecordIndexerApiTest { @Mock private IndexerService indexService; @Mock - private SchemaService schemaService; + private IRequestInfo requestInfo; @Mock private SchemaEventsProcessor eventsProcessingService; @@ -72,6 +73,7 @@ public class RecordIndexerApiTest { dpsHeaders.put(DpsHeaders.ACCOUNT_ID, this.ACCOUNT_ID); dpsHeaders.put(DpsHeaders.DATA_PARTITION_ID, this.DATA_PARTITION_ID); + when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders); } @Test @@ -89,6 +91,12 @@ public class RecordIndexerApiTest { should_return400_indexerWorkerTest(messageEmpty, String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); } + @Test + public void should_addCorrelationIdToHeader_IfExists_indexWorkerTest() throws Exception { + this.sut.indexWorker(createRecordChangedMessage(recordMessageValid)); + Mockito.verify(this.requestInfo.getHeaders()).put("correlation-id", "b5a281bd-f59d-4db2-9939-b2d85036fc7e"); + } + @Test public void should_return400_given_incorrectJsonFormatMessage_indexWorkerTest() { should_return400_indexerWorkerTest(messageWithIncorrectJsonFormat, "Unable to parse request payload.");