diff --git a/NOTICE b/NOTICE index 638949a065828b4166b053cf2e12538d06ec553f..2d4af556bf276516d0b1e50763c0567b2c9fc727 100644 --- a/NOTICE +++ b/NOTICE @@ -74,26 +74,25 @@ The following software have components provided under the terms of this license: - Byte Buddy (without dependencies) (from https://repo1.maven.org/maven2/net/bytebuddy/byte-buddy) - Byte Buddy agent (from https://repo1.maven.org/maven2/net/bytebuddy/byte-buddy-agent) - ClassMate (from http://github.com/cowtowncoder/java-classmate) -- Cloud Storage JSON API v1-rev20210127-1.32.1 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) +- Cloud Storage JSON API v1-rev20200927-1.30.10 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) - Converter: Jackson (from https://repo1.maven.org/maven2/com/squareup/retrofit2/converter-jackson) - Core functionality for the Reactor Netty library (from https://github.com/reactor/reactor-netty) - Elastic JNA Distribution (from https://github.com/java-native-access/jna) -- Elasticsearch: Core (from https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch) -- Expression Language 3.0 (from http://uel.java.net) - Expression Language 3.0 (from https://projects.eclipse.org/projects/ee4j.el) +- Expression Language 3.0 (from http://uel.java.net) - FindBugs-jsr305 (from http://findbugs.sourceforge.net/) - GSON extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-gson) - Google APIs Client Library for Java (from https://repo1.maven.org/maven2/com/google/api-client/google-api-client) - Google App Engine extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-appengine) -- Google Cloud Core (from https://github.com/googleapis/java-core) -- Google Cloud Core HTTP (from https://github.com/googleapis/java-core) +- Google Cloud Core (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-core) +- Google Cloud Core HTTP (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-core-http) - Google Cloud Core gRPC (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-core-grpc) -- Google Cloud Datastore (from https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-datastore) +- Google Cloud Datastore (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-datastore) - Google Cloud IAM Service Account Credentials (from https://github.com/googleapis/java-iamcredentials) - Google Cloud Key Management Service (KMS) API v1-rev9-1.22.0 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-cloudkms) -- Google Cloud Logging (from https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-logging) -- Google Cloud Pub/Sub (from https://github.com/googleapis/java-pubsub) -- Google Cloud Storage (from https://github.com/googleapis/java-storage) +- Google Cloud Logging (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-logging) +- Google Cloud Pub/Sub (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-pubsub) +- Google Cloud Storage (from https://github.com/googleapis/google-cloud-java/tree/master/google-cloud-clients/google-cloud-storage) - Google HTTP Client Library for Java (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client) - Google HTTP Client Library for Java (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client) - Google OAuth Client Library for Java (from https://repo1.maven.org/maven2/com/google/oauth-client/google-oauth-client) @@ -125,21 +124,22 @@ The following software have components provided under the terms of this license: - JSON Web Token support for the JVM (from https://github.com/jwtk/jjwt) - JSON library from Android SDK (from http://developer.android.com/sdk) - JSONassert (from https://github.com/skyscreamer/JSONassert) +- JSR107 API and SPI (from https://github.com/jsr107/jsr107spec) - Jackson (from http://jackson.codehaus.org) - Jackson 2 extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-jackson2) - Jackson dataformat: CBOR (from http://github.com/FasterXML/jackson-dataformats-binary) +- Jackson dataformat: Smile (from http://github.com/FasterXML/jackson-dataformats-binary) - Jackson datatype: JSR310 (from https://repo1.maven.org/maven2/com/fasterxml/jackson/datatype/jackson-datatype-jsr310) - Jackson datatype: jdk8 (from https://repo1.maven.org/maven2/com/fasterxml/jackson/datatype/jackson-datatype-jdk8) - Jackson extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-jackson) +- Jackson module: JAXB Annotations (from https://github.com/FasterXML/jackson-modules-base) - Jackson-annotations (from http://github.com/FasterXML/jackson) - Jackson-core (from http://wiki.fasterxml.com/JacksonHome) - Jackson-core (from http://wiki.fasterxml.com/JacksonHome) -- Jackson-dataformat-Smile (from http://github.com/FasterXML/jackson-dataformat-smile) - Jackson-dataformat-XML (from http://wiki.fasterxml.com/JacksonExtensionXmlDataBinding) -- Jackson-dataformat-YAML (from https://github.com/FasterXML/jackson) +- Jackson-dataformat-YAML (from https://github.com/FasterXML/jackson-dataformats-text) - Jackson-datatype-Joda (from http://wiki.fasterxml.com/JacksonModuleJoda) - Jackson-module-Afterburner (from http://wiki.fasterxml.com/JacksonHome) -- Jackson-module-JAXB-annotations (from http://wiki.fasterxml.com/JacksonJAXBAnnotations) - Jackson-module-parameter-names (from https://repo1.maven.org/maven2/com/fasterxml/jackson/module/jackson-module-parameter-names) - Java Native Access (from https://github.com/java-native-access/jna) - Java Native Access Platform (from https://github.com/java-native-access/jna) @@ -157,8 +157,8 @@ The following software have components provided under the terms of this license: - KeePassJava2 :: KDB (from https://repo1.maven.org/maven2/org/linguafranca/pwdb/KeePassJava2-kdb) - KeePassJava2 :: KDBX (from https://repo1.maven.org/maven2/org/linguafranca/pwdb/KeePassJava2-kdbx) - KeePassJava2 :: Simple (from https://repo1.maven.org/maven2/org/linguafranca/pwdb/KeePassJava2-simple) -- Logback Contrib :: JSON :: Classic (from ) -- Logback Contrib :: JSON :: Core (from ) +- Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic) +- Logback Contrib :: JSON :: Core (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-core) - Logback Contrib :: Jackson (from ) - Lucene Common Analyzers (from https://repo1.maven.org/maven2/org/apache/lucene/lucene-analyzers-common) - Lucene Core (from https://repo1.maven.org/maven2/org/apache/lucene/lucene-core) @@ -236,7 +236,7 @@ The following software have components provided under the terms of this license: - Protocol Buffer extensions to the Google HTTP Client Library for Java. (from https://repo1.maven.org/maven2/com/google/http-client/google-http-client-protobuf) - Proton-J (from https://repo1.maven.org/maven2/org/apache/qpid/proton-j) - QpidJMS Client (from ) -- Reactive Streams Netty driver (from https://github.com/reactor/reactor-netty) +- Reactor Netty with all modules (from https://github.com/reactor/reactor-netty) - Retrofit (from https://github.com/square/retrofit) - SnakeYAML (from http://www.snakeyaml.org) - Spring AOP (from https://github.com/spring-projects/spring-framework) @@ -246,7 +246,6 @@ The following software have components provided under the terms of this license: - Spring Boot Actuator AutoConfigure (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-actuator-autoconfigure) - Spring Boot Actuator Starter (from http://projects.spring.io/spring-boot/) - Spring Boot AutoConfigure (from http://projects.spring.io/spring-boot/) -- Spring Boot Json Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-json) - Spring Boot Logging Starter (from http://projects.spring.io/spring-boot/) - Spring Boot Reactor Netty Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-reactor-netty) - Spring Boot Security Starter (from http://projects.spring.io/spring-boot/) @@ -254,11 +253,8 @@ The following software have components provided under the terms of this license: - Spring Boot Test (from http://projects.spring.io/spring-boot/) - Spring Boot Test Auto-Configure (from http://projects.spring.io/spring-boot/) - Spring Boot Test Starter (from http://projects.spring.io/spring-boot/) -- Spring Boot Tomcat Starter (from http://projects.spring.io/spring-boot/) - Spring Boot Validation Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-validation) - Spring Boot Validation Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-validation) -- Spring Boot Web Starter (from http://projects.spring.io/spring-boot/) -- Spring Boot Web Starter (from http://projects.spring.io/spring-boot/) - Spring Boot WebFlux Starter (from https://projects.spring.io/spring-boot/#/spring-boot-parent/spring-boot-starters/spring-boot-starter-webflux) - Spring Commons Logging Bridge (from https://github.com/spring-projects/spring-framework) - Spring Context (from https://github.com/spring-projects/spring-framework) @@ -272,7 +268,7 @@ The following software have components provided under the terms of this license: - Spring TestContext Framework (from https://github.com/SpringSource/spring-framework) - Spring Transaction (from https://github.com/spring-projects/spring-framework) - Spring Web (from https://github.com/spring-projects/spring-framework) -- Spring Web MVC (from https://github.com/SpringSource/spring-framework) +- Spring Web MVC (from https://github.com/spring-projects/spring-framework) - Spring WebFlux (from https://github.com/spring-projects/spring-framework) - T-Digest (from https://github.com/tdunning/t-digest) - Undertow Core (from https://repo1.maven.org/maven2/io/undertow/undertow-core) @@ -281,6 +277,8 @@ The following software have components provided under the terms of this license: - Undertow Servlet (from https://repo1.maven.org/maven2/io/undertow/undertow-servlet) - Undertow WebSockets JSR356 implementations (from https://repo1.maven.org/maven2/io/undertow/undertow-websockets-jsr) - Undertow WebSockets JSR356 implementations (from https://repo1.maven.org/maven2/io/undertow/undertow-websockets-jsr) +- Vavr (from http://vavr.io) +- Vavr Match (from http://vavr.io) - WildFly Client Configuration (from ) - Woodstox (from https://github.com/FasterXML/woodstox) - XNIO API (from http://www.jboss.org/xnio) @@ -293,14 +291,18 @@ The following software have components provided under the terms of this license: - aalto-xml (from ) - aggs-matrix-stats (from https://github.com/elastic/elasticsearch) - aws-ssm-java-caching-client (from https://github.com/awslabs/aws-ssm-java-caching-client) -- cli (from https://github.com/elastic/elasticsearch) -- com.google.api.grpc:grpc-google-cloud-pubsub-v1 (from https://github.com/googleapis/googleapis) -- com.google.api.grpc:proto-google-cloud-logging-v2 (from https://github.com/googleapis/googleapis) +- com.google.api.grpc:proto-google-cloud-datastore-v1 (from https://github.com/googleapis/googleapis) +- com.google.api.grpc:proto-google-common-protos (from https://github.com/googleapis/googleapis) +- com.google.api.grpc:proto-google-common-protos (from https://github.com/googleapis/googleapis) - compiler (from http://github.com/spullara/mustache.java) -- datastore-v1-proto-client (from ) +- datastore-v1-proto-client (from https://repo1.maven.org/maven2/com/google/cloud/datastore/datastore-v1-proto-client) +- elasticsearch-cli (from https://github.com/elastic/elasticsearch) - elasticsearch-core (from https://github.com/elastic/elasticsearch) +- elasticsearch-secure-sm (from https://github.com/elastic/elasticsearch) +- elasticsearch-x-content (from https://github.com/elastic/elasticsearch) - error-prone annotations (from https://repo1.maven.org/maven2/com/google/errorprone/error_prone_annotations) - error-prone annotations (from https://repo1.maven.org/maven2/com/google/errorprone/error_prone_annotations) +- grpc-google-cloud-pubsub-v1 (from https://repo1.maven.org/maven2/com/google/api/grpc/grpc-google-cloud-pubsub-v1) - io.grpc:grpc-alts (from https://github.com/grpc/grpc-java) - io.grpc:grpc-api (from https://github.com/grpc/grpc-java) - io.grpc:grpc-auth (from https://github.com/grpc/grpc-java) @@ -340,25 +342,36 @@ The following software have components provided under the terms of this license: - org.apiguardian:apiguardian-api (from https://github.com/apiguardian-team/apiguardian) - org.conscrypt:conscrypt-openjdk-uber (from https://conscrypt.org/) - org.opentest4j:opentest4j (from https://github.com/ota4j-team/opentest4j) -- org.xmlunit:xmlunit-core (from http://www.xmlunit.org/) +- org.xmlunit:xmlunit-core (from https://www.xmlunit.org/) - parent-join (from https://github.com/elastic/elasticsearch) - perfmark:perfmark-api (from https://github.com/perfmark/perfmark) - powermock-module-junit4-common (from https://repo1.maven.org/maven2/org/powermock/powermock-module-junit4-common) -- proto-google-cloud-datastore-v1 (from https://github.com/googleapis/api-client-staging) - proto-google-cloud-iamcredentials-v1 (from https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-iamcredentials-v1) -- proto-google-cloud-pubsub-v1 (from https://github.com/googleapis/java-pubsub/proto-google-cloud-pubsub-v1) -- proto-google-common-protos (from https://github.com/googleapis/java-iam/proto-google-common-protos) -- proto-google-common-protos (from https://github.com/googleapis/java-iam/proto-google-common-protos) +- proto-google-cloud-logging-v2 (from https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-logging-v2) +- proto-google-cloud-pubsub-v1 (from https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-pubsub-v1) - proto-google-iam-v1 (from https://github.com/googleapis/java-iam/proto-google-iam-v1) - rank-eval (from https://github.com/elastic/elasticsearch) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) +- resilience4j (from https://github.com/resilience4j/resilience4j) - rest (from https://github.com/elastic/elasticsearch) - rest-high-level (from https://github.com/elastic/elasticsearch) - rxjava (from https://github.com/ReactiveX/RxJava) -- secure-sm (from https://github.com/elastic/elasticsearch) +- server (from https://github.com/elastic/elasticsearch) - spring-boot-dependencies (from https://spring.io/projects/spring-boot) +- spring-boot-starter-aop (from https://spring.io/projects/spring-boot) +- spring-boot-starter-json (from https://spring.io/projects/spring-boot) - spring-boot-starter-log4j2 (from https://spring.io/projects/spring-boot) +- spring-boot-starter-tomcat (from https://spring.io/projects/spring-boot) - spring-boot-starter-undertow (from https://spring.io/projects/spring-boot) - spring-boot-starter-undertow (from https://spring.io/projects/spring-boot) +- spring-boot-starter-web (from https://spring.io/projects/spring-boot) +- spring-boot-starter-web (from https://spring.io/projects/spring-boot) - spring-security-config (from http://spring.io/spring-security) - spring-security-config (from http://spring.io/spring-security) - spring-security-core (from http://spring.io/spring-security) @@ -381,16 +394,15 @@ The following software have components provided under the terms of this license: - tomcat-embed-el (from https://tomcat.apache.org/) - tomcat-embed-websocket (from https://tomcat.apache.org/) - wildfly-common (from ) -- x-content (from https://github.com/elastic/elasticsearch) ======================================================================== BSD-2-Clause ======================================================================== The following software have components provided under the terms of this license: -- API Common (from https://github.com/googleapis/api-common-java) -- GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) -- GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) +- API Common (from https://github.com/googleapis) +- GAX (Google Api eXtensions) (from https://github.com/googleapis) +- GAX (Google Api eXtensions) (from https://github.com/googleapis) - GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) - Lucene Common Analyzers (from https://repo1.maven.org/maven2/org/apache/lucene/lucene-analyzers-common) - Lucene Core (from https://repo1.maven.org/maven2/org/apache/lucene/lucene-core) @@ -403,11 +415,11 @@ BSD-3-Clause ======================================================================== The following software have components provided under the terms of this license: -- API Common (from https://github.com/googleapis/api-common-java) +- API Common (from https://github.com/googleapis) - ASM Core (from https://repo1.maven.org/maven2/org/ow2/asm/asm) - Apache Commons Codec (from https://commons.apache.org/proper/commons-codec/) -- GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) -- GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) +- GAX (Google Api eXtensions) (from https://github.com/googleapis) +- GAX (Google Api eXtensions) (from https://github.com/googleapis) - GAX (Google Api eXtensions) for Java (from https://github.com/googleapis/gax-java) - Google APIs Client Library for Java (from https://repo1.maven.org/maven2/com/google/api-client/google-api-client) - Google Auth Library for Java - Credentials (from https://repo1.maven.org/maven2/com/google/auth/google-auth-library-credentials) @@ -500,6 +512,7 @@ EPL-1.0 ======================================================================== The following software have components provided under the terms of this license: +- AspectJ Weaver (from https://www.eclipse.org/aspectj/) - Common Annotations 1.3 API (from ) - Expression Language 3.0 (from https://projects.eclipse.org/projects/ee4j.el) - JUnit Jupiter (Aggregator) (from https://junit.org/junit5/) @@ -507,11 +520,11 @@ The following software have components provided under the terms of this license: - Java Servlet 4.0 API (from ) - Java Servlet API (from https://projects.eclipse.org/projects/ee4j.servlet) - Java(TM) API for WebSocket (from https://repo1.maven.org/maven2/org/jboss/spec/javax/websocket/jboss-websocket-api_1.1_spec) -- Logback Classic Module (from http://logback.qos.ch) -- Logback Contrib :: JSON :: Classic (from ) -- Logback Contrib :: JSON :: Core (from ) +- Logback Classic Module (from https://repo1.maven.org/maven2/ch/qos/logback/logback-classic) +- Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic) +- Logback Contrib :: JSON :: Core (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-core) - Logback Contrib :: Jackson (from ) -- Logback Core Module (from http://logback.qos.ch) +- Logback Core Module (from https://repo1.maven.org/maven2/ch/qos/logback/logback-core) - Microsoft Application Insights Java SDK Spring Boot starter (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Java SDK Web Module (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Log4j 2 Appender (from https://github.com/Microsoft/ApplicationInsights-Java) @@ -626,11 +639,11 @@ The following software have components provided under the terms of this license: - Java Native Access Platform (from https://github.com/java-native-access/jna) - Javassist (from http://www.javassist.org/) - Javassist (from http://www.javassist.org/) -- Logback Classic Module (from http://logback.qos.ch) -- Logback Contrib :: JSON :: Classic (from ) -- Logback Contrib :: JSON :: Core (from ) +- Logback Classic Module (from https://repo1.maven.org/maven2/ch/qos/logback/logback-classic) +- Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic) +- Logback Contrib :: JSON :: Core (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-core) - Logback Contrib :: Jackson (from ) -- Logback Core Module (from http://logback.qos.ch) +- Logback Core Module (from https://repo1.maven.org/maven2/ch/qos/logback/logback-core) - Microsoft Application Insights Java SDK Spring Boot starter (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Java SDK Web Module (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Log4j 2 Appender (from https://github.com/Microsoft/ApplicationInsights-Java) diff --git a/devops/azure/chart/templates/deployment.yaml b/devops/azure/chart/templates/deployment.yaml index 15743798b4124847d52737f40c9e89b9fac23e8d..4651e38c7b35cae5c9035d01d44d268153597b44 100644 --- a/devops/azure/chart/templates/deployment.yaml +++ b/devops/azure/chart/templates/deployment.yaml @@ -86,4 +86,20 @@ spec: - name: partition_service_endpoint value: http://partition/api/partition/v1 - name: maxCacheSize - value: "20" \ No newline at end of file + value: "20" + - name: max_concurrent_calls + value: "3" + - name: executor_n_threads + value: "32" + - name: max_lock_renew_duration_seconds + value: "2000" + - name: initial_subscription_manager_delay_seconds + value: "0" + - name: consecutive_subscription_manager_delay_seconds + value: "1800" + - name: service_bus_enabled + value: "false" + - name: event_grid_to_service_bus_enabled + value: "false" + - name: event_grid_enabled + value: "true" \ No newline at end of file diff --git a/pom.xml b/pom.xml index af5e0210e18b18ac7b5203eeb355ee7a0b94da53..11457fcce09cd01280155bdb0e7673d0caa18cba 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <java.version>8</java.version> <maven.compiler.target>${java.version}</maven.compiler.target> <maven.compiler.source>${java.version}</maven.compiler.source> - <os-core-common.version>0.10.0</os-core-common.version> + <os-core-common.version>0.11.0-rc3</os-core-common.version> </properties> <licenses> diff --git a/provider/notification-azure/pom.xml b/provider/notification-azure/pom.xml index 48d5797b94ab63c53f65ac89186543680fad56d9..1aed122928c7a81dd02556ab5938c8f89d19eec0 100644 --- a/provider/notification-azure/pom.xml +++ b/provider/notification-azure/pom.xml @@ -39,7 +39,7 @@ <springframework.version>4.3.0.RELEASE</springframework.version> <reactor.netty.version>0.10.0.RELEASE</reactor.netty.version> <reactor.core.version>3.3.0.RELEASE</reactor.core.version> - <osdu.corelibazure.version>0.10.1</osdu.corelibazure.version> + <osdu.corelibazure.version>0.11.0-rc3</osdu.corelibazure.version> <junit.version>5.6.0</junit.version> <jjwt.version>3.8.1</jjwt.version> <mockito.version>2.23.0</mockito.version> @@ -214,7 +214,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.4.2</version> <configuration> <useSystemClassLoader>false</useSystemClassLoader> <threadCount>1</threadCount> diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/Application.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/Application.java index 79a9ffdb69a44139abcefd6ad4f0b292256ca62f..c9f2e556dc0a4d03d5b936907a21be12b716267e 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/Application.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/Application.java @@ -14,18 +14,48 @@ package org.opengroup.osdu.notification.provider.azure; +import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.ISubscriptionManager; +import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeBeanFactoryPostProcessor; +import org.opengroup.osdu.notification.provider.azure.util.AzureServiceBusConfig; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + @SpringBootApplication @ComponentScan({"org.opengroup.osdu"}) @EnableAsync public class Application { public static void main(String[] args) { - SpringApplication.run(new Class[]{Application.class}, args); + + ApplicationContext context = SpringApplication.run(new Class[]{Application.class}, args); + // Subscribe To Notification Event for Service Bus Notification Processing + AzureServiceBusConfig azureServiceBusConfig = context.getBean(AzureServiceBusConfig.class); + if (Boolean.parseBoolean(azureServiceBusConfig.getServiceBusEnabled())) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + ISubscriptionManager subscriptionManager = context.getBean(ISubscriptionManager.class); + /* + Here the initialSubscriptionManagerDelay is used to have a delay before the first execution. + Every consecutive execution will take place after a delay of consecutiveSubscriptionManagerDelay. + If any of the execution exceeds the time consecutiveSubscriptionManagerDelay then next execution + will begin immediately after the current execution is completed. + */ + executorService.scheduleAtFixedRate(subscriptionManager, Integer.parseUnsignedInt(azureServiceBusConfig.getInitialSubscriptionManagerDelay()), + Integer.parseUnsignedInt(azureServiceBusConfig.getConsecutiveSubscriptionManagerDelay()), TimeUnit.SECONDS); + } + } + + @Bean + public static BeanFactoryPostProcessor beanFactoryPostProcessor() { + return new ThreadScopeBeanFactoryPostProcessor(); } } diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..bef7ffd10d50865a05591beaa13e90394ff9c882 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/MessageHandler.java @@ -0,0 +1,52 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.ExceptionPhase; +import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.IMessageHandler; +import com.microsoft.azure.servicebus.SubscriptionClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +public class MessageHandler implements IMessageHandler { + + private final static Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class); + private final SubscriptionClient receiveClient; + private ProcessNotification processNotification; + + public MessageHandler(SubscriptionClient client, ProcessNotification processNotification) { + this.receiveClient = client; + this.processNotification = processNotification; + } + + @Override + public CompletableFuture<Void> onMessageAsync(IMessage message) { + try { + this.processNotification.performNotification(message, receiveClient.getSubscriptionName()); + return this.receiveClient.completeAsync(message.getLockToken()); + } catch (Exception e) { + LOGGER.error("Unable to process the Notification : " + e); + return this.receiveClient.abandonAsync(message.getLockToken()); + } + } + + @Override + public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) { + LOGGER.error("{} - {}", exceptionPhase, throwable.getMessage()); + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java new file mode 100644 index 0000000000000000000000000000000000000000..e33e641f59ce8c0caa222c77574f5376e124e388 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/ProcessNotification.java @@ -0,0 +1,72 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.IMessage; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadScopeContextHolder; +import org.opengroup.osdu.notification.provider.azure.models.NotificationContent; +import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.RequestBodyAdapter; +import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadDpsHeaders; +import org.opengroup.osdu.notification.provider.azure.util.MDCContextMap; +import org.opengroup.osdu.notification.service.NotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +public class ProcessNotification { + private final String NOT_ACKNOWLEDGE = "message not acknowledged by client"; + private final static Logger LOGGER = LoggerFactory.getLogger(ProcessNotification.class); + @Autowired + private NotificationHandler notificationHandler; + @Autowired + private RequestBodyAdapter requestBodyAdapter; + @Autowired + private ThreadDpsHeaders dpsHeaders; + @Autowired + private MDCContextMap mdcContextMap; + + public void performNotification(IMessage message, String subscriptionName) throws Exception { + try { + NotificationContent notificationContent = requestBodyAdapter.extractNotificationContent(message, subscriptionName); + + String dataPartitionId = notificationContent.getExtractAttributes().get(DpsHeaders.DATA_PARTITION_ID); + String correlationId = notificationContent.getExtractAttributes().get(DpsHeaders.CORRELATION_ID); + + MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId)); + dpsHeaders.setThreadContext(dataPartitionId, correlationId); + + LOGGER.info("Notification process started for message with id: {}", message.getMessageId()); + + HttpResponse response = notificationHandler.notifySubscriber(notificationContent.getNotificationId(), + notificationContent.getData(), notificationContent.getExtractAttributes()); + if (!response.isSuccessCode()) { + throw new Exception(NOT_ACKNOWLEDGE); + } + } catch (Exception e) { + LOGGER.error(String.format("An error occurred performing Notification for message with ID: ", message.getMessageId()), e); + throw e; + } finally { + ThreadScopeContextHolder.getContext().clear(); + MDC.clear(); + } + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionClientFactImpl.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionClientFactImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..a42ca37f435a0864f5c43e5d6a412d077cf7ffbb --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionClientFactImpl.java @@ -0,0 +1,42 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.SubscriptionClient; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import org.opengroup.osdu.azure.servicebus.ISubscriptionClientFactory; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +public class SubscriptionClientFactImpl { + private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionClientFactImpl.class); + @Autowired + private ISubscriptionClientFactory subscriptionClientFactory; + + public SubscriptionClient getSubscriptionClient(String dataPartition, String sbTopic, String sbSubscription) throws ServiceBusException, InterruptedException { + try { + return subscriptionClientFactory.getClient(dataPartition, sbTopic, sbSubscription); + } catch (ServiceBusException | InterruptedException e) { + LOGGER.error("Unexpected error creating Subscription Client", e); + throw new AppException(500, "Server Error", "Unexpected error creating Subscription Client", e); + } + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..9edeecf9f76f048e289361c2b47a0439d587aa36 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/SubscriptionManagerImpl.java @@ -0,0 +1,139 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.MessageHandlerOptions; +import com.microsoft.azure.servicebus.SubscriptionClient; +import com.microsoft.azure.servicebus.management.ManagementClient; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import org.opengroup.osdu.azure.cosmosdb.CosmosStore; +import org.opengroup.osdu.azure.partition.PartitionInfoAzure; +import org.opengroup.osdu.azure.partition.PartitionServiceClient; +import org.opengroup.osdu.azure.serviceBusManager.IManagementClientFactory; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.ISubscriptionManager; +import org.opengroup.osdu.notification.provider.azure.messageBus.models.TopicSubscriptions; +import org.opengroup.osdu.notification.provider.azure.util.AzureServiceBusConfig; +import org.opengroup.osdu.notification.provider.azure.util.AzureCosmosProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +@Component +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +public class SubscriptionManagerImpl implements ISubscriptionManager { + private final static Logger LOGGER = LoggerFactory.getLogger(SubscriptionManagerImpl.class); + @Autowired + private TopicSubscriptions topicSubscriptions; + @Autowired + private SubscriptionClientFactImpl subscriptionClientFactory; + @Autowired + private ProcessNotification processNotification; + @Autowired + private ITenantFactory tenantFactory; + @Autowired + private ISubscriptionFactory subscriptionFactory; + @Autowired + private PartitionServiceClient partitionService; + @Autowired + private CosmosStore cosmosStore; + @Autowired + private AzureCosmosProperties azureCosmosProperties; + @Autowired + private AzureServiceBusConfig azureServiceBusConfig; + @Autowired + private IManagementClientFactory factory; + + @Override + public void subscribeNotificationsEvent() { + + List<String> tenantList = tenantFactory.listTenantInfo().stream().map(TenantInfo::getDataPartitionId) + .collect(Collectors.toList()); + ExecutorService executorService = Executors + .newFixedThreadPool(Integer.parseUnsignedInt(azureServiceBusConfig.getNThreads())); + for (String partition : tenantList) { + try { + List<Subscription> subscriptionsList = cosmosStore.findAllItems(partition, azureCosmosProperties.cosmosDBName(), + azureCosmosProperties.registerSubscriptionContainerName(), Subscription.class); + + ManagementClient managementClient = factory.getManager(partition); + for (Subscription subscription : subscriptionsList) { + // To check if its a not new subscription. + if (!this.topicSubscriptions.checkIfNewTopicSubscription(partition, subscription.getTopic(), subscription.getNotificationId())) { + // Update existing subscriptions and skip registration + this.topicSubscriptions.updateCurrentTopicSubscriptions(partition, subscription.getTopic(), subscription.getNotificationId()); + + } else { + + /* This check is added if a Cosmos subscription is created but the corresponding service bus + subscription is still not created or creation is in progress.We do not register message handler + with the subscription client as it will throw entity not found exception and unregistering is not supported. + Check if its a new Subscription Client */ + if (managementClient.topicExists(subscription.getTopic()) && managementClient.subscriptionExists(subscription.getTopic(), subscription.getNotificationId())) { + try { + SubscriptionClient subscriptionClient = this.subscriptionClientFactory + .getSubscriptionClient(partition, subscription.getTopic(), subscription.getNotificationId()); + registerMessageHandler(subscriptionClient, executorService); + this.topicSubscriptions.updateCurrentTopicSubscriptions(partition, subscription.getTopic(), subscription.getNotificationId()); + } catch (InterruptedException | ServiceBusException e) { + LOGGER.error("Error while creating or registering subscription client {}", e.getMessage(), e); + } catch (Exception e) { + LOGGER.error("Unknown exception occurred while creating or registering subscription client: ", e); + } + } + } + } + + } catch (AppException e) { + LOGGER.error("Error creating Cosmos Client {}", e.getMessage(), e); + } catch (Exception e) { + LOGGER.error("An exception occurred while subscribing to Notification Event : ", e); + } + } + this.topicSubscriptions.clearTopicSubscriptions(); + + } + + private void registerMessageHandler(SubscriptionClient subscriptionClient, ExecutorService executorService) throws ServiceBusException, InterruptedException { + + MessageHandler messageHandler = new MessageHandler(subscriptionClient, processNotification); + subscriptionClient.registerMessageHandler( + messageHandler, + new MessageHandlerOptions(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxConcurrentCalls()), + false, + Duration.ofSeconds(Integer.parseUnsignedInt(azureServiceBusConfig.getMaxLockRenewDurationInSeconds())), + Duration.ofSeconds(1) + ), + executorService); + } + + @Override + public void run() { + subscribeNotificationsEvent(); + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/EventGridServiceBusRequestBodyExtractor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/EventGridServiceBusRequestBodyExtractor.java new file mode 100644 index 0000000000000000000000000000000000000000..03d2e0c415e05a5f3381aed339a65a84e6471661 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/EventGridServiceBusRequestBodyExtractor.java @@ -0,0 +1,94 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.extractor; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; +import com.microsoft.azure.servicebus.IMessage; +import lombok.SneakyThrows; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.provider.azure.models.NotificationEventGridServiceBusRequest; +import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData; +import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.IPullRequestBodyExtractor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@Component +@ConditionalOnProperty(value = "azure.eventGridToServiceBus.enabled", havingValue = "true", matchIfMissing = false) +public class EventGridServiceBusRequestBodyExtractor implements IPullRequestBodyExtractor { + private static final Gson GSON = new Gson(); + private NotificationEventGridServiceBusRequest notificationRequest; + private NotificationRecordsChangedData notificationRecordsChangedData; + private IMessage message; + + public void InitializeExtractor(IMessage message) { + + this.message = message; + this.notificationRequest = extractNotificationRequestFromMessageBody(); + } + + public Map<String, String> extractAttributesFromRequestBody() { + + Map<String, String> attributes = new HashMap<>(); + attributes.put("correlation-id", this.notificationRecordsChangedData.getCorrelationId()); + attributes.put("data-partition-id", this.notificationRecordsChangedData.getDataPartitionId()); + attributes.put("account-id", this.notificationRecordsChangedData.getAccountId()); + return attributes; + } + + public String extractDataFromRequestBody() { + + return notificationRecordsChangedData.getData().toString(); + } + + @SneakyThrows + private NotificationEventGridServiceBusRequest extractNotificationRequestFromMessageBody() { + + NotificationEventGridServiceBusRequest notificationRequest = null; + try { + String requestBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8); + NotificationEventGridServiceBusRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationEventGridServiceBusRequest[].class); + notificationRequest = notificationRequestArray[0]; + extractNotificationData(notificationRequest); + } catch (Exception e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", + "Unable to parse request payload.", "Request contents are null or empty", e); + } + return notificationRequest; + } + + private void extractNotificationData(NotificationEventGridServiceBusRequest notificationRequest) { + + String notifData = notificationRequest.getData().toString(); + NotificationRecordsChangedData notificationRecordsChangedData = GSON.fromJson(notifData, NotificationRecordsChangedData.class); + verifyNotificationData(notificationRecordsChangedData); + this.notificationRecordsChangedData = notificationRecordsChangedData; + } + + private void verifyNotificationData(NotificationRecordsChangedData notificationRecordsChangedData) { + + Preconditions.checkNotNull(notificationRecordsChangedData, "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getData(), "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getCorrelationId(), "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getDataPartitionId(), "Request payload parsing error"); + } +} + diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/RequestBodyAdapter.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/RequestBodyAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..4c1d9db4b231966ec112249f1cf8915f2ddb5c51 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/RequestBodyAdapter.java @@ -0,0 +1,40 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.extractor; + +import com.microsoft.azure.servicebus.IMessage; +import org.opengroup.osdu.notification.provider.azure.models.NotificationContent; +import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.IPullRequestBodyExtractor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +public class RequestBodyAdapter { + @Autowired + IPullRequestBodyExtractor extractor; + + public NotificationContent extractNotificationContent(IMessage message, String subscriptionName) { + + extractor.InitializeExtractor(message); + String notificationData = extractor.extractDataFromRequestBody(); + Map<String, String> headerAttributes = extractor.extractAttributesFromRequestBody(); + NotificationContent notificationContent = NotificationContent.mapFrom(subscriptionName, notificationData, headerAttributes, false); + return notificationContent; + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/ServiceBusRequestBodyExtractor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/ServiceBusRequestBodyExtractor.java new file mode 100644 index 0000000000000000000000000000000000000000..aaec1012e29baf29ee75ae7bf2ffe2afa67f2826 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/extractor/ServiceBusRequestBodyExtractor.java @@ -0,0 +1,92 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.extractor; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; +import com.microsoft.azure.servicebus.IMessage; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData; +import org.opengroup.osdu.notification.provider.azure.models.NotificationServiceBusRequest; +import org.opengroup.osdu.notification.provider.azure.messageBus.interfaces.IPullRequestBodyExtractor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@Component +@ConditionalOnProperty(value = "azure.serviceBus.enabled", havingValue = "true", matchIfMissing = false) +public class ServiceBusRequestBodyExtractor implements IPullRequestBodyExtractor { + private IMessage message; + private static final Gson GSON = new Gson(); + private NotificationServiceBusRequest notificationRequest; + private NotificationRecordsChangedData notificationRecordsChangedData; + + public void InitializeExtractor(IMessage message) { + + this.message = message; + this.notificationRequest = extractNotificationRequestFromMessageBody(); + } + + public Map<String, String> extractAttributesFromRequestBody() { + + Map<String, String> attributes = new HashMap<>(); + attributes.put("correlation-id", this.notificationRecordsChangedData.getCorrelationId()); + attributes.put("data-partition-id", this.notificationRecordsChangedData.getDataPartitionId()); + attributes.put("account-id", this.notificationRecordsChangedData.getAccountId()); + return attributes; + } + + public String extractDataFromRequestBody() { + return notificationRecordsChangedData.getData().toString(); + } + + private NotificationServiceBusRequest extractNotificationRequestFromMessageBody() { + + NotificationServiceBusRequest notificationRequest = null; + try { + String requestBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8); + NotificationServiceBusRequest notificationRequestArray = GSON.fromJson(requestBody, NotificationServiceBusRequest.class); + notificationRequest = notificationRequestArray; + extractNotificationData(notificationRequest); + } catch (Exception e) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error", + "Unable to parse request payload.", "Request contents are null or empty", e); + } + return notificationRequest; + + } + + private void extractNotificationData(NotificationServiceBusRequest notificationRequest) { + + String notifData = notificationRequest.getMessage().toString(); + NotificationRecordsChangedData notificationRecordsChangedData = GSON.fromJson(notifData, NotificationRecordsChangedData.class); + verifyNotificationData(notificationRecordsChangedData); + this.notificationRecordsChangedData = notificationRecordsChangedData; + } + + private void verifyNotificationData(NotificationRecordsChangedData notificationRecordsChangedData) { + + Preconditions.checkNotNull(notificationRecordsChangedData, "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getData(), "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getCorrelationId(), "Request payload parsing error"); + Preconditions.checkNotNull(notificationRecordsChangedData.getDataPartitionId(), "Request payload parsing error"); + } + +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/IPullRequestBodyExtractor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/IPullRequestBodyExtractor.java new file mode 100644 index 0000000000000000000000000000000000000000..bea354845a940a7897f4e4f97d311d969c8d7e3c --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/IPullRequestBodyExtractor.java @@ -0,0 +1,27 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.interfaces; + +import com.microsoft.azure.servicebus.IMessage; + +import java.util.Map; + +public interface IPullRequestBodyExtractor { + void InitializeExtractor(IMessage message); + + Map<String, String> extractAttributesFromRequestBody(); + + String extractDataFromRequestBody(); +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/ISubscriptionManager.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/ISubscriptionManager.java new file mode 100644 index 0000000000000000000000000000000000000000..8c8e726c4351a26fc38b0aff842b8b728a9274e9 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/interfaces/ISubscriptionManager.java @@ -0,0 +1,19 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.interfaces; + +public interface ISubscriptionManager extends Runnable { + void subscribeNotificationsEvent(); +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java new file mode 100644 index 0000000000000000000000000000000000000000..92eb9abbaf015c5605d4990eece4ab00b13070f7 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/models/TopicSubscriptions.java @@ -0,0 +1,52 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.models; + +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class TopicSubscriptions { + + private Map<String, Map<String, List<String>>> existingTopicSubscriptions = new HashMap<>(); + private Map<String, Map<String, List<String>>> currentTopicSubscriptions = new HashMap<>(); + + public boolean checkIfNewTopicSubscription(String partition, String sbTopicName, String subscriptionName) { + + if (existingTopicSubscriptions.get(partition) == null || + existingTopicSubscriptions.get(partition).get(sbTopicName) == null || + !existingTopicSubscriptions.get(partition).get(sbTopicName).contains(subscriptionName)) + return true; + return false; + } + + public void updateCurrentTopicSubscriptions(String partition, String sbTopicName, String subscriptionName) { + // Update the active subscriptions + currentTopicSubscriptions.putIfAbsent(partition, new HashMap<String, List<String>>()); + currentTopicSubscriptions.get(partition).putIfAbsent(sbTopicName, new ArrayList<>()); + currentTopicSubscriptions.get(partition).get(sbTopicName).add(subscriptionName); + } + + public void clearTopicSubscriptions() { + // Deletes the old subscriptions for next run + this.existingTopicSubscriptions.clear(); + this.existingTopicSubscriptions.putAll(this.currentTopicSubscriptions); + this.currentTopicSubscriptions.clear(); + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadDpsHeaders.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadDpsHeaders.java new file mode 100644 index 0000000000000000000000000000000000000000..b02fe38df5244618ca5cd0f9657c571780874598 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadDpsHeaders.java @@ -0,0 +1,48 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; +import org.springframework.context.annotation.ScopedProxyMode; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +@Component +@Scope(value = "ThreadScope", proxyMode = ScopedProxyMode.TARGET_CLASS) +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +@Primary +public class ThreadDpsHeaders extends DpsHeaders { + @Autowired + private IServiceAccountJwtClient serviceAccountJwtClient; + + public void setThreadContext(String dataPartitionId, String correlationId) { + Map<String, String> headers = new HashMap<>(); + headers.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + headers.put(DpsHeaders.CORRELATION_ID, correlationId); + String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId); + headers.put(DpsHeaders.AUTHORIZATION, authToken); + this.addFromMap(headers); + } + +} + + diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScope.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScope.java new file mode 100644 index 0000000000000000000000000000000000000000..b3445775194f3986d7fbb2e7e71696789a4ad0c1 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScope.java @@ -0,0 +1,75 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.ObjectFactory; +import org.springframework.beans.factory.config.Scope; + +/** + * Thread scope which allows putting data in thread scope and clearing up afterwards. + */ + +public class ThreadScope implements Scope, DisposableBean { + + /** + * Get bean for given name in the "ThreadScope". + */ + public Object get(String name, ObjectFactory<?> factory) { + ThreadScopeContext context = ThreadScopeContextHolder.getContext(); + + Object result = context.getBean(name); + if (null == result) { + result = factory.getObject(); + context.setBean(name, result); + } + return result; + } + + /** + * Removes bean from scope. + */ + public Object remove(String name) { + ThreadScopeContext context = ThreadScopeContextHolder.getContext(); + return context.remove(name); + } + + public void registerDestructionCallback(String name, Runnable callback) { + ThreadScopeContextHolder.getContext().registerDestructionCallback(name, callback); + } + + /** + * Resolve the contextual object for the given key, if any. E.g. the HttpServletRequest object for key "request". + */ + public Object resolveContextualObject(String key) { + return null; + } + + /** + * Return the conversation ID for the current underlying scope, if any. + * <p/> + * In this case, it returns the thread name. + */ + public String getConversationId() { + return Thread.currentThread().getName(); + } + + @Override + public void destroy() { + ThreadScopeContextHolder.clearContext(); + } +} + + diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeBeanFactoryPostProcessor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeBeanFactoryPostProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..33ddea59efbc68b1f52d46b9f0ff577c83f075f2 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeBeanFactoryPostProcessor.java @@ -0,0 +1,26 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; + +public class ThreadScopeBeanFactoryPostProcessor implements BeanFactoryPostProcessor { + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory factory) { + factory.registerScope("ThreadScope", new ThreadScope()); + } +} \ No newline at end of file diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContext.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContext.java new file mode 100644 index 0000000000000000000000000000000000000000..74acaebec47dacb75517075842092a40d2d74a07 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContext.java @@ -0,0 +1,109 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +import java.util.HashMap; +import java.util.Map; + +public class ThreadScopeContext { + + protected final Map<String, Bean> beans = new HashMap<>(); + + /** + * Get a bean value from the context. + * + * @param name bean name + * @return bean value or null + */ + public Object getBean(String name) { + Bean bean = beans.get(name); + if (null == bean) { + return null; + } + return bean.object; + } + + /** + * Set a bean in the context. + * + * @param name bean name + * @param object bean value + */ + public void setBean(String name, Object object) { + + Bean bean = beans.computeIfAbsent(name,k-> new Bean()); + bean.object = object; + } + + /** + * Remove a bean from the context, calling the destruction callback if any. + * + * @param name bean name + * @return previous value + */ + public Object remove(String name) { + Bean bean = beans.get(name); + if (null != bean) { + beans.remove(name); + bean.destructionCallback.run(); + return bean.object; + } + return null; + } + + /** + * Register the given callback as to be executed after request completion. + * + * @param name The name of the bean. + * @param callback The callback of the bean to be executed for destruction. + */ + public void registerDestructionCallback(String name, Runnable callback) { + Bean bean = beans.computeIfAbsent(name,k->new Bean()); + bean.destructionCallback = callback; + } + + /** Clear all beans and call the destruction callback. */ + public void clear() { + for (Bean bean : beans.values()) { + if (null != bean.destructionCallback) { + bean.destructionCallback.run(); + } + } + beans.clear(); + } + + /** Private class storing bean name and destructor callback. */ + private class Bean { + + private Object object; + private Runnable destructionCallback; + + public Object getObject() { + return object; + } + + public void setObject(Object object) { + this.object = object; + } + + public Runnable getDestructionCallback() { + return destructionCallback; + } + + public void setDestructionCallback(Runnable destructionCallback) { + this.destructionCallback = destructionCallback; + } + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContextHolder.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContextHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..54c43f6f7dc945659873378f70338c5f97d097b3 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadScopeContextHolder.java @@ -0,0 +1,47 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +public final class ThreadScopeContextHolder { + + private static final ThreadLocal<ThreadScopeContext> CONTEXT_HOLDER = ThreadLocal + .withInitial(ThreadScopeContext::new); + + private ThreadScopeContextHolder() { + // utility object, not allowed to create instances + } + + /** + * Get the thread specific context. + * + * @return thread scoped context + */ + public static ThreadScopeContext getContext() { + return CONTEXT_HOLDER.get(); + } + + /** + * Set the thread specific context. + * + * @param context thread scoped context + */ + public static void setContext(ThreadScopeContext context) { + CONTEXT_HOLDER.set(context); + } + + public static void clearContext() { + CONTEXT_HOLDER.remove(); + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadSignatureService.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadSignatureService.java new file mode 100644 index 0000000000000000000000000000000000000000..efadcda499e75ef2479aabcfa2d1034dfa1b7ad0 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/messageBus/thread/ThreadSignatureService.java @@ -0,0 +1,142 @@ +package org.opengroup.osdu.notification.provider.azure.messageBus.thread; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.opengroup.osdu.core.common.cryptographic.HmacData; +import org.opengroup.osdu.core.common.cryptographic.ISignatureService; +import org.opengroup.osdu.core.common.cryptographic.SignatureServiceException; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; +import org.springframework.context.annotation.ScopedProxyMode; +import org.springframework.stereotype.Component; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Base64; + +@Component +@Scope(value = "ThreadScope", proxyMode = ScopedProxyMode.TARGET_CLASS) +@ConditionalOnExpression("${azure.serviceBus.enabled:true} || ${azure.eventGridToServiceBus.enabled:true}") +@Primary +public class ThreadSignatureService implements ISignatureService { + private static final String HMAC_SHA_256 = "HmacSHA256"; + private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}"; + private static final String NOTIFICATION_SERVICE = "de-notification-service"; + private static final long EXPIRE_DURATION = 30000L; + private static final String INVALID_SIGNATURE = "Invalid signature"; + private static final String ERROR_GENERATING_SIGNATURE = "Error generating the signature"; + private static final String SIGNATURE_EXPIRED = "Signature is expired"; + private static final String MISSING_HMAC_SIGNATURE = "HMAC signature should not be null or empty"; + private static final String MISSING_SECRET_VALUE = "Secret should not be null or empty"; + private static final String MISSING_ATTRIBUTES_IN_SIGNATURE = "Missing url or nonce or expire time in the signature"; + + public ThreadSignatureService() { + } + + public String getSignedSignature(String url, String secret) throws SignatureServiceException { + if (!Strings.isNullOrEmpty(url) && !Strings.isNullOrEmpty(secret)) { + long currentTime = System.currentTimeMillis(); + String expireTime = String.valueOf(currentTime + 30000L); + String timeStamp = String.valueOf(currentTime); + + try { + String nonce = DatatypeConverter.printHexBinary(this.generateRandomBytes(16)).toLowerCase(); + String data = String.format("{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}", expireTime, url, nonce); + byte[] signature = this.getSignature(secret, nonce, timeStamp, data); + byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); + String dataBytesEncoded = Base64.getEncoder().encodeToString(dataBytes); + StringBuilder output = new StringBuilder(); + output.append(dataBytesEncoded).append(".").append(DatatypeConverter.printHexBinary(signature).toLowerCase()); + return output.toString(); + } catch (Exception var13) { + throw new SignatureServiceException("Error generating the signature", var13); + } + } else { + throw new SignatureServiceException("Error generating the signature"); + } + } + + public String getSignedSignature(String url, String secret, String expireTime, String nonce) throws SignatureServiceException { + if (!Strings.isNullOrEmpty(url) && !Strings.isNullOrEmpty(secret) && StringUtils.isNumeric(expireTime)) { + long expiry = Long.parseLong(expireTime); + if (System.currentTimeMillis() > expiry) { + throw new SignatureServiceException("Signature is expired"); + } else { + String timeStamp = String.valueOf(expiry - 30000L); + String data = String.format("{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}", expireTime, url, nonce); + + try { + byte[] signature = this.getSignature(secret, nonce, timeStamp, data); + return DatatypeConverter.printHexBinary(signature).toLowerCase(); + } catch (Exception var10) { + throw new SignatureServiceException("Error generating the signature", var10); + } + } + } else { + throw new SignatureServiceException("Error generating the signature"); + } + } + + public void verifyHmacSignature(String hmac, String secret) throws SignatureServiceException { + if (Strings.isNullOrEmpty(hmac)) { + throw new SignatureServiceException("HMAC signature should not be null or empty"); + } else if (Strings.isNullOrEmpty(secret)) { + throw new SignatureServiceException("Secret should not be null or empty"); + } else { + String[] tokens = hmac.split("\\."); + if (tokens.length != 2) { + throw new SignatureServiceException("Invalid signature"); + } else { + byte[] dataBytes = Base64.getDecoder().decode(tokens[0]); + String requestSignature = tokens[1]; + String data = new String(dataBytes, StandardCharsets.UTF_8); + HmacData hmacData = (HmacData)(new Gson()).fromJson(data, HmacData.class); + String url = hmacData.getEndpointUrl(); + String nonce = hmacData.getNonce(); + String expireTime = hmacData.getExpireMillisecond(); + if (!Strings.isNullOrEmpty(url) && !Strings.isNullOrEmpty(nonce) && !Strings.isNullOrEmpty(expireTime)) { + String newSignature = this.getSignedSignature(url, secret, expireTime, nonce); + if (!requestSignature.equalsIgnoreCase(newSignature)) { + throw new SignatureServiceException("Invalid signature"); + } + } else { + throw new SignatureServiceException("Missing url or nonce or expire time in the signature"); + } + } + } + } + + private byte[] getSignature(String secret, String nonce, String timeStamp, String data) throws Exception { + byte[] secretBytes = DatatypeConverter.parseHexBinary(secret); + byte[] nonceBytes = DatatypeConverter.parseHexBinary(nonce); + byte[] encryptedNonce = this.computeHmacSha256(nonceBytes, secretBytes); + byte[] encryptedTimestamp = this.computeHmacSha256(timeStamp, encryptedNonce); + byte[] signedKey = this.computeHmacSha256("de-notification-service", encryptedTimestamp); + byte[] signature = this.computeHmacSha256(data, signedKey); + return signature; + } + + private byte[] computeHmacSha256(String data, byte[] key) throws Exception { + Mac mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(key, "HmacSHA256")); + return mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); + } + + private byte[] computeHmacSha256(byte[] data, byte[] key) throws Exception { + Mac mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(key, "HmacSHA256")); + return mac.doFinal(data); + } + + private byte[] generateRandomBytes(int size) { + byte[] key = new byte[size]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(key); + return key; + } +} \ No newline at end of file diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationContent.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationContent.java new file mode 100644 index 0000000000000000000000000000000000000000..d3edc232272b90b1e5ab30dcb89f672eacd49dab --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationContent.java @@ -0,0 +1,40 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.models; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.Map; + +@Data +@AllArgsConstructor +@Builder +public class NotificationContent { + String NotificationId; + String data; + Map<String, String> extractAttributes; + boolean isHandShakeRequest; + + public static NotificationContent mapFrom(String notificationId, String pubSubData, Map<String, String> attributes, boolean isHandShake) { + NotificationContentBuilder notificationContentBuilder = NotificationContent.builder() + .NotificationId(notificationId) + .data(pubSubData) + .extractAttributes(attributes) + .isHandShakeRequest(isHandShake); + return notificationContentBuilder.build(); + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationRequest.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridRequest.java similarity index 95% rename from provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationRequest.java rename to provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridRequest.java index 32236c6e234268a2e0a92d05aa423661819387ff..1e3445d1526f79ea674c4715e9bd7231256d3aea 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationRequest.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridRequest.java @@ -23,7 +23,7 @@ import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class NotificationRequest { +public class NotificationEventGridRequest { private String id; diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridServiceBusRequest.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridServiceBusRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..da13b71ccb3cf94e804198eaff6b87fc70ab57d8 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationEventGridServiceBusRequest.java @@ -0,0 +1,42 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.models; + +import com.google.gson.JsonObject; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class NotificationEventGridServiceBusRequest { + private String id; + + private String eventType; + + private String subject; + + private JsonObject data; + + private String dataVersion; + + private String eventTime; + + private String metadataVersion; + + private String topic; +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationServiceBusRequest.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationServiceBusRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..02f559d11595915c0c4bb82e8c2d3040094b7f57 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/models/NotificationServiceBusRequest.java @@ -0,0 +1,28 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.models; + +import com.google.gson.JsonObject; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class NotificationServiceBusRequest { + private JsonObject message; +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/EventGridRequestBodyExtractor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/EventGridRequestBodyExtractor.java index b3c4aeb6b55e27936af4e0be2d42d66b4e00cbd0..fb5ba29ef3892508a195680d253c989eea209219 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/EventGridRequestBodyExtractor.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/EventGridRequestBodyExtractor.java @@ -22,14 +22,14 @@ import com.google.gson.JsonObject; import lombok.SneakyThrows; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.provider.azure.models.NotificationContent; import org.opengroup.osdu.notification.provider.azure.models.HandshakeRequestData; import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData; -import org.opengroup.osdu.notification.provider.azure.models.NotificationRequest; +import org.opengroup.osdu.notification.provider.azure.models.NotificationEventGridRequest; import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.context.annotation.RequestScope; import javax.servlet.http.HttpServletRequest; @@ -56,10 +56,11 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto private final HttpServletRequest httpServletRequest; private final JaxRsDpsLog logger; - private final NotificationRequest notificationRequest; + private final NotificationEventGridRequest notificationRequest; private NotificationRecordsChangedData notificationRecordsChangedData; private HandshakeRequestData handshakeRequestData; private boolean isHandshakeRequest; + private NotificationContent notificationContent; @Autowired public EventGridRequestBodyExtractor(HttpServletRequest httpServletRequest, JaxRsDpsLog log) { @@ -151,12 +152,12 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto */ @SneakyThrows // TODO : @komakkar sanitize the exceptions to match the SpringExceptionMapper and throw ValidationException - private NotificationRequest extractNotificationRequestFromHttpRequest() { - NotificationRequest notificationRequest = null; + private NotificationEventGridRequest extractNotificationRequestFromHttpRequest() { + NotificationEventGridRequest notificationRequest = null; if (this.notificationRequest == null && this.httpServletRequest.getMethod().equalsIgnoreCase("post")) { try { String requestBody = getBody(this.httpServletRequest); - NotificationRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationRequest[].class); + NotificationEventGridRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationEventGridRequest[].class); notificationRequest = notificationRequestArray[0]; this.isHandshakeRequest = notificationRequest.getEventType().equals(EVENTGRID_VALIDATION_EVENT); @@ -173,12 +174,12 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto return notificationRequest; } - private void extractHandshakeData(NotificationRequest notificationRequest) { + private void extractHandshakeData(NotificationEventGridRequest notificationRequest) { this.handshakeRequestData = GSON.fromJson(notificationRequest.getData(), HandshakeRequestData.class); Preconditions.checkNotNull(this.handshakeRequestData.getValidationCode(), "Request payload parsing error handshkae"); } - private void extractNotificationData(NotificationRequest notificationRequest) { + private void extractNotificationData(NotificationEventGridRequest notificationRequest) { NotificationRecordsChangedData notificationRecordsChangedData = GSON.fromJson(notificationRequest.getData(), NotificationRecordsChangedData.class); verifyNotificationData(notificationRecordsChangedData); this.notificationRecordsChangedData = notificationRecordsChangedData; diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureCosmosProperties.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureCosmosProperties.java index c606785dabc1d8730bc3426d76dc8c73dab4471a..7a9fef37308bba5e0b088ea09537d7d45280672e 100644 --- a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureCosmosProperties.java +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureCosmosProperties.java @@ -12,6 +12,9 @@ public class AzureCosmosProperties { @Value("${tenantinfo.container.name}") private String tenantInfoContainerName; + @Value("${registerSubscription.container.name}") + private String registerSubscriptionContainerName; + @Value("${azure.cosmosdb.database}") private String cosmosDBName; @@ -21,6 +24,12 @@ public class AzureCosmosProperties { return tenantInfoContainerName; } + @Bean + @Named("COSMOS_CONTAINER_NAME") + public String registerSubscriptionContainerName() { + return registerSubscriptionContainerName; + } + @Bean @Named("COSMOS_DB_NAME") public String cosmosDBName() { diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..b9df67e2e4f6aec63c1c7c8cbbd818fff2653d05 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/AzureServiceBusConfig.java @@ -0,0 +1,53 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.util; + +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Getter +public class AzureServiceBusConfig { + @Value("${executor-n-threads}") + private String nThreads; + + @Value("${max-concurrent-calls}") + private String maxConcurrentCalls; + + @Value("${max-lock-renew}") + private String maxLockRenewDurationInSeconds; + + @Value("${initial-subscription-manager-delay}") + private String initialSubscriptionManagerDelay; + + @Value("${consecutive-subscription-manager-delay}") + private String consecutiveSubscriptionManagerDelay; + + @Value("${service-bus-enabled}") + private String ServiceBusEnabled; + + @Value("${event-grid-to-service-bus-enabled}") + private String EventGridToServiceBusEnabled; + + @Bean + public MDCContextMap mdcContextMap() { + return new MDCContextMap(); + } + +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MDCContextMap.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MDCContextMap.java new file mode 100644 index 0000000000000000000000000000000000000000..57c08cb18613784974dc7260041b3a44e693d6d2 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/util/MDCContextMap.java @@ -0,0 +1,30 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.util; + +import org.opengroup.osdu.core.common.model.http.DpsHeaders; + +import java.util.HashMap; +import java.util.Map; + +public class MDCContextMap { + + public Map<String, String> getContextMap(String correlationId, String dataPartitionId) { + final Map<String, String> contextMap = new HashMap<>(); + contextMap.put(DpsHeaders.CORRELATION_ID, correlationId); + contextMap.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + return contextMap; + } +} diff --git a/provider/notification-azure/src/main/resources/application.properties b/provider/notification-azure/src/main/resources/application.properties index a5e8bdbbddf801f90fbfef055acc712a647ae505..8855e8376cd707cc0efbc4413111f0119f03cc50 100644 --- a/provider/notification-azure/src/main/resources/application.properties +++ b/provider/notification-azure/src/main/resources/application.properties @@ -1,4 +1,4 @@ -# Copyright © Microsoft Corporation +# Copyright © Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -56,4 +56,31 @@ azure.keyvault.url=${KEYVAULT_URI} #TenantFactory Configuration tenantFactoryImpl.required=true -tenantInfo.container.name=TenantInfo \ No newline at end of file +tenantInfo.container.name=TenantInfo + +#RegisterSubscription Configuration +registerSubscription.container.name=RegisterSubscription + +# Specifies the number of threads to be created on the thread pool +executor-n-threads=${executor_n_threads} + +# Specifies the maximum number of concurrent calls to the callback the message pump should initiate +max-concurrent-calls=${max_concurrent_calls} + +# Specifies the maximum duration in seconds within which the lock will be renewed automatically +max-lock-renew=${max_lock_renew_duration_seconds} + +# Specifies the initial delay before calling subscribeNotificationsEvent +initial-subscription-manager-delay=${initial_subscription_manager_delay_seconds} + +# Specifies the consecutive thread delay for subscribeNotificationsEvent +consecutive-subscription-manager-delay=${consecutive_subscription_manager_delay_seconds} + +# Specifies if Service Bus is enabled +azure.serviceBus.enabled=${service_bus_enabled} + +# Specifies if Event Grid To Service Bus is enabled +azure.eventGridToServiceBus.enabled=${event_grid_to_service_bus_enabled} + +# Specifies if Event Grid is enabled. Used to override the non-request scoped beans +requestScope.enabled=${event_grid_enabled} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c5ee287a6ff5158a0cc704a5f22ea5156e50c184 --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/MessageHandlerTest.java @@ -0,0 +1,71 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.messageBus; + +import com.microsoft.azure.servicebus.Message; +import com.microsoft.azure.servicebus.SubscriptionClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengroup.osdu.notification.provider.azure.messageBus.MessageHandler; +import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification; + +import java.util.UUID; + +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class MessageHandlerTest { + + private static final UUID uuid = UUID.randomUUID(); + private static final String subscriptionName = "TestSubscription"; + + @InjectMocks + private MessageHandler messageHandler; + + @Mock + private ProcessNotification processNotification; + + @Mock + private SubscriptionClient subscriptionClient; + + @Mock + private Message message; + + @BeforeEach + public void init() { + when(message.getLockToken()).thenReturn(uuid); + when(subscriptionClient.getSubscriptionName()).thenReturn(subscriptionName); + } + + @Test + public void shouldInvokeCompleteAsync() throws Exception { + lenient().doNothing().when(processNotification).performNotification(message, subscriptionName); + messageHandler.onMessageAsync(message); + verify(subscriptionClient, times(1)).completeAsync(uuid); + verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); + } + + @Test + public void shouldInvokeAbandonAsyncWhenProcessNotificationThrowsException() throws Exception { + doThrow(new Exception()).when(processNotification).performNotification(message, subscriptionName); + messageHandler.onMessageAsync(message); + verify(subscriptionClient, times(1)).abandonAsync(uuid); + verify(processNotification, times(1)).performNotification(message, subscriptionClient.getSubscriptionName()); + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java new file mode 100644 index 0000000000000000000000000000000000000000..927672d6d6d69985f3c459cb9557467d81a579c2 --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/ProcessNotificationTest.java @@ -0,0 +1,119 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.messageBus; + +import com.microsoft.azure.servicebus.Message; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengroup.osdu.core.common.http.HttpResponse; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification; +import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.RequestBodyAdapter; +import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadDpsHeaders; +import org.opengroup.osdu.notification.provider.azure.models.NotificationContent; +import org.opengroup.osdu.notification.provider.azure.util.MDCContextMap; +import org.opengroup.osdu.notification.service.NotificationHandler; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class ProcessNotificationTest { + private static final String dataPartitionId = "opendes"; + private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85"; + private static final String notificationData = "[{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"},{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}]"; + private static final String subscriptionName = "TestSubscription"; + private static final String notificationId = "Notification-Test-Subscription"; + private HttpResponse response = new HttpResponse(); + private static final Map<String, String> requestAttributes = new HashMap(); + private final String NOT_ACKNOWLEDGE = "message not acknowledged by client"; + private final String EXCEPTION_NOT_THROWN = "Should Throw Exception"; + @InjectMocks + private ProcessNotification processNotification; + + @Mock + private NotificationHandler notificationHandler; + @Mock + private RequestBodyAdapter requestBodyAdapter; + @Mock + private Message message; + @Mock + private NotificationContent notificationContent; + @Mock + private MDCContextMap mdcContextMap; + @Spy + private ThreadDpsHeaders dpsHeaders; + + @BeforeEach + public void init() { + requestAttributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId); + requestAttributes.put(DpsHeaders.CORRELATION_ID, correlationId); + lenient().doNothing().when(dpsHeaders).setThreadContext(dataPartitionId, correlationId); + lenient().when(mdcContextMap.getContextMap(dataPartitionId, correlationId)).thenReturn(new HashMap<>()); + lenient().when(dpsHeaders.getHeaders()).thenReturn(requestAttributes); + when(notificationContent.getExtractAttributes()).thenReturn(requestAttributes); + when(notificationContent.getNotificationId()).thenReturn(notificationId); + when(notificationContent.getData()).thenReturn(notificationData); + } + + @Test + public void shouldSuccessfullyPerformNotification() throws Exception { + response.setResponseCode(200); + when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); + when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response); + processNotification.performNotification(message, subscriptionName); + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); + } + + @Test + public void shouldThrowExceptionWhenNotifySubscriberFails() throws Exception { + response.setResponseCode(400); + when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); + when(notificationHandler.notifySubscriber(notificationId, notificationData, requestAttributes)).thenReturn(response); + try { + processNotification.performNotification(message, subscriptionName); + fail(EXCEPTION_NOT_THROWN); + } catch (Exception e) { + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); + Assert.assertEquals(NOT_ACKNOWLEDGE, e.getMessage()); + } + } + + @Test + public void shouldThrowExceptionWhenNotifySubscriberThrowsException() throws Exception { + response.setResponseCode(400); + when(requestBodyAdapter.extractNotificationContent(message, subscriptionName)).thenReturn(notificationContent); + doThrow(new Exception()).when(notificationHandler).notifySubscriber(notificationId, notificationData, requestAttributes); + try { + processNotification.performNotification(message, subscriptionName); + fail(EXCEPTION_NOT_THROWN); + } catch (Exception e) { + verify(notificationHandler, times(1)).notifySubscriber(notificationId, notificationData, requestAttributes); + verify(requestBodyAdapter, times(1)).extractNotificationContent(message, subscriptionName); + Assert.assertNotNull(e); + } + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionClientFactoryTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionClientFactoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dee2f7f0e5151cbabf538460eb1281d77e46e7a2 --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionClientFactoryTest.java @@ -0,0 +1,71 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.messageBus; + +import com.microsoft.azure.servicebus.SubscriptionClient; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengroup.osdu.azure.servicebus.ISubscriptionClientFactory; +import org.opengroup.osdu.notification.provider.azure.messageBus.SubscriptionClientFactImpl; + +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionClientFactoryTest { + @InjectMocks + private SubscriptionClientFactImpl subsClientFactory; + + @Mock + private SubscriptionClient subscriptionClient; + + @Mock + private ISubscriptionClientFactory subscriptionClientFactory; + + private static final String sbTopic = "testTopic"; + private static final String sbSubscription = "testSubscription"; + private static final String dataPartition = "testPartition"; + private final String EXCEPTION_NOT_THROWN = "Should Throw Exception"; + private final String SERVICE_BUS_EXCEPTION = "Unable to retrieve client info from Service Bus"; + + @Test + public void subscriptionClientShouldNotBeNull() throws ServiceBusException, InterruptedException { + when(subscriptionClientFactory.getClient(dataPartition, sbTopic, sbSubscription)) + .thenReturn(subscriptionClient); + SubscriptionClient result = subsClientFactory.getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + assertNotNull(result); + assertEquals(subscriptionClient, result); + } + + @Test + public void shouldThrowExceptionWhenSubscriptionClientThrowsException() throws ServiceBusException, InterruptedException { + when(subscriptionClientFactory.getClient(dataPartition, sbTopic, sbSubscription)) + .thenThrow(new ServiceBusException(false, SERVICE_BUS_EXCEPTION)); + try { + subsClientFactory.getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + fail(EXCEPTION_NOT_THROWN); + } catch (Exception e) { + Assert.assertNotNull(e); + Assert.assertEquals( SERVICE_BUS_EXCEPTION, e.getMessage()); + } + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionManagerImplTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionManagerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c1f6216898b1f660f7d8846322537ac83bcd35fc --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/messageBus/SubscriptionManagerImplTest.java @@ -0,0 +1,182 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.messageBus; + +import com.microsoft.azure.servicebus.SubscriptionClient; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengroup.osdu.azure.cosmosdb.CosmosStore; +import org.opengroup.osdu.azure.partition.PartitionServiceClient; +import org.opengroup.osdu.core.common.model.notification.HmacSecret; +import org.opengroup.osdu.core.common.model.notification.Subscription; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.notification.ISubscriptionFactory; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.notification.provider.azure.messageBus.ProcessNotification; +import org.opengroup.osdu.notification.provider.azure.messageBus.SubscriptionClientFactImpl; +import org.opengroup.osdu.notification.provider.azure.messageBus.SubscriptionManagerImpl; +import org.opengroup.osdu.notification.provider.azure.messageBus.thread.ThreadDpsHeaders; +import org.opengroup.osdu.notification.provider.azure.util.AzureServiceBusConfig; +import org.opengroup.osdu.notification.provider.azure.util.AzureCosmosProperties; + +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionManagerImplTest { + + private static final String maxLockRenewDuration = "60"; + private static final String maxConcurrentCalls = "1"; + private static final String nThreads = "2"; + private static final String errorMessage = "some-error"; + + @InjectMocks + private SubscriptionManagerImpl subscriptionManager; + @Mock + private SubscriptionClientFactImpl subscriptionClientFactory; + @Mock + private SubscriptionClient subscriptionClient; + @Mock + private ProcessNotification processNotification; + @Mock + private ITenantFactory tenantFactory; + @Mock + private ISubscriptionFactory subscriptionFactory; + @Mock + private ThreadDpsHeaders dpsHeaders; + @Mock + private PartitionServiceClient partitionService; + @Mock + private CosmosStore cosmosStore; + @Mock + private AzureCosmosProperties azureCosmosProperties; + @Mock + private AzureServiceBusConfig azureServiceBusConfig; + + private static final String dataPartition = "testTenant"; + private static final String cosmosDbName = "testDatabase"; + private static final String registerContainerName = "testContainer"; + private static final String sbTopic = "testTopic"; + private static final String sbSubscription = "testSubscription"; + private static final String initial_thread_delay = "0"; + private static final String thread_delay = "1"; + + @BeforeEach + public void init() { + TenantInfo tenantInfo = new TenantInfo(); + tenantInfo.setDataPartitionId(dataPartition); + lenient().when(azureServiceBusConfig.getMaxConcurrentCalls()).thenReturn(maxConcurrentCalls); + lenient().when(azureServiceBusConfig.getNThreads()).thenReturn(nThreads); + lenient().when(azureServiceBusConfig.getMaxLockRenewDurationInSeconds()).thenReturn(maxLockRenewDuration); + lenient().when(azureServiceBusConfig.getInitialSubscriptionManagerDelay()).thenReturn(initial_thread_delay); + lenient().when(azureServiceBusConfig.getConsecutiveSubscriptionManagerDelay()).thenReturn(thread_delay); + lenient().when(azureCosmosProperties.registerSubscriptionContainerName()).thenReturn(registerContainerName); + lenient().when(azureCosmosProperties.cosmosDBName()).thenReturn(cosmosDbName); + lenient().when(tenantFactory.listTenantInfo()).thenReturn(Collections.singletonList(tenantInfo)); + lenient().when(cosmosStore.findAllItems(dataPartition, cosmosDbName, registerContainerName, Subscription.class)). + thenReturn(Collections.singletonList(getHmac_subscription())); + } + + @Test + public void shouldSuccessfullyRegisterMessageHandler() throws ServiceBusException, InterruptedException { + + when(subscriptionClientFactory.getSubscriptionClient(dataPartition, sbTopic, sbSubscription)).thenReturn(subscriptionClient); + doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any()); + subscriptionManager.subscribeNotificationsEvent(); + + verify(azureServiceBusConfig, times(1)).getMaxConcurrentCalls(); + verify(azureServiceBusConfig, times(1)).getNThreads(); + verify(azureServiceBusConfig, times(1)).getMaxLockRenewDurationInSeconds(); + verify(azureCosmosProperties, times(1)).registerSubscriptionContainerName(); + verify(azureCosmosProperties, times(1)).cosmosDBName(); + verify(subscriptionClientFactory, times(1)).getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + verify(subscriptionClient, times(1)).registerMessageHandler(any(), any(), any()); + + } + + @Test + public void shouldNotRegisterMessageHandlerWhenAlreadyRegisteredInPrevThread() throws ServiceBusException, InterruptedException { + + when(subscriptionClientFactory.getSubscriptionClient(dataPartition, sbTopic, sbSubscription)).thenReturn(subscriptionClient); + doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any()); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(subscriptionManager, Integer.parseUnsignedInt(azureServiceBusConfig.getInitialSubscriptionManagerDelay()), + Integer.parseUnsignedInt(azureServiceBusConfig.getConsecutiveSubscriptionManagerDelay()), TimeUnit.SECONDS); + sleep(10000); + verify(azureServiceBusConfig, times(1)).getMaxConcurrentCalls(); + verify(azureServiceBusConfig, atLeast(2)).getNThreads(); + verify(azureServiceBusConfig, times(1)).getMaxLockRenewDurationInSeconds(); + verify(azureCosmosProperties, atLeast(2)).registerSubscriptionContainerName(); + verify(azureCosmosProperties, atLeast(2)).cosmosDBName(); + verify(subscriptionClientFactory, times(1)).getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + verify(subscriptionClient, times(1)).registerMessageHandler(any(), any(), any()); + + } + + @Test + public void shouldNotRegisterMessageHandlerIfSSubscriptionClientThrowsException() throws ServiceBusException, InterruptedException { + lenient().doNothing().when(subscriptionClient).registerMessageHandler(any(), any(), any()); + doThrow(new InterruptedException(errorMessage)).when(subscriptionClientFactory).getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + subscriptionManager.subscribeNotificationsEvent(); + + verify(azureServiceBusConfig, times(0)).getMaxConcurrentCalls(); + verify(azureServiceBusConfig, times(1)).getNThreads(); + verify(azureServiceBusConfig, times(0)).getMaxLockRenewDurationInSeconds(); + verify(azureCosmosProperties, times(1)).registerSubscriptionContainerName(); + verify(azureCosmosProperties, times(1)).cosmosDBName(); + verify(subscriptionClientFactory, times(1)).getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + verify(subscriptionClient, times(0)).registerMessageHandler(any(), any(), any()); + } + + @Test + public void shouldThrowExceptionIfErrorWhileRegisteringMessageHandler() throws ServiceBusException, InterruptedException { + + doThrow(new InterruptedException(errorMessage)).when(subscriptionClient).registerMessageHandler(any(), any(), any()); + when(subscriptionClientFactory.getSubscriptionClient(dataPartition, sbTopic, sbSubscription)).thenReturn(subscriptionClient); + subscriptionManager.subscribeNotificationsEvent(); + + verify(azureServiceBusConfig, times(1)).getMaxConcurrentCalls(); + verify(azureServiceBusConfig, times(1)).getNThreads(); + verify(azureServiceBusConfig, times(1)).getMaxLockRenewDurationInSeconds(); + verify(azureCosmosProperties, times(1)).registerSubscriptionContainerName(); + verify(azureCosmosProperties, times(1)).cosmosDBName(); + verify(subscriptionClientFactory, times(1)).getSubscriptionClient(dataPartition, sbTopic, sbSubscription); + verify(subscriptionClient, times(1)).registerMessageHandler(any(), any(), any()); + } + + private static Subscription getHmac_subscription() { + Subscription hmac_subscription = new Subscription(); + hmac_subscription.setName("hamc_test_subscription"); + hmac_subscription.setPushEndpoint("http://challenge"); + hmac_subscription.setDescription("Description"); + hmac_subscription.setTopic(sbTopic); + hmac_subscription.setNotificationId(sbSubscription); + hmac_subscription.setId("id_1"); + hmac_subscription.setCreatedBy("test@test.com"); + HmacSecret secret = new HmacSecret(); + secret.setValue("testsecret"); + hmac_subscription.setSecret(secret); + return hmac_subscription; + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/EventGridServiceBusRequestBodyExtractorTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/EventGridServiceBusRequestBodyExtractorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..172cb1a1137012d28b307d4e6b30e918d1cac1db --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/EventGridServiceBusRequestBodyExtractorTest.java @@ -0,0 +1,119 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.Message; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.EventGridServiceBusRequestBodyExtractor; +import org.springframework.http.HttpStatus; + +import java.util.Map; + +import static org.junit.Assert.fail; + +@RunWith(MockitoJUnitRunner.class) +public class EventGridServiceBusRequestBodyExtractorTest { + @InjectMocks + private EventGridServiceBusRequestBodyExtractor sut; + private static final String inValidData = "[{\"invalidData\"}]"; + private static final String validData = "[{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"},{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}]"; + private static final String dataPartitionId = "opendes"; + private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85"; + private static final String accountId = "ab47b48b7a85-30c5"; + + @Test + public void should_throwWhenAttributesAreMissing_extractDataFromRequestBody() { + IMessage message = getInvalidMessage(); + try { + sut.InitializeExtractor(message); + fail("Should Throw Exception"); + } catch (AppException appException) { + Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), appException.getError().getCode()); + Assert.assertEquals("Unable to parse request payload.", appException.getError().getMessage()); + } catch (Exception exception) { + fail("Should Throw AppException"); + } + + } + + @Test + public void shouldReturnNotificationDataAndAttributesWhenValidRequestBody() { + IMessage message = getValidMessage(); + try { + sut.InitializeExtractor(message); + String notificationData = sut.extractDataFromRequestBody(); + Assert.assertEquals(notificationData, validData); + Map<String, String> attributes = sut.extractAttributesFromRequestBody(); + Assert.assertEquals(attributes.get("account-id"), accountId); + Assert.assertEquals(attributes.get("correlation-id"), correlationId); + Assert.assertEquals(attributes.get("data-partition-id"), dataPartitionId); + + } catch (Exception exception) { + fail("Should not Throw AppException"); + } + + } + + private Message getValidMessage() { + + String body = + "[\n" + + " {\n" + + " \"id\": \"2425\",\n" + + " \"eventType\": \"recordInserted\",\n" + + " \"subject\": \"myapp/vehicles/motorcycles\",\n" + + " \"data\": {\n" + + " \"data\":" + validData + ",\n" + + " \"account-id\": \"" + accountId + "\",\n" + + " \"correlation-id\": \"" + correlationId + "\",\n" + + " \"data-partition-id\": \"" + dataPartitionId + "\"\n" + + " },\n" + + " \"dataVersion\": \"1.0\",\n" + + " \"metadataVersion\": \"1\",\n" + + " \"eventTime\": \"2020-08-14T18:04:06\",\n" + + " \"topic\": \"records-changed\"\n" + + " }\n" + + "]"; + return new Message(body); + } + + private Message getInvalidMessage() { + + String body = + "[\n" + + " {\n" + + " \"id\": \"2425\",\n" + + " \"eventType\": \"recordInserted\",\n" + + " \"subject\": \"myapp/vehicles/motorcycles\",\n" + + " \"data\": {\n" + + " \"data\":" + inValidData + ",\n" + + " \"data-partition-id\": \"" + dataPartitionId + "\"\n" + + " },\n" + + " \"dataVersion\": \"1.0\",\n" + + " \"metadataVersion\": \"1\",\n" + + " \"eventTime\": \"2020-08-14T18:04:12+00:00\",\n" + + " \"topic\": \"records-changed\"\n" + + " }\n" + + "]"; + Message message = new Message(body); + return message; + } +} diff --git a/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/ServiceBusRequestBodyExtractorTest.java b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/ServiceBusRequestBodyExtractorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6f9729ecd25dabd2118d8adf33643928587e1c96 --- /dev/null +++ b/provider/notification-azure/src/test/java/org/opengroup/osdu/notification/provider/azure/messageBus/ServiceBusRequestBodyExtractorTest.java @@ -0,0 +1,101 @@ +// Copyright © Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.notification.provider.azure.messageBus; + +import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.Message; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.junit.MockitoJUnitRunner; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.provider.azure.messageBus.extractor.ServiceBusRequestBodyExtractor; +import org.springframework.http.HttpStatus; + +import java.util.Map; + +import static org.junit.Assert.fail; + +@RunWith(MockitoJUnitRunner.class) +public class ServiceBusRequestBodyExtractorTest { + @InjectMocks + private ServiceBusRequestBodyExtractor sut; + private static final String inValidData = "[{\"invalidData\"}]"; + private static final String validData = "[{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"},{\"id\":\"opendes:doc:\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}]"; + private static final String dataPartitionId = "opendes"; + private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85"; + private static final String accountId = "ab47b48b7a85-30c5"; + + @Test + public void should_throwWhenAttributesAreMissing_extractDataFromRequestBody() { + IMessage message = getInvalidMessage(); + try { + sut.InitializeExtractor(message); + fail("Should Throw Exception"); + } catch (AppException appException) { + Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), appException.getError().getCode()); + Assert.assertEquals("Unable to parse request payload.", appException.getError().getMessage()); + } catch (Exception exception) { + fail("Should Throw AppException"); + } + + } + + @Test + public void shouldReturnNotificationDataAndAttributesWhenValidRequestBody() { + IMessage message = getValidMessage(); + try { + sut.InitializeExtractor(message); + String notificationData = sut.extractDataFromRequestBody(); + Assert.assertEquals(notificationData, validData); + Map<String, String> attributes = sut.extractAttributesFromRequestBody(); + Assert.assertEquals(attributes.get("account-id"), accountId); + Assert.assertEquals(attributes.get("correlation-id"), correlationId); + Assert.assertEquals(attributes.get("data-partition-id"), dataPartitionId); + + } catch (Exception exception) { + fail("Should not Throw AppException"); + } + + } + + private Message getValidMessage() { + + String body = + " {\n" + + " \"message\": {\n" + + " \"data\":" + validData + ",\n" + + " \"account-id\": \"" + accountId + "\",\n" + + " \"correlation-id\": \"" + correlationId + "\",\n" + + " \"data-partition-id\": \"" + dataPartitionId + "\"\n" + + " }\n" + + " }"; + return new Message(body); + } + + private Message getInvalidMessage() { + String body = + " {\n" + + " \"message\": {\n" + + " \"data\":" + inValidData + ",\n" + + " \"correlation-id\": \"" + correlationId + "\",\n" + + " \"data-partition-id\": \"" + dataPartitionId + "\"\n" + + " }\n" + + " }"; + Message message = new Message(body); + return message; + } +}