diff --git a/provider/indexer-aws/pom.xml b/provider/indexer-aws/pom.xml index 982e215671c5cef610c03505f69f78f850b88e9e..e57eeeb898a96e514434f4af65cd0b3ee9ea7df0 100644 --- a/provider/indexer-aws/pom.xml +++ b/provider/indexer-aws/pom.xml @@ -72,7 +72,7 @@ <dependency> <groupId>org.opengroup.osdu.core.aws</groupId> <artifactId>os-core-lib-aws</artifactId> - <version>0.21.0</version> + <version>0.23.0-dhudsons-notification-refactor-SNAPSHOT</version> </dependency> <!-- AWS managed packages --> diff --git a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/publish/PublisherImpl.java b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/publish/PublisherImpl.java index fb81abbbbb8100094869e1d442b91d3bb6533ae5..c19b22b1718fc4eb262d748a084338ec6a20e58c 100644 --- a/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/publish/PublisherImpl.java +++ b/provider/indexer-aws/src/main/java/org/opengroup/osdu/indexer/aws/publish/PublisherImpl.java @@ -14,7 +14,6 @@ package org.opengroup.osdu.indexer.aws.publish; -import com.amazonaws.services.sns.model.MessageAttributeValue; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.AmazonSNS; @@ -22,14 +21,13 @@ import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.aws.sns.AmazonSNSConfig; import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; +import org.opengroup.osdu.core.common.model.indexer.RecordStatus; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.core.common.model.indexer.JobStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; import javax.inject.Inject; -import java.util.HashMap; -import java.util.Map; @Component public class PublisherImpl implements IPublisher { @@ -40,6 +38,9 @@ public class PublisherImpl implements IPublisher { @Value("${aws.region}") private String amazonSNSRegion; + @Value("${OSDU_TOPIC}") + private String osduIndexerTopic; + @Inject public void init() throws K8sParameterNotFoundException { AmazonSNSConfig snsConfig = new AmazonSNSConfig(amazonSNSRegion); @@ -50,20 +51,10 @@ public class PublisherImpl implements IPublisher { public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) throws Exception { - // Attributes - Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); - messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - headers.addCorrelationIdIfMissing(); - messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getCorrelationId())); + PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>(); + publishRequestBuilder.setGeneralParametersFromHeaders(headers); - PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", indexerBatchStatus.getStatusesList(), messageAttributes, amazonSNSTopic); + PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(osduIndexerTopic, amazonSNSTopic, indexerBatchStatus.getStatusesList()); snsClient.publish(publishRequest); } diff --git a/provider/indexer-aws/src/main/resources/application.properties b/provider/indexer-aws/src/main/resources/application.properties index a0d1a419f6a6aa71128963495390ae748e7e9649..677c2536cde4d22eabf9794270c092ede9f7697d 100644 --- a/provider/indexer-aws/src/main/resources/application.properties +++ b/provider/indexer-aws/src/main/resources/application.properties @@ -35,6 +35,9 @@ STORAGE_QUERY_KINDS_HOST=${STORAGE_BASE_URL}/api/storage/v2/query/kinds STORAGE_QUERY_RECORD_FOR_CONVERSION_HOST=${STORAGE_BASE_URL}/api/storage/v2/query/records:batch STORAGE_RECORDS_BATCH_SIZE=20 INDEXER_QUEUE_HOST="" + +OSDU_TOPIC=${OSDU_INDEXER_TOPIC:indexer-status-changed} + ## AWS ElastiCache configuration aws.elasticache.cluster.endpoint=${CACHE_CLUSTER_ENDPOINT:null} aws.elasticache.cluster.port=${CACHE_CLUSTER_PORT:null} diff --git a/provider/indexer-aws/src/test/java/org/opengroup/osdu/indexer/aws/publish/PublisherImplTest.java b/provider/indexer-aws/src/test/java/org/opengroup/osdu/indexer/aws/publish/PublisherImplTest.java index 65d37eeda5d1c3c639a2fc0d7cf6a6c29827ed5e..d74ee6ce52eb3ff755d77e9e401222970f3ecaff 100644 --- a/provider/indexer-aws/src/test/java/org/opengroup/osdu/indexer/aws/publish/PublisherImplTest.java +++ b/provider/indexer-aws/src/test/java/org/opengroup/osdu/indexer/aws/publish/PublisherImplTest.java @@ -28,19 +28,21 @@ import org.mockito.Mockito; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.aws.sns.AmazonSNSConfig; import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; +import org.opengroup.osdu.core.common.model.indexer.RecordStatus; import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; import org.opengroup.osdu.indexer.aws.IndexerAwsApplication; import org.opengroup.osdu.core.common.model.indexer.JobStatus; import org.springframework.boot.test.context.SpringBootTest; import org.mockito.runners.MockitoJUnitRunner; - -import java.util.HashMap; -import java.util.Map; +import org.springframework.test.util.ReflectionTestUtils; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashMap; +import java.util.Map; + @RunWith(MockitoJUnitRunner.class) @SpringBootTest(classes = {IndexerAwsApplication.class}) public class PublisherImplTest { @@ -65,19 +67,10 @@ public class PublisherImplTest { Mockito.when(snsClient.publish(Mockito.any(PublishRequest.class))) .thenReturn(Mockito.any(PublishResult.class)); - Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); - messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - headers.addCorrelationIdIfMissing(); - messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getCorrelationId())); - - PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", jobStatus.getStatusesList(), messageAttributes,null); + PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>(); + publishRequestBuilder.setGeneralParametersFromHeaders(headers); + + PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(null, null, jobStatus.getStatusesList()); // Act publisher.publishStatusChangedTagsToTopic(headers, jobStatus); @@ -98,26 +91,15 @@ public class PublisherImplTest { })) { publisher.init(); + JobStatus jobStatus = new JobStatus(); // Arrange DpsHeaders headers = new DpsHeaders(); - JobStatus jobStatus = new JobStatus(); - Mockito.when(snsClient.publish(Mockito.any(PublishRequest.class))) - .thenReturn(Mockito.any(PublishResult.class)); - - Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); - messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - messageAttributes.put(DpsHeaders.DATA_PARTITION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getPartitionIdWithFallbackToAccountId())); - headers.addCorrelationIdIfMissing(); - messageAttributes.put(DpsHeaders.CORRELATION_ID, new MessageAttributeValue() - .withDataType("String") - .withStringValue(headers.getCorrelationId())); - - PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", jobStatus.getStatusesList(), messageAttributes, indexer_sns_topic_arn); + + PublishRequestBuilder<RecordStatus> publishRequestBuilder = new PublishRequestBuilder<>(); + publishRequestBuilder.setGeneralParametersFromHeaders(headers); + + PublishRequest publishRequest = publishRequestBuilder.generatePublishRequest(null, indexer_sns_topic_arn, jobStatus.getStatusesList()); // Act publisher.publishStatusChangedTagsToTopic(headers, jobStatus);