Skip to content
Snippets Groups Projects
Commit 9487172e authored by Dmitrii Novikov (EPAM)'s avatar Dmitrii Novikov (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

Added logs for not found records

parent a3170cca
No related branches found
No related tags found
1 merge request!502Added logs for not found records
...@@ -129,6 +129,7 @@ public class StorageServiceImpl implements StorageService { ...@@ -129,6 +129,7 @@ public class StorageServiceImpl implements StorageService {
// no retry possible, update record status as failed -- storage service cannot locate records // no retry possible, update record status as failed -- storage service cannot locate records
if (!records.getNotFound().isEmpty()) { if (!records.getNotFound().isEmpty()) {
jaxRsDpsLog.error(records.getNotFound().size() + " records were not found. Full list: " + records.getNotFound());
this.jobStatus.addOrUpdateRecordStatus(records.getNotFound(), IndexingStatus.FAIL, RequestStatus.INVALID_RECORD, "Storage service records not found", String.format("Storage service records not found: %s", String.join(",", records.getNotFound()))); this.jobStatus.addOrUpdateRecordStatus(records.getNotFound(), IndexingStatus.FAIL, RequestStatus.INVALID_RECORD, "Storage service records not found", String.format("Storage service records not found: %s", String.join(",", records.getNotFound())));
} }
......
/* /*
* Copyright 2020-2022 Google LLC * Copyright 2020-2023 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc * Copyright 2020-2023 EPAM Systems, Inc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; package org.opengroup.osdu.indexer.provider.gcp.indexing.processing;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -42,60 +42,61 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { ...@@ -42,60 +42,61 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver {
@Override @Override
public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) { public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) {
log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), oqmMessage.getAttributes());
oqmMessage.getAttributes()); if (!validInput(oqmMessage)) {
boolean acked = false; log.error("Not valid event payload, event will not be processed.");
oqmAckReplier.ack();
return;
}
try { try {
if (!validInput(oqmMessage)) {
log.warn("Not valid event payload, event will not be processed.");
oqmAckReplier.ack();
return;
}
DpsHeaders headers = getHeaders(oqmMessage); DpsHeaders headers = getHeaders(oqmMessage);
// Filling thread context required by the core services. // Filling thread context required by the core services.
dpsHeaders.setThreadContext(headers.getHeaders()); dpsHeaders.setThreadContext(headers.getHeaders());
sendMessage(oqmMessage); sendMessage(oqmMessage);
acked = true; oqmAckReplier.ack();
} catch (AppException appException) { } catch (AppException appException) {
int statusCode = appException.getError().getCode(); int statusCode = appException.getError().getCode();
if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) { if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) {
log.info( skipMessage(oqmMessage, oqmAckReplier, appException);
"Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.",
appException
);
acked = true;
} else { } else {
//It is possible to get both AppException with wrapped in original Exception or the original Exception without any wrapper rescheduleMessage(oqmMessage, oqmAckReplier, getException(appException));
Exception exception = Optional.ofNullable(appException.getOriginalException()).orElse(appException);
log.warn(
"Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.",
exception
);
} }
} catch (Exception exception) { } catch (Exception exception) {
log.error( rescheduleMessage(oqmMessage, oqmAckReplier, exception);
"Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.",
exception
);
} finally { } finally {
if (!acked) {
oqmAckReplier.nack();
} else {
oqmAckReplier.ack();
}
// Cleaning thread context after processing is finished and the thread dies out. // Cleaning thread context after processing is finished and the thread dies out.
ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); ThreadScopeContextHolder.currentThreadScopeAttributes().clear();
} }
} }
private static void skipMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, AppException appException) {
log.info("Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", appException);
oqmAckReplier.ack();
}
private static void rescheduleMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, Exception exception) {
log.error("Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", exception);
oqmAckReplier.nack();
}
/**
* It is possible to get both AppException with wrapped in original Exception
* or the original Exception without any wrapper.
*/
@NotNull
private static Exception getException(AppException appException) {
return Optional.ofNullable(appException.getOriginalException()).orElse(appException);
}
private boolean validInput(OqmMessage oqmMessage) { private boolean validInput(OqmMessage oqmMessage) {
boolean isValid = true; boolean isValid = true;
if (Strings.isNullOrEmpty(oqmMessage.getData()) || oqmMessage.getData().equals("{}")) { if (Strings.isNullOrEmpty(oqmMessage.getData()) || oqmMessage.getData().equals("{}")) {
log.warn("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); log.error("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes());
isValid = false; isValid = false;
} }
if (oqmMessage.getAttributes() == null || oqmMessage.getAttributes().size() == 0) { if (oqmMessage.getAttributes() == null || oqmMessage.getAttributes().size() == 0) {
log.warn("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); log.error("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes());
isValid = false; isValid = false;
} }
return isValid; return isValid;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment