Skip to content

Commit

Permalink
Some working tests are added.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 8, 2023
1 parent 3b3bcbd commit b12d703
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ object SnapshotCassandra {
def apply[F[_]: MonadThrow](statements: Statements[F]): SnapshotStoreFlat[F] = {
new Main with SnapshotStoreFlat[F] {

// we do not use segments for now
val segmentNr = SegmentNr.min

def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = {
// we do not use segments for now
val segmentNr = SegmentNr.min
statements.selectMetadata(key, segmentNr).flatMap {
case s if s.size < BufferSize => insert(key, segmentNr, s, snapshot)
case s => update(key, segmentNr, s, snapshot)
Expand Down Expand Up @@ -75,7 +76,19 @@ object SnapshotCassandra {
maxTimestamp: Instant,
minSeqNr: SeqNr,
minTimestamp: Instant
): F[Option[SnapshotRecord[EventualPayloadAndType]]] = ???
): F[Option[SnapshotRecord[EventualPayloadAndType]]] = for {
savedSnapshots <- statements.selectMetadata(key, segmentNr)
sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, _)) => seqNr }
bufferNr = sortedSnapshots.reverse.collectFirst {
case (bufferNr, (seqNr, timestamp))
if seqNr >= minSeqNr &&
seqNr <= maxSeqNr &&
timestamp.compareTo(minTimestamp) >= 0 &&
timestamp.compareTo(maxTimestamp) <= 0 =>
bufferNr
}
snapshot <- bufferNr.flatTraverse(statements.selectRecords(key, segmentNr, _))
} yield snapshot

def drop(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit] = ???
def drop(key: Key, seqNr: SeqNr): F[Unit] = ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import cats.syntax.all._
import com.evolutiongaming.kafka.journal.IOSuite._
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.funsuite.AsyncFunSuite

import java.time.Instant

class SnapshotCassandraTest extends AnyFunSuite {
class SnapshotCassandraTest extends AsyncFunSuite {

type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType]

Expand All @@ -19,14 +19,31 @@ class SnapshotCassandraTest extends AnyFunSuite {
statements <- statements[IO]
store = SnapshotCassandra(statements)
key = Key("topic", "id")
_ <- store.save(key, snapshot)
snapshot <- store.load(key, SeqNr.min, Instant.MAX, SeqNr.max, Instant.MAX)
_ <- store.save(key, record)
snapshot <- store.load(key, SeqNr.max, Instant.MAX, SeqNr.min, Instant.MIN)
} yield {
assert(snapshot.isDefined)
}
program.run()
}

test("save concurrently") {
val program = for {
statements <- statements[IO]
store = SnapshotCassandra(statements)
key = Key("topic", "id")
snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1))
snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2))
save1 = store.save(key, record.copy(snapshot = snapshot1))
save2 = store.save(key, record.copy(snapshot = snapshot2))
_ <- IO.both(save1, save2)
snapshot <- store.load(key, SeqNr.max, Instant.MAX, SeqNr.min, Instant.MIN)
} yield {
assert(snapshot.map(_.snapshot.seqNr) == Some(SeqNr.unsafe(2)))
}
program.run()
}

def statements[F[_]: Concurrent]: F[SnapshotCassandra.Statements[F]] =
for {
state <- Ref[F].of(DatabaseState.empty)
Expand Down Expand Up @@ -61,7 +78,7 @@ class SnapshotCassandraTest extends AnyFunSuite {
def empty: DatabaseState = DatabaseState(Map.empty)
}

val snapshot = SnapshotRecord(
val record = SnapshotRecord(
snapshot = Snapshot(
seqNr = SeqNr.min,
payload = Some(EventualPayloadAndType(payload = Left("payload"), payloadType = PayloadType.Text))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ trait SnapshotStoreFlat[F[_]] {
minTimestamp: Instant
): F[Option[SnapshotRecord[EventualPayloadAndType]]]

def drop(
key: Key,
maxSeqNr: SeqNr,
maxTimestamp: Instant,
minSeqNr: SeqNr,
minTimestamp: Instant
): F[Unit]
def drop(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit]

def drop(key: Key, seqNr: SeqNr): F[Unit]

Expand Down

0 comments on commit b12d703

Please sign in to comment.