Skip to content
Snippets Groups Projects
Commit 4b12723c authored by Derek Hudson's avatar Derek Hudson
Browse files

Refactored out to make use of PublishRequestBuilder changes and added the OSDU topic.

parent bcacf25f
No related branches found
No related tags found
1 merge request!631merge code to gitlab
......@@ -72,7 +72,7 @@
<dependency>
<groupId>org.opengroup.osdu.core.aws</groupId>
<artifactId>os-core-lib-aws</artifactId>
<version>0.21.0</version>
<version>0.23.0-dhudsons-notification-refactor-SNAPSHOT</version>
</dependency>
<!-- AWS managed packages -->
......
......@@ -14,7 +14,6 @@
package org.opengroup.osdu.indexer.aws.publish;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.AmazonSNS;
......@@ -22,14 +21,13 @@ import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.aws.sns.AmazonSNSConfig;
import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder;
import org.opengroup.osdu.core.common.model.indexer.RecordStatus;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
@Component
public class PublisherImpl implements IPublisher {
......@@ -40,6 +38,9 @@ public class PublisherImpl implements IPublisher {
@Value("${aws.region}")
private String amazonSNSRegion;
@Value("${OSDU_TOPIC}")
private String osduIndexerTopic;
@Inject
public void init() throws K8sParameterNotFoundException {
AmazonSNSConfig snsConfig = new AmazonSNSConfig(amazonSNSRegion);
......@@ -50,20 +51,10 @@ public class PublisherImpl implements IPublisher {
public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) throws Exception
{
// Attributes
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
headers.addCorrelationIdIfMissing();
messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>();
publishRequestBuilder.setGeneralParametersFromHeaders(headers);
PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", indexerBatchStatus.getStatusesList(), messageAttributes, amazonSNSTopic);
PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(osduIndexerTopic, amazonSNSTopic, indexerBatchStatus.getStatusesList());
snsClient.publish(publishRequest);
}
......
......@@ -35,6 +35,9 @@ STORAGE_QUERY_KINDS_HOST=${STORAGE_BASE_URL}/api/storage/v2/query/kinds
STORAGE_QUERY_RECORD_FOR_CONVERSION_HOST=${STORAGE_BASE_URL}/api/storage/v2/query/records:batch
STORAGE_RECORDS_BATCH_SIZE=20
INDEXER_QUEUE_HOST=""
OSDU_TOPIC=${OSDU_INDEXER_TOPIC:indexer-status-changed}
## AWS ElastiCache configuration
aws.elasticache.cluster.endpoint=${CACHE_CLUSTER_ENDPOINT:null}
aws.elasticache.cluster.port=${CACHE_CLUSTER_PORT:null}
......
......@@ -28,19 +28,21 @@ import org.mockito.Mockito;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.aws.sns.AmazonSNSConfig;
import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder;
import org.opengroup.osdu.core.common.model.indexer.RecordStatus;
import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider;
import org.opengroup.osdu.indexer.aws.IndexerAwsApplication;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.springframework.boot.test.context.SpringBootTest;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
import org.springframework.test.util.ReflectionTestUtils;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest(classes = {IndexerAwsApplication.class})
public class PublisherImplTest {
......@@ -65,19 +67,10 @@ public class PublisherImplTest {
Mockito.when(snsClient.publish(Mockito.any(PublishRequest.class)))
.thenReturn(Mockito.any(PublishResult.class));
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
headers.addCorrelationIdIfMissing();
messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", jobStatus.getStatusesList(), messageAttributes,null);
PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>();
publishRequestBuilder.setGeneralParametersFromHeaders(headers);
PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(null, null, jobStatus.getStatusesList());
// Act
publisher.publishStatusChangedTagsToTopic(headers, jobStatus);
......@@ -98,26 +91,15 @@ public class PublisherImplTest {
})) {
publisher.init();
JobStatus jobStatus = new JobStatus();
// Arrange
DpsHeaders headers = new DpsHeaders();
JobStatus jobStatus = new JobStatus();
Mockito.when(snsClient.publish(Mockito.any(PublishRequest.class)))
.thenReturn(Mockito.any(PublishResult.class));
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getPartitionIdWithFallbackToAccountId()));
headers.addCorrelationIdIfMissing();
messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue()
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", jobStatus.getStatusesList(), messageAttributes, indexer_sns_topic_arn);
PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>();
publishRequestBuilder.setGeneralParametersFromHeaders(headers);
PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(null, indexer_sns_topic_arn, jobStatus.getStatusesList());
// Act
publisher.publishStatusChangedTagsToTopic(headers, jobStatus);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment