diff --git a/devops/azure/chart/templates/deployment.yaml b/devops/azure/chart/templates/deployment.yaml index 6f8dabb677ca1a52f3f95c310c9b2c18acf2f6a4..1f8e620484e28c856b2ddad65a3ef0d1bf202156 100644 --- a/devops/azure/chart/templates/deployment.yaml +++ b/devops/azure/chart/templates/deployment.yaml @@ -56,7 +56,7 @@ spec: readOnly: true env: - name: spring_application_name - value: indexer + value: {{ .Chart.Name }} - name: server.servlet.contextPath value: /api/indexer/v2/ - name: server_port diff --git a/devops/azure/chart/templates/hpa.yaml b/devops/azure/chart/templates/hpa.yaml index d9d00bfc65e3df192ca241337772c937dbc1ff9e..68a00c86912a249472eec7cf49411f38b72322d4 100644 --- a/devops/azure/chart/templates/hpa.yaml +++ b/devops/azure/chart/templates/hpa.yaml @@ -24,7 +24,7 @@ spec: kind: Deployment name: {{ .Chart.Name }} minReplicas: {{ .Values.global.replicaCount }} - maxReplicas: 5 + maxReplicas: 10 metrics: - type: Resource resource: diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java new file mode 100644 index 0000000000000000000000000000000000000000..6e546e14cf7197a480bf84de084a3cf628373ad0 --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/error/GlobalExceptionMapperCore.java @@ -0,0 +1,88 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.indexer.error; + +import javax.validation.ValidationException; + +import javassist.NotFoundException; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.AccessDeniedException; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler; +import org.opengroup.osdu.core.common.model.http.AppException; + +@Order(Ordered.HIGHEST_PRECEDENCE - 1) +@ControllerAdvice +public class GlobalExceptionMapperCore extends ResponseEntityExceptionHandler { + + @Autowired + private JaxRsDpsLog logger; + + @ExceptionHandler(AppException.class) + protected ResponseEntity<Object> handleAppException(AppException e) { + return this.getErrorResponse(e); + } + + @ExceptionHandler(ValidationException.class) + protected ResponseEntity<Object> handleValidationException(ValidationException e) { + return this.getErrorResponse( + new AppException(HttpStatus.BAD_REQUEST.value(), "Validation error.", e.getMessage(), e)); + } + + @ExceptionHandler(NotFoundException.class) + protected ResponseEntity<Object> handleNotFoundException(NotFoundException e) { + return this.getErrorResponse( + new AppException(HttpStatus.NOT_FOUND.value(), "Resource not found.", e.getMessage(), e)); + } + + @ExceptionHandler(AccessDeniedException.class) + protected ResponseEntity<Object> handleAccessDeniedException(AccessDeniedException e) { + return this.getErrorResponse( + new AppException(HttpStatus.FORBIDDEN.value(), "Access denied", e.getMessage(), e)); + } + + @ExceptionHandler(Exception.class) + protected ResponseEntity<Object> handleGeneralException(Exception e) { + return this.getErrorResponse( + new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Server error.", + "An unknown error has occurred.", e)); + } + + private ResponseEntity<Object> getErrorResponse(AppException e) { + + String exceptionMsg = e.getOriginalException() != null + ? e.getOriginalException().getMessage() + : e.getError().getMessage(); + + if( e.getCause() instanceof Exception) { + Exception original = (Exception) e.getCause(); + this.logger.error(original.getMessage(), original); + } + + if (e.getError().getCode() > 499) { + this.logger.error(exceptionMsg, e); + } else { + this.logger.warning(exceptionMsg, e); + } + + return new ResponseEntity<Object>(e.getError(), HttpStatus.resolve(e.getError().getCode())); + } +} \ No newline at end of file diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java index 0f3d5c4f18db650813ca096878925dc49a153921..70fa630e6ca33621db37c7b4f1a15344925fb3e1 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/ReindexServiceImpl.java @@ -16,7 +16,10 @@ package org.opengroup.osdu.indexer.service; import com.google.common.base.Strings; import com.google.gson.Gson; + import java.util.Objects; + +import lombok.SneakyThrows; import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.AppException; @@ -53,74 +56,70 @@ public class ReindexServiceImpl implements ReindexService { @Inject private JaxRsDpsLog jaxRsDpsLog; + @SneakyThrows @Override public String reindexRecords(RecordReindexRequest recordReindexRequest, boolean forceClean) { Long initialDelayMillis = 0l; - try { - DpsHeaders headers = this.requestInfo.getHeadersWithDwdAuthZ(); - if (forceClean) { - this.indexSchemaService.syncIndexMappingWithStorageSchema(recordReindexRequest.getKind()); - initialDelayMillis = 30000l; - } + DpsHeaders headers = this.requestInfo.getHeadersWithDwdAuthZ(); - RecordQueryResponse recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); + if (forceClean) { + this.indexSchemaService.syncIndexMappingWithStorageSchema(recordReindexRequest.getKind()); + initialDelayMillis = 30000l; + } - if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { + RecordQueryResponse recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); - List<RecordInfo> msgs = recordQueryResponse.getResults().stream() - .map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList()); + if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + List<RecordInfo> msgs = recordQueryResponse.getResults().stream() + .map(record -> RecordInfo.builder().id(record).kind(recordReindexRequest.getKind()).op(OperationType.create.name()).build()).collect(Collectors.toList()); - Gson gson = new Gson(); - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers); + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - // don't call reindex-worker endpoint if it's the last batch - // previous storage query result size will be less then requested (limit param) - if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()) { - String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build()); - this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers); - return newPayLoad; - } + Gson gson = new Gson(); + RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(msgs)).attributes(attributes).build(); + String recordChangedMessagePayload = gson.toJson(recordChangedMessages); + this.indexerQueueTaskBuilder.createWorkerTask(recordChangedMessagePayload, initialDelayMillis, headers); - return recordChangedMessagePayload; - } else { - jaxRsDpsLog.info(String.format("kind: %s cannot be re-indexed, storage service cannot locate valid records", recordReindexRequest.getKind())); + // don't call reindex-worker endpoint if it's the last batch + // previous storage query result size will be less then requested (limit param) + if (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()) { + String newPayLoad = gson.toJson(RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordReindexRequest.getKind()).build()); + this.indexerQueueTaskBuilder.createReIndexTask(newPayLoad, initialDelayMillis, headers); + return newPayLoad; } - return null; - } catch (AppException e) { - throw e; - } catch (Exception e) { - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e); + + return recordChangedMessagePayload; + } else { + jaxRsDpsLog.info(String.format("kind: %s cannot be re-indexed, storage service cannot locate valid records", recordReindexRequest.getKind())); } + return null; } - @Override - public void fullReindex(boolean forceClean) { - List<String> allKinds = null; - try { - allKinds = storageService.getAllKinds(); - } catch (Exception e) { - jaxRsDpsLog.error("storage service all kinds request failed",e); - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "an unknown error has occurred.", e); - } - if (Objects.isNull(allKinds)){ - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "full reindex failed"); - } - for (String kind : allKinds) { - try { - reindexRecords(new RecordReindexRequest(kind, ""), forceClean); - } catch (Exception e) { - jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind)); - continue; - } - } - } + @Override + public void fullReindex(boolean forceClean) { + List<String> allKinds = null; + try { + allKinds = storageService.getAllKinds(); + } catch (Exception e) { + jaxRsDpsLog.error("storage service all kinds request failed", e); + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "an unknown error has occurred.", e); + } + if (Objects.isNull(allKinds)) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "storage service cannot respond with all kinds", "full reindex failed"); + } + for (String kind : allKinds) { + try { + reindexRecords(new RecordReindexRequest(kind, ""), forceClean); + } catch (Exception e) { + jaxRsDpsLog.warning(String.format("kind: %s cannot be re-indexed", kind)); + continue; + } + } + } } \ No newline at end of file diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index 555dbc3964e0fe7e8308b329e98656310d13d78b..7210b4aaa06bb72b607ae31353835ca4353a19d1 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -14,15 +14,23 @@ package org.opengroup.osdu.indexer.azure.util; +import com.google.common.base.Strings; import com.google.common.reflect.TypeToken; import com.google.gson.*; import com.microsoft.azure.servicebus.Message; import lombok.extern.java.Log; +import org.apache.http.HttpStatus; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.OperationType; import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; +import org.opengroup.osdu.indexer.service.StorageService; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; @@ -31,11 +39,14 @@ import org.springframework.web.context.annotation.RequestScope; import javax.inject.Inject; import javax.inject.Named; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.lang.reflect.Type; + @Log @Component @@ -46,6 +57,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { @Autowired private ITopicClientFactory topicClientFactory; + @Inject + private IndexerConfigurationProperties configurationProperties; + @Inject private JaxRsDpsLog logger; @@ -53,14 +67,59 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { @Named("SERVICE_BUS_TOPIC") private String serviceBusTopic; + @Inject + private StorageService storageService; + + @Override public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { + headers.addCorrelationIdIfMissing(); createTask(payload, headers); } @Override public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { - createTask(payload, headers); + headers.addCorrelationIdIfMissing(); + publishAllRecordsToServiceBus(payload, headers); + } + + private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) { + // fetch all the remaining records + // This logic is temporary and would be updated to call the storage service async. + // Currently the storage client can't be called out of request scope hence making the + // storage calls sync here + Gson gson = new Gson(); + RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); + final String recordKind = recordReindexRequest.getKind(); + RecordQueryResponse recordQueryResponse = null; + + try { + do { + if (recordQueryResponse != null) { + recordReindexRequest = RecordReindexRequest.builder().cursor(recordQueryResponse.getCursor()).kind(recordKind).build(); + } + recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); + if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { + + List<RecordInfo> records = recordQueryResponse.getResults().stream() + .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + + RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); + String recordChangedMessagePayload = gson.toJson(recordChangedMessages); + createTask(recordChangedMessagePayload, headers); + } + } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize()); + + } catch (AppException e) { + throw e; + } catch (Exception e) { + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e); + } } private void createTask(String payload, DpsHeaders headers) {