Skip to content
Snippets Groups Projects
Commit b132680a authored by Truong Le's avatar Truong Le
Browse files

feat: implement vailidation csv file v1

parent 7d3491b7
No related branches found
No related tags found
2 merge requests!130integer data type,!125Add Validate CSV file endpoint
......@@ -42,6 +42,9 @@ object ValidationMessages {
val invalidFormat = "Invalid format. Format must be matches with the format DATETIME patterns"
val emptyValueTimeSeries = "Values section of time series in the request is missing or null"
val invalidValueTypesTimeSeries = "Invalid value type %: %. Value type of values in the request must be matches list of type: %"
// Csv file message
val emptyColumn = "Empty column found: %"
val invalidColumn = "Invalid column found: %"
def createMessage(msg: String, args: String*): String = {
var message = msg
......
package org.opengroup.osdu.production.handlers
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.{Multipart, StatusCodes}
import org.opengroup.osdu.production.common.{LogF, MessageResponse, ServiceResponse, ValidationMessages}
import org.opengroup.osdu.production.inputRequestJson.ValidationRequest
import org.opengroup.osdu.production.util.validation.FileValidator
import org.apache.pekko.util.ByteString
import org.apache.pekko.stream.scaladsl.{Framing, Sink, Source}
import org.scalactic.{Bad, Good}
import scala.concurrent.{ExecutionContext, Future}
object ValidationCSVHandler extends LogF {
def handle[AckType](csvStream: Source[ByteString, Any])(implicit ec: ExecutionContext, system: ActorSystem): Future[ServiceResponse] = {
val delimiter = "\n"
val maxLineLength = 1024
csvStream
.via(Framing.delimiter(ByteString(delimiter), maxLineLength, allowTruncation = true))
.map(_.utf8String.trim)
.drop(1)
.zipWithIndex // Add line numbers for better error reporting
.map { case (line, index) =>
FileValidator.validateLine(line, index) match
case Good(r) => ServiceResponse(StatusCodes.OK, MessageResponse(StatusCodes.OK.intValue, ValidationMessages.reasonSuccess, ValidationMessages.validRequest).toJson)
case Bad(msg) => ServiceResponse(StatusCodes.BadRequest, MessageResponse(StatusCodes.BadRequest.intValue, ValidationMessages.reasonFailed, msg).toJson)
}
.runWith(Sink.head)
// .map { result match
// case Good(r) => ServiceResponse(StatusCodes.OK, MessageResponse(StatusCodes.OK.intValue, ValidationMessages.reasonSuccess, ValidationMessages.validRequest).toJson)
// case Bad(msg) => ServiceResponse(StatusCodes.BadRequest, MessageResponse(StatusCodes.BadRequest.intValue, ValidationMessages.reasonFailed, msg).toJson)
// }
}
}
package org.opengroup.osdu.production.routes
import com.github.pjfanning.pekkohttpjson4s.Json4sSupport
import com.typesafe.config.ConfigFactory
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.{Span, Tracer}
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.http.scaladsl.HttpExt
import org.apache.pekko.http.scaladsl.model.MediaTypes.`application/json`
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.model.{HttpEntity, HttpResponse, MediaTypes, Multipart, StatusCodes}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.{ByteString, Timeout}
import org.opengroup.osdu.production.common.CorrelationIdDirective.{correlationIdDirective, withCorrelationId}
import org.opengroup.osdu.production.common.{ErrorMessageResponse, LogF, MessageResponse, ServiceResponse, ValidationMessages, WriteBackServiceContext, WritebackServiceConfiguration}
import org.opengroup.osdu.production.handlers.ValidationMetadataHandler
import org.opengroup.osdu.production.inputRequestJson.ValidationRequest
import org.opengroup.osdu.production.common.ValidationRequestJsonMarshaller.*
import org.opengroup.osdu.production.entitlements.{EntitlementDataAuthorizationFilter, EntitlementServiceAuthorizationFilter, HttpResponseException}
import org.opengroup.osdu.production.common.routes.CompletionHelper.completeWithError
import org.opengroup.osdu.production.observability.span
import org.opengroup.osdu.production.observability.span.inSpan
import org.opengroup.osdu.production.provider.interface.IMessageBusProducer
import org.opengroup.osdu.production.handlers.ValidationCSVHandler
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object ValidationFileRoute extends LogF {
def route[AckType](producer: IMessageBusProducer[AckType], entityIdCache: ActorRef)(implicit sc: WritebackServiceConfiguration, timeout: Timeout, ec: ExecutionContext, mat: Materializer, http: HttpExt, context: WriteBackServiceContext, actorSystem: ActorSystem): Route = {
extractRequest { httpRequest =>
val tracer: Tracer = GlobalOpenTelemetry.getTracer("ingestion-tracer")
path("validate" / "csv") {
correlationIdDirective { correlationId =>
// respondWithHeaders(
// RawHeader("CorrelationId", Option(correlationId).getOrElse(""))
// ) {
pathEndOrSingleSlash {
span("ingestion-metadata-validations-endpoint")(tracer) { pSpan =>
val parentSpan = pSpan.setAttribute("correlationId", correlationId)
post {
entity(as[Multipart.FormData]) { formData =>
val entitlementServiceAuthorizationFilter = new EntitlementServiceAuthorizationFilter(ConfigFactory.load("local"))
val entitlementResponse: Future[Unit] = for {
_ <- entitlementServiceAuthorizationFilter.checkOSDUEntitlement(httpRequest, context.sc.entitlementAdmin, context.sc.entitlementEditor).inSpan("check entitlement")(tracer, parentSpan)
} yield ()
onComplete(entitlementResponse) {
case Success(_) =>
logger.info("starting validate csv file: " + System.currentTimeMillis() + "\n\r")
val validationResult: Future[ServiceResponse] = formData.parts
.filter(_.name == "file")
.mapAsync(1) { filePart =>
val csvStream: Source[ByteString, Any] = filePart.entity.dataBytes
ValidationCSVHandler.handle(csvStream)
}
.runWith(Sink.head)
onComplete(validationResult) {
case Success(r) =>
complete(HttpResponse(status = r.httpCode, entity = HttpEntity(`application/json`, r.content.toString)))
case Failure(ex: HttpResponseException) => completeWithError(ex.statusCode, ex.responseBody)
case _ =>
complete(HttpResponse(status = StatusCodes.NotFound, entity = HttpEntity(`application/json`, ErrorMessageResponse(StatusCodes.NotFound.intValue, ValidationMessages.reasonFailed, "Request Data Validation: Invalid request.").toJson)))
}
case Failure(ex: HttpResponseException) => completeWithError(ex.statusCode, ex.responseBody)
case _ => complete(HttpResponse(status = StatusCodes.NotFound, entity = HttpEntity(`application/json`, ErrorMessageResponse(StatusCodes.NotFound.intValue, ValidationMessages.reasonFailed, "Entitlement Data Validation: Invalid entityKey. The record with the given ID is not active").toJson)))
}
}
}
}
}
// }
}
}
}
}
}
......@@ -119,7 +119,8 @@ case class WriteBackServiceRoutes()(implicit serviceContext: WriteBackServiceCon
WriteBackDataBatchRoute.route(producer, entityIdCacheActor) ~
VersionInfoRoute.route(producer, entityIdCacheActor) ~
VersionInfoShortRoute.route(producer, entityIdCacheActor) ~
ValidateRoute.route(producer, entityIdCacheActor)
ValidateRoute.route(producer, entityIdCacheActor) ~
ValidationFileRoute.route(producer, entityIdCacheActor)
}
}
package org.opengroup.osdu.production.util.validation
import org.apache.pekko.stream.scaladsl.{Framing, Sink, Source}
import org.apache.pekko.util.ByteString
import org.opengroup.osdu.production.common.{LogF, MessageResponse, ServiceResponse, ValidationMessages}
import org.scalactic.{Bad, ErrorMessage, Good, Or}
object FileValidator extends LogF {
// Function to validate a single CSV line
def validateLine(line: String, lineNumber: Long): Or[Boolean, ErrorMessage] = {
val columns = line.split(",").map(_.trim)
columns.length match
case 3 if columns.forall(_.nonEmpty) => Good(true)
case 3 => Bad(ValidationMessages.createMessage(ValidationMessages.emptyColumn, line))
case _ => Bad(ValidationMessages.createMessage(ValidationMessages.invalidColumn, columns.length.toString))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment