Skip to content
Snippets Groups Projects
Commit 1673ab2e authored by Jozsef Barcza's avatar Jozsef Barcza
Browse files

integer data type

parent 1dd75c09
No related branches found
No related tags found
3 merge requests!130integer data type,!121integer data type,!120integer data type
Pipeline #294995 passed with warnings
Showing
with 2848 additions and 1053 deletions
......@@ -32,6 +32,11 @@ message LongPoint{
int64 value = 2;
}
message IntegerPoint{
int64 point = 1;
int32 value = 2;
}
message StringPoint{
int64 point = 1;
string value = 2;
......@@ -62,6 +67,11 @@ message TimeSeriesLong{
repeated LongPoint points = 2;
}
message TimeSeriesInteger{
string unit = 1;
repeated IntegerPoint points = 2;
}
message TimeSeriesString{
repeated StringPoint points = 2;
}
......@@ -91,10 +101,11 @@ message WriteBackData {
TimeSeriesBoolean timeSeriesBoolean = 9;
TimeSeriesSetString timeSeriesSetString = 10;
TimeSeriesDateTime timeSeriesDateTime = 11;
string requestId = 12;
int64 requestStartTime = 13;
string batchRequestId = 14;
string sourceId = 15;
TimeSeriesInteger timeSeriesInteger = 12;
string requestId = 13;
int64 requestStartTime = 14;
string batchRequestId = 15;
string sourceId = 16;
}
message WriteBackDataList {
......
......@@ -23,4 +23,5 @@ enum PdiDataType(val value: Int):
case BOOLEAN extends PdiDataType(5)
case STRING extends PdiDataType(6)
case SETSTRING extends PdiDataType(17)
case DATETIME extends PdiDataType(18)
\ No newline at end of file
case DATETIME extends PdiDataType(18)
case INTEGER extends PdiDataType(19)
\ No newline at end of file
......@@ -86,11 +86,13 @@ object WriteBackDataHandler {
writeBackDetails.subscriberDetails.subscriberId,
writeBackDetails.requestDetailsWithTimeSeries.requestDetails.entityKey,
writeBackDetails.requestDetailsWithTimeSeries.requestDetails.propertyDescriptor,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesLong, writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesString,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesLong,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesString,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesDouble,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesBoolean,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesSetString,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesDateTime
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesDateTime,
writeBackDetails.requestDetailsWithTimeSeries.timeSeries.timeSeriesInteger
), writeBackDetails.requestDetailsWithTimeSeries.requestId))
} else {
ServiceResponse(StatusCodes.InternalServerError, ErrorMessageResponse(StatusCodes.InternalServerError.intValue, StatusCodes.InternalServerError.reason, "Error while processing data").toJson)
......
......@@ -24,4 +24,4 @@ case class EntityPropertiesStoreRequest(entityKey: String, properties: Seq[Prope
case class PropertiesStoreBatchRequest(propertyDescriptor: String, timeSeries: TimeSeriesBasicStoreRequest)
case class TimeSeriesBasicStoreRequest(timeSeriesLong: Option[TimeSeriesField], timeSeriesString: Option[TimeSeriesField], timeSeriesDouble: Option[TimeSeriesField], timeSeriesBoolean: Option[TimeSeriesField], timeSeriesSetString: Option[TimeSeriesField],
timeSeriesDateTime: Option[TimeSeriesField])
\ No newline at end of file
timeSeriesDateTime: Option[TimeSeriesField], timeSeriesInteger: Option[TimeSeriesField])
\ No newline at end of file
......@@ -21,17 +21,17 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
case class TimeSeriesStoreRequestNew(source: String, timeSeriesLong: Option[TimeSeriesField], timeSeriesString: Option[TimeSeriesField], timeSeriesDouble: Option[TimeSeriesField],
timeSeriesBoolean: Option[TimeSeriesField], timeSeriesSetString: Option[TimeSeriesField], timeSeriesDateTime: Option[TimeSeriesField]) {
timeSeriesBoolean: Option[TimeSeriesField], timeSeriesSetString: Option[TimeSeriesField], timeSeriesDateTime: Option[TimeSeriesField], timeSeriesInteger: Option[TimeSeriesField]) {
implicit val formats: Formats = Serialization.formats(NoTypeHints)
def toTimeSeriesStoreRequest(entityKey: String, propertyDescriptor: String): TimeSeriesStoreRequest = TimeSeriesStoreRequest(
source = source,
entityKey = entityKey,
propertyDescriptor = propertyDescriptor, timeSeriesLong = timeSeriesLong, timeSeriesString = timeSeriesString, timeSeriesDouble = timeSeriesDouble, timeSeriesBoolean = timeSeriesBoolean, timeSeriesSetString = timeSeriesSetString,
timeSeriesDateTime = timeSeriesDateTime)
timeSeriesDateTime = timeSeriesDateTime, timeSeriesInteger = timeSeriesInteger)
}
case class TimeSeriesStoreRequest(source: String, entityKey: String, propertyDescriptor: String, timeSeriesLong: Option[TimeSeriesField], timeSeriesString: Option[TimeSeriesField], timeSeriesDouble: Option[TimeSeriesField],
timeSeriesBoolean: Option[TimeSeriesField], timeSeriesSetString: Option[TimeSeriesField], timeSeriesDateTime: Option[TimeSeriesField]) {
timeSeriesBoolean: Option[TimeSeriesField], timeSeriesSetString: Option[TimeSeriesField], timeSeriesDateTime: Option[TimeSeriesField], timeSeriesInteger: Option[TimeSeriesField]) {
implicit val formats: Formats = Serialization.formats(NoTypeHints)
}
case class TimeSeriesField(unit: Option[String], points: Seq[Point]) {
......
......@@ -85,7 +85,7 @@ object WriteBackDataBatchRoute extends LogF with Json4sSupport {
timeSeriesRequest.properties.map { timeSeriesProperties =>
val propertyDescriptor = timeSeriesProperties.propertyDescriptor
val timeSeriesStoreRequest = TimeSeriesStoreRequest(source, entityKey, propertyDescriptor, timeSeriesProperties.timeSeries.timeSeriesLong, timeSeriesProperties.timeSeries.timeSeriesString, timeSeriesProperties.timeSeries.timeSeriesDouble, timeSeriesProperties.timeSeries.timeSeriesBoolean,
timeSeriesProperties.timeSeries.timeSeriesSetString, timeSeriesProperties.timeSeries.timeSeriesDateTime)
timeSeriesProperties.timeSeries.timeSeriesSetString, timeSeriesProperties.timeSeries.timeSeriesDateTime, timeSeriesProperties.timeSeries.timeSeriesInteger)
val requestId = WriteBackCommonsUtil.createRequestId(source, entityKey, propertyDescriptor, System.currentTimeMillis())
entitlementCheckFuture(entityKey, httpRequest).inSpan("entitlement-check-hasAcl")(tracer, parentSpan).flatMap {
case true => WriteBackDataRoute.handleTimeSeriesStoreRequest(producer, propertyDescriptor, httpRequest, entityKey, timeSeriesStoreRequest, entityIdCache, requestId, correlationId)(tracer, parentSpan).inSpan("ingestion-batch")(tracer, parentSpan)
......
......@@ -163,7 +163,7 @@ object WriteBackDataRoute extends LogF with Json4sSupport {
Map("requestId" -> requestId, "requestStartTime" -> requestStartTime)))
val writeBackData: WriteBackData = WriteBackData(timeSeriesData.source, entityRecID, propertyDescriptor,
TimeSeriesBasicStoreRequest(timeSeriesData.timeSeriesLong, timeSeriesData.timeSeriesString, timeSeriesData.timeSeriesDouble, timeSeriesData.timeSeriesBoolean, timeSeriesData.timeSeriesSetString,
timeSeriesData.timeSeriesDateTime),
timeSeriesData.timeSeriesDateTime, timeSeriesData.timeSeriesInteger),
osduDataPartitionId, requestId, requestStartTime, authorizationToken, xApiToken)
def entityCache: Future[GetEntityIdResponse] =
......
......@@ -29,7 +29,7 @@ import org.opengroup.osdu.production.entities.others.WriteBackDetails
import org.opengroup.osdu.production.inputRequestJson.TimeSeriesBasicStoreRequest
import org.opengroup.osdu.production.common.LogF
import org.opengroup.osdu.production.common.entities.{EventTypes, LogCategories, StructuredLog}
import org.opengroup.osdu.production.domain.PdiDataType.{DATETIME, SETSTRING}
import org.opengroup.osdu.production.domain.PdiDataType.{DATETIME, INTEGER, SETSTRING}
import org.opengroup.osdu.production.provider.interface.IMessageBusProducer
import org.opengroup.osdu.production.provider.model.IMessage
import org.scalactic.{Bad, Good, Or}
......@@ -137,6 +137,8 @@ object WriteBackDataPublisher extends LogF {
createChunksForTimeSeriesSetString(timeSeries, sourceType, agentId, entityId, propertyDescriptor, propertyId, requestId, requestStartTime, batchRequestId, sourceId).toSeq
case DATETIME =>
createChunksForTimeSeriesDateTime(timeSeries, sourceType, agentId, entityId, propertyDescriptor, propertyId, requestId, requestStartTime, batchRequestId, sourceId).toSeq
case INTEGER =>
createChunksForTimeSeriesInteger(timeSeries, sourceType, agentId, entityId, propertyDescriptor, propertyId, requestId, requestStartTime, batchRequestId, sourceId).toSeq
}
}
......@@ -261,6 +263,62 @@ object WriteBackDataPublisher extends LogF {
writeBackData
}
}
private def createChunksForTimeSeriesInteger(timeSeries: TimeSeriesBasicStoreRequest, sourceType: Int, agentId: Int, entityId: String,
propertyDescriptor: String, propertyId: String, requestId: String, requestStartTime: Long, batchRequestId: String, sourceId: String)(implicit sc: WritebackServiceConfiguration) = {
val timeSeriesField = timeSeries.timeSeriesInteger.get
val timeSeriesBuilder: TimeSeriesInteger.Builder = TimeSeriesInteger.newBuilder()
if (timeSeriesField.unit.isDefined) {
timeSeriesBuilder.setUnit(timeSeriesField.unit.get)
}
var timeSeriesBuilderSeq = new ListBuffer[TimeSeriesInteger.Builder]()
timeSeriesBuilderSeq += timeSeriesBuilder
var chunkSizeInBytes = calculateSizeInBytesForTimeSeriesInformation(entityId, propertyDescriptor, propertyId)
if (timeSeriesField.unit.isDefined) chunkSizeInBytes += timeSeriesField.unit.get.getBytes.length
timeSeriesField.points.foreach {
timeSeriesPoint =>
val dataPoint = IntegerPoint.newBuilder().setPoint(timeSeriesPoint.point).setValue(timeSeriesPoint.value.asInstanceOf[Number].intValue).build()
val dataPointSizeInBytes: Int = dataPoint.toByteString.size()
chunkSizeInBytes = chunkSizeInBytes + dataPointSizeInBytes
if (WriteBackCommonsUtil.checkMessageChunkSize(chunkSizeInBytes))
timeSeriesBuilderSeq.last.addPoints(dataPoint)
else {
val timeSeriesBuilder: TimeSeriesInteger.Builder = TimeSeriesInteger.newBuilder()
if (timeSeriesField.unit.isDefined) {
timeSeriesBuilder.setUnit(timeSeriesField.unit.get)
}
timeSeriesBuilderSeq += timeSeriesBuilder
chunkSizeInBytes = calculateSizeInBytesForTimeSeriesInformation(entityId, propertyDescriptor, propertyId)
if (timeSeriesField.unit.isDefined) {
chunkSizeInBytes += timeSeriesField.unit.get.getBytes.length
}
chunkSizeInBytes = chunkSizeInBytes + dataPointSizeInBytes
timeSeriesBuilderSeq.last.addPoints(dataPoint)
}
}
timeSeriesBuilderSeq.map {
timeSeriesBuilder =>
val writeBackDataBuilder = WriteBackData.newBuilder()
.setSourceType(sourceType)
.setAgentId(agentId)
.setEntityId(entityId)
.setPropertyDescriptor(propertyDescriptor)
.setPropertyId(propertyId)
.setRequestId(requestId)
.setRequestStartTime(requestStartTime)
.setBatchRequestId(batchRequestId)
.setSourceId(sourceId)
val writeBackData = writeBackDataBuilder.setTimeSeriesInteger(timeSeriesBuilder.build()).build()
if (batchRequestId != "")
loggerF.logDebug(_ => StructuredLog(s"Created chunk of size ${writeBackData.toByteString.size}", EventTypes.DataUpdate, "", LogCategories.Asynchronous, "", Map("requestId" -> requestId, "batchRequestId" -> batchRequestId)))
else
loggerF.logDebug(_ => StructuredLog(s"Created chunk of size ${writeBackData.toByteString.size}", EventTypes.DataUpdate, "", LogCategories.Asynchronous, "", Map("requestId" -> requestId)))
writeBackData
}
}
private def createChunksForTimeSeriesString(timeSeries: TimeSeriesBasicStoreRequest, sourceType: Int, agentId: Int, entityId: String,
propertyDescriptor: String, propertyId: String, requestId: String, requestStartTime: Long, batchRequestId: String, sourceId: String)(implicit sc: WritebackServiceConfiguration) = {
val timeSeriesField = timeSeries.timeSeriesString.get
......
......@@ -36,7 +36,7 @@ import org.scalactic.{Bad, ErrorMessage, Good, Or}
import java.time.ZonedDateTime
import scala.collection.concurrent.TrieMap
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.{Failure, Success, Try}
object ValidationUtil extends LogF {
def validateAuthorizationToken(token: Option[String])(implicit sc: WritebackServiceConfiguration): Or[Boolean, ErrorMessage] = {
......@@ -53,8 +53,14 @@ object ValidationUtil extends LogF {
def validateRequestDetailsAndTimeSeriesData(requestId: String, requestStartTime: Long, source: String, entityKey: String, propertyDescriptor: String, timeSeries: TimeSeriesBasicStoreRequest,
elementDetailsResponse: TrieMap[String, ElementDetailsResponse])(implicit serviceConfig: WritebackServiceConfiguration, http: HttpExt, ec: ExecutionContext, mat: Materializer): Future[Or[DerivedRequestDetails, ErrorMessageResponse]] =
(timeSeries.timeSeriesDouble.isEmpty,timeSeries.timeSeriesLong.isEmpty, timeSeries.timeSeriesString.isEmpty, timeSeries.timeSeriesBoolean.isEmpty, timeSeries.timeSeriesSetString.isEmpty, timeSeries.timeSeriesDateTime.isEmpty) match
case (false, true, true,true, true, true) =>
(timeSeries.timeSeriesDouble.isEmpty,
timeSeries.timeSeriesLong.isEmpty,
timeSeries.timeSeriesString.isEmpty,
timeSeries.timeSeriesBoolean.isEmpty,
timeSeries.timeSeriesSetString.isEmpty,
timeSeries.timeSeriesDateTime.isEmpty,
timeSeries.timeSeriesInteger.isEmpty) match
case (false, true, true,true, true, true, true) =>
if (timeSeries.timeSeriesDouble.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -64,7 +70,7 @@ object ValidationUtil extends LogF {
} else Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.createMessage(WriteBackServiceMessages.invalidDataType, invalidDataPoints.map(_.value).mkString(", "), "timeSeriesDouble"))))
}
case (true, false, true, true, true, true) =>
case (true, false, true, true, true, true, true) =>
if (timeSeries.timeSeriesLong.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -75,7 +81,7 @@ object ValidationUtil extends LogF {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.createMessage(WriteBackServiceMessages.invalidDataType, invalidDataPoints.map(_.value).mkString(", "), "timeSeriesLong"))))
}
}
case (true, true, false, true, true, true) =>
case (true, true, false, true, true, true, true) =>
if (timeSeries.timeSeriesString.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -86,7 +92,7 @@ object ValidationUtil extends LogF {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.createMessage(WriteBackServiceMessages.invalidDataType, invalidDataPoints.map(_.value).mkString(", "), "timeSeriesString"))))
}
}
case (true, true, true, false, true, true) =>
case (true, true, true, false, true, true, true) =>
if (timeSeries.timeSeriesBoolean.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -98,7 +104,7 @@ object ValidationUtil extends LogF {
}
}
case (true, true, true, true, false, true) =>
case (true, true, true, true, false, true, true) =>
if (timeSeries.timeSeriesSetString.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -123,7 +129,7 @@ object ValidationUtil extends LogF {
}
case (true, true, true, true, true, false) =>
case (true, true, true, true, true, false, true) =>
if (timeSeries.timeSeriesDateTime.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
......@@ -147,6 +153,18 @@ object ValidationUtil extends LogF {
}
}
case (true, true, true, true, true, true, false) =>
if (timeSeries.timeSeriesInteger.get.points.isEmpty) {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.emptyData)))
} else {
val invalidDataPoints = validateDataFormat("timeSeriesInteger", timeSeries.timeSeriesInteger.get.points)
if (invalidDataPoints.isEmpty) {
validateAndDeriveRequestDetails(requestId, requestStartTime, source, entityKey, propertyDescriptor, "timeSeriesInteger", timeSeries.timeSeriesInteger.get, PdiDataType.INTEGER, elementDetailsResponse)
} else {
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.createMessage(WriteBackServiceMessages.invalidDataType, invalidDataPoints.map(_.value).mkString(", "), "timeSeriesInteger"))))
}
}
case _ =>
Future(Bad(ErrorMessageResponse(StatusCodes.BadRequest.intValue, StatusCodes.BadRequest.reason, WriteBackServiceMessages.createMessage(WriteBackServiceMessages.multipleDataFields, "timeSeriesDouble, timeSeriesLong, timeSeriesString, timeSeriesBoolean, timeSeriesDateTime"))))
......@@ -314,10 +332,27 @@ object ValidationUtil extends LogF {
case _: Double => "double"
case _: Boolean => "boolean"
case _: List[String] => "setstring"
case _ if Try(point.toString.toLong).isSuccess => "long"
case _ if isInt(point.toString).isSuccess => "integer"
case _ if isLong(point.toString).isSuccess => "long"
case _ => "NotSupportedDataType"
}
def isLong(value: String): Try[Long] =
Try(value.toString.toLong).transform(x =>
Success(x),
err =>
logger.error("Error converting value to Long", err)
Failure(err))
def isInt(value: String): Try[Int] =
Try(value.toString.toInt).transform(x =>
Success(x),
err =>
logger.error("Error converting value to Integer", err)
Failure(err))
def validateSource(source: String, dpsHeaders: DpsHeaders)(implicit ec: ExecutionContext, http: HttpExt, sc: WritebackServiceConfiguration, mat: Materializer): Future[Or[Option[SubscriberDetails], None.type]] = {
val path = String.format("/sources/%s", source)
val url = createUrl(sc.sourceServiceHost, sc.sourceServicePort, sc.sourceServicePrefix, path)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment