From 8e499a5a9faa2a126e9ab847c51e3d3dba39f43d Mon Sep 17 00:00:00 2001 From: Ruslans Tarasovs Date: Wed, 8 May 2024 11:34:12 +0300 Subject: [PATCH] Disable LWTs by default. --- .../cassandra/SnapshotCassandra.scala | 21 +++---- .../cassandra/SnapshotCassandraConfig.scala | 10 +++- .../cassandra/SnapshotStatements.scala | 60 ++++++++++++++----- 3 files changed, 65 insertions(+), 26 deletions(-) diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala index a39d91fdc..71fa16d9f 100644 --- a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala @@ -3,7 +3,7 @@ package com.evolutiongaming.kafka.journal.snapshot.cassandra import cats.effect.kernel.{Async, Resource, Temporal} import cats.effect.syntax.all._ import cats.syntax.all._ -import cats.{Monad, MonadThrow, Parallel} +import cats.{MonadThrow, Parallel} import com.evolutiongaming.catshelper.LogOf import com.evolutiongaming.kafka.journal._ import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig @@ -22,7 +22,7 @@ object SnapshotCassandra { ): Resource[F, SnapshotStore[F]] = { def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = - of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots) + of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots, config.useLWT) for { cassandraCluster <- CassandraCluster.of[F](config.client, cassandraClusterOf, config.retries) @@ -36,11 +36,12 @@ object SnapshotCassandra { schemaConfig: SnapshotSchemaConfig, origin: Option[Origin], consistencyConfig: CassandraConsistencyConfig, - numberOfSnapshots: Int + numberOfSnapshots: Int, + useLWT: Boolean ): F[SnapshotStore[F]] = for { schema <- SetupSnapshotSchema[F](schemaConfig, origin, consistencyConfig) - statements <- Statements.of[F](schema, consistencyConfig) + statements <- Statements.of[F](schema, consistencyConfig, useLWT) } yield SnapshotCassandra(statements, numberOfSnapshots) private sealed abstract class Main @@ -159,13 +160,13 @@ object SnapshotCassandra { ) object Statements { - def of[F[_]: Monad: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig): F[Statements[F]] = { + def of[F[_]: MonadThrow: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig, useLWT: Boolean): F[Statements[F]] = { for { - insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write) - updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write) - selectRecord <- SnapshotStatements.SelectRecord.of[F](schema.snapshot, consistencyConfig.read) - selectMetadata <- SnapshotStatements.SelectMetadata.of[F](schema.snapshot, consistencyConfig.read) - deleteRecords <- SnapshotStatements.Delete.of[F](schema.snapshot, consistencyConfig.write) + insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT) + updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT) + selectRecord <- SnapshotStatements.SelectRecord.of[F](schema.snapshot, consistencyConfig.read, useLWT) + selectMetadata <- SnapshotStatements.SelectMetadata.of[F](schema.snapshot, consistencyConfig.read, useLWT) + deleteRecords <- SnapshotStatements.Delete.of[F](schema.snapshot, consistencyConfig.write, useLWT) } yield Statements(insertRecord, updateRecord, selectRecord, selectMetadata, deleteRecords) } } diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala index 94b186587..18b7e5294 100644 --- a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala @@ -9,6 +9,9 @@ import pureconfig.generic.semiauto.deriveReader /** Cassandra-specific configuration used by a plugin. * * Specifies long time storage configuration and Cassandra client parameters. + * + * Note: if `useLWT` is set to `true`, then `consistencyConfig.read` should be set to `ConsistencyLevel.SERIAL` or + * `ConsistencyLevel.LOCAL_SERIAL`. Otherwise, the plugin will throw an exception to prevent data corruption. * * @param retries * Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the @@ -25,6 +28,10 @@ import pureconfig.generic.semiauto.deriveReader * @param consistencyConfig * Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to * change it, is for integration tests with small number of Cassandra nodes. + * @param useLWT + * Use Cassandra LWTs to ensure the older snapshots of one writer do not overwrite the newer snapshots of another. + * It is recommended to set it to `false` and rely on external mechanism to ensure there is only a single writer + * (such as Akka Persistence). */ final case class SnapshotCassandraConfig( retries: Int = 100, @@ -34,7 +41,8 @@ final case class SnapshotCassandraConfig( query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true) ), schema: SnapshotSchemaConfig = SnapshotSchemaConfig.default, - consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default + consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default, + useLWT: Boolean = false ) object SnapshotCassandraConfig { diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala index cef3ff44c..4ce878fc9 100644 --- a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala @@ -1,7 +1,7 @@ package com.evolutiongaming.kafka.journal.snapshot.cassandra -import cats.Monad import cats.syntax.all._ +import cats.{Monad, MonadThrow} import com.datastax.driver.core.Row import com.evolutiongaming.kafka.journal._ import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig @@ -42,7 +42,8 @@ object SnapshotStatements { def of[F[_]: Monad: CassandraSession]( name: TableName, - consistencyConfig: CassandraConsistencyConfig.Write + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean ): F[InsertRecord[F]] = { implicit val encodeByNameByteVector: EncodeByName[ByteVector] = @@ -63,7 +64,7 @@ object SnapshotStatements { |payload_bin, |metadata) |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - |IF NOT EXISTS + |${if (useLWT) "IF NOT EXISTS" else ""} |""".stripMargin for { @@ -90,7 +91,12 @@ object SnapshotStatements { val statement = statementOf(snapshot) val row = statement.first - row.map(_.fold(false)(_.wasApplied)) + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } } } } @@ -108,7 +114,8 @@ object SnapshotStatements { def of[F[_]: Monad: CassandraSession]( name: TableName, - consistencyConfig: CassandraConsistencyConfig.Write + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean ): F[UpdateRecord[F]] = { implicit val encodeByNameByteVector: EncodeByName[ByteVector] = @@ -128,7 +135,7 @@ object SnapshotStatements { |WHERE id = :id |AND topic = :topic |AND buffer_idx = :buffer_idx - |IF seq_nr = :delete_seq_nr + |${if (useLWT) "IF seq_nr = :delete_seq_nr" else ""} |""".stripMargin for { @@ -144,7 +151,7 @@ object SnapshotStatements { .encode(key) .encode(bufferNr) .encode("insert_seq_nr", snapshot.seqNr) - .encode("delete_seq_nr", deleteSnapshot) + .encodeSome("delete_seq_nr", Option.when(useLWT)(deleteSnapshot)) .encode("timestamp", record.timestamp) .encodeSome(record.origin) .encodeSome(record.version) @@ -156,7 +163,12 @@ object SnapshotStatements { val statement = statementOf(insertSnapshot) val row = statement.first - row.map(_.fold(false)(_.wasApplied)) + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } } } } @@ -167,9 +179,10 @@ object SnapshotStatements { object SelectMetadata { - def of[F[_]: Monad: CassandraSession]( + def of[F[_]: MonadThrow: CassandraSession]( name: TableName, - consistencyConfig: CassandraConsistencyConfig.Read + consistencyConfig: CassandraConsistencyConfig.Read, + useLWT: Boolean ): F[SelectMetadata[F]] = { val query = @@ -183,6 +196,9 @@ object SnapshotStatements { |""".stripMargin for { + _ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) { + new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true") + } prepared <- query.prepare } yield { key => val bound = prepared @@ -212,9 +228,10 @@ object SnapshotStatements { object SelectRecord { - def of[F[_]: Monad: CassandraSession]( + def of[F[_]: MonadThrow: CassandraSession]( name: TableName, - consistencyConfig: CassandraConsistencyConfig.Read + consistencyConfig: CassandraConsistencyConfig.Read, + useLWT: Boolean ): F[SelectRecord[F]] = { implicit val decodeByNameByteVector: DecodeByName[ByteVector] = @@ -237,6 +254,9 @@ object SnapshotStatements { |""".stripMargin for { + _ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) { + new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true") + } prepared <- query.prepare } yield { (key, bufferNr) => def readPayload(row: Row): EventualPayloadAndType = { @@ -282,7 +302,11 @@ object SnapshotStatements { object Delete { - def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Delete[F]] = { + def of[F[_]: Monad: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean + ): F[Delete[F]] = { val query = s""" @@ -290,7 +314,8 @@ object SnapshotStatements { |WHERE id = ? |AND topic = ? |AND buffer_idx = ? - |IF EXISTS""".stripMargin + |${if (useLWT) "IF EXISTS" else ""} + |""".stripMargin for { prepared <- query.prepare @@ -301,7 +326,12 @@ object SnapshotStatements { .encode(bufferNr) .setConsistencyLevel(consistencyConfig.value) .first - row.map(_.fold(false)(_.wasApplied)) + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } } } }