Skip to content
Snippets Groups Projects
Commit e7ea17c2 authored by Mingyang Zhu's avatar Mingyang Zhu
Browse files

Merge branch 'reindex-records-api' into 'master'

Implement reindex records API

See merge request !556
parents b17d1e6c 9dcb40a3
No related branches found
No related tags found
1 merge request!556Implement reindex records API
Pipeline #191781 failed
Showing
with 362 additions and 58 deletions
......@@ -657,8 +657,8 @@ The following software have components provided under the terms of this license:
- perfmark:perfmark-api (from https://github.com/perfmark/perfmark)
- proto-google-cloud-logging-v2 (from https://github.com/googleapis/java-logging/proto-google-cloud-logging-v2, https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-logging-v2)
- proto-google-cloud-pubsub-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-pubsub/proto-google-cloud-pubsub-v1)
- proto-google-common-protos (from https://github.com/googleapis/api-client-staging, https://github.com/googleapis/gapic-generator-java, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-common-protos)
- proto-google-iam-v1 (from https://github.com/googleapis/gapic-generator-java, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-iam-v1)
- proto-google-common-protos (from https://github.com/googleapis/api-client-staging, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-common-protos, https://github.com/googleapis/sdk-platform-java)
- proto-google-iam-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-iam-v1, https://github.com/googleapis/sdk-platform-java)
- rank-eval (from https://github.com/elastic/elasticsearch, https://github.com/elastic/elasticsearch.git)
- resilience4j (from https://github.com/resilience4j/resilience4j, https://resilience4j.readme.io, ttps://resilience4j.readme.io)
- rest (from https://github.com/elastic/elasticsearch, https://github.com/elastic/elasticsearch.git)
......
......@@ -115,7 +115,9 @@ This endpoint takes information from files, generated by `spring-boot-maven-plug
### Reindex <a name="reindex"></a>
Reindex API allows users to re-index a `kind` without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 200 if it can launch the re-indexing or
#### Reindex a 'kind'
Reindex kind API allows users to re-index a `kind` without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 200 if it can launch the re-indexing or
appropriate error code if it cannot. The current status of the indexing can be tracked by calling search API and making query with this particular kind. Please be advised, it may take few seconds to few hours to finish the re-indexing as
multiple factors contribute to latency, such as number of records in the kind, current load at the indexer service etc.
......@@ -159,6 +161,58 @@ will use the same schema and overwrite records with the same ids. Default value
`kind` <br />
&emsp;&emsp;(required, String) Kind to be re-indexed.
#### Reindex given records
Reindex records API allows users to re-index the given records by providing the record ids without re-ingesting the records via storage API. Reindexing a kind is an asynchronous operation and when a user calls this API, it will respond with HTTP 202 if it can launch the re-indexing or
appropriate error code if it cannot. The response body indicates which given records were re-indexed and which ones were not found in storage. It supports up to 1000 records per API call.
#### Request
```http
POST /api/indexer/v2/reindex/records HTTP/1.1
{
"recordIds": ["opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d", "opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7"]
}
```
<details><summary>**Curl**</summary>
```bash
curl --request POST \
--url '/api/indexer/v2/reindex/records' \
--header 'accept: application/json' \
--header 'authorization: Bearer <JWT>' \
--header 'content-type: application/json' \
--header 'data-partition-id: opendes' \
--data '{
"recordIds": ["opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d", "opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7"]
}'
```
</details><br>
#### Prerequisite
Users must be a member of `users.datalake.admins` or `users.datalake.ops` group.
#### Request body
`recordIds` <br />
&emsp;&emsp;(required, Array of String) Storage records to be re-indexed.
#### Example response
```json
{
"reIndexedRecords": [
"opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7"
],
"notFoundRecords": [
"opendes:work-product-component--WellLog:17763fcc18864f4f8eab62e320f8913d"
]
}
```
[Back to table of contents](#TOC)
## Delete API <a name="delete"></a>
......
......@@ -103,7 +103,7 @@ public class RecordIndexerApi {
public ResponseEntity<?> reindex(
@RequestBody @NotNull(message = SwaggerDoc.REQUEST_VALIDATION_NOT_NULL_BODY)
@Valid RecordReindexRequest recordReindexRequest) {
return new ResponseEntity<>(reIndexService.reindexRecords(recordReindexRequest, false), HttpStatus.OK);
return new ResponseEntity<>(reIndexService.reindexKind(recordReindexRequest, false), HttpStatus.OK);
}
// THIS IS AN INTERNAL USE API ONLY
......
......@@ -24,10 +24,14 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.opengroup.osdu.core.common.model.http.AppError;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.core.common.model.search.SearchServiceRole;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.model.ReindexRecordsRequest;
import org.opengroup.osdu.indexer.model.ReindexRecordsResponse;
import org.opengroup.osdu.indexer.service.IndexSchemaService;
import org.opengroup.osdu.indexer.service.ReindexService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PatchMapping;
......@@ -42,6 +46,8 @@ import javax.inject.Inject;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
......@@ -58,6 +64,29 @@ public class ReindexApi {
@Inject
private AuditLogger auditLogger;
@Operation(summary = "${reindexApi.reindexRecords.summary}", description = "${reindexApi.reindexRecords.description}",
security = {@SecurityRequirement(name = "Authorization")}, tags = { "reindex-api" })
@ApiResponses(value = {
@ApiResponse(responseCode = "202", description = "Accepted"),
@ApiResponse(responseCode = "400", description = "Bad Request", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "401", description = "Unauthorized", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "403", description = "User not authorized to perform the action", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "404", description = "Not Found", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "500", description = "Internal Server Error", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "502", description = "Bad Gateway", content = {@Content(schema = @Schema(implementation = AppError.class))}),
@ApiResponse(responseCode = "503", description = "Service Unavailable", content = {@Content(schema = @Schema(implementation = AppError.class))})
})
@PreAuthorize("@authorizationFilter.hasPermission('" + SearchServiceRole.ADMIN + "')")
@PostMapping(path = "/records", consumes = "application/json")
public ResponseEntity<?> reindexRecords(@NotNull @Valid @RequestBody ReindexRecordsRequest reindexRecordsRequest) {
Records records = this.reIndexService.reindexRecords(reindexRecordsRequest.getRecordIds());
List<String> reindexedRecords = records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList());
if (!reindexedRecords.isEmpty()) {
this.auditLogger.getReindexRecords(reindexedRecords);
}
return new ResponseEntity<>(ReindexRecordsResponse.builder().reIndexedRecords(records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList())).notFoundRecords(records.getNotFound()).build(), HttpStatus.ACCEPTED);
}
@Operation(summary = "${reindexApi.reindex.summary}", description = "${reindexApi.reindex.description}",
security = {@SecurityRequirement(name = "Authorization")}, tags = { "reindex-api" })
@ApiResponses(value = {
......@@ -75,7 +104,7 @@ public class ReindexApi {
public ResponseEntity<?> reindex(@NotNull @Valid @RequestBody RecordReindexRequest recordReindexRequest,
@Parameter(description = "Force Clean")
@RequestParam(value = "force_clean", defaultValue = "false") boolean forceClean) throws IOException {
this.reIndexService.reindexRecords(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean));
this.reIndexService.reindexKind(recordReindexRequest, this.indexSchemaService.isStorageSchemaSyncRequired(recordReindexRequest.getKind(), forceClean));
this.auditLogger.getReindex(singletonList(recordReindexRequest.getKind()));
return new ResponseEntity<>(org.springframework.http.HttpStatus.OK);
}
......
......@@ -63,6 +63,9 @@ public class AuditEvents {
private static final String INDEX_DELETE_SUCCESS = "Successfully deleted index";
private static final String INDEX_DELETE_FAILURE = "Failed deleting index";
private static final String REINDEX_RECORDS_ACTION_ID = "IN0014";
private static final String REINDEX_RECORDS_OPERATION = "Reindex records";
private final String user;
public AuditEvents(String user) {
......@@ -204,6 +207,17 @@ public class AuditEvents {
.build();
}
public AuditPayload getReindexRecordsEvent(List<String> resources) {
return AuditPayload.builder()
.action(AuditAction.CREATE)
.status(AuditStatus.SUCCESS)
.actionId(REINDEX_RECORDS_ACTION_ID)
.message(REINDEX_RECORDS_OPERATION)
.resources(resources)
.user(this.user)
.build();
}
public AuditPayload getCopyIndexEvent(List<String> resources) {
return AuditPayload.builder()
.action(AuditAction.CREATE)
......@@ -269,4 +283,4 @@ public class AuditEvents {
.build();
}
}
}
\ No newline at end of file
}
......@@ -89,6 +89,10 @@ public class AuditLogger {
this.writeLog(this.getAuditEvents().getReindexEvent(resources));
}
public void getReindexRecords(List<String> resources) {
this.writeLog(this.getAuditEvents().getReindexRecordsEvent(resources));
}
public void copyIndex(List<String> resources) {
this.writeLog(this.getAuditEvents().getCopyIndexEvent(resources));
}
......@@ -116,4 +120,4 @@ public class AuditLogger {
private void writeLog(AuditPayload log) {
this.logger.audit(log);
}
}
\ No newline at end of file
}
// Copyright 2023, SLB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ReindexRecordsRequest {
@NotNull
@Size(min = 1, max = 1000)
private List<@NotBlank String> recordIds;
}
// Copyright 2023, SLB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.model;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class ReindexRecordsResponse {
private List<String> reIndexedRecords;
private List<String> notFoundRecords;
}
......@@ -16,10 +16,15 @@ package org.opengroup.osdu.indexer.service;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.indexer.Records;
import java.util.List;
public interface ReindexService {
String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean);
String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean);
Records reindexRecords(List<String> recordIds);
void fullReindex(boolean forceClean);
}
\ No newline at end of file
}
......@@ -17,27 +17,21 @@ package org.opengroup.osdu.indexer.service;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import java.util.Objects;
import java.util.*;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.indexer.*;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
......@@ -58,7 +52,7 @@ public class ReindexServiceImpl implements ReindexService {
@SneakyThrows
@Override
public String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean) {
public String reindexKind(RecordReindexRequest recordReindexRequest, boolean forceClean) {
Long initialDelayMillis = 0l;
......@@ -75,20 +69,12 @@ public class ReindexServiceImpl implements ReindexService {
List<RecordInfo> msgs = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
Gson gson = new Gson();
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers);
String recordChangedMessagePayload = this.replayReindexMsg(msgs, initialDelayMillis, headers);
// don't call reindex-worker endpoint if it's the last batch
// previous storage query result size will be less then requested (limit param)
if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()) {
Gson gson = new Gson();
String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build());
this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers);
return newPayLoad;
......@@ -101,6 +87,18 @@ public class ReindexServiceImpl implements ReindexService {
return null;
}
@SneakyThrows
@Override
public Records reindexRecords(List<String> recordIds) {
Records records = this.storageService.getStorageRecords(recordIds);
if (records.getRecords().size() > 0) {
List<RecordInfo> msgs = records.getRecords().stream()
.map(record -> RecordInfo.builder().id(record.getId()).kind(record.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList());
this.replayReindexMsg(msgs, 0L, null);
}
return records;
}
@Override
public void fullReindex(boolean forceClean) {
List<String> allKinds = null;
......@@ -115,11 +113,26 @@ public class ReindexServiceImpl implements ReindexService {
}
for (String kind : allKinds) {
try {
reindexRecords(new RecordReindexRequest(kind, ""), forceClean);
reindexKind(new RecordReindexRequest(kind, ""), forceClean);
} catch (Exception e) {
jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind));
continue;
}
}
}
}
\ No newline at end of file
private String replayReindexMsg(List<RecordInfo> msgs, Long initialDelayMillis, DpsHeaders headers) {
Map<String, String> attributes = new HashMap<>();
if (headers == null) {
headers = this.requestInfo.getHeadersWithDwdAuthZ();
}
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
Gson gson = new Gson();
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers);
return recordChangedMessagePayload;
}
}
......@@ -28,9 +28,11 @@ public interface StorageService {
Records getStorageRecords(List<String> ids, List<RecordInfo> recordChangedInfos) throws AppException, URISyntaxException;
Records getStorageRecords(List<String> ids) throws URISyntaxException;
RecordQueryResponse getRecordsByKind(RecordReindexRequest request) throws URISyntaxException;
String getStorageSchema(String kind) throws URISyntaxException, UnsupportedEncodingException;
List<String> getAllKinds() throws URISyntaxException;
}
\ No newline at end of file
}
......@@ -24,7 +24,11 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.opengroup.osdu.core.common.http.FetchServiceHttpRequest;
import org.opengroup.osdu.core.common.http.IHttpClientHandler;
import org.opengroup.osdu.core.common.http.IUrlFetchService;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
......@@ -50,11 +54,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import static org.opengroup.osdu.core.common.Constants.SLB_FRAME_OF_REFERENCE_VALUE;
......@@ -70,6 +70,8 @@ public class StorageServiceImpl implements StorageService {
@Inject
private IUrlFetchService urlFetchService;
@Inject
private IHttpClientHandler httpClientHandler;
@Inject
private JobStatus jobStatus;
@Inject
private IRequestInfo requestInfo;
......@@ -98,6 +100,24 @@ public class StorageServiceImpl implements StorageService {
return Records.builder().records(valid).notFound(notFound).conversionStatuses(conversionStatuses).missingRetryRecords(missingRetryRecordIds).build();
}
@Override
public Records getStorageRecords(List<String> ids) throws URISyntaxException {
List<Records.Entity> valid = new ArrayList<>();
List<String> notFound = new ArrayList<>();
List<ConversionStatus> conversionStatuses = new ArrayList<>();
List<String> missingRetryRecordIds = new ArrayList<>();
List<List<String>> batch = Lists.partition(ids, configurationProperties.getStorageRecordsBatchSize());
for (List<String> recordsBatch : batch) {
Records storageOut = this.getRecords(recordsBatch);
valid.addAll(storageOut.getRecords());
notFound.addAll(storageOut.getNotFound());
conversionStatuses.addAll(storageOut.getConversionStatuses());
missingRetryRecordIds.addAll(storageOut.getMissingRetryRecords());
}
return Records.builder().records(valid).notFound(notFound).conversionStatuses(conversionStatuses).missingRetryRecords(missingRetryRecordIds).build();
}
protected Records getRecords(List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) throws URISyntaxException {
// e.g. {"records":["test:10"]}
String body = this.gson.toJson(RecordIds.builder().records(ids).build());
......@@ -114,6 +134,27 @@ public class StorageServiceImpl implements StorageService {
return this.validateStorageResponse(response, ids, recordChangedMap, validRecordKindPatchMap);
}
protected Records getRecords(List<String> ids) throws URISyntaxException {
String body = this.gson.toJson(RecordIds.builder().records(ids).build());
DpsHeaders headers = this.requestInfo.getHeaders();
URIBuilder builder = new URIBuilder(configurationProperties.getStorageQueryRecordHost());
HttpPost request = new HttpPost(builder.build());
request.setEntity(new StringEntity(body, StandardCharsets.UTF_8));
// we do not need retry on storage based on not found record
HttpResponse response = httpClientHandler.sendRequest(request, headers);
if (response.getResponseCode() > 299) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Internal Error", response.getBody());
}
try {
Records records = this.objectMapper.readValue(response.getBody(), Records.class);
ids.removeAll(records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList()));
records.setNotFound(ids);
return records;
} catch (JsonProcessingException e) {
throw new AppException(RequestStatus.INVALID_RECORD, "Invalid request", "Successful Storage service response with wrong json", e);
}
}
private Records validateStorageResponse(HttpResponse response, List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) {
String bulkStorageData = response.getBody();
......
......@@ -31,6 +31,9 @@ partitionSetupApi.provisionPartition.description=Provision partition. Required r
reindexApi.reindex.summary=Re-index given 'kind'
reindexApi.reindex.description=This API allows users to re-index a 'kind' without re-ingesting the records via storage API. \
Required roles: `service.search.admin`
reindexApi.reindexRecords.summary=Re-index given records
reindexApi.reindexRecords.description=This API allows users to re-index the given records by providing record ids without \
re-ingesting the records via storage API. Required roles: `service.search.admin`
reindexApi.fullReindex.summary=Full Re-index by data partition
reindexApi.fullReindex.description=This API allows users to re-index an entire partition without re-ingesting the records via storage API.\
Required roles: `service.search.admin`
......
......@@ -21,7 +21,10 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.indexer.logging.AuditLogger;
import org.opengroup.osdu.indexer.model.ReindexRecordsRequest;
import org.opengroup.osdu.indexer.model.ReindexRecordsResponse;
import org.opengroup.osdu.indexer.service.IndexSchemaService;
import org.opengroup.osdu.indexer.service.ReindexService;
import org.springframework.http.HttpStatus;
......@@ -29,14 +32,19 @@ import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@RunWith(SpringRunner.class)
public class ReindexApiTest {
private RecordReindexRequest recordReindexRequest;
private List<String> recordIds;
@Mock
private ReindexService reIndexService;
......@@ -50,11 +58,13 @@ public class ReindexApiTest {
@Before
public void setup() {
recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build();
recordIds = new ArrayList<>();
recordIds.add("id1");
}
@Test
public void should_return200_when_valid_kind_provided() throws IOException {
when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenReturn("something");
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenReturn("something");
ResponseEntity<?> response = sut.reindex(recordReindexRequest, false);
......@@ -63,15 +73,49 @@ public class ReindexApiTest {
@Test(expected = AppException.class)
public void should_throwAppException_ifUnknownExceptionCaught_reindexTest() throws IOException {
when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenThrow(new AppException(500, "", ""));
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new AppException(500, "", ""));
sut.reindex(recordReindexRequest, false);
}
@Test(expected = NullPointerException.class)
public void should_throwAppException_ifNullPointerExceptionCaught_ReindexTest() throws IOException {
when(this.reIndexService.reindexRecords(recordReindexRequest, false)).thenThrow(new NullPointerException(""));
when(this.reIndexService.reindexKind(recordReindexRequest, false)).thenThrow(new NullPointerException(""));
sut.reindex(recordReindexRequest, false);
}
@Test
public void should_return200_when_valid_record_id_list_provided() {
when(this.reIndexService.reindexRecords(recordIds)).thenReturn(Records.builder().records(new ArrayList<>()).records(Collections.singletonList(Records.Entity.builder().id("id1").build())).notFound(recordIds).build());
ResponseEntity<?> response = sut.reindexRecords(new ReindexRecordsRequest(recordIds));
assertEquals(HttpStatus.ACCEPTED, response.getStatusCode());
verify(auditLogger).getReindexRecords(any());
}
@Test
public void should_notWriteAuditLog_when_no_valid_record_id_list_provided() {
when(this.reIndexService.reindexRecords(recordIds)).thenReturn(Records.builder().records(new ArrayList<>()).records(Collections.emptyList()).notFound(recordIds).build());
ResponseEntity<?> response = sut.reindexRecords(new ReindexRecordsRequest(recordIds));
assertEquals(HttpStatus.ACCEPTED, response.getStatusCode());
verify(auditLogger, never()).getReindex(any());
}
@Test(expected = AppException.class)
public void should_throwAppException_ifUnknownExceptionCaught_reindexRecordsTest() {
when(this.reIndexService.reindexRecords(recordIds)).thenThrow(new AppException(500, "", ""));
sut.reindexRecords(new ReindexRecordsRequest(recordIds));
}
@Test(expected = NullPointerException.class)
public void should_throwAppException_ifNullPointerExceptionCaught_ReindexRecordsTest() {
when(this.reIndexService.reindexRecords(recordIds)).thenThrow(new NullPointerException(""));
sut.reindexRecords(new ReindexRecordsRequest(recordIds));
}
}
......@@ -24,6 +24,7 @@ import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.opengroup.osdu.core.common.http.IHttpClientHandler;
import org.opengroup.osdu.core.common.http.IUrlFetchService;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
......@@ -43,11 +44,7 @@ import org.springframework.test.context.junit4.SpringRunner;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
......@@ -64,6 +61,8 @@ public class StorageServiceImplTest {
@Mock
private IUrlFetchService urlFetchService;
@Mock
private IHttpClientHandler httpClientHandler;
@Mock
private JobStatus jobStatus;
@Mock
private JaxRsDpsLog log;
......@@ -188,6 +187,24 @@ public class StorageServiceImplTest {
verify(this.log).warning("stale records found with older kind, skipping indexing | record ids: testid");
}
@Test
public void should_returnStorageRecords_givenRecordIds_getValidStorageRecordsTest() throws URISyntaxException {
String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}]}";
HttpResponse httpResponse = mock(HttpResponse.class);
when(httpResponse.getBody()).thenReturn(validDataFromStorage);
when(configurationProperties.getStorageQueryRecordHost()).thenReturn("storageUrl");
when(this.httpClientHandler.sendRequest(any(), any())).thenReturn(httpResponse);
List<String> idsCopy = new ArrayList<>();
idsCopy.addAll(ids);
Records storageRecords = this.sut.getStorageRecords(idsCopy);
assertEquals(1, storageRecords.getRecords().size());
assertEquals(1, storageRecords.getNotFound().size());
}
@Test
public void should_logMissingRecord_given_storageMissedRecords() throws URISyntaxException {
......@@ -310,4 +327,4 @@ public class StorageServiceImplTest {
fail("Should not throw this exception" + e.getMessage());
}
}
}
\ No newline at end of file
}
......@@ -27,6 +27,7 @@ import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.ReindexServiceImpl;
import org.opengroup.osdu.indexer.service.StorageService;
......@@ -36,9 +37,11 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.springframework.test.context.junit4.SpringRunner;
import java.net.URISyntaxException;
import java.util.*;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
......@@ -95,7 +98,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -109,7 +112,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -129,7 +132,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -145,11 +148,26 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
fail("Should not throw exception" + e.getMessage());
}
}
@Test
public void should_createReindexTaskForValidRecords_givenValidRecordIds_reIndexRecordsTest() throws URISyntaxException {
DpsHeaders headers = new DpsHeaders();
when(requestInfo.getHeadersWithDwdAuthZ()).thenReturn(headers);
when(configurationProperties.getStorageRecordsBatchSize()).thenReturn(2);
List<String> recordIds = Arrays.asList("id1", "id2");
when(storageService.getStorageRecords(recordIds)).thenReturn(
Records.builder().records(Collections.singletonList(Records.Entity.builder().id("id1").kind("kind1").build())).notFound(Collections.singletonList("id2")).build()
);
Records records = sut.reindexRecords(recordIds);
Assert.assertEquals(1, records.getRecords().size());
Assert.assertEquals(1, records.getNotFound().size());
verify(indexerQueueTaskBuilder).createWorkerTask("{\"data\":\"[{\\\"id\\\":\\\"id1\\\",\\\"kind\\\":\\\"kind1\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{}}", 0L, headers);
}
}
......@@ -90,7 +90,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -104,7 +104,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -124,7 +124,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -140,7 +140,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
......
......@@ -84,7 +84,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(null);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -98,7 +98,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(new ArrayList<>());
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String response = sut.reindexRecords(recordReindexRequest, false);
String response = sut.reindexKind(recordReindexRequest, false);
Assert.assertNull(response);
} catch (Exception e) {
......@@ -118,7 +118,7 @@ public class ReindexServiceTest {
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals("{\"kind\":\"tenant:test:test:1.0.0\",\"cursor\":\"100\"}", taskQueuePayload);
} catch (Exception e) {
......@@ -134,7 +134,7 @@ public class ReindexServiceTest {
recordQueryResponse.setResults(results);
when(storageService.getRecordsByKind(ArgumentMatchers.any())).thenReturn(recordQueryResponse);
String taskQueuePayload = sut.reindexRecords(recordReindexRequest, false);
String taskQueuePayload = sut.reindexKind(recordReindexRequest, false);
Assert.assertEquals(String.format("{\"data\":\"[{\\\"id\\\":\\\"test1\\\",\\\"kind\\\":\\\"tenant:test:test:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"slb-correlation-id\":\"%s\"}}", correlationId), taskQueuePayload);
} catch (Exception e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment