diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java index 93d09b0da1f98686c1bb88bbd0c2157bfa6cc481..c3b38051b4d9e349af0fe832eb4dddb484306bc1 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java @@ -16,7 +16,6 @@ package org.opengroup.osdu.indexer.api; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; - import com.google.gson.JsonParseException; import io.swagger.v3.oas.annotations.Operation; import lombok.extern.java.Log; @@ -28,14 +27,17 @@ import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; import org.opengroup.osdu.core.common.model.indexer.SchemaInfo; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.indexer.SwaggerDoc; import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.indexer.service.ReindexService; import org.opengroup.osdu.indexer.service.SchemaEventsProcessor; -import org.opengroup.osdu.indexer.service.SchemaService; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.annotation.RequestScope; import javax.inject.Inject; @@ -51,34 +53,37 @@ import java.util.List; @RequestScope public class RecordIndexerApi { + @Inject + private IRequestInfo requestInfo; @Inject private IndexerService indexerService; @Inject private ReindexService reIndexService; @Inject - private SchemaService schemaService; - @Inject private SchemaEventsProcessor eventsProcessingService; // THIS IS AN INTERNAL USE API ONLY // THAT MEANS WE DON'T DOCUMENT IT IN SWAGGER, ACCESS IS LIMITED TO CLOUD TASK QUEUE CALLS ONLY @PostMapping(path = "/index-worker", consumes = "application/json") @Operation(hidden = true, summary = "", description = "") - public ResponseEntity<JobStatus> indexWorker ( + public ResponseEntity<JobStatus> indexWorker( @NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY) @Valid @RequestBody RecordChangedMessages recordChangedMessages) throws Exception { + if (recordChangedMessages == null) { + log.info("record change messages is null"); + return new ResponseEntity(HttpStatus.OK); + } + + if (recordChangedMessages.hasCorrelationId()) { + this.requestInfo.getHeaders().put(DpsHeaders.CORRELATION_ID, recordChangedMessages.getCorrelationId()); + } if (recordChangedMessages.missingAccountId()) { throw new AppException(org.apache.http.HttpStatus.SC_BAD_REQUEST, "Invalid tenant", String.format("Required header: '%s' not found", DpsHeaders.DATA_PARTITION_ID)); } try { - if (recordChangedMessages == null) { - log.info("record change messages is null"); - } - - Type listType = new TypeToken<List<RecordInfo>>() { - }.getType(); + Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfos = new Gson().fromJson(recordChangedMessages.getData(), listType); if (recordInfos.size() == 0) { diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 529d9f15a1f81ae20f9ce3767ba8a9c783122d3f..65d8c9f2bfaafe979800468590efa6109a4a6747 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -129,7 +129,7 @@ public class IndexerServiceImpl implements IndexerService { try { auditLogger.indexStarted(recordInfos.stream() - .map(RecordInfo::getKind) + .map(entry -> String.format("id=%s kind=%s operationType=%s", entry.getId(), entry.getKind(), entry.getOp())) .collect(Collectors.toList())); // get upsert records diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java index 4c008d8646ded0eaf6d247d7f55d283a2e90d29e..0787ade6afe3c312c14a039323bf934d75849d89 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/RecordIndexerApiTest.java @@ -17,7 +17,7 @@ package org.opengroup.osdu.indexer.api; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.MockitoAnnotations.initMocks; - +import static org.mockito.Mockito.when; import com.google.gson.Gson; import org.junit.Before; import org.junit.Test; @@ -30,9 +30,9 @@ import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.indexer.service.IndexerService; import org.opengroup.osdu.indexer.service.SchemaEventsProcessor; -import org.opengroup.osdu.indexer.service.SchemaService; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -59,7 +59,7 @@ public class RecordIndexerApiTest { @Mock private IndexerService indexService; @Mock - private SchemaService schemaService; + private IRequestInfo requestInfo; @Mock private SchemaEventsProcessor eventsProcessingService; @@ -72,6 +72,7 @@ public class RecordIndexerApiTest { dpsHeaders.put(DpsHeaders.ACCOUNT_ID, this.ACCOUNT_ID); dpsHeaders.put(DpsHeaders.DATA_PARTITION_ID, this.DATA_PARTITION_ID); + when(this.requestInfo.getHeaders()).thenReturn(dpsHeaders); } @Test