diff --git a/provider/indexer-aws/indexer-aws-queue-service/pom.xml b/provider/indexer-aws/indexer-aws-queue-service/pom.xml index 1ffd0ef61dc491847be28fc07c6dc42834f15236..038fecc6163a96f1bc1c5a75de149427dcec9ff5 100644 --- a/provider/indexer-aws/indexer-aws-queue-service/pom.xml +++ b/provider/indexer-aws/indexer-aws-queue-service/pom.xml @@ -47,23 +47,7 @@ <artifactId>aws-java-sdk</artifactId> <version>1.11.327</version> </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>3.8.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.10.19</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>2.0.2</version> - </dependency> + <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> @@ -86,5 +70,22 @@ <artifactId>commons-collections4</artifactId> <version>4.1</version> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>2.0.2</version> + </dependency> </dependencies> </project> diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java index 82a9ce2c9011c9897bc9b79239489cc55a3979b2..4a823ea7d31f38dbc7afc166f2f1aec11d7412ed 100644 --- a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueue.java @@ -37,6 +37,7 @@ public class IndexerQueue { Logger log = LogManager.getLogger(IndexerQueue.class); + try { log.debug("Starting Indexer Queue and obtaining Arguments"); Arguments arguments = new Arguments(); @@ -48,27 +49,28 @@ public class IndexerQueue { ThreadPoolExecutor executorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(arguments.maxIndexThreads); final String deadLetterQueueUrl = sqsClient.getQueueUrl(arguments.deadLetterQueueName).getQueueUrl(); - List<Message> messages = getMessages(sqsClient, arguments.queueName, arguments.maxBatchRequestCount, arguments.maxMessagesAllowed); + List<Message> messages = IndexerQueueService.getMessages(sqsClient, arguments.queueName, arguments.maxBatchRequestCount, arguments.maxMessagesAllowed); + log.debug(String.format("Processing %s messages from storage queue", messages.size())); if (!messages.isEmpty()) { - log.debug(String.format("Processing %s messages from storage queue", messages.size())); - List<IndexProcessor> indexProcessors = processQueue(messages, arguments.targetURL, executorPool, log); + List<IndexProcessor> indexProcessors = IndexerQueueService.processQueue(messages, arguments.targetURL, executorPool); log.debug(String.format("%s Messages Processed", indexProcessors.size())); List<IndexProcessor> failedProcessors = indexProcessors.stream().filter(indexProcessor -> indexProcessor.result == CallableResult.Fail || indexProcessor.exception != null).collect(Collectors.toList()); log.debug(String.format("%s Messages Failed", failedProcessors.size())); - List<SendMessageResult> deadLetterResults = sendMsgsToDeadLetterQueue(deadLetterQueueUrl, failedProcessors, sqsClient); + List<SendMessageResult> deadLetterResults = IndexerQueueService.sendMsgsToDeadLetterQueue(deadLetterQueueUrl, failedProcessors, sqsClient); log.debug(String.format("%s Messages Dead Lettered", deadLetterResults.size())); List<DeleteMessageBatchRequestEntry> deleteEntries = indexProcessors.stream().map(indexProcessor -> new DeleteMessageBatchRequestEntry(indexProcessor.messageId, indexProcessor.receiptHandle)).collect(Collectors.toList()); log.debug(String.format("%s Messages Deleting", deleteEntries.size())); - List<DeleteMessageBatchRequest> deleteBatchRequests = createMultipleBatchDeleteRequest(arguments.queueName, deleteEntries); + final String sqsQueueUrl = sqsClient.getQueueUrl(arguments.queueName).getQueueUrl(); + List<DeleteMessageBatchRequest> deleteBatchRequests = IndexerQueueService.createMultipleBatchDeleteRequest(sqsQueueUrl, deleteEntries, arguments.maxBatchRequestCount); log.debug(String.format("%s Delete Batch Request Created", deleteBatchRequests.size())); - List<DeleteMessageBatchResult> deleteMessageBatchResults = deleteMessages(deleteBatchRequests, sqsClient); + List<DeleteMessageBatchResult> deleteMessageBatchResults = IndexerQueueService.deleteMessages(deleteBatchRequests, sqsClient); log.debug(String.format("%s Requests Deleted", deleteMessageBatchResults.size())); } @@ -76,93 +78,12 @@ public class IndexerQueue { log.error(e.getMessage(), e); } catch (InterruptedException e) { log.error(e.getMessage(), e); - } catch (Exception e) { + } catch (NullPointerException e) { + log.error(e.getMessage(), e); + }catch (Exception e) { log.error(e.getMessage(), e); } } - // TODO: break down into smaller methods for cleaner code - private static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool, Logger log) - throws ExecutionException, InterruptedException { - - List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url); - - CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); - CompletableFuture<List<IndexProcessor>> results = CompletableFuture - .allOf(cfs) - .thenApply(ignored -> futures - .stream() - .map(CompletableFuture::join) - .collect(Collectors.toList())); - - List<IndexProcessor> processed = results.get(); - executorPool.shutdown(); - - return processed; - } - - private static List<DeleteMessageBatchResult> deleteMessages(List<DeleteMessageBatchRequest> deleteBatchRequests, AmazonSQS sqsClient) { - return deleteBatchRequests.stream().map(deleteRequest -> sqsClient.deleteMessageBatch(deleteRequest)).collect(Collectors.toList()); - } - - - private static List<DeleteMessageBatchRequest> createMultipleBatchDeleteRequest(String queueName, List<DeleteMessageBatchRequestEntry> deleteEntries) { - - List<List<DeleteMessageBatchRequestEntry>> batchedEntries = ListUtils.partition(deleteEntries, 10); - return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueName, entries)).collect(Collectors.toList()); - - } - - private static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url){ - List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); - for (final Message message : messages) { - IndexProcessor processor = new IndexProcessor(message, url); - CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); - futures.add(future); - } - return futures; - } - - // TODO make the upper bound configurable as a program argument - - private static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int numOfmessages, int maxMessageCount){ - final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); - int numOfMessages = numOfmessages; - List<Message> messages = new ArrayList<>(); - do { - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); - receiveMessageRequest.setMaxNumberOfMessages(numOfMessages); - List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest - .withMessageAttributeNames("data-partition-id", "account-id")).getMessages(); - messages.addAll(retrievedMessages); - numOfMessages = retrievedMessages.size(); - }while (messages.size() < maxMessageCount && numOfMessages > 0); - - return messages; - } - - private static List<SendMessageResult> sendMsgsToDeadLetterQueue(String deadLetterQueueUrl, List<IndexProcessor> indexProcessors, AmazonSQS sqsClient) { - return indexProcessors.stream().map(indexProcessor -> sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient)).collect(Collectors.toList()); - } - private static SendMessageResult sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){ - Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes(); - MessageAttributeValue exceptionAttribute = new MessageAttributeValue() - .withDataType("String"); - String exceptionMessage = indexProcessor.exception.getMessage(); - if(exceptionMessage == null){ - exceptionMessage = "Empty"; - } - - StringWriter sw = new StringWriter(); - indexProcessor.exception.printStackTrace(new PrintWriter(sw)); - String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString()); - exceptionAttribute.setStringValue(exceptionAsString); - messageAttributes.put("Exception", exceptionAttribute); - SendMessageRequest send_msg_request = new SendMessageRequest() - .withQueueUrl(deadLetterQueueUrl) - .withMessageBody(indexProcessor.message.getBody()) - .withMessageAttributes(messageAttributes); - return sqsClient.sendMessage(send_msg_request); - } } diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java new file mode 100644 index 0000000000000000000000000000000000000000..5461b1ab04d1eae01ffe55ca6336ed9423077313 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/main/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueService.java @@ -0,0 +1,99 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.*; +import org.apache.commons.collections4.ListUtils; +import org.apache.logging.log4j.Logger; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; + +public class IndexerQueueService { + + public static List<IndexProcessor> processQueue(List<Message> messages, String url, ThreadPoolExecutor executorPool) + throws ExecutionException, InterruptedException { + + List<CompletableFuture<IndexProcessor>> futures = createCompletableFutures(messages, executorPool, url); + + CompletableFuture[] cfs = futures.toArray(new CompletableFuture[0]); + CompletableFuture<List<IndexProcessor>> results = CompletableFuture + .allOf(cfs) + .thenApply(ignored -> futures + .stream() + .map(CompletableFuture::join) + .collect(Collectors.toList())); + + List<IndexProcessor> processed = results.get(); + executorPool.shutdown(); + + return processed; + } + + public static List<DeleteMessageBatchResult> deleteMessages(List<DeleteMessageBatchRequest> deleteBatchRequests, AmazonSQS sqsClient) { + return deleteBatchRequests.stream().map(deleteRequest -> sqsClient.deleteMessageBatch(deleteRequest)).collect(Collectors.toList()); + } + + + public static List<DeleteMessageBatchRequest> createMultipleBatchDeleteRequest(String queueUrl, List<DeleteMessageBatchRequestEntry> deleteEntries, int maxBatchRequest) { + List<List<DeleteMessageBatchRequestEntry>> batchedEntries = ListUtils.partition(deleteEntries, maxBatchRequest); + return batchedEntries.stream().map(entries -> new DeleteMessageBatchRequest(queueUrl, entries)).collect(Collectors.toList()); + } + + public static List<CompletableFuture<IndexProcessor>> createCompletableFutures(List<Message> messages, ThreadPoolExecutor executorPool, String url){ + List<CompletableFuture<IndexProcessor>> futures = new ArrayList<>(); + for (final Message message : messages) { + IndexProcessor processor = new IndexProcessor(message, url); + CompletableFuture<IndexProcessor> future = CompletableFuture.supplyAsync(processor::call, executorPool); + futures.add(future); + } + return futures; + } + + public static List<Message> getMessages(AmazonSQS sqsClient, String queueName, int numOfmessages, int maxMessageCount){ + final String sqsQueueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); + int numOfMessages = numOfmessages; + List<Message> messages = new ArrayList<>(); + do { + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsQueueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfMessages); + receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); + List<Message> retrievedMessages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + messages.addAll(retrievedMessages); + numOfMessages = retrievedMessages.size(); + }while (messages.size() < maxMessageCount && numOfMessages > 0); + + return messages; + } + + public static List<SendMessageResult> sendMsgsToDeadLetterQueue(String deadLetterQueueUrl, List<IndexProcessor> indexProcessors, AmazonSQS sqsClient) { + return indexProcessors.stream().map(indexProcessor -> sendMsgToDeadLetterQueue(deadLetterQueueUrl, indexProcessor, sqsClient)).collect(Collectors.toList()); + } + + private static SendMessageResult sendMsgToDeadLetterQueue(String deadLetterQueueUrl, IndexProcessor indexProcessor, AmazonSQS sqsClient){ + Map<String, MessageAttributeValue> messageAttributes = indexProcessor.message.getMessageAttributes(); + MessageAttributeValue exceptionAttribute = new MessageAttributeValue() + .withDataType("String"); + String exceptionMessage = indexProcessor.exception.getMessage(); + if(exceptionMessage == null){ + exceptionMessage = "Empty"; + } + + StringWriter sw = new StringWriter(); + indexProcessor.exception.printStackTrace(new PrintWriter(sw)); + String exceptionAsString = String.format("Exception message: %s. Exception stacktrace: %s", exceptionMessage, sw.toString()); + exceptionAttribute.setStringValue(exceptionAsString); + messageAttributes.put("Exception", exceptionAttribute); + SendMessageRequest send_msg_request = new SendMessageRequest() + .withQueueUrl(deadLetterQueueUrl) + .withMessageBody(indexProcessor.message.getBody()) + .withMessageAttributes(messageAttributes); + return sqsClient.sendMessage(send_msg_request); + } +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java b/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..31cd51af61cbb7f21017be92e8b09fd5bcf0d539 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexProcessorTest.java @@ -0,0 +1,4 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + +public class IndexProcessorTest { +} diff --git a/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java b/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d9f9b82e50bc44ab8d0e66c5b44179ca225a42c6 --- /dev/null +++ b/provider/indexer-aws/indexer-aws-queue-service/src/test/java/org/opengroup/osdu/indexerqueue/aws/api/IndexerQueueServiceTest.java @@ -0,0 +1,122 @@ +package org.opengroup.osdu.indexerqueue.aws.api; + + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class IndexerQueueServiceTest { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Mock + private AmazonSQS sqsClient = mock(AmazonSQS.class); + + private GetQueueUrlResult queueUrlResult; + + private ReceiveMessageResult receiveResult; + + private String queueName = "testQueue"; + private String queueUrl ="localhost"; + + @Before + public void setUp() { + + queueUrlResult = new GetQueueUrlResult(); + queueUrlResult.setQueueUrl(queueUrl); + Mockito.when(sqsClient.getQueueUrl(queueName)).thenReturn(queueUrlResult); + receiveResult = new ReceiveMessageResult(); + } + + @Test + public void test_getMessages_10Messages_pass() { + // Arrange + int numOfmessages = 10; + int maxMessageCount = 10; + + + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); + receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); + + List<Message> messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + messages.add(msg); + } + + receiveResult.setMessages(messages); + + Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); + + // Act + List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount); + + // Assert + assertEquals(messages.get(1), actualMessages.get(1)); + assertEquals(messages.size(), actualMessages.size()); + } + + @Test + public void test_getMessages_0Messages_pass() { + // Arrange + int numOfmessages = 10; + int maxMessageCount = 10; + + + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); + receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); + + List<Message> messages = new ArrayList<>(); + + receiveResult.setMessages(messages); + + Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); + + // Act + List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, queueName, numOfmessages, maxMessageCount); + + // Assert + assertEquals(messages.size(), actualMessages.size()); + } + + @Test(expected = NullPointerException.class) + public void test_getMessages_invalidqueuename_fail() { + // Arrange + int numOfmessages = 10; + int maxMessageCount = 10; + + String invalidQueueName = "invalid"; + + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl); + receiveMessageRequest.setMaxNumberOfMessages(numOfmessages); + receiveMessageRequest.withMessageAttributeNames("data-partition-id", "account-id"); + + List<Message> messages = new ArrayList<>(); + + receiveResult.setMessages(messages); + + Mockito.when(sqsClient.receiveMessage(receiveMessageRequest)).thenReturn(receiveResult); + + // Act + List<Message> actualMessages = IndexerQueueService.getMessages(sqsClient, invalidQueueName, numOfmessages, maxMessageCount); + } + + +} diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/HeadersInfoAwsImpl.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/HeadersInfoAwsImpl.java deleted file mode 100644 index 5fc318402c1fe90abe4329a924f915ed25c686a8..0000000000000000000000000000000000000000 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/util/HeadersInfoAwsImpl.java +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright © Microsoft Corporation -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.aws.util; - -import lombok.extern.java.Log; -import org.apache.http.HttpStatus; -import org.opengroup.osdu.core.api.DpsHeaders; -import org.opengroup.osdu.is.core.model.SlbHeaders; -import org.opengroup.osdu.is.core.provider.interfaces.util.IHeadersInfo; -import org.opengroup.osdu.is.core.util.AppException; -import org.opengroup.osdu.is.core.util.Preconditions; -import org.springframework.context.annotation.Primary; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; -import org.springframework.web.context.annotation.RequestScope; - -import java.util.HashSet; -import java.util.Map; -import java.util.stream.Collectors; -import javax.inject.Inject; - -@Primary -@Log -@Component -public class HeadersInfoAwsImpl implements IHeadersInfo { - - @Inject - private DpsHeaders headersMap; - - - private static final HashSet<String> FORBIDDEN_FROM_LOGGING = new HashSet<>(); - static { - FORBIDDEN_FROM_LOGGING.add(DpsHeaders.AUTHORIZATION); - FORBIDDEN_FROM_LOGGING.add(DpsHeaders.ON_BEHALF_OF); - } - - /** - * Get list of current headers - * @return DpsHeaders headers - */ - @Override - public DpsHeaders getHeaders() { - if (headersMap == null) { - log.warning("Headers Map DpsHeaders is null"); - // throw to prevent null reference exception below - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Invalid Headers", "Headers Map DpsHeaders is null"); - } - DpsHeaders headers = this.getCoreServiceHeaders(headersMap.getHeaders()); - return headers; - } - - /** - * get current logged in user - * @return userEmail - */ - @Override - public String getUser() { - return getHeaders().getUserEmail(); - } - - /** - * get partition id and fallback to account id - * @return Partition ID - */ - @Override - public String getPartitionId() { - return getHeaders().getPartitionIdWithFallbackToAccountId(); - } - - /** - * get the primary partition id - * @return primaryPartitionID - */ - @Override - public String getPrimaryPartitionId() { - return getHeadersMap().get(SlbHeaders.PRIMARY_PARTITION_ID); - } - - /** - * get map of the current headers - * @return Map<String, String> headers - */ - @Override - public Map<String, String> getHeadersMap() { - return getHeaders().getHeaders(); - } - - /** - * supplement the DPSHeaders with any specific core service headers - * @param input Map<String, String> of core headers - * @return DpsHeaders dpsHeaders - */ - @Override - public DpsHeaders getCoreServiceHeaders(Map<String, String> input) { - Preconditions.checkNotNull(input, "input headers cannot be null"); - - DpsHeaders output = DpsHeaders.createFromMap(input); - - return output; - } - - /** - * create string representing a comma delimited list of the current headers - * @return - */ - @Override - public String toString() { - return this.getHeadersMap().entrySet().stream().filter(map -> !FORBIDDEN_FROM_LOGGING.contains(map.getKey().toLowerCase())).map(Map.Entry::toString).collect(Collectors.joining(" | ")); - } - -} \ No newline at end of file