Skip to content

Commit

Permalink
WIP: Snapshot statements.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 1, 2023
1 parent b7ce030 commit dd3a372
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 1 deletion.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OrganizeImports.preset = INTELLIJ_2020_3
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy val root = (project in file(".")
aggregate(
`scalatest-io`,
journal,
snapshot,
persistence,
`tests`,
replicator,
Expand Down Expand Up @@ -85,6 +86,11 @@ lazy val journal = (project in file("journal")
Logback.core % Test,
Logback.classic % Test)))

lazy val snapshot = (project in file("snapshot")
settings (name := "kafka-journal-snapshot")
settings commonSettings
dependsOn (journal))

lazy val persistence = (project in file("persistence")
settings (name := "kafka-journal-persistence")
settings commonSettings
Expand Down Expand Up @@ -135,7 +141,7 @@ lazy val replicator = (Project("replicator", file("replicator"))
lazy val `eventual-cassandra` = (project in file("eventual-cassandra")
settings (name := "kafka-journal-eventual-cassandra")
settings commonSettings
dependsOn (journal % "test->test;compile->compile")
dependsOn (journal % "test->test;compile->compile", snapshot % "test->test;compile->compile")
settings (libraryDependencies ++= Seq(scache, scassandra)))

lazy val `journal-circe` = (project in file("circe/core")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.Monad
import cats.syntax.all._
import com.datastax.driver.core.Row
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._
import com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandraConfig.ConsistencyConfig
import com.evolutiongaming.kafka.journal.eventual.cassandra.HeadersHelper._
import com.evolutiongaming.scassandra.syntax._
import com.evolutiongaming.scassandra.{DecodeByName, EncodeByName, TableName}
import scodec.bits.ByteVector

import java.time.Instant
import scala.util.Try

object SnapshotStatements {

def createTable(name: TableName): String = {
s"""
|CREATE TABLE IF NOT EXISTS ${ name.toCql } (
|id TEXT,
|topic TEXT,
|segment BIGINT,
|buffer_nr INT,
|seq_nr BIGINT,
|partition INT,
|offset BIGINT,
|timestamp TIMESTAMP,
|origin TEXT,
|version TEXT,
|tags SET<TEXT>,
|metadata TEXT,
|payload_type TEXT,
|payload_txt TEXT,
|payload_bin BLOB,
|headers MAP<TEXT, TEXT>,
|status TEXT
|PRIMARY KEY ((id, topic, segment), buffer_nr))
|""".stripMargin
}

trait InsertRecord[F[_]] {
def apply(key: Key, segment: SegmentNr, bufferNr: BufferNr, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit]
}

object InsertRecord {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Encode](
name: TableName,
consistencyConfig: ConsistencyConfig.Write
): F[InsertRecord[F]] = {

implicit val encodeTry: JsonCodec.Encode[Try] = JsonCodec.Encode.summon[F].mapK(ToTry.functionK)

implicit val encodeByNameByteVector: EncodeByName[ByteVector] = EncodeByName[Array[Byte]]
.contramap { _.toArray }

val encodeByNameRecordMetadata = EncodeByName[RecordMetadata]

val query =
s"""
|INSERT INTO ${ name.toCql } (
|id,
|topic,
|segment,
|buffer_nr,
|seq_nr,
|partition,
|offset,
|timestamp,
|origin,
|version,
|tags,
|payload_type,
|payload_txt,
|payload_bin,
|metadata,
|headers,
|status)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

for {
prepared <- query.prepare
} yield {
(key: Key, segment: SegmentNr, bufferNr: BufferNr, snapshot: SnapshotRecord[EventualPayloadAndType]) =>

def statementOf(record: SnapshotRecord[EventualPayloadAndType]) = {
val snapshot = record.snapshot
val (payloadType, txt, bin) = snapshot.payload.map { payloadAndType =>
val (text, bytes) = payloadAndType.payload.fold(
str => (str.some, none[ByteVector]),
bytes => (none[String], bytes.some)
)
(payloadAndType.payloadType.some, text, bytes)
} getOrElse {
(None, None, None)
}

prepared
.bind()
.encode(key)
.encode(segment)
.encode(bufferNr)
.encode(snapshot.seqNr)
.encode(record.partitionOffset)
.encode("timestamp", record.timestamp)
.encodeSome(record.origin)
.encodeSome(record.version)
.encode("tags", snapshot.tags)
.encodeSome("payload_type", payloadType)
.encodeSome("payload_txt", txt)
.encodeSome("payload_bin", bin)
.encode("metadata", record.metadata)(encodeByNameRecordMetadata)
.encode(record.headers)
.encode(record.status)
.setConsistencyLevel(consistencyConfig.value)
}

val statement = statementOf(snapshot)
statement.setConsistencyLevel(consistencyConfig.value).first.void
}
}
}


trait SelectRecord[F[_]] {

def apply(key: Key, segment: SegmentNr, bufferNr: BufferNr): F[Option[SnapshotRecord[EventualPayloadAndType]]]
}

object SelectRecord {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Decode](
name: TableName,
consistencyConfig: ConsistencyConfig.Read): F[SelectRecord[F]] = {

implicit val encodeTry: JsonCodec.Decode[Try] = JsonCodec.Decode.summon[F].mapK(ToTry.functionK)
implicit val decodeByNameByteVector: DecodeByName[ByteVector] = DecodeByName[Array[Byte]]
.map { a => ByteVector.view(a) }

val query =
s"""
|SELECT
|seq_nr,
|partition,
|offset,
|timestamp,
|origin,
|version,
|tags,
|payload_type,
|payload_txt,
|payload_bin,
|metadata,
|headers,
|status FROM ${ name.toCql }
|WHERE id = ?
|AND topic = ?
|AND segment = ?
|AND buffer_nr = ?
|""".stripMargin

for {
prepared <- query.prepare
} yield {
new SelectRecord[F] {

def apply(key: Key, segment: SegmentNr, bufferNr: BufferNr) = {

def readPayload(row: Row): Option[EventualPayloadAndType] = {
val payloadType = row.decode[Option[PayloadType]]("payload_type")
val payloadTxt = row.decode[Option[String]]("payload_txt")
val payloadBin = row.decode[Option[ByteVector]]("payload_bin") getOrElse ByteVector.empty

payloadType
.map(EventualPayloadAndType(payloadTxt.toLeft(payloadBin), _))
}

val bound = prepared
.bind()
.encode(key)
.encode(segment)
.encodeAt(3, bufferNr)
.setConsistencyLevel(consistencyConfig.value)

val rows = for {
row <- bound.execute
} yield {
val partitionOffset = row.decode[PartitionOffset]

val payload = readPayload(row)

val seqNr = row.decode[SeqNr]
val snapshot = Snapshot(
seqNr = seqNr,
tags = row.decode[Tags]("tags"),
payload = payload)

val metadata = row.decode[Option[RecordMetadata]]("metadata") getOrElse RecordMetadata.empty

val headers = row.decode[Headers]

val status = row.decode[SnapshotStatus]

SnapshotRecord(
snapshot = snapshot,
timestamp = row.decode[Instant]("timestamp"),
origin = row.decode[Option[Origin]],
version = row.decode[Option[Version]],
partitionOffset = partitionOffset,
metadata = metadata,
headers = headers,
status = status)
}

rows.first
}
}
}
}
}


trait UpdateStatus[F[_]] {

def apply(key: Key, segmentNr: SegmentNr, bufferNr: BufferNr, status: SnapshotStatus): F[Unit]
}

object UpdateStatus {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: ConsistencyConfig.Write
): F[UpdateStatus[F]] = {

val query =
s"""
|UPDATE ${ name.toCql }
|SET status = ?
|WHERE id = ?
|AND topic = ?
|AND segment = ?
|AND buffer_nr = ?
|""".stripMargin

for {
prepared <- query.prepare
} yield {
(key: Key, segmentNr: SegmentNr, bufferNr: BufferNr, status: SnapshotStatus) =>
prepared
.bind()
.encode(status)
.encode(key)
.encode(segmentNr)
.encode(bufferNr)
.setConsistencyLevel(consistencyConfig.value)
.first
.void
}
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.evolutiongaming.kafka.journal

import com.evolutiongaming.scassandra.DecodeByIdx
import com.evolutiongaming.scassandra.DecodeByName
import com.evolutiongaming.scassandra.DecodeRow
import com.evolutiongaming.scassandra.EncodeByIdx
import com.evolutiongaming.scassandra.EncodeByName
import com.evolutiongaming.scassandra.EncodeRow

sealed abstract case class BufferNr(value: Int) {
override def toString: String = value.toString
}

object BufferNr {

def fromIntUnsafe(value: Int): BufferNr =
new BufferNr(value) {}

implicit val encodeByNameBufferNr: EncodeByName[BufferNr] =
EncodeByName[Int].contramap(_.value)
implicit val decodeByNameBufferNr: DecodeByName[BufferNr] =
DecodeByName[Int].map(fromIntUnsafe)

implicit val encodeByIdxBufferNr: EncodeByIdx[BufferNr] =
EncodeByIdx[Int].contramap(_.value)
implicit val decodeByIdxBufferNr: DecodeByIdx[BufferNr] =
DecodeByIdx[Int].map(fromIntUnsafe)

implicit val encodeRowSeqNr: EncodeRow[BufferNr] =
EncodeRow[BufferNr]("buffer_nr")
implicit val decodeRowSeqNr: DecodeRow[BufferNr] =
DecodeRow[BufferNr]("buffer_nr")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.evolutiongaming.kafka.journal

final case class Snapshot[A](
seqNr: SeqNr,
tags: Tags = Tags.empty,
payload: Option[A] = None
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.evolutiongaming.kafka.journal

import java.time.Instant

final case class SnapshotRecord[A](
snapshot: Snapshot[A],
timestamp: Instant,
partitionOffset: PartitionOffset,
origin: Option[Origin],
version: Option[Version],
metadata: RecordMetadata,
headers: Headers,
status: SnapshotStatus
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.evolutiongaming.kafka.journal

import com.evolutiongaming.scassandra.EncodeByName
import com.evolutiongaming.scassandra.DecodeByName
import com.evolutiongaming.scassandra.EncodeByIdx
import com.evolutiongaming.scassandra.DecodeByIdx
import com.evolutiongaming.scassandra.EncodeRow
import com.evolutiongaming.scassandra.DecodeRow

sealed abstract class SnapshotStatus {
def name: String
}

object SnapshotStatus {

val values: List[SnapshotStatus] = List(Active, Deleted)

def fromStringUnsafe(status: String): SnapshotStatus =
values.find(_.name == status).getOrElse(Active)

case object Active extends SnapshotStatus { def name = "active" }
case object Deleted extends SnapshotStatus { def name = "deleted" }

implicit val encodeByNameSnapshotStatus: EncodeByName[SnapshotStatus] =
EncodeByName[String].contramap(_.name)
implicit val decodeByNameSnapshotStatus: DecodeByName[SnapshotStatus] =
DecodeByName[String].map(fromStringUnsafe)

implicit val encodeByIdxSnapshotStatus: EncodeByIdx[SnapshotStatus] =
EncodeByIdx[String].contramap(_.name)
implicit val decodeByIdxSnapshotStatus: DecodeByIdx[SnapshotStatus] =
DecodeByIdx[String].map(fromStringUnsafe)

implicit val encodeRowSeqNr: EncodeRow[SnapshotStatus] =
EncodeRow[SnapshotStatus]("status")
implicit val decodeRowSeqNr: DecodeRow[SnapshotStatus] =
DecodeRow[SnapshotStatus]("status")
}

0 comments on commit dd3a372

Please sign in to comment.