From b12d703f54be2f132b80ac6625212d1431e820b6 Mon Sep 17 00:00:00 2001 From: Ruslans Tarasovs Date: Wed, 8 Nov 2023 15:48:11 +0200 Subject: [PATCH] Some working tests are added. --- .../cassandra/SnapshotCassandra.scala | 19 ++++++++++--- .../cassandra/SnapshotCassandraTest.scala | 27 +++++++++++++++---- .../kafka/journal/SnapshotStoreFlat.scala | 8 +----- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala index 9bc43f825..f27fc9b89 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala @@ -30,9 +30,10 @@ object SnapshotCassandra { def apply[F[_]: MonadThrow](statements: Statements[F]): SnapshotStoreFlat[F] = { new Main with SnapshotStoreFlat[F] { + // we do not use segments for now + val segmentNr = SegmentNr.min + def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = { - // we do not use segments for now - val segmentNr = SegmentNr.min statements.selectMetadata(key, segmentNr).flatMap { case s if s.size < BufferSize => insert(key, segmentNr, s, snapshot) case s => update(key, segmentNr, s, snapshot) @@ -75,7 +76,19 @@ object SnapshotCassandra { maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant - ): F[Option[SnapshotRecord[EventualPayloadAndType]]] = ??? + ): F[Option[SnapshotRecord[EventualPayloadAndType]]] = for { + savedSnapshots <- statements.selectMetadata(key, segmentNr) + sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, _)) => seqNr } + bufferNr = sortedSnapshots.reverse.collectFirst { + case (bufferNr, (seqNr, timestamp)) + if seqNr >= minSeqNr && + seqNr <= maxSeqNr && + timestamp.compareTo(minTimestamp) >= 0 && + timestamp.compareTo(maxTimestamp) <= 0 => + bufferNr + } + snapshot <- bufferNr.flatTraverse(statements.selectRecords(key, segmentNr, _)) + } yield snapshot def drop(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit] = ??? def drop(key: Key, seqNr: SeqNr): F[Unit] = ??? diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala index 001922c06..922e7094f 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala @@ -6,11 +6,11 @@ import cats.syntax.all._ import com.evolutiongaming.kafka.journal.IOSuite._ import com.evolutiongaming.kafka.journal._ import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.funsuite.AsyncFunSuite import java.time.Instant -class SnapshotCassandraTest extends AnyFunSuite { +class SnapshotCassandraTest extends AsyncFunSuite { type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType] @@ -19,14 +19,31 @@ class SnapshotCassandraTest extends AnyFunSuite { statements <- statements[IO] store = SnapshotCassandra(statements) key = Key("topic", "id") - _ <- store.save(key, snapshot) - snapshot <- store.load(key, SeqNr.min, Instant.MAX, SeqNr.max, Instant.MAX) + _ <- store.save(key, record) + snapshot <- store.load(key, SeqNr.max, Instant.MAX, SeqNr.min, Instant.MIN) } yield { assert(snapshot.isDefined) } program.run() } + test("save concurrently") { + val program = for { + statements <- statements[IO] + store = SnapshotCassandra(statements) + key = Key("topic", "id") + snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) + snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) + save1 = store.save(key, record.copy(snapshot = snapshot1)) + save2 = store.save(key, record.copy(snapshot = snapshot2)) + _ <- IO.both(save1, save2) + snapshot <- store.load(key, SeqNr.max, Instant.MAX, SeqNr.min, Instant.MIN) + } yield { + assert(snapshot.map(_.snapshot.seqNr) == Some(SeqNr.unsafe(2))) + } + program.run() + } + def statements[F[_]: Concurrent]: F[SnapshotCassandra.Statements[F]] = for { state <- Ref[F].of(DatabaseState.empty) @@ -61,7 +78,7 @@ class SnapshotCassandraTest extends AnyFunSuite { def empty: DatabaseState = DatabaseState(Map.empty) } - val snapshot = SnapshotRecord( + val record = SnapshotRecord( snapshot = Snapshot( seqNr = SeqNr.min, payload = Some(EventualPayloadAndType(payload = Left("payload"), payloadType = PayloadType.Text)) diff --git a/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreFlat.scala b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreFlat.scala index a72a23c97..827ddb461 100644 --- a/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreFlat.scala +++ b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreFlat.scala @@ -16,13 +16,7 @@ trait SnapshotStoreFlat[F[_]] { minTimestamp: Instant ): F[Option[SnapshotRecord[EventualPayloadAndType]]] - def drop( - key: Key, - maxSeqNr: SeqNr, - maxTimestamp: Instant, - minSeqNr: SeqNr, - minTimestamp: Instant - ): F[Unit] + def drop(key: Key, maxSeqNr: SeqNr, maxTimestamp: Instant, minSeqNr: SeqNr, minTimestamp: Instant): F[Unit] def drop(key: Key, seqNr: SeqNr): F[Unit]