Skip to content
Snippets Groups Projects
Commit de1d55cd authored by Neelesh Thakur's avatar Neelesh Thakur
Browse files

Merge branch 'master' into change-schema-retrieval-order

parents 9a1a66da c9383019
No related branches found
No related tags found
1 merge request!105change schema retrieval order
Pipeline #30274 failed
......@@ -56,7 +56,7 @@ spec:
readOnly: true
env:
- name: spring_application_name
value: indexer
value: {{ .Chart.Name }}
- name: server.servlet.contextPath
value: /api/indexer/v2/
- name: server_port
......
......@@ -24,7 +24,7 @@ spec:
kind: Deployment
name: {{ .Chart.Name }}
minReplicas: {{ .Values.global.replicaCount }}
maxReplicas: 5
maxReplicas: 10
metrics:
- type: Resource
resource:
......
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.error;
import javax.validation.ValidationException;
import javassist.NotFoundException;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
import org.opengroup.osdu.core.common.model.http.AppException;
@Order(Ordered.HIGHEST_PRECEDENCE - 1)
@ControllerAdvice
public class GlobalExceptionMapperCore extends ResponseEntityExceptionHandler {
@Autowired
private JaxRsDpsLog logger;
@ExceptionHandler(AppException.class)
protected ResponseEntity<Object> handleAppException(AppException e) {
return this.getErrorResponse(e);
}
@ExceptionHandler(ValidationException.class)
protected ResponseEntity<Object> handleValidationException(ValidationException e) {
return this.getErrorResponse(
new AppException(HttpStatus.BAD_REQUEST.value(), "Validation error.", e.getMessage(), e));
}
@ExceptionHandler(NotFoundException.class)
protected ResponseEntity<Object> handleNotFoundException(NotFoundException e) {
return this.getErrorResponse(
new AppException(HttpStatus.NOT_FOUND.value(), "Resource not found.", e.getMessage(), e));
}
@ExceptionHandler(AccessDeniedException.class)
protected ResponseEntity<Object> handleAccessDeniedException(AccessDeniedException e) {
return this.getErrorResponse(
new AppException(HttpStatus.FORBIDDEN.value(), "Access denied", e.getMessage(), e));
}
@ExceptionHandler(Exception.class)
protected ResponseEntity<Object> handleGeneralException(Exception e) {
return this.getErrorResponse(
new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Server error.",
"An unknown error has occurred.", e));
}
private ResponseEntity<Object> getErrorResponse(AppException e) {
String exceptionMsg = e.getOriginalException() != null
? e.getOriginalException().getMessage()
: e.getError().getMessage();
if( e.getCause() instanceof Exception) {
Exception original = (Exception) e.getCause();
this.logger.error(original.getMessage(), original);
}
if (e.getError().getCode() > 499) {
this.logger.error(exceptionMsg, e);
} else {
this.logger.warning(exceptionMsg, e);
}
return new ResponseEntity<Object>(e.getError(), HttpStatus.resolve(e.getError().getCode()));
}
}
\ No newline at end of file
......@@ -16,7 +16,10 @@ package org.opengroup.osdu.indexer.service;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import java.util.Objects;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.AppException;
......@@ -53,74 +56,70 @@ public class ReindexServiceImpl implements ReindexService {
@Inject
private JaxRsDpsLog jaxRsDpsLog;
@SneakyThrows
@Override
public String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean) {
Long initialDelayMillis = 0l;
try {
DpsHeaders headers = this.requestInfo.getHeadersWithDwdAuthZ();
if (forceClean) {
this.indexSchemaService.syncIndexMappingWithStorageSchema(recordReindexRequest.getKind());
initialDelayMillis = 30000l;
}
DpsHeaders headers = this.requestInfo.getHeadersWithDwdAuthZ();
RecordQueryResponse recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (forceClean) {
this.indexSchemaService.syncIndexMappingWithStorageSchema(recordReindexRequest.getKind());
initialDelayMillis = 30000l;
}
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
RecordQueryResponse recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
List<RecordInfo> msgs = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList());
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
List<RecordInfo> msgs = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList());
Gson gson = new Gson();
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers);
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
// don't call reindex-worker endpoint if it's the last batch
// previous storage query result size will be less then requested (limit param)
if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()) {
String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build());
this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers);
return newPayLoad;
}
Gson gson = new Gson();
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers);
return recordChangedMessagePayload;
} else {
jaxRsDpsLog.info(String.format("kind: %s cannot be re-indexed, storage service cannot locate valid records", recordReindexRequest.getKind()));
// don't call reindex-worker endpoint if it's the last batch
// previous storage query result size will be less then requested (limit param)
if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()) {
String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build());
this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers);
return newPayLoad;
}
return null;
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
return recordChangedMessagePayload;
} else {
jaxRsDpsLog.info(String.format("kind: %s cannot be re-indexed, storage service cannot locate valid records", recordReindexRequest.getKind()));
}
return null;
}
@Override
public void fullReindex(boolean forceClean) {
List<String> allKinds = null;
try {
allKinds = storageService.getAllKinds();
} catch (Exception e) {
jaxRsDpsLog.error("storage service all kinds request failed",e);
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "an unknown error has occurred.", e);
}
if (Objects.isNull(allKinds)){
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "full reindex failed");
}
for (String kind : allKinds) {
try {
reindexRecords(new RecordReindexRequest(kind, ""), forceClean);
} catch (Exception e) {
jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind));
continue;
}
}
}
@Override
public void fullReindex(boolean forceClean) {
List<String> allKinds = null;
try {
allKinds = storageService.getAllKinds();
} catch (Exception e) {
jaxRsDpsLog.error("storage service all kinds request failed", e);
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "an unknown error has occurred.", e);
}
if (Objects.isNull(allKinds)) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "full reindex failed");
}
for (String kind : allKinds) {
try {
reindexRecords(new RecordReindexRequest(kind, ""), forceClean);
} catch (Exception e) {
jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind));
continue;
}
}
}
}
\ No newline at end of file
......@@ -14,15 +14,23 @@
package org.opengroup.osdu.indexer.azure.util;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.java.Log;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
......@@ -31,11 +39,14 @@ import org.springframework.web.context.annotation.RequestScope;
import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;
@Log
@Component
......@@ -46,6 +57,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Autowired
private ITopicClientFactory topicClientFactory;
@Inject
private IndexerConfigurationProperties configurationProperties;
@Inject
private JaxRsDpsLog logger;
......@@ -53,14 +67,59 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Named("SERVICE_BUS_TOPIC")
private String serviceBusTopic;
@Inject
private StorageService storageService;
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
createTask(payload, headers);
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers);
}
private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
// fetch all the remaining records
// This logic is temporary and would be updated to call the storage service async.
// Currently the storage client can't be called out of request scope hence making the
// storage calls sync here
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null;
try {
do {
if (recordQueryResponse != null) {
recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build();
}
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<RecordInfo> records = recordQueryResponse.getResults().stream()
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
}
private void createTask(String payload, DpsHeaders headers) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment