Skip to content

Commit

Permalink
Minimal implementation (TCK not passing yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 21, 2023
1 parent c6d69ea commit ee8a959
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object SnapshotCassandra {
MonadThrow[F].fromOption(freeBufferNr, SnapshotStoreError("Could not find a free key")).flatMap { bufferNr =>
val wasApplied = statements.insertRecord(key, segmentNr, bufferNr, snapshot)
wasApplied.flatMap { wasApplied =>
// TODO: consider adding circuit breaker here
if (wasApplied) ().pure[F] else save(key, snapshot)
}
}
Expand All @@ -95,6 +96,7 @@ object SnapshotCassandra {
val (bufferNr, (deleteSnapshot, _)) = oldestSnapshot
val wasApplied = statements.updateRecord(key, segmentNr, bufferNr, insertSnapshot, deleteSnapshot)
wasApplied.flatMap { wasApplied =>
// TODO: consider adding circuit breaker here
if (wasApplied) ().pure[F] else save(key, insertSnapshot)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SnapshotStatements {
|payload_txt,
|payload_bin,
|metadata)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|IF NOT EXISTS
|""".stripMargin

Expand Down Expand Up @@ -125,7 +125,7 @@ object SnapshotStatements {

val query =
s"""
|UPDATE ${name.toCql} (
|UPDATE ${name.toCql}
|SET seq_nr = :insert_seq_nr,
|timestamp = :timestamp,
|origin = :origin,
Expand All @@ -134,10 +134,10 @@ object SnapshotStatements {
|payload_txt = :payload_txt,
|payload_bin = :payload_bin,
|metadata = :metadata
|WHERE id = :id,
|topic = :topic,
|segment = :segment,
|buffer_nr = :buffer_nr
|WHERE id = :id
|AND topic = :topic
|AND segment = :segment
|AND buffer_nr = :buffer_nr
|IF seq_nr = :delete_seq_nr
|""".stripMargin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.Fail.implicits._
import scodec.bits.ByteVector

/** Piece of data prepared for convenient storing into Cassandra row.
*
* Usually the data is stored into the structure similar to the following:
* {{{
* payload_type TEXT,
* payload_txt TEXT,
* payload_bin BLOB
* }}}
* Where usage of either `payload_txt` or `payload_bin` column depends on
* the contents of a `payload` field.
*
* The `payloadType` field is used to determine how the contents of `payload`
* should be treated, i.e. if it should be parsed as JSON.
*/
final case class EventualPayloadAndType(
payload: Either[String, ByteVector],
payloadType: PayloadType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import com.evolutiongaming.kafka.journal.util.Fail.implicits._
import com.evolutiongaming.kafka.journal.{JournalError, JsonCodec, Payload, PayloadType}


/** Decode a payload loaded from an eventual storage.
*
* Converts a structure convenient to store to eventual store, i.e. Cassandra, to a structure, which is convenient to
* use for a business logic.
*/
trait EventualRead[F[_], A] {

def apply(payloadAndType: EventualPayloadAndType): F[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import com.evolutiongaming.kafka.journal.{JournalError, JsonCodec, Payload}
import scodec.bits.ByteVector


/** Prepare payload for storing.
*
* Converts a structure convenient for a business logic to a structure, which is convenient to store to eventual store,
* i.e. Cassandra.
*/
trait EventualWrite[F[_], A] {

def apply(payload: A): F[EventualPayloadAndType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
.load[KafkaJournalConfig]
.liftTo[IO]

def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] = {
??? // SnapshotSerializer.of[IO](system).toResource
}
def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] =
SnapshotSerializer.of[IO](system).toResource

def snapshotReadWrite(config: KafkaJournalConfig): IO[SnapshotReadWrite[IO, Payload]] =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,129 @@
package akka.persistence.kafka.journal

import akka.actor.ActorSystem
import akka.persistence.{SelectedSnapshot, SnapshotMetadata}
import com.evolutiongaming.kafka.journal.Snapshot
import cats.effect.kernel.Sync
import cats.syntax.all._
import com.evolutiongaming.catshelper.MonadThrowable
import com.evolutiongaming.kafka.journal.FromBytes.implicits._
import com.evolutiongaming.kafka.journal.ToBytes.implicits._
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.Fail.implicits._
import com.evolutiongaming.serialization.SerializedMsg
import play.api.libs.json.{JsString, JsValue, Json}
import scodec.bits.ByteVector

trait SnapshotSerializer[F[_], A] {

def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]]
def toAkkaRepresentation(persistenceId: PersistenceId, snapshot: Snapshot[A]): F[SelectedSnapshot]

}

object SnapshotSerializer {

def of[F[_]: Sync: FromAttempt: FromJsResult](system: ActorSystem): F[SnapshotSerializer[F, Payload]] =
SerializedMsgSerializer.of[F](system).map(SnapshotSerializer(_))

def apply[F[_]: MonadThrowable: FromAttempt: FromJsResult](
serializer: SerializedMsgSerializer[F]
): SnapshotSerializer[F, Payload] = {

implicit val toBytesSerializedMsg: ToBytes[F, SerializedMsg] = ToBytes.fromEncoder
implicit val fromBytesSerializedMsg: FromBytes[F, SerializedMsg] = FromBytes.fromDecoder

def toSnapshotPayload(payload: Any): F[Payload] = {

def binary(payload: AnyRef) =
// TODO: should we use PersitentBinary?
for {
serializedMsg <- serializer.toMsg(payload)
bytes <- serializedMsg.toBytes[F]
} yield Payload.binary(bytes)

def json(payload: JsValue, payloadType: Option[PayloadType.TextOrJson] = None) = {
// TODO: should we use another structure?
val persistent = PersistentJson(manifest = None, writerUuid = "", payloadType = payloadType, payload = payload)
val json = Json.toJson(persistent)
Payload.json(json)
}

// TODO: what will happen if `payload` is `Any`?
payload match {
case payload: JsValue => json(payload).pure[F]
case payload: String => json(JsString(payload), PayloadType.Text.some).pure[F]
case payload: AnyRef => binary(payload)
}
}

def fromSnapshotPayload(payload: Payload): F[Any] = {

def binary(payload: ByteVector): F[Any] = {
for {
serializedMsg <- payload.fromBytes[F, SerializedMsg]
anyRef <- serializer.fromMsg(serializedMsg)
} yield anyRef
}

def json(payload: JsValue): F[Any] = {
for {
persistent <- FromJsResult[F].apply(payload.validate[PersistentJson[JsValue]])
payloadType = persistent.payloadType.getOrElse(PayloadType.Json)
anyRef <- payloadType match {
case PayloadType.Text => FromJsResult[F].apply(persistent.payload.validate[String])
case PayloadType.Json => persistent.payload.pure[F].widen[AnyRef]
}
} yield anyRef
}

payload match {
case p: Payload.Binary => binary(p.value)
case _: Payload.Text => Fail.lift[F].fail(s"Payload.Text is not supported")
case p: Payload.Json => json(p.value)
}
}

SnapshotSerializer(toSnapshotPayload, fromSnapshotPayload)
}

def apply[F[_]: MonadThrowable, A](
toSnapshotPayload: Any => F[A],
fromSnapshotPayload: A => F[Any]
): SnapshotSerializer[F, A] = new SnapshotSerializer[F, A] {

implicit val fail: Fail[F] = Fail.lift[F]

def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = {

val result = for {
payload <- toSnapshotPayload(snapshot)
seqNr <- SeqNr.of[F](metadata.sequenceNr)
} yield Snapshot(seqNr, payload.some)

result.adaptErr { case e =>
SnapshotStoreError(s"ToSnapshot error, persistenceId: ${metadata.persistenceId}: $e", e)
}
}

def toAkkaRepresentation(persistenceId: PersistenceId, snapshot: Snapshot[A]): F[SelectedSnapshot] = {

val payload = snapshot.payload.map(_.pure[F]).getOrElse {
s"Snapshot.payload is not defined".fail[F, A]
}

val result = for {
payload <- payload
persistentPayload <- fromSnapshotPayload(payload)
} yield SelectedSnapshot(
metadata = SnapshotMetadata(persistenceId = persistenceId, sequenceNr = snapshot.seqNr.value),
snapshot = persistentPayload
)

result.adaptErr { case e =>
SnapshotStoreError(s"FromSnapshot error, persistenceId: $persistenceId, snapshot: $snapshot: $e", e)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ object SnapshotStoreAdapter {
for {
maxSeqNr <- SeqNr.of(criteria.maxSequenceNr)
maxTimestamp = Instant.ofEpochMilli(criteria.maxTimestamp)
minSequenceNr <- SeqNr.of(criteria.minSequenceNr)
// this "if"" statement is required, because `0` is sometimes passed by Akka as a value here
minSequenceNr <- if (criteria.minSequenceNr < 1) SeqNr.min.pure else SeqNr.of(criteria.minSequenceNr)
minTimestamp = Instant.ofEpochMilli(criteria.minTimestamp)
} yield journal.SnapshotSelectionCriteria(
maxSeqNr = maxSeqNr,
Expand Down

0 comments on commit ee8a959

Please sign in to comment.