Commit e04b8189 authored by snehal jagtap's avatar snehal jagtap
Browse files

initial commit

parent 64976ee6
Pipeline #62035 failed with stages
in 3 minutes and 7 seconds
package org.opengroup.osdu.production.backup
import com.google.cloud.pubsub.v1.stub.{ GrpcSubscriberStub, SubscriberStubSettings }
import com.google.common.primitives.Ints
import com.google.pubsub.v1.{ AcknowledgeRequest, PullRequest }
import org.opengroup.osdu.production.backup.BackupGlobalVariables._
import org.opengroup.osdu.production.common.LogF
import org.opengroup.osdu.production.common.entities.{ EventTypes, LogCategories, StructuredLog }
import org.opengroup.osdu.production.domain.generated.DataPointsAddedMessage.StreamDatum
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
object MessageHandler extends LogF {
val dataOffset = 99999999999999L // 14 digit offset
def writeTableSegment(content: ByteBuffer, tableName: String) = {
loggerF.logInfo(_ => StructuredLog(s"writing backup segment at version ${checkpointTime}", EventTypes.DataUpdate, "", LogCategories.Synchronous))
val channel = Some(StorageHandler.openChannel(StorageHandler.createBlob(s"backup/$tableName/data/${checkpointTime}")))
channel.map(_.write(content))
channel.foreach { c => c.close(); loggerF.logInfo(_ => StructuredLog(s"completed writing segment at version ${checkpointTime}", EventTypes.DataUpdate, "", LogCategories.Synchronous)) }
}
def convertToStorageFormat(data: Seq[StreamDatum]) = {
data.foldLeft(Array.emptyByteArray) { (totalArray, datum) =>
val content = datum.toByteArray
totalArray ++ Ints.toByteArray(content.length) ++ content
}
}
val subscriberStubSettings: SubscriberStubSettings = SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().build()).build()
val subscriberStub: GrpcSubscriberStub = GrpcSubscriberStub.create(subscriberStubSettings)
def pubsubMessages = {
val pullRequest = PullRequest.newBuilder.setMaxMessages(10000)
.setSubscription(subscriptionName.toString)
.build
val pullResponse = subscriberStub.pullCallable().call(pullRequest)
pullResponse.getReceivedMessagesList.asScala.map(message => (message.getAckId, message.getMessage))
}
def acknowledge(ackIds: Seq[String]) = {
subscriberStub.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().addAllAckIds(ackIds.asJava).setSubscription(subscriptionName.toString).build())
}
}
package org.opengroup.osdu.production.backup
import com.google.cloud.storage.Storage.CopyRequest
import com.google.cloud.storage.{ BlobId, BlobInfo, StorageOptions }
import org.opengroup.osdu.production.backup.BackupGlobalVariables._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
object StorageHandler {
val storageService = StorageOptions.getDefaultInstance.getService
private def blobIdForVersionMarker(table: String) = {
BlobId.of(bucketName, s"backup/${table}/lastVersion")
}
def updateVersion(table: (String)) = {
val versionMarkerFile = blobIdForVersionMarker(table)
val versionBlob = BlobInfo.newBuilder(versionMarkerFile).setContentType("text/plain").build
if (!Option(storageService.get(versionMarkerFile)).isDefined) {
storageService.create(versionBlob)
}
val versionMarkerFileBackup = BlobId.of(bucketName, s"backup/${table}/lastVersion.bak")
storageService.delete(versionMarkerFileBackup)
storageService.copy(CopyRequest.of(versionMarkerFile, versionMarkerFileBackup))
val writer = storageService.writer(versionBlob)
writer.write(ByteBuffer.wrap(BackupGlobalVariables.checkpointTime.toString.getBytes(StandardCharsets.UTF_8)))
writer.close()
}
// name should be in the format of version
def createBlob(name: String) = {
val blobId = BlobId.of(bucketName, name)
val blobInfo = BlobInfo.newBuilder(blobId).setContentType("application/octet-stream").build
storageService.create(blobInfo)
}
def openChannel(blobInfo: BlobInfo) = {
storageService.writer(blobInfo)
}
}
package org.opengroup.osdu.production.backup
import com.google.api.gax.rpc.DeadlineExceededException
import com.google.pubsub.v1.PubsubMessage
import org.opengroup.osdu.production.backup.BackupGlobalVariables._
import org.opengroup.osdu.production.common.LogF
import org.opengroup.osdu.production.common.entities.{ EventTypes, LogCategories, StructuredLog }
import org.opengroup.osdu.production.domain.generated.DataPointsAddedMessage.DataPointsAddedEvent
import org.opengroup.osdu.production.domain.generated.DataPointsAddedMessage.DataPointsAddedEvent
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.util.Try
object TimeSeriesBackup extends LogF {
lazy val counter = new AtomicLong()
def triggerBackup(): Unit = {
var handleMoreMessages = true
var fullContent = Array.emptyByteArray
val ackIds = new util.ArrayList[String]()
var currentBatch = scala.collection.mutable.Buffer.empty[(String, PubsubMessage)]
while (handleMoreMessages) {
val pullStartTime = System.currentTimeMillis()
try {
currentBatch = MessageHandler.pubsubMessages
} catch {
case dee: DeadlineExceededException => {
loggerF.logInfo(_ => StructuredLog(s"No More messages to process in pubsub at this moment. Will move to next step pullStartTime:$pullStartTime :currentTime:${System.currentTimeMillis()}", EventTypes.DataAccess, "", LogCategories.CronJobLogs))
currentBatch.clear()
}
}
val ackDeadlineNearExpiry = pullStartTime >= (checkpointTime + 240000L) //240 seconds, for schedule duration of 5 mins
if (currentBatch.nonEmpty && !ackDeadlineNearExpiry) {
counter.addAndGet(currentBatch.size.toLong)
fullContent = fullContent ++ currentBatch.map(_._2).map(message => Try(DataPointsAddedEvent.parseFrom(message.getData)).toOption)
.filter(mayBeMessage => mayBeMessage.isDefined).map(_.get).map(proto => MessageHandler.convertToStorageFormat(proto.getStreamDataList.asScala))
.reduce(_ ++ _)
ackIds.addAll(currentBatch.map(_._1).asJava)
}
handleMoreMessages = currentBatch.nonEmpty && counter.get() < 10000000L && !ackDeadlineNearExpiry
}
loggerF.logInfo(_ => StructuredLog(s"Total messages processed in this run ${counter.get()}, Table Write count ${fullContent.length}, checkPointTime $checkpointTime", EventTypes.DataAccess, "", LogCategories.CronJobLogs))
if (fullContent.nonEmpty) {
MessageHandler.writeTableSegment(ByteBuffer.wrap(fullContent), tableName)
}
ackIds.asScala.grouped(1000).foreach(MessageHandler.acknowledge)
loggerF.logInfo(_ => StructuredLog(s"Acknowledged all messages count: ${ackIds.size()}", EventTypes.DataProcess, "", LogCategories.CronJobLogs))
StorageHandler.updateVersion(tableName)
}
}
\ No newline at end of file
package org.opengroup.osdu.production.common
import com.google.cloud.bigtable.grpc.BigtableSession
import org.opengroup.osdu.production.common.entities.{ EventTypes, LogCategories, StructuredLog }
import org.quartz.{ Job, JobExecutionContext }
import java.io.File
class BigtableConnectJob() extends Job with LogF {
override def execute(context: JobExecutionContext): Unit = {
try {
val session = context.getScheduler.getContext.get("bigTableSession").asInstanceOf[BigtableSession]
val dataTableName = context.getScheduler.getContext.get("tableName")
val isTablePresent = session.getTableAdminClientWrapper.listTables.contains(dataTableName)
val file = new File("alive")
if (!isTablePresent) {
throw new NoSuchElementException
} else if (!file.exists) {
file.createNewFile
}
} catch {
case e: Exception => {
val file = new File("alive")
if (file.exists) {
file.delete
}
loggerF.logError(_ => StructuredLog(s"BigTable Connection Failed with exception $e", EventTypes.InfrastructureAccess, "", LogCategories.Asynchronous, additionalFields = Map("stackTrace" -> e)))
}
}
()
}
}
package org.opengroup.osdu.production.common
import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }
import org.opengroup.osdu.production.dataOffset
import java.util.concurrent.Executors
import scala.concurrent.{ Future, Promise }
object Extensions {
implicit class offsetValue(vTime: Long) {
def offsetted: Long = dataOffset - vTime
}
val futurePool = Executors.newCachedThreadPool()
implicit class ListenableFutureConverter[T](lf: ListenableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
Futures.addCallback(lf, new FutureCallback[T] {
def onFailure(t: Throwable): Unit = if (!p.isCompleted) p failure t
def onSuccess(result: T): Unit = p success result
}, futurePool)
p.future
}
}
}
\ No newline at end of file
package org.opengroup.osdu.production.common
object Metrics {
val sli_pss_writeback_api_data_ingestion_latency = "sli_pss_writeback_api_data_ingestion_latency"
}
package org.opengroup.osdu.production.common
import com.google.cloud.bigtable.grpc.BigtableSession
import com.typesafe.config.Config
import org.quartz.CronScheduleBuilder.cronSchedule
import org.quartz.JobBuilder.newJob
import org.quartz.TriggerBuilder.newTrigger
import org.quartz.impl.StdSchedulerFactory
object ProbesJobScheduler {
def scheduleJob(conf: Config, bigtableSession: BigtableSession) = {
val tableName = conf.getString("cloud.bigtable.table.data.name")
val job = newJob(classOf[BigtableConnectJob]).withIdentity("BigtableConnectJob", "BigtableConnectJobGroup").requestRecovery(true).build
val exp = conf.getString("quartz.schedules.probeSchedule.expression")
val trigger = newTrigger().withIdentity("BigtableConnectJobTrigger", "ArchiveJobGroup").withSchedule(cronSchedule(exp)).forJob(job).build
val scheduler = StdSchedulerFactory.getDefaultScheduler
scheduler.start()
scheduler.getContext.put("bigTableSession", bigtableSession)
scheduler.getContext.put("tableName", tableName)
scheduler.scheduleJob(job, trigger)
}
}
package org.opengroup.osdu.production.config
import com.google.cloud.bigtable.config.BigtableOptions
import com.google.pubsub.v1.ProjectSubscriptionName
import org.opengroup.osdu.production.common.LogF
import org.opengroup.osdu.production.metric.tracing.MetricsSupport
import com.typesafe.config.Config
import scala.collection.JavaConverters._
case class PublishConfigWithRetry(project: String, topicId: String, publishRetryCount: Int)
object PubSubPublishConfig {
def apply(conf: Config, outBoundPath: String): PublishConfigWithRetry = PublishConfigWithRetry(
conf.getString("cloud.global-project.name"),
conf.getString(s"$outBoundPath.outbound.topic.name"),
conf.getInt(s"$outBoundPath.publishRetryCount"))
}
object PubSubSourceConfig {
def apply(conf: Config, prefix: String, inboundPath: String): PubSubConfig = PubSubConfig(
conf.getString("cloud.global-project.name"),
conf.getString(s"$inboundPath.subscription.$prefix.name"),
conf.getString(s"$inboundPath.subscription.$prefix.topic.name"),
SubscriptionConfig(parallelPullCount = conf.getInt(s"$inboundPath.parallel-pull-size")))
}
case class ServiceConfiguration(
pubsub: PubSubConfig,
bigTable: BigTableConfiguration,
batchConfig: BatchConfig,
modifyAckDeadlineJobConfig: ModifyAckDeadlineJobConfig,
cachecoherencePublishConfig: PublishConfigWithRetry,
cachecoherencePublishConfig2: PublishConfigWithRetry,
subscriptionName: ProjectSubscriptionName,
unitCatalogServiceConfiguration: HttpClientConfiguration,
redisConfig: RedisConfig,
unitCatalogServiceConfigurationSecret:String,
googleProjectId: String,
env: String
) extends MetricsSupport
object ServiceConfiguration {
def apply(conf: Config): ServiceConfiguration = ServiceConfiguration(
PubSubConfig(
conf.getString("cloud.global-project.name"),
conf.getString("cloud.pubsub.subscription.name"),
conf.getString("cloud.pubsub.topic.name"),
SubscriptionConfig(
ackDeadLineSeconds = conf.getInt("cloud.pubsub.subscription.ack-deadline-seconds"),
parallelPullCount = conf.getInt("cloud.pubsub.subscription.parallel-pull-count"),
retainAckedMessages = conf.getBoolean("cloud.pubsub.subscription.retain-acked-messages")),
),
BigTableConfiguration(conf, conf.hasPath("test.environment")),
BatchConfig(conf),
ModifyAckDeadlineJobConfig(conf),
PubSubPublishConfig(conf, "cloud.pubsub.cache-coherence-outbound"),
PubSubPublishConfig(conf, "cloud.pubsub.cache-coherence-outbound-2"),
ProjectSubscriptionName.of(conf.getString("cloud.project.name"), conf.getString("cloud.pubsub.subscription.name")),
UnitCatalogServiceConfiguration(conf),
RedisConfig(conf),
conf.getString("unitCatalogService.secret"),
conf.getString("cloud.global-project.name"),
conf.getString("env")
)
}
case class BigTableConfiguration(
opts: BigtableOptions,
projectName: String,
instanceName: String,
dataTableName: String,
columnFamilies: Seq[String],
indexTableName: String,
changeLogTable: String)
object BigTableConfiguration extends LogF {
def apply(conf: Config, testConfig: Boolean = false): BigTableConfiguration = {
BigTableConfiguration(
createOptions(
conf,
testConfig),
conf.getString("cloud.project.name"),
s"projects/${conf.getString("cloud.project.name")}/instances/${conf.getString("cloud.bigtable.instance.name")}",
conf.getString("cloud.bigtable.table.data.name"),
conf.getStringList("cloud.bigtable.columnFamily.names").asScala,
conf.getString("cloud.bigtable.table.index.name"),
conf.getString("cloud.bigtable.table.changelog.name"))
}
def createOptions(conf: Config, testInstance: Boolean = false): BigtableOptions = {
val optionBuilder = BigtableOptions.builder
optionBuilder.setProjectId(conf.getString("cloud.project.name")).setInstanceId(conf.getString("cloud.bigtable.instance.name")).setUserAgent(conf.getString("cloud.bigtable.user.agent"))
optionBuilder.build()
}
}
case class BatchConfig(throttleCost: Int, dataPointsSegmentSize: Int)
object BatchConfig extends LogF {
def apply(conf: Config): BatchConfig = {
BatchConfig(conf.getInt("batch.dataPoints.throttleCost"), conf.getInt("batch.dataPoints.segmentSize"))
}
}
case class ModifyAckDeadlineJobConfig(jobIntervalMilliseconds: Long, ackDeadlineMilliseconds: Int, oldPubSubMessageOffsetMilliseconds: Int)
object ModifyAckDeadlineJobConfig {
def apply(conf: Config): ModifyAckDeadlineJobConfig = {
ModifyAckDeadlineJobConfig(
conf.getLong("modifyAckDeadlineJob.jobIntervalMilliseconds"),
conf.getInt("modifyAckDeadlineJob.ackDeadlineMilliseconds"),
conf.getInt("modifyAckDeadlineJob.oldPubSubMessageOffsetMilliseconds"))
}
}
object UnitCatalogServiceConfiguration {
def apply(conf: Config): HttpClientConfiguration = HttpClientConfiguration(
conf.getString("unitCatalogService.hostname"),
conf.getInt("unitCatalogService.port"),
conf.getString("unitCatalogService.geturi"))
}
case class RedisConfig(host: String, port: Int, messageChannel: String)
object RedisConfig extends LogF {
def apply(conf: Config): RedisConfig = {
RedisConfig(conf.getString("cloud.memorystore.host"), conf.getInt("cloud.memorystore.port"), conf.getString("cloud.memorystore.message-channel"))
}
}
package org.opengroup.osdu.production.domain
import org.opengroup.osdu.production.domain.DataStatus.DataStatus
import org.opengroup.osdu.production.domain.generated.DataPointsList
import org.opengroup.osdu.production.domain.generated.DataPointsList
case class DataPoint(streamId: Long, physicalTime: Long, versionTime: Long, status: DataStatus, value: Any, dataType: DataType, sourceVersionTime: Long, pubSubDeltaTime: Long, storageSchemaVersion: Int, agentId: Long, unit: String = "")
case class DataPointsWithInfo(dataPoints: Seq[DataPoint], requestId: String, batchRequestId: String)
object DataStatus extends Enumeration {
type DataStatus = Value
val Good = Value("Good")
val Bad = Value("Bad")
val Missing = Value("Missing")
val Questionable = Value("Questionable")
val InvalidType = Value("InvalidType")
val KnownInvalidType = Value("KnownInvalidType")
val Uncertain = Value("Uncertain")
val Substituted = Value("Substituted")
val BadUnit = Value("BadUnit")
val IntervalEnd = Value("IntervalEnd")
val Deleted = Value("Deleted")
}
trait DataType
case object Double extends DataType
case object Single extends DataType
case object Long extends DataType
case object String extends DataType
case object Boolean extends DataType
case object Raw extends DataType
case object Time extends DataType
case object UndefType extends DataType
case class InboundDataPoint(point: DataPointsList.DataPoint, sourceVersion: Long, pubSubDeltaTime: Long, agentId: Long)
case class InboundPointsWithInfo(inboundPoints: Seq[InboundDataPoint], requestId: String, batchRequestId: String)
package org.opengroup.osdu.production.domain
import org.opengroup.osdu.production.RowKeyAndMutation
case class FailedAndSucceededDataPointsWithInfo(succeededDataPoints: Seq[DataPoint], failedDataPoints: Seq[DataPoint], requestId: String, batchRequestId: String)
case class DataAndIndexRowGeneratorsWithDataPoints(dataRowsGenerator: Seq[RowKeyAndMutation], indexRowsGenerator: Seq[RowKeyAndMutation], dataPointsWithInfo: DataPointsWithInfo)
\ No newline at end of file
package org.opengroup.osdu.production.domain
import com.google.bigtable.v2.Mutation
import com.google.bigtable.v2.Mutation.SetCell
import com.google.common.primitives.{ Ints, Longs }
import com.google.protobuf.ByteString
import org.opengroup.osdu.production._
import org.opengroup.osdu.production.common.Extensions._
import org.opengroup.osdu.production.common.LogF
import org.opengroup.osdu.production.common.entities.{ EventTypes, LogCategories, StructuredLog }
import org.opengroup.osdu.production.config.BigTableConfiguration
import org.opengroup.osdu.production.domain.DataStatus.DataStatus
import org.opengroup.osdu.production.domain.generated.{ DataPointsList, StorageRecordProtocol }
import org.opengroup.osdu.production.config.BigTableConfiguration
import org.opengroup.osdu.production.domain.generated.DataPointsList
import org.opengroup.osdu.production.domain.generated.StorageRecordProtocol.{ DataPointValue, StorageRecord }
import scala.util.{ Failure, Success, Try }
object ProtocolConverters extends LogF {
def convertToTypedValue(point: DataPointsList.DataPoint, agentId: Long, sourceVersionTime: Long = 0, pubSubDeltaTime: Long = 0, versionTime: Long = System.currentTimeMillis().offsetted): DataPoint = {
logger.trace(s"converting data point protobuf type to domain specific format $point")
point.getValueType match {
case 1 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointDouble, Double, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 2 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointDouble, Single, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 3 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointLong, Long, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 4 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointDateTime, Time, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 5 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointBoolean, Boolean, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 6 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointString, String, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case 7 => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), point.getValue.getDataPointStringBytes, Raw, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
case _ => DataPoint(point.getStreamId, point.getTimeStamp, versionTime, resolveDataStatus(point.getStatus), UndefType, UndefType, sourceVersionTime, pubSubDeltaTime, 1, agentId, point.getSourceUnit)
}
}
def resolveDataStatus(status: Int): DataStatus = {
logger.trace(s"resolving data status from integer $status")
status match {
case 100 => DataStatus.Good
case 200 => DataStatus.Bad
case 300 => DataStatus.Missing
case 400 => DataStatus.Questionable
case 500 => DataStatus.KnownInvalidType
case 600 => DataStatus.Uncertain
case 700 => DataStatus.Substituted
case 800 => DataStatus.BadUnit
case 900 => DataStatus.IntervalEnd
case 10000 => DataStatus.Deleted
case _ => DataStatus.InvalidType
}
}
def resolveDataTypeCode(dataType: DataType): Int = {
logger.trace(s"resolving integer value from data type $dataType")
dataType match {
case Double => 1
case Single => 2
case Long => 3
case Time => 4
case Boolean => 5
case String => 6
case Raw => 7
case UndefType => 0
}
}
def resolveStatusCode(status: DataStatus): Int = {
logger.trace(s"resolving integer value from status $status")
status match {
case DataStatus.Good => 100
case DataStatus.Bad => 200
case DataStatus.Missing => 300
case DataStatus.Questionable => 400
case DataStatus.KnownInvalidType => 500
case DataStatus.Uncertain => 600
case DataStatus.Substituted => 700
case DataStatus.BadUnit => 800
case DataStatus.IntervalEnd => 900
case DataStatus.Deleted => 10000
case DataStatus.InvalidType => 0
}
}
def dataPointGenerator(v: DataPoint, config: BigTableConfiguration): RowKeyAndMutation = {
val mutationBuilder = Mutation.newBuilder()
val setCell = SetCell.newBuilder()
.setColumnQualifier(ByteString.copyFrom(Longs.toByteArray(v.versionTime)))
.setFamilyName(config.columnFamilies.head)
.setValue(ByteString.copyFrom(convertValueToBytes(v)))
.build()
(ByteString.copyFrom(Longs.toByteArray(v.streamId) ++ Longs.toByteArray(v.physicalTime + dataOffset)), mutationBuilder.setSetCell(setCell).build())
}
def indexGenerator(v: DataPoint, config: BigTableConfiguration): RowKeyAndMutation = {
val mutationBuilder = Mutation.newBuilder()
val setCell = SetCell.newBuilder()
.setColumnQualifier(ByteString.copyFrom(Longs.toByteArray(v.versionTime)))
.setFamilyName(config.columnFamilies.head)
// needed to filter index
.setValue(ByteString.copyFrom(Ints.toByteArray(resolveStatusCode(v.status))))
.build()