Skip to content

Commit

Permalink
Add unit test for save and load.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 8, 2023
1 parent cb402fc commit ca0f4db
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object SnapshotCassandra {
val segmentNr = SegmentNr.min
statements.selectMetadata(key, segmentNr).flatMap {
case s if s.size < BufferSize => insert(key, segmentNr, s, snapshot)
case s => update(key, segmentNr, s, snapshot)
case s => update(key, segmentNr, s, snapshot)
}
}

Expand All @@ -62,13 +62,21 @@ object SnapshotCassandra {
val sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, _)) => seqNr }

val oldestSnapshot = sortedSnapshots.lastOption
MonadThrow[F].fromOption(oldestSnapshot, SnapshotStoreError("Could not find an oldest snapshot")).flatMap { oldestSnapshot =>
val (bufferNr, (_, _)) = oldestSnapshot
statements.insertRecords(key, segmentNr, bufferNr, snapshot)
MonadThrow[F].fromOption(oldestSnapshot, SnapshotStoreError("Could not find an oldest snapshot")).flatMap {
oldestSnapshot =>
val (bufferNr, (_, _)) = oldestSnapshot
statements.insertRecords(key, segmentNr, bufferNr, snapshot)
}
}

def load(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit] = ???
def load(
key: Key,
maxSeqNr: SeqNr,
maxTimestamp: Instant,
minSeqNr: SeqNr,
minTimestamp: Instant
): F[Option[SnapshotRecord[EventualPayloadAndType]]] = ???

def drop(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit] = ???
def drop(key: Key, seqNr: SeqNr): F[Unit] = ???

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.effect.IO
import cats.effect.kernel.{Concurrent, Ref}
import cats.syntax.all._
import com.evolutiongaming.kafka.journal.IOSuite._
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import org.scalatest.funsuite.AnyFunSuite

import java.time.Instant

class SnapshotCassandraSpec extends AnyFunSuite {

type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType]

test("save and load") {
val program = for {
statements <- statements[IO]
store = SnapshotCassandra(statements)
key = Key("topic", "id")
_ <- store.save(key, snapshot)
snapshot <- store.load(key, SeqNr.min, Instant.MAX, SeqNr.max, Instant.MAX)
} yield {
assert(snapshot.isDefined)
}
program.run()
}

def statements[F[_]: Concurrent]: F[SnapshotCassandra.Statements[F]] =
for {
state <- Ref[F].of(DatabaseState.empty)
} yield SnapshotCassandra.Statements(
insertRecords = { (_, _, bufferNr, snapshot) =>
state.update(_.insert(bufferNr, snapshot))
},
selectRecords = { (_, _, bufferNr) =>
state.get.map(_.select(bufferNr))
},
selectMetadata = { (_, _) =>
state.get.map(_.metadata)
},
deleteRecords = { (_, _, bufferNr) =>
state.update(_.delete(bufferNr))
}
)

case class DatabaseState(snapshots: Map[BufferNr, SnaphsotWithPayload]) {
def insert(bufferNr: BufferNr, snapshot: SnaphsotWithPayload): DatabaseState =
this.copy(snapshots = snapshots.updated(bufferNr, snapshot))
def select(bufferNr: BufferNr): Option[SnaphsotWithPayload] =
snapshots.get(bufferNr)
def metadata: Map[BufferNr, (SeqNr, Instant)] =
snapshots.fmap { snapshot =>
(snapshot.snapshot.seqNr, snapshot.timestamp)
}
def delete(bufferNr: BufferNr): DatabaseState =
this.copy(snapshots = snapshots - bufferNr)
}
object DatabaseState {
def empty: DatabaseState = DatabaseState(Map.empty)
}

val snapshot = SnapshotRecord(
snapshot = Snapshot(
seqNr = SeqNr.min,
payload = Some(EventualPayloadAndType(payload = Left("payload"), payloadType = PayloadType.Text))
),
timestamp = Instant.MIN,
origin = None,
version = None
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ trait SnapshotStoreFlat[F[_]] {
maxTimestamp: Instant,
minSeqNr: SeqNr,
minTimestamp: Instant
): F[Unit]
): F[Option[SnapshotRecord[EventualPayloadAndType]]]

def drop(
key: Key,
Expand Down

0 comments on commit ca0f4db

Please sign in to comment.