diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java index 89c13fcb1fbc6b0da27795b5577a491eac8bf3dd..e9e3328eea0cb9df4f2691b901cbf62fabc22165 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java @@ -129,6 +129,7 @@ public class StorageServiceImpl implements StorageService { // no retry possible, update record status as failed -- storage service cannot locate records if (!records.getNotFound().isEmpty()) { + jaxRsDpsLog.error(records.getNotFound().size() + " records were not found. Full list: " + records.getNotFound()); this.jobStatus.addOrUpdateRecordStatus(records.getNotFound(), IndexingStatus.FAIL, RequestStatus.INVALID_RECORD, "Storage service records not found", String.format("Storage service records not found: %s", String.join(",", records.getNotFound()))); } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java index 973372f431e5d29c333affcd43932736132dbe16..3c73f93c1d893dea4be35ee1191eafee23546946 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; import com.google.common.base.Strings; -import java.io.IOException; + import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,60 +42,61 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { @Override public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) { - log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), - oqmMessage.getAttributes()); - boolean acked = false; + log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), oqmMessage.getAttributes()); + if (!validInput(oqmMessage)) { + log.error("Not valid event payload, event will not be processed."); + oqmAckReplier.ack(); + return; + } + try { - if (!validInput(oqmMessage)) { - log.warn("Not valid event payload, event will not be processed."); - oqmAckReplier.ack(); - return; - } DpsHeaders headers = getHeaders(oqmMessage); // Filling thread context required by the core services. dpsHeaders.setThreadContext(headers.getHeaders()); sendMessage(oqmMessage); - acked = true; + oqmAckReplier.ack(); } catch (AppException appException) { int statusCode = appException.getError().getCode(); if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) { - log.info( - "Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", - appException - ); - acked = true; + skipMessage(oqmMessage, oqmAckReplier, appException); } else { - //It is possible to get both AppException with wrapped in original Exception or the original Exception without any wrapper - Exception exception = Optional.ofNullable(appException.getOriginalException()).orElse(appException); - log.warn( - "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", - exception - ); + rescheduleMessage(oqmMessage, oqmAckReplier, getException(appException)); } } catch (Exception exception) { - log.error( - "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", - exception - ); + rescheduleMessage(oqmMessage, oqmAckReplier, exception); } finally { - if (!acked) { - oqmAckReplier.nack(); - } else { - oqmAckReplier.ack(); - } // Cleaning thread context after processing is finished and the thread dies out. ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); } } + private static void skipMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, AppException appException) { + log.info("Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", appException); + oqmAckReplier.ack(); + } + + private static void rescheduleMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, Exception exception) { + log.error("Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", exception); + oqmAckReplier.nack(); + } + + /** + * It is possible to get both AppException with wrapped in original Exception + * or the original Exception without any wrapper. + */ + @NotNull + private static Exception getException(AppException appException) { + return Optional.ofNullable(appException.getOriginalException()).orElse(appException); + } + private boolean validInput(OqmMessage oqmMessage) { boolean isValid = true; if (Strings.isNullOrEmpty(oqmMessage.getData()) || oqmMessage.getData().equals("{}")) { - log.warn("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + log.error("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); isValid = false; } if (oqmMessage.getAttributes() == null || oqmMessage.getAttributes().size() == 0) { - log.warn("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + log.error("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); isValid = false; } return isValid;