Commit bbd5f6a2 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Added IndexerServiceImpl

parent b6b92fcd
package org.opengroup.osdu.indexer.di;
import java.util.List;
import javax.inject.Inject;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.logging.audit.AuditPayload;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.indexer.logging.AuditEvents;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* Prototype spring bean overriding @RequestScope AuditLogger
*/
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Primary
public class AuditLogger {
@Inject
private JaxRsDpsLog logger;
@Inject
private DpsHeaders headers;
private AuditEvents events = null;
private AuditEvents getAuditEvents() {
if (this.events == null) {
this.events = new AuditEvents(this.headers.getUserEmail());
}
return this.events;
}
public void indexCreateRecordSuccess(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexCreateRecordSuccessEvent(resources));
}
public void indexCreateRecordFail(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexCreateRecordFailEvent(resources));
}
public void indexUpdateRecordSuccess(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexUpdateRecordSuccessEvent(resources));
}
public void indexUpdateRecordFail(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexUpdateRecordFailEvent(resources));
}
public void indexDeleteRecordSuccess(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexDeleteRecordSuccessEvent(resources));
}
public void indexDeleteRecordFail(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexDeleteRecordFailEvent(resources));
}
public void indexPurgeRecordSuccess(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexPurgeRecordSuccessEvent(resources));
}
public void indexPurgeRecordFail(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexPurgeRecordFailEvent(resources));
}
public void indexStarted(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexEvent(resources));
}
public void getReindex(List<String> resources) {
this.writeLog(this.getAuditEvents().getReindexEvent(resources));
}
public void copyIndex(List<String> resources) {
this.writeLog(this.getAuditEvents().getCopyIndexEvent(resources));
}
public void getTaskStatus(List<String> resources) {
this.writeLog(this.getAuditEvents().getTaskStatusEvent(resources));
}
public void getIndexCleanUpJobRun(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexCleanUpJobRunEvent(resources));
}
public void indexMappingUpdateSuccess(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexMappingUpdateEvent(resources,true));
}
public void indexMappingUpdateFail(List<String> resources) {
this.writeLog(this.getAuditEvents().getIndexMappingUpdateEvent(resources,false));
}
public void getConfigurePartition(List<String> resources) {
this.writeLog(this.getAuditEvents().getConfigurePartitionEvent(resources));
}
private void writeLog(AuditPayload log) {
this.logger.audit(log);
}
}
\ No newline at end of file
/*
* Copyright 2022 Google LLC
* Copyright 2022 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.di;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.logging.LogUtils;
import org.opengroup.osdu.core.common.logging.audit.AuditPayload;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.Request;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* Prototype spring bean overriding @RequestScope JaxRsDpsLog
*/
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Primary
public class JaxRsDpsLog implements AutoCloseable {
@Value("${LOG_PREFIX}")
private String LOG_PREFIX;
private ILogger log;
private DpsHeaders headers;
@Inject
public JaxRsDpsLog(ILogger log, DpsHeaders headers) {
this.log = log;
this.headers = headers;
}
public void audit(AuditPayload auditPayload) {
this.log.audit(this.LOG_PREFIX + ".audit", auditPayload, this.getLabels());
}
public void audit(String loggerName, AuditPayload auditPayload) {
this.log.audit(loggerName, this.LOG_PREFIX + ".audit", auditPayload, this.getLabels());
}
public void request(Request httpRequest) {
this.log.request(this.LOG_PREFIX + ".request", httpRequest, this.getLabels());
}
public void request(String loggerName, Request httpRequest) {
this.log.request(loggerName, this.LOG_PREFIX + ".request", httpRequest, this.getLabels());
}
public void info(String message) {
this.log.info(this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void info(String loggerName, String message) {
this.log.info(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void debug(String message) {
this.log.debug(this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void debug(String loggerName, String message) {
this.log.debug(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void warning(String message) {
this.log.warning(this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void warning(String loggerName, String message) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
}
private String prepareWarningMessage(List<String> messages) {
int sn = 0;
StringBuilder sb = new StringBuilder();
Iterator var4 = messages.iterator();
while(var4.hasNext()) {
String s = (String)var4.next();
sb.append(String.format("%d: %s", sn++, s)).append(System.lineSeparator());
}
return sb.toString();
}
public void warning(List<String> messages) {
if (messages != null && !messages.isEmpty()) {
this.log.warning(this.LOG_PREFIX + ".app", this.prepareWarningMessage(messages), this.getLabels());
}
}
public void warning(String loggerName, List<String> messages) {
if (messages != null && !messages.isEmpty()) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", this.prepareWarningMessage(messages), this.getLabels());
}
}
public void warning(String message, Exception e) {
this.log.warning(this.LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void warning(String loggerName, String message, Exception e) {
this.log.warning(loggerName, this.LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void error(String message) {
this.log.error(this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void error(String loggerName, String message) {
this.log.error(loggerName, this.LOG_PREFIX + ".app", message, this.getLabels());
}
public void error(String message, Exception e) {
this.log.error(this.LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void error(String loggerName, String message, Exception e) {
this.log.error(loggerName, this.LOG_PREFIX + ".app", message, e, this.getLabels());
}
public void close() throws Exception {
}
private Map<String, String> getLabels() {
if (this.headers != null) {
Map<String, String> out = LogUtils.createStandardLabelsFromMap(this.headers.getHeaders());
if (out.containsKey("X-AppEngine-TaskRetryCount")) {
out.put("X-AppEngine-TaskRetryCount", StringUtils.join(new Serializable[]{(Serializable)out.get("X-AppEngine-TaskRetryCount"), ','}));
}
return out;
} else {
return Collections.emptyMap();
}
}
}
/*
* Copyright 2022 Google LLC
* Copyright 2022 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.oqm.subscribe;
import java.io.IOException;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.CloudTaskRequest;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.config.IndexerConfigProperties;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.service.IndexerService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
/**
* Runs once on the service start.
* Fetches all tenants' oqm destinations for TOPIC existence.
* If exists - searches for pull SUBSCRIPTION existence.
* Creates SUBSCRIPTION if it doesn't exist. Then subscribe itself on SUBSCRIPTION.
*/
@Service
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "oqmDriver")
public class OqmSubscriberManager {
private static final String SUBSCRIPTION_PREFIX = "indexer-oqm-";
private static final String ACKNOWLEDGE = "message acknowledged by client";
private static final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
private final OqmDriver driver;
private final IndexerConfigProperties properties;
private final ITenantFactory tenantInfoFactory;
private final IndexerService indexerService;
private String subscriptionName;
@PostConstruct
void postConstruct() {
subscriptionName = getSubscriptionName();
log.info("OqmSubscriberManager provisioning STARTED");
tenantInfoFactory.listTenantInfo().forEach(this::createSubscriptionForTenant);
log.info("OqmSubscriberManager provisioning COMPLETED");
}
private void createSubscriptionForTenant(TenantInfo tenantInfo) {
String topicName = properties.getRecordsTopicName();
log.info("OQM: provisioning tenant {}:", tenantInfo.getDataPartitionId());
log.info("OQM: check for topic {} existence:", topicName);
OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo))
.orElse(null);
if (topic == null) {
log.info("OQM: check for topic {} existence: ABSENT. Skipped", topicName);
return;
}
log.info("OQM: check for topic {} existence: PRESENT", topicName);
OqmSubscription subscription = getSubscription(tenantInfo, topic);
if (subscription == null) {
subscription = createSubscription(tenantInfo, topic);
} else {
log.info("OQM: check for subscription {} existence: PRESENT", subscriptionName);
}
registerSubscriber(tenantInfo, subscription);
log.info("OQM: provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId());
}
@Nullable
private OqmSubscription getSubscription(TenantInfo tenantInfo, OqmTopic topic) {
log.info("OQM: check for subscription {} existence:", subscriptionName);
OqmSubscriptionQuery query = OqmSubscriptionQuery.builder()
.namePrefix(subscriptionName)
.subscriberable(true)
.build();
return driver
.listSubscriptions(topic, query, getDestination(tenantInfo)).stream()
.findAny()
.orElse(null);
}
private OqmSubscription createSubscription(TenantInfo tenantInfo, OqmTopic topic) {
log.info("OQM: check for subscription {} existence: ABSENT. Will create.", subscriptionName);
OqmSubscription request = OqmSubscription.builder()
.topic(topic)
.name(subscriptionName)
.build();
return driver.createAndGetSubscription(request, getDestination(tenantInfo));
}
private void registerSubscriber(TenantInfo tenantInfo, OqmSubscription subscription) {
log.info("OQM: registering Subscriber for subscription {}", subscription.getName());
OqmDestination destination = getDestination(tenantInfo);
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(),
oqmMessage.getAttributes());
try {
ackSendingMessage(oqmMessage, oqmAckReplier);
} catch (Exception e) {
log.debug(NOT_ACKNOWLEDGE, e);
oqmAckReplier.nack();
}
};
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(subscription)
.messageReceiver(receiver)
.build();
driver.subscribe(subscriber, destination);
log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscription.getName());
}
private void ackSendingMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) {
DpsHeaders headers = getHeaders(oqmMessage);
if (headers.getPartitionId() == null) {
log.error("Partition id is not set for message: {}", oqmMessage.getId());
oqmAckReplier.ack();
}
indexerService.processRecordChangedMessages();
if (response.isError()) {
log.error(NOT_ACKNOWLEDGE + response.getReasonPhrase());
} else {
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
}
}
@NotNull
private DpsHeaders getHeaders(OqmMessage oqmMessage) {
DpsHeaders headers = new DpsHeaders();
headers.getHeaders().put("data-partition-id", oqmMessage.getAttributes().get("data-partition-id"));
return headers;
}
private OqmDestination getDestination(TenantInfo tenantInfo) {
return OqmDestination.builder()
.partitionId(tenantInfo.getDataPartitionId())
.build();
}
private String getSubscriptionName() {
return SUBSCRIPTION_PREFIX + properties.getRecordsTopicName();
}
}
package org.opengroup.osdu.indexer.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.indexer.di.JaxRsDpsLog;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class BulkRequestProcessor {
private static final List<RestStatus> RETRY_ELASTIC_EXCEPTION = Stream.of(
RestStatus.TOO_MANY_REQUESTS,
RestStatus.BAD_GATEWAY,
RestStatus.SERVICE_UNAVAILABLE).collect(Collectors.toList());
private final JaxRsDpsLog jaxRsDpsLog;
private final JobStatus jobStatus;
public List<String> processBulkRequest(RestHighLevelClient restClient, BulkRequest bulkRequest) throws AppException {
List<String> failureRecordIds = new LinkedList<>();
if (bulkRequest.numberOfActions() == 0) {
return failureRecordIds;
}
int failedRequestStatus = 500;
Exception failedRequestCause = null;
try {
BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// log failed bulk requests
ArrayList<String> bulkFailures = new ArrayList<>();
int succeededResponses = 0;
int failedResponses = 0;
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
bulkFailures.add(
String.format("elasticsearch bulk service status: %s | id: %s | message: %s",
failure.getStatus(), failure.getId(), failure.getMessage()));
jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.FAIL,
failure.getStatus().getStatus(), bulkItemResponse.getFailureMessage());
if (canIndexerRetry(bulkItemResponse)) {
failureRecordIds.add(bulkItemResponse.getId());
if (failedRequestCause == null) {
failedRequestCause = failure.getCause();
failedRequestStatus = failure.getStatus().getStatus();
}
}
failedResponses++;
} else {
succeededResponses++;
this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.SUCCESS,
HttpStatus.SC_OK, "Indexed Successfully");
}
}
if (!bulkFailures.isEmpty()) {
jaxRsDpsLog.warning(bulkFailures);
}
jaxRsDpsLog.info(String.format(
"records in elasticsearch service bulk request: %s | successful: %s | failed: %s",
bulkRequest.numberOfActions(), succeededResponses, failedResponses));
// retry entire message if all records are failing
if (bulkRequest.numberOfActions() == failureRecordIds.size()) {
throw new AppException(failedRequestStatus, "Elastic error",
failedRequestCause.getMessage(), failedRequestCause);
}
} catch (IOException e) {
// throw explicit 504 for IOException
throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error",
"Request cannot be completed in specified time.", e);
} catch (ElasticsearchStatusException e) {
throw new AppException(e.status().getStatus(), "Elastic error", e.getMessage(), e);
} catch (Exception e) {
if (e instanceof AppException) {
throw e;
}
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Elastic error",
"Error indexing records.", e);
}
return failureRecordIds;
}
private boolean canIndexerRetry(BulkItemResponse bulkItemResponse) {
OpType type = bulkItemResponse.getOpType();
RestStatus status = bulkItemResponse.status();
return RETRY_ELASTIC_EXCEPTION.contains(status)
|| ((type == DocWriteRequest.OpType.CREATE || type