Commit df39b292 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Fixed job status request scope issue

parent fabd3433
......@@ -17,9 +17,7 @@
package org.opengroup.osdu.indexer.di;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
......@@ -27,9 +25,9 @@ import org.apache.commons.lang3.StringUtils;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.logging.LogUtils;
import org.opengroup.osdu.core.common.logging.audit.AuditPayload;
import org.opengroup.osdu.core.common.model.AppEngineHeaders;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.Request;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Primary;
......@@ -46,6 +44,7 @@ public class JaxRsDpsLog implements AutoCloseable {
@Value("${LOG_PREFIX}")
private String LOG_PREFIX;
private ILogger log;
private DpsHeaders headers;
......@@ -56,52 +55,57 @@ public class JaxRsDpsLog implements AutoCloseable {
}
public void audit(AuditPayload auditPayload) {
this.log.audit(this.LOG_PREFIX + ".audit", auditPayload, this.getLabels());
log.audit(LOG_PREFIX + ".audit", auditPayload, this.getLabels());
}
public void audit(String loggerName, AuditPayload auditPayload) {
this.log.audit(loggerName, this.LOG_PREFIX + ".audit", auditPayload, this.getLabels());
public void audit(final String loggerName, final AuditPayload auditPayload) {
log.audit(loggerName, LOG_PREFIX + ".audit", auditPayload, this.getLabels());
}
public void request(Request httpRequest) {
this.log.request(this.LOG_PREFIX + ".request", httpRequest, this.getLabels());
log.request(LOG_PREFIX + ".request", httpRequest, this.getLabels());
}
public void request(String loggerName, Request httpRequest) {
this.log.request(loggerName, this.LOG_PREFIX + ".request", httpRequest, this.getLabels());
public void request(final String loggerName, final Request httpRequest) {
log.request(loggerName, LOG_PREFIX + ".request", httpRequest, this.getLabels());
}
public void info(String message) {
this.log.info(this.LOG_PREFIX + ".app", message, this.getLabels());
log.info(LOG_PREFIX + ".app", message, this.getLabels());
}
public void info(String loggerName, String message) {
this.log.info(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
public void info(final String loggerName, final String message) {
log.info(loggerName, LOG_PREFIX + ".app", message, this.getLabels());
}
public void debug(String message) {
this.log.debug(this.LOG_PREFIX + ".app", message, this.getLabels());
log.debug(LOG_PREFIX + ".app", message, this.getLabels());
}
public void debug(final String loggerName, final String message) {
log.debug(loggerName, LOG_PREFIX + ".app", message, this.getLabels());
}
public void debug(String loggerName, String message) {
this.log.debug(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
public void debug(List<String> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
log.debug(LOG_PREFIX + ".app", prepareLoggingMessage(messages), this.getLabels());
}
public void warning(String message) {
this.log.warning(this.LOG_PREFIX + ".app", message, this.getLabels());
log.warning(LOG_PREFIX + ".app", message, this.getLabels());
}
public void warning(String loggerName, String message) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
public void warning(final String loggerName, final String message) {
log.warning(loggerName, LOG_PREFIX + ".app", message, this.getLabels());
}
private String prepareWarningMessage(List<String> messages) {
private String prepareLoggingMessage(List<String> messages) {
int sn = 0;
StringBuilder sb = new StringBuilder();
Iterator var4 = messages.iterator();
while(var4.hasNext()) {
String s = (String)var4.next();
for (String s : messages) {
sb.append(String.format("%d: %s", sn++, s)).append(System.lineSeparator());
}
......@@ -109,54 +113,59 @@ public class JaxRsDpsLog implements AutoCloseable {
}
public void warning(List<String> messages) {
if (messages != null && !messages.isEmpty()) {
this.log.warning(this.LOG_PREFIX + ".app", this.prepareWarningMessage(messages), this.getLabels());
if (messages == null || messages.isEmpty()) {
return;
}
log.warning(LOG_PREFIX + ".app", prepareLoggingMessage(messages), this.getLabels());
}
public void warning(String loggerName, List<String> messages) {
if (messages != null && !messages.isEmpty()) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", this.prepareWarningMessage(messages), this.getLabels());
public void warning(final String loggerName, final List<String> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
log.warning(loggerName, LOG_PREFIX + ".app", prepareLoggingMessage(messages), this.getLabels());
}
public void warning(String message, Exception e) {
this.log.warning(this.LOG_PREFIX + ".app", message, e, this.getLabels());
log.warning(LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void warning(String loggerName, String message, Exception e) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", message, e, this.getLabels());
public void warning(final String loggerName, final String message, final Exception e) {
log.warning(loggerName, LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void error(String message) {
this.log.error(this.LOG_PREFIX + ".app", message, this.getLabels());
log.error(LOG_PREFIX + ".app", message, this.getLabels());
}
public void error(String loggerName, String message) {
this.log.error(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
public void error(final String loggerName, final String message) {
log.error(loggerName, LOG_PREFIX + ".app", message, this.getLabels());
}
public void error(String message, Exception e) {
this.log.error(this.LOG_PREFIX + ".app", message, e, this.getLabels());
log.error(LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void error(String loggerName, String message, Exception e) {
this.log.error(loggerName, this.LOG_PREFIX + ".app", message, e, this.getLabels());
public void error(final String loggerName, final String message, final Exception e) {
log.error(loggerName, LOG_PREFIX + ".app", message, e, this.getLabels());
}
@Override
public void close() throws Exception {
}
private Map<String, String> getLabels() {
if (this.headers != null) {
Map<String, String> out = LogUtils.createStandardLabelsFromMap(this.headers.getHeaders());
if (out.containsKey("X-AppEngine-TaskRetryCount")) {
out.put("X-AppEngine-TaskRetryCount", StringUtils.join(new Serializable[]{(Serializable)out.get("X-AppEngine-TaskRetryCount"), ','}));
Map<String, String> out;
if (headers != null) {
out = LogUtils.createStandardLabelsFromMap(headers.getHeaders());
if (out.containsKey(AppEngineHeaders.TASK_RETRY_COUNT)) {
out.put(AppEngineHeaders.TASK_RETRY_COUNT,
StringUtils.join(out.get(AppEngineHeaders.TASK_RETRY_COUNT), ','));
}
return out;
} else {
return Collections.emptyMap();
}
return Collections.emptyMap();
}
}
/*
* Copyright 2022 Google LLC
* Copyright 2022 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.di;
import com.google.common.base.Strings;
import lombok.extern.java.Log;
import lombok.Data;
import org.opengroup.osdu.core.common.model.indexer.IndexProgress;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordStatus;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
/**
* Prototype spring bean overriding @RequestScope JobStatus
*/
@Log
@Data
@Component("gcpJobStatus")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Primary
public class JobStatus {
@Inject
private JaxRsDpsLog jaxRsDpsLog;
private List<RecordStatus> statusesList = new ArrayList<>();
private List<String> debugInfos = new ArrayList<>();
public void initialize(List<RecordInfo> recordInfos) {
if (recordInfos == null || recordInfos.isEmpty()) return;
List<RecordStatus> statuses = recordInfos.stream().map(msg -> RecordStatus.builder()
.id(msg.getId())
.kind(msg.getKind())
.operationType(msg.getOp())
.status(IndexingStatus.PROCESSING)
.indexProgress(
IndexProgress.builder().trace(new Stack<>()).lastUpdateTime(Instant.now().toString()).build())
.build()).collect(Collectors.toList());
this.statusesList.addAll(statuses);
}
public void addOrUpdateRecordStatus(Collection<String> ids, IndexingStatus status, int statusCode, String message, String debugInfo) {
this.debugInfos.add(debugInfo);
addOrUpdateRecordStatus(ids, status, statusCode, message);
}
public void addOrUpdateRecordStatus(String id, IndexingStatus status, int statusCode, String message, String debugInfo) {
this.debugInfos.add(debugInfo);
addOrUpdateRecordStatus(id, status, statusCode, message);
}
public void addOrUpdateRecordStatus(Collection<String> ids, IndexingStatus status, int statusCode, String message) {
if (ids == null || ids.isEmpty()) return;
ids.forEach(id -> addOrUpdateRecordStatus(id, status, statusCode, message));
}
public void addOrUpdateRecordStatus(String id, IndexingStatus status, int statusCode, String message) {
Optional<RecordStatus> queryResult = this.statusesList.stream().filter(s -> s.getId().equalsIgnoreCase(id)).findFirst();
if (queryResult.isPresent()) {
RecordStatus s = queryResult.get();
IndexProgress indexProgress = s.getIndexProgress();
indexProgress.setStatusCode(statusCode);
indexProgress.setLastUpdateTime(Instant.now().toString());
if (!Strings.isNullOrEmpty(message)) {
indexProgress.getTrace().add(message);
}
if (status.isWorseThan(s.getStatus())) {
s.setStatus(status);
}
s.setIndexProgress(indexProgress);
} else {
IndexProgress indexProgress = IndexProgress.builder()
.trace(new Stack<>())
.lastUpdateTime(Instant.now().toString()).build();
indexProgress.getTrace().add(message);
this.statusesList.add(RecordStatus.builder().id(id).status(status).indexProgress(indexProgress).build());
}
}
public List<String> getIdsByIndexingStatus(IndexingStatus indexingStatus) {
return this.statusesList.stream().filter(s -> s.getStatus() == indexingStatus).map(RecordStatus::getId)
.collect(Collectors.toList());
}
public List<String> getIdsByValidUpsertIndexingStatus() {
return this.statusesList.stream().filter(s -> (s.getStatus() == IndexingStatus.PROCESSING) || (s.getStatus() == IndexingStatus.SKIP) || (s.getStatus() == IndexingStatus.WARN)).map(RecordStatus::getId)
.collect(Collectors.toList());
}
public String getRecordKindById(String id) {
Optional<RecordStatus> optionalRecordStatus = this.statusesList.stream().filter(s -> s.getId()
.equalsIgnoreCase(id)).findFirst();
RecordStatus status = optionalRecordStatus.orElse(null);
return status != null ? status.getKind() : null;
}
public RecordStatus getJobStatusByRecordId(String id) {
Optional<RecordStatus> optionalRecordStatus = this.statusesList.stream().filter(s -> s.getId()
.equalsIgnoreCase(id)).findFirst();
return optionalRecordStatus.orElse(null);
}
public List<RecordStatus> getRecordStatuses(IndexingStatus indexingStatus, OperationType operationType) {
return this.statusesList.stream().filter(
s -> s.getStatus() == indexingStatus && s.getOperationType().equalsIgnoreCase(operationType.getValue())).collect(Collectors.toList());
}
/*
* mark all the records as FAIL if for some reason they were not processed
* */
public void finalizeRecordStatus(String errorMessage) {
statusesList.stream().filter(recordStatus -> recordStatus.getStatus() == IndexingStatus.PROCESSING).forEach
(recordStatus -> {
recordStatus.setStatus(IndexingStatus.FAIL);
recordStatus.getIndexProgress().getTrace().add(errorMessage);
});
// dump all debug-info
this.jaxRsDpsLog.debug(this.debugInfos);
}
}
......@@ -19,7 +19,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.indexer.di.JaxRsDpsLog;
import org.springframework.stereotype.Service;
......
......@@ -10,7 +10,7 @@ import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.RequestStatus;
import org.opengroup.osdu.core.common.model.indexer.IndexSchema;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordIndexerPayload;
import org.opengroup.osdu.core.common.model.indexer.RecordStatus;
......
......@@ -33,7 +33,7 @@ import org.opengroup.osdu.indexer.di.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.indexer.RecordStatus;
......@@ -64,7 +64,7 @@ public class IndexerServiceImpl implements IndexerService {
private final DpsHeaders headers;
@Override
public JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception {
public org.opengroup.osdu.core.common.model.indexer.JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception {
String errorMessage = "";
jobStatus.initialize(recordInfos);
putHeadersFromMessages(recordChangedMessages.getAttributes());
......@@ -94,10 +94,11 @@ public class IndexerServiceImpl implements IndexerService {
} finally {
jobStatus.finalizeRecordStatus(errorMessage);
updateAuditLog();
publisher.publishStatusChangedTagsToTopic(headers, jobStatus);
publisher.publishStatusChangedTagsToTopic(headers, new org.opengroup.osdu.core.common.model.indexer.JobStatus());
}
return jobStatus;
// Unused return object with @Request scope. Left here to match the interface.
return new org.opengroup.osdu.core.common.model.indexer.JobStatus();
}
@Override
......
......@@ -9,7 +9,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.opengroup.osdu.core.common.model.indexer.IndexSchema;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.opengroup.osdu.core.common.model.indexer.RecordIndexerPayload;
import org.opengroup.osdu.core.common.model.indexer.Records;
......
......@@ -9,7 +9,7 @@ import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.IndexSchema;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.core.common.model.indexer.OperationType;
import org.springframework.stereotype.Service;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment