-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Separate CassandraSnapshotStoreConfig from KafkaJournalConfig.
- Loading branch information
Showing
8 changed files
with
142 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
45 changes: 45 additions & 0 deletions
45
.../scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraConfig.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package com.evolutiongaming.kafka.journal.eventual.cassandra | ||
|
||
import com.datastax.driver.core.ConsistencyLevel | ||
import com.evolutiongaming.scassandra.{CassandraConfig, QueryConfig} | ||
import pureconfig.ConfigReader | ||
import pureconfig.generic.semiauto.deriveReader | ||
|
||
/** Cassandra-specific configuration used by a plugin. | ||
* | ||
* Specifies long time storage configuration and Cassandra client parameters. | ||
* | ||
* @param retries | ||
* Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the | ||
* same host if it timed out, or switch to another host if error happened, or the host was not available on a first | ||
* attempt. | ||
* @param numberOfSnapshots | ||
* Maximum number of snapshots to be stored per single persistence id. If the number of snapshots reaches this | ||
* number, but a new snapshot is requsted to be written, then the oldest snapshot will be overwritten. | ||
* @param client | ||
* Cassandra client configuration, see [[CassandraConfig]] for more details. | ||
* @param schema | ||
* Schema of Cassandra database, i.e. keyspace, names of the tables etc. It also contains a flag if schema should be | ||
* automatically created if not present, which is useful for integration testing purposes etc. | ||
* @param consistencyConfig | ||
* Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to | ||
* change it, is for integration tests with small number of Cassandra nodes. | ||
*/ | ||
final case class SnapshotCassandraConfig( | ||
retries: Int = 100, | ||
numberOfSnapshots: Int = 10, | ||
client: CassandraConfig = CassandraConfig( | ||
name = "snapshot", | ||
query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true) | ||
), | ||
schema: SchemaConfig = SchemaConfig.default, | ||
consistencyConfig: EventualCassandraConfig.ConsistencyConfig = EventualCassandraConfig.ConsistencyConfig.default | ||
) | ||
|
||
object SnapshotCassandraConfig { | ||
|
||
implicit val configReaderEventualCassandraConfig: ConfigReader[SnapshotCassandraConfig] = deriveReader | ||
|
||
val default: SnapshotCassandraConfig = SnapshotCassandraConfig() | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
56 changes: 56 additions & 0 deletions
56
persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package akka.persistence.kafka.journal | ||
|
||
import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig | ||
import pureconfig.generic.semiauto.deriveReader | ||
import pureconfig.{ConfigCursor, ConfigReader, ConfigSource} | ||
|
||
import scala.concurrent.duration._ | ||
|
||
/** Configuration for [[CassandraSnapshotStore]]. | ||
* | ||
* This case class specifies configuration that could be set using `application.conf` (see `reference.conf` for an | ||
* example of such configuration). | ||
* | ||
* @param cassandra | ||
* Cassandra-specific configuration used by a plugin. | ||
* @param startTimeout | ||
* The timeout to create a journal adapter. Starting a journal involves some effectful steps, such as creating | ||
* Cassandra session, so, in case of infrastructure or configuration troubles, it could take a longer time. Creating | ||
* the journal will fail with [[TimeoutException]] if it takes longer than `startTimeout`. | ||
* @param stopTimeout | ||
* This is meant to be a counterpart to `startTimeout`, allowing resource release to timeout with an error. This | ||
* parameter is not used, for now, and `startTimeout` is used instead. | ||
* @param jsonCodec | ||
* JSON codec to use for (de)serialization of the events from [[scodec.bits.ByteVector]] to | ||
* [[play.api.libs.json.JsValue]] and vice-versa. This parameter is only relevant if default [[CassandraSnapshotStore]] is | ||
* used, i.e. it is not taken into account if Circe JSON or other custom serialization is used. | ||
* | ||
* @see | ||
* [[KafkaJournal]] for more details. | ||
*/ | ||
final case class CassandraSnapshotStoreConfig( | ||
cassandra: SnapshotCassandraConfig = SnapshotCassandraConfig.default, | ||
startTimeout: FiniteDuration = 1.minute, | ||
stopTimeout: FiniteDuration = 1.minute, | ||
jsonCodec: KafkaJournalConfig.JsonCodec = KafkaJournalConfig.JsonCodec.Default | ||
) | ||
|
||
object CassandraSnapshotStoreConfig { | ||
|
||
val default: CassandraSnapshotStoreConfig = CassandraSnapshotStoreConfig() | ||
|
||
implicit val configReaderKafkaJournalConfig: ConfigReader[CassandraSnapshotStoreConfig] = { | ||
|
||
val configReader = deriveReader[CassandraSnapshotStoreConfig] | ||
|
||
cursor: ConfigCursor => { | ||
for { | ||
cursor <- cursor.asObjectCursor | ||
config = cursor.objValue.toConfig | ||
source = ConfigSource.fromConfig(config) | ||
config <- source.load(configReader) | ||
} yield config | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters