diff --git a/NOTICE b/NOTICE index 5a1fa8890554fca7a60e7bc943e26efa1f8d2966..e8ae1ab75f2ef00011a544988d1d35e883779f79 100644 --- a/NOTICE +++ b/NOTICE @@ -657,8 +657,8 @@ The following software have components provided under the terms of this license: - perfmark:perfmark-api (from https://github.com/perfmark/perfmark) - proto-google-cloud-logging-v2 (from https://github.com/googleapis/java-logging/proto-google-cloud-logging-v2, https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-logging-v2) - proto-google-cloud-pubsub-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-pubsub/proto-google-cloud-pubsub-v1) -- proto-google-common-protos (from https://github.com/googleapis/api-client-staging, https://github.com/googleapis/gapic-generator-java, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-common-protos) -- proto-google-iam-v1 (from https://github.com/googleapis/gapic-generator-java, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-iam-v1) +- proto-google-common-protos (from https://github.com/googleapis/api-client-staging, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-common-protos, https://github.com/googleapis/sdk-platform-java) +- proto-google-iam-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-iam-v1, https://github.com/googleapis/sdk-platform-java) - rank-eval (from https://github.com/elastic/elasticsearch, https://github.com/elastic/elasticsearch.git) - resilience4j (from https://github.com/resilience4j/resilience4j, https://resilience4j.readme.io, ttps://resilience4j.readme.io) - rest (from https://github.com/elastic/elasticsearch, https://github.com/elastic/elasticsearch.git) diff --git a/docs/tutorial/IndexerService.md b/docs/tutorial/IndexerService.md index b6d8e858e1e934cc9bab01ac54ef62b619264594..f63d8c72510c6103c97c53bc93566d19030b5449 100644 --- a/docs/tutorial/IndexerService.md +++ b/docs/tutorial/IndexerService.md @@ -115,7 +115,9 @@ This endpoint takes information from files, generated by `spring-boot-maven-plug ### Reindex <a name="reindex"></a> -Reindex API allows users to re-index a `kind` without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 200 if it can launch the re-indexing or +#### Reindex a 'kind' + +Reindex kind API allows users to re-index a `kind` without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 200 if it can launch the re-indexing or appropriate error code if it cannot. The current status of the indexing can be tracked by calling search API and making query with this particular kind. Please be advised, it may take few seconds to few hours to finish the re-indexing as multiple factors contribute to latency, such as number of records in the kind, current load at the indexer service etc. @@ -159,6 +161,58 @@ will use the same schema and overwrite records with the same ids. Default value `kind` <br />   (required, String) Kind to be re-indexed. +#### Reindex given records + +Reindex records API allows users to re-index the given records by providing the record ids without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 202 if it can launch the re-indexing or +appropriate error code if it cannot. The response body indicates which given records were re-indexed and which ones were not found in storage. It supports up to 1000 records per API call. + +#### Request + +```http +POST /api/indexer/v2/reindex/records HTTP/1.1 +{ + "recordIds": ["opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d", "opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7"] +} +``` + +<details><summary>**Curl**</summary> + +```bash +curl --request POST \ + --url '/api/indexer/v2/reindex/records' \ + --header 'accept: application/json' \ + --header 'authorization: Bearer <JWT>' \ + --header 'content-type: application/json' \ + --header 'data-partition-id: opendes' \ + --data '{ + "recordIds": ["opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d", "opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7"] +}' +``` + +</details><br> + +#### Prerequisite + +Users must be a member of `users.datalake.admins` or `users.datalake.ops` group. + +#### Request body + +`recordIds` <br /> +  (required, Array of String) Storage records to be re-indexed. + +#### Example response + +```json +{ + "reIndexedRecords": [ + "opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7" + ], + "notFoundRecords": [ + "opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d" + ] +} +``` + [Back to table of contents](#TOC) ## Delete API <a name="delete"></a> diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java index 6f38cd0644e62c2ade527f2a69d79ede2c2181e6..93d09b0da1f98686c1bb88bbd0c2157bfa6cc481 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/RecordIndexerApi.java @@ -103,7 +103,7 @@ public class RecordIndexerApi { public ResponseEntity<?> reindex( @RequestBody @NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY) @Valid RecordReindexRequest recordReindexRequest) { - return new ResponseEntity<>(reIndexService.reindexRecords(recordReindexRequest, false), HttpStatus.OK); + return new ResponseEntity<>(reIndexService.reindexKind(recordReindexRequest, false), HttpStatus.OK); } // THIS IS AN INTERNAL USE API ONLY diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/ReindexApi.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/ReindexApi.java index 8d719c247bb1a6aa8fcc8f24877703a31aa743b8..c4ea75f991793fcea471a4cee6f51b369be316c1 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/ReindexApi.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/api/ReindexApi.java @@ -24,10 +24,14 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import org.opengroup.osdu.core.common.model.http.AppError; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.Records; import org.opengroup.osdu.core.common.model.search.SearchServiceRole; import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.model.ReindexRecordsRequest; +import org.opengroup.osdu.indexer.model.ReindexRecordsResponse; import org.opengroup.osdu.indexer.service.IndexSchemaService; import org.opengroup.osdu.indexer.service.ReindexService; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PatchMapping; @@ -42,6 +46,8 @@ import javax.inject.Inject; import javax.validation.Valid; import javax.validation.constraints.NotNull; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; @@ -58,6 +64,29 @@ public class ReindexApi { @Inject private AuditLogger auditLogger; + @Operation(summary = "${reindexApi.reindexRecords.summary}", description = "${reindexApi.reindexRecords.description}", + security = {@SecurityRequirement(name = "Authorization")}, tags = { "reindex-api" }) + @ApiResponses(value = { + @ApiResponse(responseCode = "202", description = "Accepted"), + @ApiResponse(responseCode = "400", description = "Bad Request", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "401", description = "Unauthorized", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "403", description = "User not authorized to perform the action", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "404", description = "Not Found", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "500", description = "Internal Server Error", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "502", description = "Bad Gateway", content = {@Content(schema = @Schema(implementation = AppError.class))}), + @ApiResponse(responseCode = "503", description = "Service Unavailable", content = {@Content(schema = @Schema(implementation = AppError.class))}) + }) + @PreAuthorize("@authorizationFilter.hasPermission('" + SearchServiceRole.ADMIN + "')") + @PostMapping(path = "/records", consumes = "application/json") + public ResponseEntity<?> reindexRecords(@NotNull @Valid @RequestBody ReindexRecordsRequest reindexRecordsRequest) { + Records records = this.reIndexService.reindexRecords(reindexRecordsRequest.getRecordIds()); + List<String> reindexedRecords = records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList()); + if (!reindexedRecords.isEmpty()) { + this.auditLogger.getReindexRecords(reindexedRecords); + } + return new ResponseEntity<>(ReindexRecordsResponse.builder().reIndexedRecords(records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList())).notFoundRecords(records.getNotFound()).build(), HttpStatus.ACCEPTED); + } + @Operation(summary = "${reindexApi.reindex.summary}", description = "${reindexApi.reindex.description}", security = {@SecurityRequirement(name = "Authorization")}, tags = { "reindex-api" }) @ApiResponses(value = { @@ -75,7 +104,7 @@ public class ReindexApi { public ResponseEntity<?> reindex(@NotNull @Valid @RequestBody RecordReindexRequest recordReindexRequest, @Parameter(description = "Force Clean") @RequestParam(value = "force_clean", defaultValue = "false") boolean forceClean) throws IOException { - this.reIndexService.reindexRecords(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean)); + this.reIndexService.reindexKind(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean)); this.auditLogger.getReindex(singletonList(recordReindexRequest.getKind())); return new ResponseEntity<>(org.springframework.http.HttpStatus.OK); } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditEvents.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditEvents.java index ad0a6cc98435dd6efbf1e6457617656ac1ded7f0..9a183e80df3e9216481c6d2f1673f3c62e50cf3f 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditEvents.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditEvents.java @@ -63,6 +63,9 @@ public class AuditEvents { private static final String INDEX_DELETE_SUCCESS = "Successfully deleted index"; private static final String INDEX_DELETE_FAILURE = "Failed deleting index"; + private static final String REINDEX_RECORDS_ACTION_ID = "IN0014"; + private static final String REINDEX_RECORDS_OPERATION = "Reindex records"; + private final String user; public AuditEvents(String user) { @@ -204,6 +207,17 @@ public class AuditEvents { .build(); } + public AuditPayload getReindexRecordsEvent(List<String> resources) { + return AuditPayload.builder() + .action(AuditAction.CREATE) + .status(AuditStatus.SUCCESS) + .actionId(REINDEX_RECORDS_ACTION_ID) + .message(REINDEX_RECORDS_OPERATION) + .resources(resources) + .user(this.user) + .build(); + } + public AuditPayload getCopyIndexEvent(List<String> resources) { return AuditPayload.builder() .action(AuditAction.CREATE) @@ -269,4 +283,4 @@ public class AuditEvents { .build(); } } -} \ No newline at end of file +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditLogger.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditLogger.java index e02f7a2ea8ab30659fd943f81567472f13400151..792f46de627e96f2f5162cde7865693712ef5bc0 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditLogger.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/logging/AuditLogger.java @@ -89,6 +89,10 @@ public class AuditLogger { this.writeLog(this.getAuditEvents().getReindexEvent(resources)); } + public void getReindexRecords(List<String> resources) { + this.writeLog(this.getAuditEvents().getReindexRecordsEvent(resources)); + } + public void copyIndex(List<String> resources) { this.writeLog(this.getAuditEvents().getCopyIndexEvent(resources)); } @@ -116,4 +120,4 @@ public class AuditLogger { private void writeLog(AuditPayload log) { this.logger.audit(log); } -} \ No newline at end of file +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsRequest.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..d11570ec1f0e5402ad8fbb0b3bba2603ccfe391a --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsRequest.java @@ -0,0 +1,33 @@ +// Copyright 2023, SLB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.indexer.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ReindexRecordsRequest { + @NotNull + @Size(min = 1, max = 1000) + private List<@NotBlank String> recordIds; +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsResponse.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsResponse.java new file mode 100644 index 0000000000000000000000000000000000000000..b0f4eee4ccc29fae4465f04117cf12fea5afb253 --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/ReindexRecordsResponse.java @@ -0,0 +1,27 @@ +// Copyright 2023, SLB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.indexer.model; + +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder +public class ReindexRecordsResponse { + private List<String> reIndexedRecords; + private List<String> notFoundRecords; +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexService.java index 6569743e49fd39a381eec56123e14ea7b96c7232..231cbf2ad089e0839efe8e5141a1ea4ae7078f09 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexService.java @@ -16,10 +16,15 @@ package org.opengroup.osdu.indexer.service; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.Records; + +import java.util.List; public interface ReindexService { - String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean); + String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean); + + Records reindexRecords(List<String> recordIds); void fullReindex(boolean forceClean); -} \ No newline at end of file +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java index af15d671f8f1cbc4ad1907daf99be0cf0e9a0297..062c48da5c31506fd443d9551191d1ac86cf6af0 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java @@ -17,27 +17,21 @@ package org.opengroup.osdu.indexer.service; import com.google.common.base.Strings; import com.google.gson.Gson; -import java.util.Objects; +import java.util.*; import lombok.SneakyThrows; import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.indexer.OperationType; -import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.*; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.stream.Collectors; @Component @@ -58,7 +52,7 @@ public class ReindexServiceImpl implements ReindexService { @SneakyThrows @Override - public String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean) { + public String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean) { Long initialDelayMillis = 0l; @@ -75,20 +69,12 @@ public class ReindexServiceImpl implements ReindexService { List<RecordInfo> msgs = recordQueryResponse.getResults().stream() .map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList()); - - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - - Gson gson = new Gson(); - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers); + String recordChangedMessagePayload = this.replayReindexMsg(msgs, initialDelayMillis, headers); // don't call reindex-worker endpoint if it's the last batch // previous storage query result size will be less then requested (limit param) if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()) { + Gson gson = new Gson(); String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build()); this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers); return newPayLoad; @@ -101,6 +87,18 @@ public class ReindexServiceImpl implements ReindexService { return null; } + @SneakyThrows + @Override + public Records reindexRecords(List<String> recordIds) { + Records records = this.storageService.getStorageRecords(recordIds); + if (records.getRecords().size() > 0) { + List<RecordInfo> msgs = records.getRecords().stream() + .map(record -> RecordInfo.builder().id(record.getId()).kind(record.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList()); + this.replayReindexMsg(msgs, 0L, null); + } + return records; + } + @Override public void fullReindex(boolean forceClean) { List<String> allKinds = null; @@ -115,11 +113,26 @@ public class ReindexServiceImpl implements ReindexService { } for (String kind : allKinds) { try { - reindexRecords(new RecordReindexRequest(kind, ""), forceClean); + reindexKind(new RecordReindexRequest(kind, ""), forceClean); } catch (Exception e) { jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind)); continue; } } } -} \ No newline at end of file + + private String replayReindexMsg(List<RecordInfo> msgs, Long initialDelayMillis, DpsHeaders headers) { + Map<String, String> attributes = new HashMap<>(); + if (headers == null) { + headers = this.requestInfo.getHeadersWithDwdAuthZ(); + } + attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + Gson gson = new Gson(); + RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build(); + String recordChangedMessagePayload = gson.toJson(recordChangedMessages); + this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers); + return recordChangedMessagePayload; + } +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java index 54a19e807079d6076b855e89ea2707f88308e038..d2833e9748c3ad8dcb98a0efb913d01ca68c090b 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java @@ -28,9 +28,11 @@ public interface StorageService { Records getStorageRecords(List<String> ids, List<RecordInfo> recordChangedInfos) throws AppException, URISyntaxException; + Records getStorageRecords(List<String> ids) throws URISyntaxException; + RecordQueryResponse getRecordsByKind(RecordReindexRequest request) throws URISyntaxException; String getStorageSchema(String kind) throws URISyntaxException, UnsupportedEncodingException; List<String> getAllKinds() throws URISyntaxException; -} \ No newline at end of file +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java index 1b20cfb14cd9128dcb1595d9a608d6c4ed1da4b1..cfcf8ce96187e5918a133e31800ea6731f609263 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java @@ -24,7 +24,11 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; import org.opengroup.osdu.core.common.http.FetchServiceHttpRequest; +import org.opengroup.osdu.core.common.http.IHttpClientHandler; import org.opengroup.osdu.core.common.http.IUrlFetchService; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; @@ -50,11 +54,7 @@ import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import static org.opengroup.osdu.core.common.Constants.SLB_FRAME_OF_REFERENCE_VALUE; @@ -70,6 +70,8 @@ public class StorageServiceImpl implements StorageService { @Inject private IUrlFetchService urlFetchService; @Inject + private IHttpClientHandler httpClientHandler; + @Inject private JobStatus jobStatus; @Inject private IRequestInfo requestInfo; @@ -98,6 +100,24 @@ public class StorageServiceImpl implements StorageService { return Records.builder().records(valid).notFound(notFound).conversionStatuses(conversionStatuses).missingRetryRecords(missingRetryRecordIds).build(); } + @Override + public Records getStorageRecords(List<String> ids) throws URISyntaxException { + List<Records.Entity> valid = new ArrayList<>(); + List<String> notFound = new ArrayList<>(); + List<ConversionStatus> conversionStatuses = new ArrayList<>(); + List<String> missingRetryRecordIds = new ArrayList<>(); + + List<List<String>> batch = Lists.partition(ids, configurationProperties.getStorageRecordsBatchSize()); + for (List<String> recordsBatch : batch) { + Records storageOut = this.getRecords(recordsBatch); + valid.addAll(storageOut.getRecords()); + notFound.addAll(storageOut.getNotFound()); + conversionStatuses.addAll(storageOut.getConversionStatuses()); + missingRetryRecordIds.addAll(storageOut.getMissingRetryRecords()); + } + return Records.builder().records(valid).notFound(notFound).conversionStatuses(conversionStatuses).missingRetryRecords(missingRetryRecordIds).build(); + } + protected Records getRecords(List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) throws URISyntaxException { // e.g. {"records":["test:10"]} String body = this.gson.toJson(RecordIds.builder().records(ids).build()); @@ -114,6 +134,27 @@ public class StorageServiceImpl implements StorageService { return this.validateStorageResponse(response, ids, recordChangedMap, validRecordKindPatchMap); } + protected Records getRecords(List<String> ids) throws URISyntaxException { + String body = this.gson.toJson(RecordIds.builder().records(ids).build()); + DpsHeaders headers = this.requestInfo.getHeaders(); + URIBuilder builder = new URIBuilder(configurationProperties.getStorageQueryRecordHost()); + HttpPost request = new HttpPost(builder.build()); + request.setEntity(new StringEntity(body, StandardCharsets.UTF_8)); + // we do not need retry on storage based on not found record + HttpResponse response = httpClientHandler.sendRequest(request, headers); + if (response.getResponseCode() > 299) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Internal Error", response.getBody()); + } + try { + Records records = this.objectMapper.readValue(response.getBody(), Records.class); + ids.removeAll(records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList())); + records.setNotFound(ids); + return records; + } catch (JsonProcessingException e) { + throw new AppException(RequestStatus.INVALID_RECORD, "Invalid request", "Successful Storage service response with wrong json", e); + } + } + private Records validateStorageResponse(HttpResponse response, List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) { String bulkStorageData = response.getBody(); diff --git a/indexer-core/src/main/resources/swagger.properties b/indexer-core/src/main/resources/swagger.properties index 12506ebeb2e9aa1af8fdb396baaa543879cbcba0..cdfffe670e8476df7aca32c71f1dea065eea33ac 100644 --- a/indexer-core/src/main/resources/swagger.properties +++ b/indexer-core/src/main/resources/swagger.properties @@ -31,6 +31,9 @@ partitionSetupApi.provisionPartition.description=Provision partition. Required r reindexApi.reindex.summary=Re-index given 'kind' reindexApi.reindex.description=This API allows users to re-index a 'kind' without re-ingesting the records via storage API. \ Required roles: `service.search.admin` +reindexApi.reindexRecords.summary=Re-index given records +reindexApi.reindexRecords.description=This API allows users to re-index the given records by providing record ids without \ +re-ingesting the records via storage API. Required roles: `service.search.admin` reindexApi.fullReindex.summary=Full Re-index by data partition reindexApi.fullReindex.description=This API allows users to re-index an entire partition without re-ingesting the records via storage API.\ Required roles: `service.search.admin` diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/ReindexApiTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/ReindexApiTest.java index 23513bc5d19a682f3d549bbc18485982d3655e8c..f5437fbd963bcaf622406c2ebde8be6b90374ac8 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/ReindexApiTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/api/ReindexApiTest.java @@ -21,7 +21,10 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.Records; import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.model.ReindexRecordsRequest; +import org.opengroup.osdu.indexer.model.ReindexRecordsResponse; import org.opengroup.osdu.indexer.service.IndexSchemaService; import org.opengroup.osdu.indexer.service.ReindexService; import org.springframework.http.HttpStatus; @@ -29,14 +32,19 @@ import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; @RunWith(SpringRunner.class) public class ReindexApiTest { private RecordReindexRequest recordReindexRequest; + private List<String> recordIds; @Mock private ReindexService reIndexService; @@ -50,11 +58,13 @@ public class ReindexApiTest { @Before public void setup() { recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build(); + recordIds = new ArrayList<>(); + recordIds.add("id1"); } @Test public void should_return200_when_valid_kind_provided() throws IOException { - when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenReturn("something"); + when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenReturn("something"); ResponseEntity<?> response = sut.reindex(recordReindexRequest, false); @@ -63,15 +73,49 @@ public class ReindexApiTest { @Test(expected = AppException.class) public void should_throwAppException_ifUnknownExceptionCaught_reindexTest() throws IOException { - when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenThrow(new AppException(500, "", "")); + when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new AppException(500, "", "")); sut.reindex(recordReindexRequest, false); } @Test(expected = NullPointerException.class) public void should_throwAppException_ifNullPointerExceptionCaught_ReindexTest() throws IOException { - when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenThrow(new NullPointerException("")); + when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new NullPointerException("")); sut.reindex(recordReindexRequest, false); } + + @Test + public void should_return200_when_valid_record_id_list_provided() { + when(this.reIndexService.reindexRecords(recordIds)).thenReturn(Records.builder().records(new ArrayList<>()).records(Collections.singletonList(Records.Entity.builder().id("id1").build())).notFound(recordIds).build()); + + ResponseEntity<?> response = sut.reindexRecords(new ReindexRecordsRequest(recordIds)); + + assertEquals(HttpStatus.ACCEPTED, response.getStatusCode()); + verify(auditLogger).getReindexRecords(any()); + } + + @Test + public void should_notWriteAuditLog_when_no_valid_record_id_list_provided() { + when(this.reIndexService.reindexRecords(recordIds)).thenReturn(Records.builder().records(new ArrayList<>()).records(Collections.emptyList()).notFound(recordIds).build()); + + ResponseEntity<?> response = sut.reindexRecords(new ReindexRecordsRequest(recordIds)); + + assertEquals(HttpStatus.ACCEPTED, response.getStatusCode()); + verify(auditLogger, never()).getReindex(any()); + } + + @Test(expected = AppException.class) + public void should_throwAppException_ifUnknownExceptionCaught_reindexRecordsTest() { + when(this.reIndexService.reindexRecords(recordIds)).thenThrow(new AppException(500, "", "")); + + sut.reindexRecords(new ReindexRecordsRequest(recordIds)); + } + + @Test(expected = NullPointerException.class) + public void should_throwAppException_ifNullPointerExceptionCaught_ReindexRecordsTest() { + when(this.reIndexService.reindexRecords(recordIds)).thenThrow(new NullPointerException("")); + + sut.reindexRecords(new ReindexRecordsRequest(recordIds)); + } } diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java index e64574d7a6452f704361f80ea66884232b506c0a..437f8714f53d299f09f6da9d80e5c3169edd77e6 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java @@ -24,6 +24,7 @@ import org.mockito.ArgumentMatchers; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Spy; +import org.opengroup.osdu.core.common.http.IHttpClientHandler; import org.opengroup.osdu.core.common.http.IUrlFetchService; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; @@ -43,11 +44,7 @@ import org.springframework.test.context.junit4.SpringRunner; import java.lang.reflect.Type; import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -64,6 +61,8 @@ public class StorageServiceImplTest { @Mock private IUrlFetchService urlFetchService; @Mock + private IHttpClientHandler httpClientHandler; + @Mock private JobStatus jobStatus; @Mock private JaxRsDpsLog log; @@ -188,6 +187,24 @@ public class StorageServiceImplTest { verify(this.log).warning("stale records found with older kind, skipping indexing | record ids: testid"); } + @Test + public void should_returnStorageRecords_givenRecordIds_getValidStorageRecordsTest() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}]}"; + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(configurationProperties.getStorageQueryRecordHost()).thenReturn("storageUrl"); + when(this.httpClientHandler.sendRequest(any(), any())).thenReturn(httpResponse); + List<String> idsCopy = new ArrayList<>(); + idsCopy.addAll(ids); + Records storageRecords = this.sut.getStorageRecords(idsCopy); + + assertEquals(1, storageRecords.getRecords().size()); + assertEquals(1, storageRecords.getNotFound().size()); + } + @Test public void should_logMissingRecord_given_storageMissedRecords() throws URISyntaxException { @@ -310,4 +327,4 @@ public class StorageServiceImplTest { fail("Should not throw this exception" + e.getMessage()); } } -} \ No newline at end of file +} diff --git a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/ReindexServiceTest.java b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/ReindexServiceTest.java index 5165084ac11832bc7b60e3faa6fffb8f8b54fa9a..4cca1d431c0374d040ab8945f15ca4fb35b57c2c 100644 --- a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/ReindexServiceTest.java +++ b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/ReindexServiceTest.java @@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.indexer.Records; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.service.ReindexServiceImpl; import org.opengroup.osdu.indexer.service.StorageService; @@ -36,9 +37,11 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; import org.springframework.test.context.junit4.SpringRunner; +import java.net.URISyntaxException; import java.util.*; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; @@ -95,7 +98,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(null); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -109,7 +112,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(new ArrayList<>()); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -129,7 +132,7 @@ public class ReindexServiceTest { when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload); } catch (Exception e) { @@ -145,11 +148,26 @@ public class ReindexServiceTest { recordQueryResponse.setResults(results); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload); } catch (Exception e) { fail("Should not throw exception" + e.getMessage()); } } + + @Test + public void should_createReindexTaskForValidRecords_givenValidRecordIds_reIndexRecordsTest() throws URISyntaxException { + DpsHeaders headers = new DpsHeaders(); + when(requestInfo.getHeadersWithDwdAuthZ()).thenReturn(headers); + when(configurationProperties.getStorageRecordsBatchSize()).thenReturn(2); + List<String> recordIds = Arrays.asList("id1", "id2"); + when(storageService.getStorageRecords(recordIds)).thenReturn( + Records.builder().records(Collections.singletonList(Records.Entity.builder().id("id1").kind("kind1").build())).notFound(Collections.singletonList("id2")).build() + ); + Records records = sut.reindexRecords(recordIds); + Assert.assertEquals(1, records.getRecords().size()); + Assert.assertEquals(1, records.getNotFound().size()); + verify(indexerQueueTaskBuilder).createWorkerTask("{\"data\":\"[{\\\"id\\\":\\\"id1\\\",\\\"kind\\\":\\\"kind1\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{}}", 0L, headers); + } } diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/ReindexServiceTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/ReindexServiceTest.java index 403199cb3231d84c8b072b6588dee7a1389ab447..aab33b30c1eb11aa41b810022472ac46ba1d2329 100644 --- a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/ReindexServiceTest.java +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/ReindexServiceTest.java @@ -90,7 +90,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(null); when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -104,7 +104,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(new ArrayList<>()); when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -124,7 +124,7 @@ public class ReindexServiceTest { when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload); } catch (Exception e) { @@ -140,7 +140,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(results); when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"correlation-id\":\"%s\"}}", correlationId), taskQueuePayload); } catch (Exception e) { diff --git a/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/ReindexServiceTest.java b/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/ReindexServiceTest.java index acb116a762ba85ce6373ad0e8fa3814de66a4937..2067d75f0a8b4ef7bc5c7cb5351d0ef443e09282 100644 --- a/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/ReindexServiceTest.java +++ b/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/ReindexServiceTest.java @@ -84,7 +84,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(null); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -98,7 +98,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(new ArrayList<>()); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String response = sut.reindexRecords(recordReindexRequest, false); + String response = sut.reindexKind(recordReindexRequest, false); Assert.assertNull(response); } catch (Exception e) { @@ -118,7 +118,7 @@ public class ReindexServiceTest { when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload); } catch (Exception e) { @@ -134,7 +134,7 @@ public class ReindexServiceTest { recordQueryResponse.setResults(results); when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse); - String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false); + String taskQueuePayload = sut.reindexKind(recordReindexRequest, false); Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload); } catch (Exception e) {