Commit fa951c1b authored by Nikhil Singh[MicroSoft]'s avatar Nikhil Singh[MicroSoft]
Browse files

Commit 3 Contents:

1-Merge branch 'master'
parents 59904b59 4845fc47
Pipeline #54734 passed with stages
in 25 minutes and 12 seconds
This diff is collapsed.
...@@ -16,29 +16,13 @@ ...@@ -16,29 +16,13 @@
package org.opengroup.osdu.notification.api; package org.opengroup.osdu.notification.api;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.cryptographic.ISignatureService;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
import org.opengroup.osdu.core.common.http.HttpResponse; import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor; import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.opengroup.osdu.notification.service.NotificationHandler;
import org.opengroup.osdu.notification.utils.Config; import org.opengroup.osdu.notification.utils.Config;
import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
...@@ -46,12 +30,8 @@ import org.springframework.web.bind.annotation.RequestMapping; ...@@ -46,12 +30,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope; import org.springframework.web.context.annotation.RequestScope;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
@RestController @RestController
@RequestScope @RequestScope
@RequestMapping("/push-handlers") @RequestMapping("/push-handlers")
...@@ -59,29 +39,12 @@ public class PubsubEndpoint { ...@@ -59,29 +39,12 @@ public class PubsubEndpoint {
@Autowired @Autowired
private IPubsubRequestBodyExtractor pubsubRequestBodyExtractor; private IPubsubRequestBodyExtractor pubsubRequestBodyExtractor;
@Autowired @Autowired
private IPubsubHandshakeHandler pubsubHandshakeHandler; private NotificationHandler notificationHandler;
@Autowired
private ISignatureService signatureService;
@Autowired
private HttpClient httpClient;
@Autowired
private IGoogleServiceAccount gsaTokenProvider;
@Autowired @Autowired
private JaxRsDpsLog log; private JaxRsDpsLog log;
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private DpsHeaders headers;
private static final String HMAC_TYPE = "HMAC";
private static final String GSA_TYPE = "GSA";
private final int WAITING_TIME = 30000;
private final String ACKNOWLEDGE = "message acknowledged by client"; private final String ACKNOWLEDGE = "message acknowledged by client";
private final String NOT_ACKNOWLEDGE = "message not acknowledged by client"; private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
private static final Gson gson = new Gson();
private ObjectMapper objectMapper;
@PostMapping("/records-changed") @PostMapping("/records-changed")
@PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')") @PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')")
...@@ -89,92 +52,12 @@ public class PubsubEndpoint { ...@@ -89,92 +52,12 @@ public class PubsubEndpoint {
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody(); String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody(); String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody(); Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
HttpResponse response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.debug("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.debug("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.debug("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) { if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE); this.log.error(NOT_ACKNOWLEDGE + response.getBody());
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE); return new ResponseEntity<String>(NOT_ACKNOWLEDGE, HttpStatus.valueOf(response.getResponseCode()));
} }
this.log.debug(ACKNOWLEDGE); this.log.debug(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE); return new ResponseEntity<String>(ACKNOWLEDGE, HttpStatus.OK);
}
private Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException {
String subscriptionString = subscriptionCacheFactory.get(notificationId);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId);
ObjectMapper objectMapper = this.getObjectMapper();
Subscription subscription = objectMapper.readValue(subscriptionString, Subscription.class);
return subscription;
} catch (IOException e) {
this.log.warning("Error Parsing subscription String to object.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
this.log.warning("Error query subscription from registration.");
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error in getting subscription for notificationId:" + notificationId, "Unexpected error in pushing message", se);
}
}
private String querySubscriptionAndUpdateCache(String notificationId) throws AppException, SubscriptionException {
ISubscriptionService service = subscriptionFactory.create(headers);
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
this.log.warning(String.format("Subscription with notification ID %s not found in registration", notificationId));
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
return jsonSubscription;
}
//unit test purpose
protected ObjectMapper getObjectMapper() {
if (this.objectMapper == null) {
this.objectMapper = new ObjectMapper();
}
return this.objectMapper;
}
//unit test purpose
void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
} }
} }
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.auth;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.opengroup.osdu.core.common.model.notification.GsaSecret;
import org.opengroup.osdu.core.common.model.notification.GsaSecretValue;
import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
import java.util.HashMap;
import java.util.Map;
@Component
public class GsaAuth implements SecretAuth {
@Autowired
private IGoogleServiceAccount gsaTokenProvider;
private GsaSecret gsaSecret;
public void setSecret(Secret secret) {
this.gsaSecret = (GsaSecret) secret;
}
public Secret getSecret() {
return this.gsaSecret;
}
public String getPushUrl(String endpoint) throws Exception {
return endpoint;
}
public Map<String, String> getRequestHeaders() {
Map<String, String> requestHeader = new HashMap<>();
if (gsaSecret != null) {
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
requestHeader.put("Authorization", idToken);
}
return requestHeader;
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.auth;
import org.opengroup.osdu.core.common.cryptographic.ISignatureService;
import org.opengroup.osdu.core.common.model.notification.HmacSecret;
import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
import java.util.HashMap;
import java.util.Map;
@Component
public class HmacAuth implements SecretAuth {
@Autowired
private ISignatureService signatureService;
private HmacSecret hmacSecret;
public void setSecret(Secret secret) {
this.hmacSecret = (HmacSecret) secret;
}
public Secret getSecret() {
return this.hmacSecret;
}
public String getPushUrl(String endpoint) throws Exception {
String pushUrl = endpoint;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl += "?hmac=" + signedjwt;
return pushUrl;
}
public Map<String, String> getRequestHeaders() {
Map<String, String> requestHeader = new HashMap<>();
return requestHeader;
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.auth.factory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.auth.GsaAuth;
import org.opengroup.osdu.notification.auth.HmacAuth;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
@Component
public class AuthFactory {
private final String HMAC_TYPE = "HMAC";
private final String GSA_TYPE = "GSA";
@Autowired
private HmacAuth hmacAuth;
@Autowired
private GsaAuth gsaAuth;
@Autowired
private JaxRsDpsLog log;
public SecretAuth getSecretAuth(String secretType) {
switch (secretType.toUpperCase()) {
case HMAC_TYPE:
return hmacAuth;
case GSA_TYPE:
return gsaAuth;
default:
throw new AppException(404, "Secret Type Not Found", "Unrecognised secret type encountered :" + secretType);
}
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.auth.interfaces;
import org.opengroup.osdu.core.common.model.notification.Secret;
import java.util.Map;
public interface SecretAuth {
String getPushUrl(String endpoint) throws Exception;
void setSecret(Secret secret);
Secret getSecret();
Map<String, String> getRequestHeaders();
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.service;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.*;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
import java.util.HashMap;
import java.util.Map;
@Component
public class NotificationHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(NotificationHandler.class);
@Autowired
private HttpClient httpClient;
@Autowired
private SubscriptionHandler subscriptionHandler;
@Autowired
private AuthFactory authFactory;
@Value("${app.waitingTime:30000}")
private int WAITING_TIME;
public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<String, String>();
// Authentication Secret
SecretAuth secretAuth = authFactory.getSecretAuth(secretType);
secretAuth.setSecret(secret);
pushUrl = secretAuth.getPushUrl(endpoint);
requestHeader = secretAuth.getRequestHeaders();
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
this.LOGGER.debug("Sending out notification to endpoint: " + endpoint);
return response;
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.notification.di.SubscriptionCacheFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
public class SubscriptionHandler {
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private SubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private JaxRsDpsLog log;
@Autowired
private DpsHeaders headers;
private static final Gson gson = new Gson();
private ObjectMapper objectMapper;
public Subscription getSubscriptionFromCache(String notificationId) throws IOException, SubscriptionException {
String subscriptionString = subscriptionCacheFactory.get(notificationId);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId);
ObjectMapper objectMapper = this.getObjectMapper();
Subscription subscription = objectMapper.readValue(subscriptionString, Subscription.class);
return subscription;
} catch (IOException e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error Parsing subscription String to object", "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error query subscription from registration", "Unexpected error in pushing message", se);
}
}
private String querySubscriptionAndUpdateCache(String notificationId) throws AppException, SubscriptionException {
ISubscriptionService service = subscriptionFactory.create(headers);
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription);
return jsonSubscription;
}
//unit test purpose
protected ObjectMapper getObjectMapper() {