Commit 36484e7d authored by Komal Makkar's avatar Komal Makkar
Browse files

Intermediate changes

parent 0672d731
......@@ -86,60 +86,50 @@ public class PubsubEndpoint {
@PostMapping("/records-changed")
@PreAuthorize("@authorizationFilter.hasAnyPermission('" + Config.OPS + "', '" + Config.PUBSUB + "')")
public ResponseEntity recordChanged() throws Exception {
try {
if (this.pubsubRequestBodyExtractor.isHandshakeRequest()) {
String handshakeResponse = this.pubsubHandshakeHandler.getHandshakeResponse();
return ResponseEntity.ok(handshakeResponse);
}
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.info("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
String notificationId = this.pubsubRequestBodyExtractor.extractNotificationIdFromRequestBody();
String pubsubMessage = this.pubsubRequestBodyExtractor.extractDataFromRequestBody();
Map<String, String> headerAttributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
Subscription subscription = getSubscriptionFromCache(notificationId);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl = "";
Map<String, String> requestHeader = new HashMap<>();
if (secretType.equalsIgnoreCase(HMAC_TYPE)) {
this.log.info("receiving pubsub message, will send out hmac type request, pubsub message: " + pubsubMessage);
HmacSecret hmacSecret = (HmacSecret) secret;
String signedjwt = this.signatureService.getSignedSignature(endpoint, hmacSecret.getValue());
pushUrl = endpoint + "?hmac=" + signedjwt;
} else if (secretType.equalsIgnoreCase(GSA_TYPE)) {
this.log.info("receiving pubsub message, will send out gsa type request, pubsub message: " + pubsubMessage);
GsaSecret gsaSecret = (GsaSecret) secret;
GsaSecretValue gsaSecretValue = gsaSecret.getValue();
JsonParser jsonParser = new JsonParser();
JsonElement root = jsonParser.parse(gsaSecretValue.getKey());
String keyString = root.getAsJsonObject().toString();
String idToken = this.gsaTokenProvider.getIdToken(keyString, gsaSecretValue.getAudience());
pushUrl = endpoint;
requestHeader.put("Authorization", idToken);
}
this.log.info("sending out notification to endpoint: " + endpoint);
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
if (!response.isSuccessCode()) {
this.log.error(NOT_ACKNOWLEDGE);
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
}
return ResponseEntity.badRequest().body(NOT_ACKNOWLEDGE);
this.log.info(ACKNOWLEDGE);
return ResponseEntity.ok(ACKNOWLEDGE);
}
private Subscription getSubscriptionFromCache(String notificationId) throws Exception {
......
......@@ -15,22 +15,26 @@
package org.opengroup.osdu.notification.provider.azure.models;
import com.google.gson.JsonArray;
import com.google.gson.annotations.SerializedName;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NotificationData {
private String data;
private JsonArray data;
private Map<String,String> attributes;
@SerializedName("correlation-id")
private String correlationId;
private String messageId;
}
@SerializedName("account-id")
private String accountId;
@SerializedName("data-partition-id")
private String dataPartitionId;
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import lombok.SneakyThrows;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.notification.provider.azure.models.HandshakeRequestData;
......@@ -32,10 +33,11 @@ import org.springframework.web.context.annotation.RequestScope;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.util.Base64;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// TODO : Make error messages as constants.
// TODO : Add TracePoints to exception logs so that monitoring is done well.
......@@ -43,6 +45,7 @@ import java.util.stream.Stream;
@Component
@RequestScope
public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtractor {
public static final String AUTHORIZATION = "Authorization";
private static final String INVALID_EVENTGRID_MESSAGE = "Invalid Event Grid Message";
private static final String SUBSCRIPTION_ID = "Aeg-Subscription-Name";
private static final String EVENTGRID_VALIDATION_EVENT = "Microsoft.EventGrid.SubscriptionValidationEvent";
......@@ -76,7 +79,12 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
logger.error("Invalid Event Grid Message. Is a handshake request");
return null;
}
return this.notificationData.getAttributes();
Map<String, String> attributes= new HashMap<>();
attributes.put("correlation-id", this.notificationData.getCorrelationId());
attributes.put("data-partition-id", this.notificationData.getDataPartitionId());
attributes.put("account-id", this.notificationData.getAccountId());
attributes.put("authorization", extractAuthHeaderFromRequestBody());
return attributes;
}
/**
......@@ -90,7 +98,7 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
logger.error("Invalid Event Grid Message. Is a handshake request");
return null;
}
return new String(Base64.getDecoder().decode(notificationData.getData()));
return notificationData.getData().toString();
}
/**
......@@ -105,7 +113,16 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
logger.error("Invalid Event Grid Message. Subscription Id is null or empty");
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Invalid Event Grid Message", "Subscription ID not found");
}
return subscriptionId;
return subscriptionId.substring(0, subscriptionId.length() - 7);
}
private String extractAuthHeaderFromRequestBody() {
String authorization = httpServletRequest.getHeader(AUTHORIZATION);
if (Strings.isNullOrEmpty(authorization)) {
logger.error("Invalid Event Grid Message. Subscription Id is null or empty");
throw new AppException(HttpStatus.BAD_REQUEST.value(), "Invalid Event Grid Message", "Subscription ID not found");
}
return authorization;
}
/**
......@@ -141,13 +158,12 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
* @throws AppException
* @return NotificationRequest Object
*/
@SneakyThrows
private NotificationRequest extractNotificationRequestFromHttpRequest() {
NotificationRequest notificationRequest = null;
if (this.notificationRequest == null) {
try {
BufferedReader reader = httpServletRequest.getReader();
Stream<String> lines = reader.lines();
String requestBody = lines.collect(Collectors.joining("\n"));
String requestBody = getBody(this.httpServletRequest);
NotificationRequest[] notificationRequestArray = GSON.fromJson(requestBody, NotificationRequest[].class);
notificationRequest = notificationRequestArray[0];
......@@ -167,7 +183,7 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
private void extractHandshakeData(NotificationRequest notificationRequest) {
this.handshakeRequestData = GSON.fromJson(notificationRequest.getData(), HandshakeRequestData.class);
Preconditions.checkNotNull(this.handshakeRequestData.getValidationCode(), "Request payload parsing error" );
Preconditions.checkNotNull(this.handshakeRequestData.getValidationCode(), "Request payload parsing error handshkae" );
}
private void extractNotificationData(NotificationRequest notificationRequest) {
......@@ -183,10 +199,40 @@ public class EventGridRequestBodyExtractor implements IPubsubRequestBodyExtracto
// Making Gson use a custom constructor, involves boilerplating, hence adding precondition checks. https://github.com/google/gson/blob/master/UserGuide.md#TOC-Writing-a-Deserializer
// The required condition is not operation as of now.https://github.com/google/gson/issues/61
private void verifyNotificationData(NotificationData notificationData) {
Preconditions.checkNotNull(notificationData, "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getData(), "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getMessageId(), "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getAttributes().get("correlation-id") , "Request payload parsing error" );
Preconditions.checkNotNull(notificationData.getAttributes().get("data-partition-id") , "Request payload parsing error" );
Preconditions.checkNotNull(notificationData, "Request payload parsing error Data" );
Preconditions.checkNotNull(notificationData.getData(), "Request payload parsing error data data" );
Preconditions.checkNotNull(notificationData.getCorrelationId() , "Request payload parsing error coorelation" );
Preconditions.checkNotNull(notificationData.getDataPartitionId() , "Request payload parsing error partition" );
}
private String getBody(HttpServletRequest request) throws IOException {
String body = null;
StringBuilder stringBuilder = new StringBuilder();
BufferedReader bufferedReader = null;
try {
InputStream inputStream = request.getInputStream();
if (inputStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
char[] charBuffer = new char[128];
int bytesRead = -1;
while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
stringBuilder.append(charBuffer, 0, bytesRead);
}
} else {
stringBuilder.append("");
}
} catch (IOException ex) {
throw ex;
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException ex) {
throw ex;
}
}
}
body = stringBuilder.toString();
return body;
}
}
......@@ -14,12 +14,6 @@
package org.opengroup.osdu.notification.provider.azure.util;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.http.Request;
import org.opengroup.osdu.notification.di.RequestInfoExt;
import static org.opengroup.osdu.core.common.http.ResponseHeaders.STANDARD_RESPONSE_HEADERS;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubHandshakeHandler;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -32,11 +26,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.time.Duration;
import static org.opengroup.osdu.core.common.http.ResponseHeaders.STANDARD_RESPONSE_HEADERS;
@Component
......@@ -56,7 +48,8 @@ public class HandshakeFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
if(this.requestBodyExtractor.isHandshakeRequest()){
HttpServletRequest request = (HttpServletRequest) servletRequest;
if(request.getServletPath().contains("records-changed") && this.requestBodyExtractor.isHandshakeRequest()){
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
httpServletResponse.setStatus(200);
httpServletResponse.setContentType(MediaType.APPLICATION_JSON_VALUE);
......@@ -74,8 +67,7 @@ public class HandshakeFilter implements Filter {
public void destroy() { }
private void setResponseHeaders(HttpServletResponse httpServletResponse) {
Map<String, List<Object>> standardHeaders = STANDARD_RESPONSE_HEADERS;
for (Map.Entry<String, List<Object>> header : standardHeaders.entrySet()) {
for (Map.Entry<String, List<Object>> header : STANDARD_RESPONSE_HEADERS.entrySet()) {
httpServletResponse.addHeader(header.getKey(), header.getValue().toString());
}
}
......
......@@ -14,90 +14,23 @@
package org.opengroup.osdu.notification.provider.azure.util;
import com.auth0.jwt.JWT;
import com.microsoft.aad.adal4j.AuthenticationContext;
import com.microsoft.aad.adal4j.AuthenticationResult;
import com.microsoft.aad.adal4j.ClientCredential;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.search.IdToken;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.IJwtCache;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient;
import org.opengroup.osdu.notification.provider.interfaces.IPubsubRequestBodyExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.MalformedURLException;
import java.util.concurrent.*;
import java.util.Map;
@Component
public class ServiceAccountJwtAzureClientImpl implements IServiceAccountJwtClient {
@Autowired
private AppProperties config;
@Autowired
private ITenantFactory tenantInfoServiceProvider;
@Autowired
private IJwtCache tenantJwtCache;
private IPubsubRequestBodyExtractor pubsubRequestBodyExtractor;
public String getIdToken(String tenantName) {
TenantInfo tenant = this.tenantInfoServiceProvider.getTenantInfo(tenantName);
if (tenant == null) {
throw new AppException(HttpStatus.SC_BAD_REQUEST, "Invalid tenant Name", "Invalid tenant Name from azure");
}
String ACCESS_TOKEN = "";
ExecutorService service = null;
try {
// TODO : Refactor to move ID token form Common.Core.model.search to Common.core
IdToken cachedToken = (IdToken) this.tenantJwtCache.get(tenant.getName());
if ((cachedToken != null) && !IdToken.refreshToken(cachedToken)) {
return "Bearer " + cachedToken.getTokenValue();
}
// TODO : Control the thread count via config and pool should be created once.
service = Executors.newFixedThreadPool(1);
ACCESS_TOKEN = getAccessToken(service);
IdToken idToken = IdToken.builder().tokenValue(ACCESS_TOKEN).expirationTimeMillis(JWT.decode(ACCESS_TOKEN).getExpiresAt().getTime()).build();
this.tenantJwtCache.put(tenant.getName(), idToken);
} finally {
if(service != null) {
service.shutdown();
}
}
return "Bearer " + ACCESS_TOKEN;
}
// TODO : Refactor for making it test-able.
// THIS METHOD IS PUBLIC ONLY TO ENABLE UNIT TESTING
public String getAccessToken(ExecutorService service) {
AuthenticationContext context = null;
ClientCredential credential = null;
String ACCESS_TOKEN = null;
try {
context = new AuthenticationContext(this.config.getAuthURL(), false, service);
credential = new ClientCredential(this.config.getAuthClientID(), this.config.getAuthClientSecret());
Future<AuthenticationResult> future = context.acquireToken(this.config.getAadClientID(), credential, null);
if (future == null) {
throw new AppException(HttpStatus.SC_FORBIDDEN, "Token not generated", "The user is not authorized to obtain Token From AAD");
}
ACCESS_TOKEN = future.get().getAccessToken();
} catch (MalformedURLException malformedURLException) {
malformedURLException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return ACCESS_TOKEN;
Map<String, String> attributes = this.pubsubRequestBodyExtractor.extractAttributesFromRequestBody();
return attributes.get(DpsHeaders.AUTHORIZATION);
}
}
/*
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
......@@ -410,3 +411,4 @@ public class EventGridRequestBodyExtractorTest {
}
}
}
*/
/*
// Copyright © Microsoft Corporation
//
......@@ -76,3 +77,4 @@ public class EventGridHandshakeHandlerTest {
}
}
}
*/
/*
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
......@@ -143,3 +144,4 @@ public class ServiceAccountClientImplTest {
}
}
*/
......@@ -36,23 +36,23 @@ public class PubsubEndpointHMACDescriptor extends RestDescriptor {
@Override
public String getValidBody() {
return "[{\n" +
return "[\n" +
" {\n" +
" \"id\": \"2425\",\n" +
" \"eventType\": \"recordInserted\",\n" +
" \"subject\": \"myapp/vehicles/motorcycles\",\n" +
" \"data\": {\n" +
" \"attributes\": {\n" +
" \"correlation-id\": \" "+ UUID.randomUUID()+ "\",\n" +
" \"data-partition-id\": \"" + TestUtils.getOsduTenant() + "\"\n" +
" },\n" +
" \"data\": \"W3sia2luZCI6InRlc3RraW5kIiwiaWQiOiJ0ZXN0aWQiLCJvcGVyYXRpb250eXBlIjoiY3JlYXRlIn0seyJraW5kIjoidGVzdGtpbmQyIiwiaWQiOiJ0ZXN0aWQyIiwib3BlcmF0aW9udHlwZSI6InVwZGF0ZSJ9XQ\",\n" +
" \"messageId\": \"136969346945\"\n" +
" \"data\": {},\n" +
" \"account-id\": \"asdf\",\n" +
" \"correlation-id\": \""+ UUID.randomUUID() +"\",\n" +
" \"data-partition-id\": \""+ TestUtils.getOsduTenant() +"\"\n" +
" },\n" +
" \"dataVersion\": \"1.0\",\n" +
" \"metadataVersion\": \"1\",\n" +
" \"eventTime\": \"2020-08-14T18:04:12+00:00\",\n" +
" \"topic\": \"/subscriptions/c99e2bf3-1777-412b-baba-d823676589c2/resourceGroups/komakkar-OSDU-RG/providers/Microsoft.EventGrid/topics/recordChanged\"\n" +
" }]";
" \"topic\": \"records-changed\"\n" +
" }\n" +
"]";
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment