Commit ce8c64f8 authored by Nikhil Singh[MicroSoft]'s avatar Nikhil Singh[MicroSoft]
Browse files

Notification Service Redesign

parent 1e3e19d3
Pipeline #45784 passed with stages
in 19 minutes and 46 seconds
......@@ -17,27 +17,9 @@
package org.opengroup.osdu.notification.api;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cryptographic.ISignatureService;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.opengroup.osdu.notification.provider.interfaces.*;
import org.opengroup.osdu.notification.service.interfaces.INotificationHandler;
import org.opengroup.osdu.notification.utils.Config;
import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
......@@ -45,13 +27,8 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestScope
@RequestMapping("/push-handlers")
......@@ -59,122 +36,15 @@ public class PubsubEndpoint {
@Autowired
private IPubsubRequestBodyExtractor pubsubRequestBodyExtractor;
@Autowired
private IPubsubHandshakeHandler pubsubHandshakeHandler;
@Autowired
private ISignatureService signatureService;
@Autowired
private HttpClient httpClient;
@Autowired
private IGoogleServiceAccount gsaTokenProvider;
@Autowired
private JaxRsDpsLog log;
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private DpsHeaders headers;
private static final String HMAC_TYPE = "HMAC";
private static final String GSA_TYPE = "GSA";
private final int WAITING_TIME = 30000;
private final String ACKNOWLEDGE = "message acknowledged by client";
private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
private static final Gson gson = new Gson();
private ObjectMapper objectMapper;
private INotificationHandler notificationHandler;
@PostMapping("/records-changed")
@PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')")
public ResponseEntity recordChanged() throws Exception {
// TODO: Couple the request body data into a pubsub object
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.info("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
}
private Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException {
String subscriptionString = subscriptionCacheFactory.get(notificationId);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId);
ObjectMapper objectMapper = this.getObjectMapper();
Subscription subscription = objectMapper.readValue(subscriptionString, Subscription.class);
return subscription;
} catch (IOException e) {
this.log.warning("Error Parsing subscription String to object.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
this.log.warning("Error query subscription from registration.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", se);
}
}
private String querySubscriptionAndUpdateCache(String notificationId) throws AppException, SubscriptionException {
ISubscriptionService service = subscriptionFactory.create(headers);
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
this.log.warning(String.format("Subscription with notification ID %s not found in registration", notificationId));
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
return jsonSubscription;
}
//unit test purpose
protected ObjectMapper getObjectMapper() {
if (this.objectMapper == null) {
this.objectMapper = new ObjectMapper();
}
return this.objectMapper;
}
//unit test purpose
void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
return notificationHandler.NotifySubscriber(notificationId,pubsubMessage,headerAttributes);
}
}
package org.opengroup.osdu.notification.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.opengroup.osdu.core.common.cryptographic.ISignatureService;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
import org.opengroup.osdu.notification.service.interfaces.INotificationHandler;
import org.opengroup.osdu.notification.service.interfaces.ISubscriptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import java.util.HashMap;
import java.util.Map;
public class NotificationHandler implements INotificationHandler {
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private ISignatureService signatureService;
@Autowired
private IGoogleServiceAccount gsaTokenProvider;
@Autowired
private JaxRsDpsLog log;
@Autowired
private HttpClient httpClient;
@Autowired
private ISubscriptionHandler subscriptionHandler;
private static final String HMAC_TYPE = "HMAC";
private static final String GSA_TYPE = "GSA";
private final int WAITING_TIME = 30000;
private final String ACKNOWLEDGE = "message acknowledged by client";
private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
public ResponseEntity NotifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) {
try {
Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
// TODO: Extract out logic for multiple authentication codes
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
this.log.info("sending out notification to endpoint: " + endpoint);
}catch(Exception e)
{
this.log.error("An exception occurred: " + e.getMessage());
}
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
}
}
package org.opengroup.osdu.notification.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.opengroup.osdu.notification.service.interfaces.ISubscriptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.List;
public class SubscriptionHandler implements ISubscriptionHandler {
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private JaxRsDpsLog log;
@Autowired
private DpsHeaders headers;
private static final Gson gson = new Gson();
private ObjectMapper objectMapper;
public Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException {
String subscriptionString = subscriptionCacheFactory.get(notificationId);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId);
ObjectMapper objectMapper = this.getObjectMapper();
Subscription subscription = objectMapper.readValue(subscriptionString, Subscription.class);
return subscription;
} catch (IOException e) {
this.log.warning("Error Parsing subscription String to object.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
this.log.warning("Error query subscription from registration.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", se);
}
}
private String querySubscriptionAndUpdateCache(String notificationId) throws AppException, SubscriptionException {
ISubscriptionService service = subscriptionFactory.create(headers);
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
this.log.warning(String.format("Subscription with notification ID %s not found in registration", notificationId));
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
return jsonSubscription;
}
//unit test purpose
protected ObjectMapper getObjectMapper() {
if (this.objectMapper == null) {
this.objectMapper = new ObjectMapper();
}
return this.objectMapper;
}
}
package org.opengroup.osdu.notification.service.interfaces;
import org.springframework.http.ResponseEntity;
import java.util.Map;
public interface INotificationHandler {
public ResponseEntity NotifySubscriber(String id, String notificationId, Map<String, String> headerAttributes);
}
package org.opengroup.osdu.notification.service.interfaces;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import java.io.IOException;
public interface ISubscriptionHandler {
public Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException;
}
......@@ -96,7 +96,6 @@ public class PubsubEndpointTests {
when(this.signatureService.getSignedSignature(any(), any())).thenReturn(SIGNED_SIGNATURE);
when(this.httpClient.send(any())).thenReturn(response);
when(this.subscriptionCacheFactory.get(any())).thenReturn(getHmacSubscription());
sut.setObjectMapper(new ObjectMapper());
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(200, responseEntity.getStatusCode().value());
Assert.assertEquals("message acknowledged by client", responseEntity.getBody().toString());
......@@ -109,7 +108,6 @@ public class PubsubEndpointTests {
when(this.googleIdTokenProducer.getIdToken(any(), any())).thenReturn(GOOGLE_ID_TOKEN);
when(this.httpClient.send(any())).thenReturn(response);
when(this.subscriptionCacheFactory.get(any())).thenReturn(getGsaSubscription());
sut.setObjectMapper(new ObjectMapper());
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(200, responseEntity.getStatusCode().value());
......@@ -123,7 +121,6 @@ public class PubsubEndpointTests {
when(this.signatureService.getSignedSignature(any(), any())).thenReturn(SIGNED_SIGNATURE);
when(this.httpClient.send(any())).thenReturn(response);
when(this.subscriptionCacheFactory.get(any())).thenReturn(getHmacSubscription());
sut.setObjectMapper(new ObjectMapper());
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(400, responseEntity.getStatusCode().value());
......@@ -137,7 +134,6 @@ public class PubsubEndpointTests {
when(this.googleIdTokenProducer.getIdToken(any(), any())).thenReturn(GOOGLE_ID_TOKEN);
when(this.httpClient.send(any())).thenReturn(response);
when(this.subscriptionCacheFactory.get(any())).thenReturn(getGsaSubscription());
sut.setObjectMapper(new ObjectMapper());
ResponseEntity responseEntity = this.sut.recordChanged();
Assert.assertEquals(400, responseEntity.getStatusCode().value());
......@@ -173,7 +169,6 @@ public class PubsubEndpointTests {
when(subscriptionService.query(any())).thenReturn(queryResult);
sut.setObjectMapper(this.objectMapper);
when(this.objectMapper.readValue(anyString(), any(Class.class))).thenThrow(new IOException());
this.sut.recordChanged();
fail("should throw AppException");
......@@ -190,7 +185,6 @@ public class PubsubEndpointTests {
when(this.credentialHeadersProvider.getObject()).thenReturn(new DpsHeaders());
ISubscriptionService subscriptionService = mock(SubscriptionService.class);
when(this.subscriptionFactory.create(any())).thenReturn(subscriptionService);
sut.setObjectMapper(new ObjectMapper());
String jsonSubscription = this.getHmacSubscription();
ObjectMapper objectMapper = new ObjectMapper();
......@@ -210,7 +204,6 @@ public class PubsubEndpointTests {
when(this.subscriptionCacheFactory.get(any())).thenReturn(getHmacSubscription());
when(this.signatureService.getSignedSignature(any(), any())).thenThrow(ex);
sut.setObjectMapper(new ObjectMapper());
this.sut.recordChanged();
}
......
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.pubsub;
package org.opengroup.osdu.notification.provider.azure.pubsub.extractor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
......
......@@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.pubsub;
package org.opengroup.osdu.notification.provider.azure.pubsub.utils;
import com.google.gson.JsonObject;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.pubsub.extractor.EventGridRequestBodyExtractor;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
@Component
......
......@@ -21,6 +21,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.pubsub.extractor.EventGridRequestBodyExtractor;
import org.springframework.http.HttpStatus;
import javax.servlet.ReadListener;
......
......@@ -24,12 +24,10 @@ import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.pubsub.EventGridHandshakeHandler;
import org.opengroup.osdu.notification.provider.azure.pubsub.EventGridRequestBodyExtractor;
import org.opengroup.osdu.notification.provider.azure.pubsub.utils.EventGridHandshakeHandler;
import org.opengroup.osdu.notification.provider.azure.pubsub.extractor.EventGridRequestBodyExtractor;
import org.springframework.http.HttpStatus;
import java.io.IOException;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment