diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubHandshakeHandler.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubHandshakeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..8907662353338194c8b6b065d9f23b0a9dd191d6 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubHandshakeHandler.java @@ -0,0 +1,16 @@ +package org.opengroup.osdu.notification.provider.azure.pubsub; + +import org.opengroup.osdu.notification.pubsub.IPubsubHandshakeHandler; +import org.springframework.context.annotation.Lazy; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; + +@Component +@Lazy +public class PubsubHandshakeHandler implements IPubsubHandshakeHandler { + + @Override + public ResponseEntity getHandshakeResponse() { + return null; + } +} diff --git a/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubRequestBodyExtractor.java b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubRequestBodyExtractor.java new file mode 100644 index 0000000000000000000000000000000000000000..61ae96440ccdb06676ae1f6fa62652887d8b1b84 --- /dev/null +++ b/provider/notification-azure/src/main/java/org/opengroup/osdu/notification/provider/azure/pubsub/PubsubRequestBodyExtractor.java @@ -0,0 +1,137 @@ +/* + * Copyright 2017-2020, Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.notification.provider.azure.pubsub; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.notification.models.MessageContent; +import org.opengroup.osdu.notification.pubsub.IPubsubRequestBodyExtractor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import org.springframework.web.context.annotation.RequestScope; + +import javax.servlet.http.HttpServletRequest; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Component +@RequestScope +public class PubsubRequestBodyExtractor implements IPubsubRequestBodyExtractor { + private static final String INVALID_PUBSUB_MESSAGE = "Invalid pubsub message"; + private static final Gson GSON = new Gson(); + private MessageContent messageContent; + private JsonObject root = null; + + @Autowired + private HttpServletRequest request; + + @Autowired + private JaxRsDpsLog log; + + public Map<String, String> extractAttributesFromRequestBody() { + if (this.messageContent == null) { + this.messageContent = this.extractPubsubMessageFromRequestBody(); + } + return this.messageContent.getAttributes(); + } + + public String extractDataFromRequestBody() { + if (this.messageContent == null) { + this.messageContent = this.extractPubsubMessageFromRequestBody(); + } + return this.messageContent.getData(); + } + + public String extractNotificationIdFromRequestBody() { + if (this.root == null) { + this.root = this.extractRootJsonElementFromRequestBody(); + } + JsonElement subscription = this.root.get("subscription"); + if (subscription == null) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), INVALID_PUBSUB_MESSAGE, "subscription object not found"); + } + + String[] fullNotificationId = subscription.getAsString().split("/"); + return fullNotificationId[fullNotificationId.length - 1]; + } + + public boolean isHandshakeRequest() { + return false; + } + + private MessageContent extractPubsubMessageFromRequestBody() { + if (this.root == null) { + this.root = this.extractRootJsonElementFromRequestBody(); + } + JsonElement message = this.root.get("message"); + if (message == null) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), INVALID_PUBSUB_MESSAGE, "message object not found"); + } + MessageContent content = GSON.fromJson(message.toString(), MessageContent.class); + + Map<String, String> attributes = content.getAttributes(); + if (attributes == null || attributes.isEmpty()) { + log.error("Incorrect Message: " + message.toString() ); + throw new AppException(HttpStatus.BAD_REQUEST.value(), INVALID_PUBSUB_MESSAGE, "attribute map not found"); + } + String data = content.getData(); + if (Strings.isNullOrEmpty(data)) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), INVALID_PUBSUB_MESSAGE, "data field not found"); + } + Map<String, String> lowerCase = new HashMap<>(); + attributes.forEach((key, value) -> lowerCase.put(key.toLowerCase(), value)); + if (Strings.isNullOrEmpty(attributes.get("data-partition-id"))) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), INVALID_PUBSUB_MESSAGE, + "No tenant information from pubsub message."); + } + content.setAttributes(lowerCase); + + String decoded = new String(Base64.getDecoder().decode(data)); + content.setData(decoded); + + return content; + } + + private JsonObject extractRootJsonElementFromRequestBody() { + try { + JsonParser jsonParser = new JsonParser(); + BufferedReader reader = request.getReader(); + Stream<String> lines = reader.lines(); + String requestBody = lines.collect(Collectors.joining("\n")); + JsonElement rootElement = jsonParser.parse(requestBody); + if (!(rootElement instanceof JsonObject)) { + throw new AppException(HttpStatus.BAD_REQUEST.value(), "RequestBody is not JsonObject.", + "Request Body should be JsonObject to be processed."); + } + return rootElement.getAsJsonObject(); + } catch (IOException e) { + throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Request payload parsing error", + "Unable to parse request payload.", e); + } + } +}