diff --git a/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImpl.java b/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImpl.java index 8af25afa02154ce3d5dba86731dad447c556d005..8a0116e9be6d7964f8d0bbc18c86b5ae1052ed8e 100644 --- a/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImpl.java +++ b/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImpl.java @@ -1,40 +1,113 @@ -/* Copyright © Amazon Web Services - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ package org.opengroup.osdu.schema.provider.aws.impl.messagebus; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.PostConstruct; +import org.opengroup.osdu.core.aws.sns.AmazonSNSConfig; +import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; +import org.opengroup.osdu.core.aws.ssm.K8sParameterNotFoundException; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.schema.constants.SchemaConstants; +import org.opengroup.osdu.schema.provider.aws.impl.messagebus.model.SchemaPubSubMessage; import org.opengroup.osdu.schema.provider.interfaces.messagebus.IMessageBus; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.model.PublishRequest; @Component -public class MessageBusImpl implements IMessageBus{ - - @Autowired - private JaxRsDpsLog logger; - - @Override - public void publishMessage(String schemaId, String eventType) { - // Auto-generated method stub - logger.warning("publish message not implemented yet"); - } - - @Override - public void publishMessageForSystemSchema(String schemaId, String eventType) { - // Auto-generated method stub - logger.warning("publish message not implemented yet"); - } +public class MessageBusImpl implements IMessageBus { + + private String amazonSNSTopic; + private final String schemaSNSTopic = "schema-sns-topic-arn"; + + private AmazonSNS snsClient; + + @Value("${OSDU_TOPIC}") + private String osduSchemaTopic; + + @Value("${AWS.REGION}") + private String amazonSNSRegion; + + @Autowired + private ITenantFactory tenantFactory; + + @Autowired + private DpsHeaders headers; + + @Autowired + private JaxRsDpsLog logger; + + @PostConstruct + public void init() throws K8sParameterNotFoundException { + K8sLocalParameterProvider k8sLocalParameterProvider = new K8sLocalParameterProvider(); + snsClient = new AmazonSNSConfig(amazonSNSRegion).AmazonSNS(); + + amazonSNSTopic = k8sLocalParameterProvider.getParameterAsString(schemaSNSTopic); + } + + @Override + public void publishMessage(String schemaId, String eventType) { + try { + publishSchemaEvent(schemaId, eventType, headers); + } catch (Exception ex) { + logger.warning(SchemaConstants.SCHEMA_NOTIFICATION_FAILED, ex); + } + } + + @Override + public void publishMessageForSystemSchema(String schemaId, String eventType) { + try { + // Publish the event for all the tenants. + List<String> privateTenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getName).collect(Collectors.toList()); + + for (String tenant : privateTenantList) { + HashMap<String, String> headersMap = new HashMap<>(); + headersMap.put(DpsHeaders.ACCOUNT_ID, tenant); + headersMap.put(DpsHeaders.DATA_PARTITION_ID, tenant); + headersMap.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + DpsHeaders headers = DpsHeaders.createFromMap(headersMap); + publishSchemaEvent(schemaId, eventType, headers); + } + + } catch (Exception ex) { + logger.warning(SchemaConstants.SYSTEM_SCHEMA_NOTIFICATION_FAILED, ex); + } + } + + private void publishSchemaEvent(String schemaId, String eventType, DpsHeaders headers) { + PublishRequestBuilder<SchemaPubSubMessage> publishRequestBuilder = new PublishRequestBuilder<>(); + + if (headers != null) { + publishRequestBuilder.setGeneralParametersFromHeaders(headers); + } + + PublishRequest publishRequest = + publishRequestBuilder.generatePublishRequest(osduSchemaTopic, amazonSNSTopic, new SchemaPubSubMessage(schemaId, eventType)); + + snsClient.publish(publishRequest); + logger.info(String.format("Message is published to OSDU topic: %s SNS topic: %s to SNS region: %s", osduSchemaTopic, amazonSNSTopic, + amazonSNSRegion)); + } } diff --git a/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/model/SchemaPubSubMessage.java b/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/model/SchemaPubSubMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..c19153cd58edb5545ba0e169ff18fa013f296884 --- /dev/null +++ b/provider/schema-aws/src/main/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/model/SchemaPubSubMessage.java @@ -0,0 +1,27 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.opengroup.osdu.schema.provider.aws.impl.messagebus.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class SchemaPubSubMessage { + private String kind; + private String op; +} diff --git a/provider/schema-aws/src/main/resources/application.properties b/provider/schema-aws/src/main/resources/application.properties index 05b5bf4026231af430850aa45bd5cf36b1c99ed2..e9aed90d330b574204ec4c20742ee1ba168e24db 100644 --- a/provider/schema-aws/src/main/resources/application.properties +++ b/provider/schema-aws/src/main/resources/application.properties @@ -54,10 +54,13 @@ server.ssl.key-alias=${SSL_KEY_ALIAS:osduonaws} server.ssl.key-password=${SSL_KEY_PASSWORD:} server.ssl.key-store-password=${SSL_KEY_STORE_PASSWORD:} + spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.SecurityAutoConfiguration # values mongodb/dynamodb for choose any condition from implementation repository.implementation=${SCHEMA_SERVICE_REPOSITORY_IMPLEMENTATION:dynamodb} #Tomcat limits -server.tomcat.threads.max=${TOMCAT_THREADS_MAX:300} \ No newline at end of file +server.tomcat.threads.max=${TOMCAT_THREADS_MAX:300} + +OSDU_TOPIC=${OSDU_SCHEMA_TOPIC:schema-status-changed} \ No newline at end of file diff --git a/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImplTest.java b/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImplTest.java index a0d80c53252353775fef41c7d3dbc614140d95b4..e2b0246754714a6719fd1711a4e1690ccb6eac59 100644 --- a/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImplTest.java +++ b/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/impl/messagebus/MessageBusImplTest.java @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -14,41 +14,64 @@ package org.opengroup.osdu.schema.provider.aws.impl.messagebus; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.aws.sns.PublishRequestBuilder; +import org.opengroup.osdu.core.aws.ssm.K8sLocalParameterProvider; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.model.PublishRequest; @RunWith(MockitoJUnitRunner.class) public class MessageBusImplTest { - @InjectMocks - private MessageBusImpl messageBusImpl; + @InjectMocks + private MessageBusImpl messageBusImpl; + + @Mock + private JaxRsDpsLog logger; + + @Mock + private K8sLocalParameterProvider k8sLocalParameterProvider; + + @Mock + private AmazonSNS snsClient; + + @Test + public void publishMessagePublishesMessages() { + + try (MockedConstruction<PublishRequestBuilder> k8sParameterProvider = + Mockito.mockConstruction(PublishRequestBuilder.class, (mock, context) -> { + when(mock.generatePublishRequest(anyString(), anyString(), any())).thenReturn(new PublishRequest()); + })) { + - @Mock - private JaxRsDpsLog logger; + messageBusImpl.publishMessage("schemaId", "eventType_create"); + verify(snsClient, times(1)).publish(any()); + } + } - @Test - public void publishMessage_Success() { - String schemaId = "schemaId"; - String eventType = "eventType"; + @Test + public void publishMessageForSystemSchemaPublishesMessages() { - assertDoesNotThrow(() -> { - messageBusImpl.publishMessage(schemaId, eventType); - }); - } + try (MockedConstruction<PublishRequestBuilder> k8sParameterProvider = + Mockito.mockConstruction(PublishRequestBuilder.class, (mock, context) -> { + when(mock.generatePublishRequest(anyString(), anyString(), any())).thenReturn(new PublishRequest()); + })) { - @Test - public void publishMessageForSystemSchema_Success() { - String schemaId = "schemaId"; - String eventType = "eventType"; - assertDoesNotThrow(() -> { - messageBusImpl.publishMessageForSystemSchema(schemaId, eventType); - }); - } + messageBusImpl.publishMessage("schemaId", "eventType_create"); + verify(snsClient, times(1)).publish(any()); + } + } } diff --git a/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/mongo/config/SchemaTestConfig.java b/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/mongo/config/SchemaTestConfig.java index 0b663e84e2d877375291d5214c21f22ea89e810b..21a453a019acbe182de8d27012b03835de2d6684 100644 --- a/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/mongo/config/SchemaTestConfig.java +++ b/provider/schema-aws/src/test/java/org/opengroup/osdu/schema/provider/aws/mongo/config/SchemaTestConfig.java @@ -18,8 +18,10 @@ import org.mockito.Mockito; import org.opengroup.osdu.core.aws.mongodb.MongoDBSimpleFactory; import org.opengroup.osdu.core.aws.mongodb.MultiClusteredConfigReader; import org.opengroup.osdu.core.aws.mongodb.config.MongoProperties; +import org.opengroup.osdu.schema.provider.aws.impl.messagebus.MessageBusImpl; import org.opengroup.osdu.schema.provider.aws.impl.schemainfostore.mongo.config.MultiClusteredConfigReaderSchema; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; import org.springframework.data.mongodb.core.MongoTemplate; @@ -45,4 +47,7 @@ public class SchemaTestConfig { return dbSimpleFactory.mongoTemplate(properties); } + + @MockBean + public MessageBusImpl messageBus; } \ No newline at end of file