Commit ed666aa1 authored by Komal Makkar's avatar Komal Makkar
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/notification into users/komakkar/EventGridAuth
parents e448b152 3a37042e
Pipeline #24278 failed with stages
in 20 minutes and 5 seconds
......@@ -48,4 +48,5 @@ The port and path for the service endpoint can be configured in ```application.p
```bash
server.servlet.contextPath=/
server.port=8080
```
\ No newline at end of file
```
# new update
......@@ -86,60 +86,50 @@ public class PubsubEndpoint {
@PostMapping("/records-changed")
@PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')")
public ResponseEntity recordChanged() throws Exception {
try {
if (this.pubsubRequestBodyExtractor.isHandshakeRequest()) {
String handshakeResponse = this.pubsubHandshakeHandler.getHandshakeResponse();
return ResponseEntity.ok(handshakeResponse);
}
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.info("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.info("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
}
private Subscription getSubscriptionFromCache(String notificationId) throws Exception {
......
......@@ -23,7 +23,9 @@ import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import java.util.Map;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.context.annotation.RequestScope;
......@@ -67,11 +69,6 @@ public class CredentialHeadersProvider implements FactoryBean<DpsHeaders> {
}
private DpsHeaders getDpsHeadersForPostPutPatch() throws Exception {
if(this.pubsubRequestBodyExtractor.isHandshakeRequest()) {
// The headers are not needed for the handshake requests.
return new DpsHeaders();
}
Map<String, String> attributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
try {
//extract headers from pubsub message
......@@ -80,7 +77,7 @@ public class CredentialHeadersProvider implements FactoryBean<DpsHeaders> {
attributes.put(DpsHeaders.AUTHORIZATION, authToken);
return DpsHeaders.createFromMap(attributes);
} catch (AppException e) {
throw new Exception("Failed to generate headers for register service.");
throw new Exception("Failed to generate headers for register service.", e);
}
}
}
\ No newline at end of file
......@@ -31,19 +31,4 @@ public class CredentialHeadersProviderTest {
when(httpRequest.getMethod()).thenReturn(RequestMethod.GET.toString());
assertNotNull(headersProvider.getObject());
}
@Test
public void testHandshake() throws Exception {
// set up
when(httpRequest.getMethod()).thenReturn(RequestMethod.GET.toString());
when(pubsubRequestBodyExtractor.isHandshakeRequest()).thenReturn(true);
// Act
DpsHeaders headers = headersProvider.getObject();
// Assert
assertNotNull(headers);
assertNull(headers.getCorrelationId());
assertNull(headers.getPartitionId());
}
}
\ No newline at end of file
......@@ -15,22 +15,28 @@
package org.opengroup.osdu.notification.provider.azure.models;
import com.google.gson.JsonArray;
import com.google.gson.annotations.SerializedName;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
// TODO: Move the contract to core and use it for both publishing and consuming.
@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NotificationData {
public class NotificationRecordsChangedData {
private String data;
private JsonArray data;
private Map<String,String> attributes;
@SerializedName("correlation-id")
private String correlationId;
private String messageId;
}
// TODO: Verify the need of account-id
@SerializedName("account-id")
private String accountId;
@SerializedName("data-partition-id")
private String dataPartitionId;
}
\ No newline at end of file
......@@ -19,23 +19,26 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import lombok.SneakyThrows;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.models.HandshakeRequestData;
import org.opengroup.osdu.notification.provider.azure.models.NotificationData;
import org.opengroup.osdu.notification.provider.azure.models.NotificationRecordsChangedData;
import org.opengroup.osdu.notification.provider.azure.models.NotificationRequest;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.context.annotation.RequestScope;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.util.Base64;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// TODO : Make error messages as constants.
// TODO : Add TracePoints to exception logs so that monitoring is done well.
......@@ -43,7 +46,7 @@ import java.util.stream.Stream;
@Component
@RequestScope
public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtractor {
private static final String INVALID_EVENTGRID_MESSAGE = "Invalid Event Grid Message";
private static final String SUBSCRIPTION_ID = "Aeg-Subscription-Name";
private static final String EVENTGRID_VALIDATION_EVENT = "Microsoft.EventGrid.SubscriptionValidationEvent";
private static final Gson GSON = new Gson();
......@@ -54,7 +57,7 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
private final JaxRsDpsLog logger;
private final NotificationRequest notificationRequest;
private NotificationData notificationData;
private NotificationRecordsChangedData notificationRecordsChangedData;
private HandshakeRequestData handshakeRequestData;
private boolean isHandshakeRequest;
......@@ -68,36 +71,40 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
/**
* Extracts the attributes from the request that are filled in by publisher of the message.
*
* @throws AppException
* @return Request Attributes Map
* @throws AppException
*/
public Map<String, String> extractAttributesFromRequestBody() {
if(isHandshakeRequest) {
if (isHandshakeRequest) {
logger.error("Invalid Event Grid Message. Is a handshake request");
return null;
}
return this.notificationData.getAttributes();
Map<String, String> attributes = new HashMap<>();
attributes.put("correlation-id", this.notificationRecordsChangedData.getCorrelationId());
attributes.put("data-partition-id", this.notificationRecordsChangedData.getDataPartitionId());
attributes.put("account-id", this.notificationRecordsChangedData.getAccountId());
return attributes;
}
/**
* Extracts the data from the request that are filled in by publisher of the message,
*
* @throws AppException
* @return Request Data String
* @throws AppException
*/
public String extractDataFromRequestBody() {
if(isHandshakeRequest) {
if (isHandshakeRequest) {
logger.error("Invalid Event Grid Message. Is a handshake request");
return null;
}
return new String(Base64.getDecoder().decode(notificationData.getData()));
return notificationRecordsChangedData.getData().toString();
}
/**
* Extracts the notificationId from the request that are filled in by EventGrid.
*
* @throws AppException
* @return Request NotificationId String.
* @throws AppException
*/
public String extractNotificationIdFromRequestBody() {
String subscriptionId = httpServletRequest.getHeader(SUBSCRIPTION_ID);
......@@ -105,27 +112,28 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
logger.error("Invalid Event Grid Message. Subscription Id is null or empty");
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Invalid Event Grid Message", "Subscription ID not found");
}
return subscriptionId;
return subscriptionId.toLowerCase();
}
/**
* Checks if the request is for handshake.
*
* @throws AppException
* @return Request Type Boolean
* @throws AppException
*/
public boolean isHandshakeRequest() {
return this.isHandshakeRequest ;
return this.isHandshakeRequest;
}
/**
* Return ValidationCode
*
* @throws AppException
* @return Request Type Boolean
* @throws AppException
*/
public String getValidationCodeForHandshake() {
if(!isHandshakeRequest) {
if (!isHandshakeRequest) {
logger.error("Invalid Event Grid Message. Is not a handshake request");
return null;
}
......@@ -135,19 +143,19 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
/**
* Utility method that extracts the core content in the request.
* This is what the publisher will publish.
*
* <p>
* The attributes are validated.
*
* @throws AppException
* @return NotificationRequest Object
* @throws AppException
*/
@SneakyThrows
// TODO : @komakkar sanitize the exceptions to match the SpringExceptionMapper and throw ValidationException
private NotificationRequest extractNotificationRequestFromHttpRequest() {
NotificationRequest notificationRequest = null;
if (this.notificationRequest == null) {
if (this.notificationRequest == null && this.httpServletRequest.getMethod().equalsIgnoreCase("post")) {
try {
BufferedReader reader = httpServletRequest.getReader();
Stream<String> lines = reader.lines();
String requestBody = lines.collect(Collectors.joining("\n"));
String requestBody = getBody(this.httpServletRequest);
NotificationRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationRequest[].class);
notificationRequest = notificationRequestArray[0];
......@@ -158,9 +166,8 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
extractNotificationData(notificationRequest);
}
} catch (Exception e) {
logger.error("Invalid Event Grid Message. %s", e.getMessage());
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Request payload parsing error",
"Unable to parse request payload.", "Request contents are null or empty");
"Unable to parse request payload.", "Request contents are null or empty", e);
}
}
return notificationRequest;
......@@ -168,13 +175,13 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
private void extractHandshakeData(NotificationRequest notificationRequest) {
this.handshakeRequestData = GSON.fromJson(notificationRequest.getData(), HandshakeRequestData.class);
Preconditions.checkNotNull(this.handshakeRequestData.getValidationCode(), "Request payload parsing error" );
Preconditions.checkNotNull(this.handshakeRequestData.getValidationCode(), "Request payload parsing error handshkae");
}
private void extractNotificationData(NotificationRequest notificationRequest) {
NotificationData notificationData = GSON.fromJson(notificationRequest.getData(), NotificationData.class);
verifyNotificationData(notificationData);
this.notificationData = notificationData;
NotificationRecordsChangedData notificationRecordsChangedData = GSON.fromJson(notificationRequest.getData(), NotificationRecordsChangedData.class);
verifyNotificationData(notificationRecordsChangedData);
this.notificationRecordsChangedData = notificationRecordsChangedData;
}
// TODO: Clean up for using @NonNull Lombok.
......@@ -183,11 +190,35 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
//
// Making Gson use a custom constructor, involves boilerplating, hence adding precondition checks. https://github.com/google/gson/blob/master/UserGuide.md#TOC-Writing-a-Deserializer
// The required condition is not operation as of now.https://github.com/google/gson/issues/61
private void verifyNotificationData(NotificationData notificationData) {
Preconditions.checkNotNull(notificationData, "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getData(), "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getMessageId(), "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getAttributes().get("correlation-id") , "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getAttributes().get("data-partition-id") , "Request payload parsing error" );
private void verifyNotificationData(NotificationRecordsChangedData notificationRecordsChangedData) {
Preconditions.checkNotNull(notificationRecordsChangedData, "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getData(), "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getCorrelationId(), "Request payload parsing error");
Preconditions.checkNotNull(notificationRecordsChangedData.getDataPartitionId(), "Request payload parsing error");
}
private String getBody(HttpServletRequest request) throws IOException {
String body;
StringBuilder stringBuilder = new StringBuilder();
BufferedReader bufferedReader = null;
try {
InputStream inputStream = request.getInputStream();
if (inputStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
char[] charBuffer = new char[128];
int bytesRead = -1;
while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
stringBuilder.append(charBuffer, 0, bytesRead);
}
} else {
stringBuilder.append("");
}
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
body = stringBuilder.toString();
return body;
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.notification.provider.azure.util;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
import static org.opengroup.osdu.core.common.http.ResponseHeaders.STANDARD_RESPONSE_HEADERS;
@Component
@Order(0)
public class HandshakeFilter implements Filter {
@Autowired
private IPubsubRequestBodyExtractor requestBodyExtractor;
@Autowired
private IPubsubHandshakeHandler handshakeHandler;
@Override
public void init(FilterConfig filterConfig) {
//do nothing
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
if(request.getServletPath().contains("records-changed") && this.requestBodyExtractor.isHandshakeRequest()){
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
httpServletResponse.setStatus(200);
httpServletResponse.setContentType(MediaType.APPLICATION_JSON_VALUE);
setResponseHeaders(httpServletResponse);
String handshakeResponse = this.handshakeHandler.getHandshakeResponse();
PrintWriter out = httpServletResponse.getWriter();
out.flush();
out.print(handshakeResponse);
return;
}
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() { }
private void setResponseHeaders(HttpServletResponse httpServletResponse) {
for (Map.Entry<String, List<Object>> header : STANDARD_RESPONSE_HEADERS.entrySet()) {
httpServletResponse.addHeader(header.getKey(), header.getValue().toString());
}
}
}
/*
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.*/
package org.opengroup.osdu.notification.provider.azure.util;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;