Skip to content
Snippets Groups Projects
Commit 772a3987 authored by Truong Le Quang's avatar Truong Le Quang
Browse files

feat: implement validation csv metadata endpoint

parent a3959be4
No related branches found
No related tags found
2 merge requests!130integer data type,!124Validation CSV metadata
Showing
with 454 additions and 1 deletion
......@@ -69,6 +69,11 @@ lazy val core =
//"org.opengroup.osdu.production" %% "pddms-messaging-connector-interface-lib-azure" % "0.3.0-test",
//"org.opengroup.osdu.production" %% "pddms-messaging-connector-interface-lib" % "0.3.0-test",
"com.google.protobuf" % "protobuf-java" % "3.25.5" % "protobuf",
"org.opengroup.osdu.production" %% "pddms-pekko-stream-commons-lib" % "0.3.0-test-3",
"org.opengroup.osdu.production" %% "pddms-messaging-connector-interface-lib-base" % "0.3.0-test-8",
"org.opengroup.osdu.production" %% "pddms-messaging-connector-interface-lib-azure" % "0.3.0-test-8",
"org.opengroup.osdu.production" %% "pddms-security-commons-lib-common" % "0.3.0-test-5",
"org.opengroup.osdu.production" %% "pddms-security-commons-lib-azure" % "0.3.0-test-5",
//"org.opengroup.osdu.production" %% "pddms-security-commons-lib" % "0.3.0-test-1",
//Newly added
//"org.json4s" %% "json4s-jackson" % "3.6.5",
......
package org.opengroup.osdu.production.common
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.opengroup.osdu.production.common.entities.JsonResponse
import org.scalactic.ErrorMessage
case class MessageResponse(code: Int, reason: String,message: ErrorMessage) extends JsonResponse {
override def toJson: String = {
implicit val formats = Serialization.formats(NoTypeHints)
Serialization.write(this)
}
}
package org.opengroup.osdu.production.common
import org.apache.pekko.http.scaladsl.model.StatusCode
import org.json4s.{CustomSerializer, JInt, JObject, JString}
import org.json4s.JsonDSL.*
import org.json4s.jackson.JsonMethods.parse
object ValidationMessages {
val emptyKind = "Kind in the request is missing or null"
val invalidKind = "Kind is not valid"
val emptyAcl = "Acl in the request is missing or null"
val emptyViewersOrOwners = "Viewers or owners in the [Acl] request is missing or null"
val invalidViewersOrOwners = "Invalid viewers or owners group name"
val emptyData = "Data section in the request is missing or null"
val emptyLegal = "Legal in the request is missing or null"
val invalidEndian = "Invalid endian. Endian is an enumeration with values BIG or LITTLE"
val emptyEndian = "Endian in the request is missing or null"
val emptyDatasetProperties = "Dataset properties section in the request is missing or null"
val emptyFileSourceInfo = "File source info in the request is missing or null"
val emptyFileSource = "File source in the request is missing or null"
val emptyFileSize = "File size in the request is missing or null"
val invalidFileSize = "File size must be a integer"
val emptyLegalTags = "Legal tags in the request is missing or null"
val emptyOtherRelevantDataCountries = "Other relevant data countries in the request is missing or null"
val invalidOtherRelevantDataCountries = "Other relevant data countries just supports only the US region"
val emptyExtensionProperty = "Extension properties section in the request is missing or null"
val emptyFileContentsDetails = "File contents details in the request is missing or null"
val emptyEntities = "Entities in the request is missing or null"
val emptySourceColumn = "Source field of entities in the request is missing or null"
val emptyValuesStorage = "Entities values field in the request is missing or null"
val emptyTimeSeries = "Time Series in the request is missing or null"
val emptyPoints = "Points of time series in the request is missing or null"
val emptyTimeZoneOfSet = "Time zone of set in the request is missing or null"
val invalidTimeZoneOfSet = "Invalid time zone offset. The valid time zone offset eg: ±HH:MM"
val validRequest = "Metadata request is valid"
val reasonFailed = "Failed metadata validation"
val reasonSuccess = "Successfully metadata validation"
val emptyValueType = "Value type field in the request is missing or null"
val emptyValueTypeIfDefineFormat = "Value type field in the request is missing or null if value type field is DATETIME"
val invalidValueType = "Invalid value type. Value type is an enumeration with values DATETIME or TIMESTAMP"
val invalidFormat = "Invalid format. Format must be matches with the format DATETIME patterns"
val emptyValueTimeSeries = "Values section of time series in the request is missing or null"
val invalidValueTypesTimeSeries = "Invalid value type %: %. Value type of values in the request must be matches list of type: %"
def createMessage(msg: String, args: String*): String = {
var message = msg
if (message.endsWith("%")) {
message += " ";
}
val array: Array[String] = message.split("%")
var messageBuffer = array.head
var i = 1
if (array.length - 1 == args.toArray.length) {
for (arg <- args) {
messageBuffer += arg
messageBuffer += array(i)
i = i + 1
}
messageBuffer.toString
} else
message
}
}
package org.opengroup.osdu.production.common
import org.json4s.{ShortTypeHints, native}
import org.opengroup.osdu.production.inputRequestJson.ValidationRequest
object ValidationRequestJsonMarshaller extends BaseMarshaller {
override def classList: List[Class[_]] = classOf[ValidationRequest] :: Nil
implicit val formats = native.Serialization.formats(ShortTypeHints(classList))
def unmarshalEntity(cmd: String): Option[ValidationRequest] = {
resolveEntity[ValidationRequest](cmd)
}
}
\ No newline at end of file
package org.opengroup.osdu.production.common
class ValueTypes {
val listOfValidValueTypes: List[String] = List("DOUBLE", "INTEGER", "LONG", "BOOLEAN", "STRING", "SET-STRING", "DATETIME")
}
package org.opengroup.osdu.production.handlers
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.opengroup.osdu.production.common.{ErrorMessageResponse, LogF, MessageResponse, ServiceResponse, ValidationMessages}
import org.opengroup.osdu.production.inputRequestJson.ValidationRequest
import org.opengroup.osdu.production.util.validation.MetadataValidator
import org.scalactic.{Bad, Good}
import scala.concurrent.{ExecutionContext, Future}
object ValidationMetadataHandler extends LogF {
def handle[AckType](validationRequest: ValidationRequest)(implicit ec: ExecutionContext): Future[ServiceResponse] = {
val kind = validationRequest.kind
if (kind.nonEmpty) {
MetadataValidator.validateKind(kind.get) match {
case Good(_) =>
MetadataValidator.validateCSVMetadata(validationRequest) match
case Good(r) => Future(ServiceResponse(StatusCodes.OK, MessageResponse(StatusCodes.OK.intValue, ValidationMessages.reasonSuccess, ValidationMessages.validRequest).toJson))
case Bad(msg) => Future(ServiceResponse(StatusCodes.BadRequest, MessageResponse(StatusCodes.BadRequest.intValue, ValidationMessages.reasonFailed, msg).toJson))
case Bad(msg) => Future(ServiceResponse(StatusCodes.BadRequest, MessageResponse(StatusCodes.BadRequest.intValue, ValidationMessages.reasonFailed, msg).toJson))
}
} else {
Future(ServiceResponse(StatusCodes.BadRequest, MessageResponse(StatusCodes.BadRequest.intValue, ValidationMessages.reasonFailed, ValidationMessages.emptyKind).toJson))
}
}
}
package org.opengroup.osdu.production.inputRequestJson
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
case class ValidationRequest(kind: Option[String], acl: Option[Acl], tags: Option[Tag], data: Option[Data], legal: Option[Legal])
case class Acl(viewers: List[String], owners: List[String])
case class Legal(legalTags: Option[List[String]], otherRelevantDataCountries: Option[List[String]], status: Option[String])
case class Tag(source: Option[Map[String, String]])
case class DatasetProperties(fileSourceInfo: Option[FileSourceInfo])
case class FileSourceInfo(name: Option[String], fileSourceInfo: Option[String], fileSource: Option[String], preloadFileCreateUser: Option[String],
preloadFileCreateDate: Option[String], preloadFileModifyUser: Option[String], preloadFileModifyDate: Option[String],
fileSize: Option[String], encodingFormatTypeID: Option[String])
case class Entities(sourceColumn: String, values: List[ValueStorage])
case class ValueStorage(sourceValue: Option[String], targetStorageRecordId: Option[String])
case class PointTimeSeries(sourceColumn: Option[String], valueType: Option[String], format: Option[String], timezoneOffset: Option[String])
case class ValueProperty(sourceColumn: Option[String], sourceUnitOfMeasureId: Option[String],
targetPropertyDescriptor: Option[String], valueType: Option[String])
case class TimeSeries(points: Option[PointTimeSeries], values: Option[List[ValueProperty]])
case class Entity(sourceColumn: Option[String], values: Option[ValueStorage])
case class FileContentsDetail(headerRowIndex: Option[Int], entities: Option[Entity], timeSeries: Option[TimeSeries], targetKind: Option[String],
nestedFieldDelimiter: Option[String], fileType: Option[String])
case class ExtensionProperty(name: Option[String], fileContentsDetails: Option[FileContentsDetail])
case class DatasetProperty(fileSourceInfo: Option[FileSourceInfo])
case class Data(name: Option[String], endian: Option[String], datasetProperties: Option[DatasetProperty], extensionProperties: Option[ExtensionProperty])
\ No newline at end of file
package org.opengroup.osdu.production.routes
import com.github.pjfanning.pekkohttpjson4s.Json4sSupport
import com.typesafe.config.ConfigFactory
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.{Span, Tracer}
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.http.scaladsl.HttpExt
import org.apache.pekko.http.scaladsl.model.MediaTypes.`application/json`
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.model.{HttpEntity, HttpResponse, MediaTypes, StatusCodes}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.Timeout
import org.opengroup.osdu.production.common.CorrelationIdDirective.{correlationIdDirective, withCorrelationId}
import org.opengroup.osdu.production.common.{ErrorMessageResponse, LogF, ServiceResponse, WriteBackServiceContext, WritebackServiceConfiguration, MessageResponse, ValidationMessages}
import org.opengroup.osdu.production.handlers.ValidationMetadataHandler
import org.opengroup.osdu.production.inputRequestJson.ValidationRequest
import org.opengroup.osdu.production.common.ValidationRequestJsonMarshaller.*
import org.opengroup.osdu.production.entitlements.{EntitlementServiceAuthorizationFilter, HttpResponseException, EntitlementDataAuthorizationFilter}
import org.opengroup.osdu.production.common.routes.CompletionHelper.completeWithError
import org.opengroup.osdu.production.observability.span
import org.opengroup.osdu.production.observability.span.inSpan
import org.opengroup.osdu.production.provider.interface.IMessageBusProducer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object ValidateRoute extends LogF with Json4sSupport {
def route[AckType](producer: IMessageBusProducer[AckType], entityIdCache: ActorRef)(implicit sc: WritebackServiceConfiguration, timeout: Timeout, ec: ExecutionContext, mat: Materializer, http: HttpExt, context: WriteBackServiceContext, actorSystem: ActorSystem): Route = {
extractRequest { httpRequest =>
val tracer: Tracer = GlobalOpenTelemetry.getTracer("ingestion-tracer")
path("validate" / "csv" / "metadata") {
correlationIdDirective { correlationId =>
respondWithHeaders(
RawHeader("CorrelationId", Option(correlationId).getOrElse(""))
) {
pathEndOrSingleSlash {
span("ingestion-metadata-validations-endpoint")(tracer) { pSpan =>
val parentSpan = pSpan.setAttribute("correlationId", correlationId)
post {
entity(as[ValidationRequest]) { validationRequest =>
val entitlementServiceAuthorizationFilter = new EntitlementServiceAuthorizationFilter(ConfigFactory.load("local"))
val entitlementResponse: Future[Unit] = for {
_ <- entitlementServiceAuthorizationFilter.checkOSDUEntitlement(httpRequest, context.sc.entitlementAdmin, context.sc.entitlementEditor).inSpan("check entitlement")(tracer, parentSpan)
} yield ()
onComplete(entitlementResponse) {
case Success(_) =>
logger.info("starting validate csv metadata: " + System.currentTimeMillis() + "\n\r")
val result: Future[ServiceResponse] = ValidationMetadataHandler.handle(validationRequest)
onComplete(result) {
case Success(r) =>
complete(HttpResponse(status = r.httpCode, entity = HttpEntity(`application/json`, r.content.toString)))
case Failure(ex: HttpResponseException) => completeWithError(ex.statusCode, ex.responseBody)
case _ =>
complete(HttpResponse(status = StatusCodes.NotFound, entity = HttpEntity(`application/json`, ErrorMessageResponse(StatusCodes.NotFound.intValue, ValidationMessages.reasonFailed, "Request Data Validation: Invalid request.").toJson)))
}
case Failure(ex: HttpResponseException) => completeWithError(ex.statusCode, ex.responseBody)
case _ => {
complete(HttpResponse(status = StatusCodes.NotFound, entity = HttpEntity(`application/json`, ErrorMessageResponse(StatusCodes.NotFound.intValue, ValidationMessages.reasonFailed, "Entitlement Data Validation: Invalid entityKey. The record with the given ID is not active").toJson)))
}
}
}
}
}
}
}
}
}
}
}
}
......@@ -118,7 +118,8 @@ case class WriteBackServiceRoutes()(implicit serviceContext: WriteBackServiceCon
WriteBackDataRoute.route(producer, entityIdCacheActor) ~
WriteBackDataBatchRoute.route(producer, entityIdCacheActor) ~
VersionInfoRoute.route(producer, entityIdCacheActor) ~
VersionInfoShortRoute.route(producer, entityIdCacheActor)
VersionInfoShortRoute.route(producer, entityIdCacheActor) ~
ValidateRoute.route(producer, entityIdCacheActor)
}
}
package org.opengroup.osdu.production.util.validation
import org.opengroup.osdu.production.common.{LogF, ValidationMessages, ValueTypes}
import org.opengroup.osdu.production.inputRequestJson.{ValidationRequest, Data, DatasetProperty, Legal, ExtensionProperty}
import org.scalactic.{Bad, ErrorMessage, Good, Or}
import java.lang.IllegalArgumentException
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
object MetadataValidator extends LogF {
def validateKind(kind: String): Or[Boolean, ErrorMessage] = {
val kindRegex = "^[\\w\\-\\.]+:[\\w\\-\\.]+:[\\w\\-\\.]+:[0-9]+.[0-9]+.[0-9]+$"
if (kind.matches(kindRegex)) {
val parts = kind.split(":") // Split the string by ":"
// Check conditions
val hasValidColonCount = parts.length - 1 <= 3 // At most 3 colons
val hasFileKindSource = parts.isDefinedAt(1) && parts(1).contains("wks")
val hasFileKineEntity = parts.isDefinedAt(2) && parts(2).contains("dataset--File.Generic")
if (hasValidColonCount && hasFileKindSource && hasFileKineEntity) {
Good(true)
} else {
Bad(ValidationMessages.invalidKind)
}
} else {
Bad(ValidationMessages.invalidKind)
}
}
def validateCSVMetadata(validationRequest: ValidationRequest): Or[Boolean, ErrorMessage] = {
val acl = validationRequest.acl
if (acl.isEmpty) {
return Bad(ValidationMessages.emptyAcl)
}
val owners = acl.get.owners
val viewers = acl.get.viewers
if (owners.isEmpty || viewers.isEmpty) {
return Bad(ValidationMessages.emptyViewersOrOwners)
}
val emailRegex = "^data\\.[a-zA-Z0-9_+&*-]+(?:\\.[a-zA-Z0-9_+&*-]+)*@(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z]{2,7}$"
val hasValidOwners = owners.forall(_.matches(emailRegex))
val hasValidViewers = viewers.forall(_.matches(emailRegex))
if (!hasValidViewers && !hasValidOwners || !hasValidViewers || !hasValidOwners) {
return Bad(ValidationMessages.invalidViewersOrOwners)
}
if (validationRequest.legal.isEmpty) {
return Bad(ValidationMessages.emptyLegal)
}
this.validateCSVLegal(validationRequest.legal.get) match
case Good(_) =>
if (validationRequest.data.isEmpty) {
return Bad(ValidationMessages.emptyData)
}
this.validateCSVData(validationRequest.data.get)
case Bad(msg) => Bad(msg)
}
def validateCSVLegal(legal: Legal): Or[Boolean, ErrorMessage] = {
val legalTags = legal.legalTags
if (legalTags.isEmpty) {
return Bad(ValidationMessages.emptyLegalTags)
} else if (legalTags.get.isEmpty) {
return Bad(ValidationMessages.emptyLegalTags)
}
val otherRelevantDataCountries = legal.otherRelevantDataCountries
if (otherRelevantDataCountries.isEmpty) {
return Bad(ValidationMessages.emptyOtherRelevantDataCountries)
} else if (otherRelevantDataCountries.get.isEmpty) {
return Bad(ValidationMessages.emptyOtherRelevantDataCountries)
} else if (otherRelevantDataCountries.get.head != "US" && otherRelevantDataCountries.get.length >= 2) {
return Bad(ValidationMessages.invalidOtherRelevantDataCountries)
} else if (otherRelevantDataCountries.get.length >= 2) {
return Bad(ValidationMessages.invalidOtherRelevantDataCountries)
}
Good(true)
}
def validateCSVData(data: Data): Or[Boolean, ErrorMessage] = {
val endian = data.endian
if (endian.isEmpty) {
return Bad(ValidationMessages.emptyEndian)
} else if (!endian.get.equals("BIG") && !endian.get.equals("LITTLE")) {
return Bad(ValidationMessages.invalidEndian)
}
val datasetProperties = data.datasetProperties
if (datasetProperties.isEmpty) {
return Bad(ValidationMessages.emptyDatasetProperties)
}
val hasValidDatasetProperties = this.validateDatasetProperties(datasetProperties.get)
hasValidDatasetProperties match
case Good(_) =>
if (data.extensionProperties.isEmpty) {
return Bad(ValidationMessages.emptyExtensionProperty)
}
this.validationExtensionProperties(data.extensionProperties.get)
case Bad(msg) => Bad(msg)
}
def validateDatasetProperties(datasetProperties: DatasetProperty): Or[Boolean, ErrorMessage] = {
val fileSourceInfo = datasetProperties.fileSourceInfo
if (fileSourceInfo.isEmpty) {
return Bad(ValidationMessages.emptyFileSourceInfo)
}
val fileSource = fileSourceInfo.get.fileSource
if (fileSource.isEmpty) {
return Bad(ValidationMessages.emptyFileSource)
} else if (fileSource.get.isEmpty) {
return Bad(ValidationMessages.emptyFileSource)
}
val fileSize = fileSourceInfo.get.fileSize
if (fileSize.isEmpty) {
return Bad(ValidationMessages.emptyFileSize)
} else if (fileSize.get.isEmpty) {
return Bad(ValidationMessages.emptyFileSize)
}
val isNumber = fileSize.get.forall(_.isDigit)
if (!isNumber) {
return Bad(ValidationMessages.invalidFileSize)
}
Good(true)
}
def validationExtensionProperties(extensionProperty: ExtensionProperty): Or[Boolean, ErrorMessage] = {
if (extensionProperty.fileContentsDetails.isEmpty) {
return Bad(ValidationMessages.emptyFileContentsDetails)
}
val entities = extensionProperty.fileContentsDetails.get.entities
if (entities.isEmpty) {
return Bad(ValidationMessages.emptyEntities)
}
val sourceColumn = entities.get.sourceColumn
if (sourceColumn.isEmpty) {
return Bad(ValidationMessages.emptySourceColumn)
}
val values = entities.get.values
if (values.isEmpty) {
return Bad(ValidationMessages.emptyValuesStorage)
}
val timeSeries = extensionProperty.fileContentsDetails.get.timeSeries
if (timeSeries.isEmpty) {
return Bad(ValidationMessages.emptyTimeSeries)
}
val points = timeSeries.get.points
if (points.isEmpty) {
return Bad(ValidationMessages.emptyPoints)
}
val valueType = points.get.valueType
val format = points.get.format
if (valueType.isEmpty) {
return Bad(ValidationMessages.emptyValueType)
} else if (valueType.get == "DATETIME") {
if (format.isEmpty) {
return Bad(ValidationMessages.emptyValueTypeIfDefineFormat)
}
try {
DateTimeFormatter.ofPattern(format.get)
} catch {
case _: IllegalArgumentException => return Bad(ValidationMessages.invalidFormat)
}
} else if (valueType.get != "TIMESTAMP") {
return Bad(ValidationMessages.invalidValueType)
}
if (format.isDefined) {
val timeZoneOfSet = points.get.timezoneOffset
if (timeZoneOfSet.isEmpty) {
return Bad(ValidationMessages.emptyTimeZoneOfSet)
}
val timezoneOffsetRegex = """^(Z|[+-](0[0-9]|1[0-3]):[0-5][0-9]|[+-]14:00)$"""
if (!timeZoneOfSet.get.matches(timezoneOffsetRegex)) {
return Bad(ValidationMessages.invalidTimeZoneOfSet)
}
}
val valueTimeSeries = timeSeries.get.values
if (valueTimeSeries.isEmpty) {
return Bad(ValidationMessages.emptyValueTimeSeries)
}
val timeSeriesValues = valueTimeSeries.get
for (i <- timeSeriesValues.indices) {
val valueType = timeSeriesValues(i)
if (valueType.valueType.isDefined) {
val validValueTypes = new ValueTypes
if (!validValueTypes.listOfValidValueTypes.exists(_.equals(valueType.valueType.get))) {
return Bad(ValidationMessages.createMessage(ValidationMessages.invalidValueTypesTimeSeries, (i + 1).toString,
valueType.valueType.get, validValueTypes.listOfValidValueTypes.mkString(", ")))
}
}
}
Good(true)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment