Commit 62008808 authored by Dmitriy Novikov's avatar Dmitriy Novikov
Browse files

Added thread local headers implementation

parent 06c2aa16
......@@ -2,9 +2,8 @@ package org.opengroup.osdu.indexer.di;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.logging.audit.AuditPayload;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.indexer.logging.AuditEvents;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Primary;
......@@ -17,18 +16,17 @@ import org.springframework.stereotype.Component;
@Component("gcpAuditLogger")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Primary
@RequiredArgsConstructor
public class AuditLogger {
@Inject
private JaxRsDpsLog logger;
@Inject
private DpsHeaders headers;
private final JaxRsDpsLog logger;
private final ThreadLocalHeaders headers;
private AuditEvents events = null;
private AuditEvents getAuditEvents() {
if (this.events == null) {
String user = Optional.ofNullable(this.headers.getUserEmail())
String user = Optional.ofNullable(headers.getUserEmail().get())
.orElse("storage-user-email");
this.events = new AuditEvents(user);
}
......
package org.opengroup.osdu.indexer.di;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Data;
import org.springframework.stereotype.Component;
@Component
@Data
public class ThreadLocalHeaders {
private AtomicReference<String> dataPartitionId = new AtomicReference<>();
private AtomicReference<String> authorization = new AtomicReference<>();
private AtomicReference<String> userEmail = new AtomicReference<>();
}
......@@ -23,12 +23,14 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.RestHighLevelClient;
import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.IndexingStatus;
......@@ -40,6 +42,7 @@ import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver;
import org.opengroup.osdu.indexer.di.AuditLogger;
import org.opengroup.osdu.indexer.di.JaxRsDpsLog;
import org.opengroup.osdu.indexer.di.JobStatus;
import org.opengroup.osdu.indexer.di.ThreadLocalHeaders;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.ElasticClientHandler;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
......@@ -61,13 +64,13 @@ public class IndexerServiceImpl implements IndexerService {
private final ElasticIndexNameResolver elasticIndexNameResolver;
private final ElasticClientHandler elasticClientHandler;
private final IndexerQueueTaskBuilder indexerQueueTaskBuilder;
private final DpsHeaders headers;
private final ThreadLocalHeaders headers;
@Override
public org.opengroup.osdu.core.common.model.indexer.JobStatus processRecordChangedMessages(RecordChangedMessages recordChangedMessages, List<RecordInfo> recordInfos) throws Exception {
String errorMessage = "";
jobStatus.initialize(recordInfos);
putHeadersFromMessages(recordChangedMessages.getAttributes());
putAtomicHeadersFromMessages(recordChangedMessages.getAttributes());
try {
auditLogger.indexStarted(recordInfos.stream()
......@@ -94,7 +97,7 @@ public class IndexerServiceImpl implements IndexerService {
} finally {
jobStatus.finalizeRecordStatus(errorMessage);
updateAuditLog();
publisher.publishStatusChangedTagsToTopic(headers, new org.opengroup.osdu.core.common.model.indexer.JobStatus());
publisher.publishStatusChangedTagsToTopic(convertAtomicToDpsHeaders(), new org.opengroup.osdu.core.common.model.indexer.JobStatus());
}
// Unused return object with @Request scope. Left here to match the interface.
......@@ -115,10 +118,19 @@ public class IndexerServiceImpl implements IndexerService {
}
}
private void putHeadersFromMessages(Map<String, String> attributes) {
headers.put("authorization", attributes.get("authorization"));
headers.put("data-partition-id", attributes.get("data-partition-id"));
headers.put("user", attributes.get("user"));
private void putAtomicHeadersFromMessages(Map<String, String> attributes) {
headers.setAuthorization(new AtomicReference<>(attributes.get("authorization")));
headers.setDataPartitionId(new AtomicReference<>(attributes.get("data-partition-id")));
headers.setUserEmail(new AtomicReference<>(attributes.get("user")));
}
@NotNull
private DpsHeaders convertAtomicToDpsHeaders() {
DpsHeaders dpsHeaders = new DpsHeaders();
dpsHeaders.put("authorization", headers.getAuthorization().get());
dpsHeaders.put("data-partition-id", headers.getDataPartitionId().get());
dpsHeaders.put("user", headers.getUserEmail().get());
return dpsHeaders;
}
private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException {
......@@ -139,7 +151,7 @@ public class IndexerServiceImpl implements IndexerService {
.attributes(message.getAttributes()).build();
String payLoad = gson.toJson(newMessage);
indexerQueueTaskBuilder.createWorkerTask(payLoad, headers);
indexerQueueTaskBuilder.createWorkerTask(payLoad, convertAtomicToDpsHeaders());
}
private void processSchemaEvents(RestHighLevelClient restClient,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment