diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala index 3eb670491..b7505d87e 100644 --- a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala @@ -13,12 +13,45 @@ import com.evolutiongaming.serialization.SerializedMsg import play.api.libs.json.{JsString, JsValue, Json} import scodec.bits.ByteVector -import java.time.Instant - +/** Serialize Akka snapshot to an internal Kafka Journal format. + * + * @tparam A + * Type of serialized payload. At the time of writing it could be either + * [[Payload]] by default, or [[Json]] if `kafka-journal-circe` module is + * used. + */ trait SnapshotSerializer[F[_], A] { - def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] - def toAkkaRepresentation(persistenceId: PersistenceId, timestamp: Instant, snapshot: Snapshot[A]): F[SelectedSnapshot] + /** Encode Akka snapshot to Kafka Journal specific internal representation. + * + * @param metadata + * Metadata to get the additional information from, i.e. sequence number. + * @param snapshot + * Payload to be serialized to a form writable to eventual storage (i.e. + * Cassandra) and stored into [[Snapshot#payload]]. + * + * The method may raise an error into `F[_]` if it is not possible to + * serialize a snapshot, i.e. for example if [[SnapshotMetadata#sequenceNr]] + * is negative or equals to zero. + * + * @note + * As `snapshot` accepts `Any` as a paramater, it is too easy to pass a + * wrong parameter to this method. The recommendation is to be very careful + * in that area and write a unit test for an affected code. + */ + def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] + + /** Decode Akka snapshot from Kafka Journal specific internal representation. + * + * @param metadata + * Metadata to get the additional information from, i.e. sequence number. + * @param snapshot + * Serialized snapshot to parse the payload from. + * + * The method may raise an error into `F[_]` if parsing of + * [[Snapshot#payload]] fails. + */ + def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot] } @@ -95,7 +128,7 @@ object SnapshotSerializer { implicit val fail: Fail[F] = Fail.lift[F] - def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = { + def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = { val result = for { payload <- toSnapshotPayload(snapshot) @@ -107,27 +140,14 @@ object SnapshotSerializer { } } - def toAkkaRepresentation( - persistenceId: PersistenceId, - timestamp: Instant, - snapshot: Snapshot[A] - ): F[SelectedSnapshot] = { + def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot] = { val payload = fromSnapshotPayload(snapshot.payload) - val result = payload.map { payload => - SelectedSnapshot( - metadata = SnapshotMetadata( - persistenceId = persistenceId, - sequenceNr = snapshot.seqNr.value, - timestamp = timestamp.toEpochMilli - ), - snapshot = payload - ) - } + val result = payload.map(SelectedSnapshot(metadata, _)) result.adaptErr { case e => - SnapshotStoreError(s"FromSnapshot error, persistenceId: $persistenceId, snapshot: $snapshot: $e", e) + SnapshotStoreError(s"FromSnapshot error, persistenceId: ${metadata.persistenceId}, snapshot: $snapshot: $e", e) } } } diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala index 636e78dbe..90b91ed72 100644 --- a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala @@ -85,7 +85,7 @@ object SnapshotStoreAdapter { def serializeSnapshot(metadata: SnapshotMetadata, snapshot: Any): F[SnapshotRecord[EventualPayloadAndType]] = { for { seqNr <- SeqNr.of(metadata.sequenceNr) - snapshot <- snapshotSerializer.toInternalRepresentation(metadata, snapshot) + snapshot <- snapshotSerializer.encode(metadata, snapshot) payload <- snapshotReadWrite.eventualWrite(snapshot.payload) record = SnapshotRecord( snapshot = Snapshot(seqNr = seqNr, payload = payload), @@ -103,7 +103,8 @@ object SnapshotStoreAdapter { for { payload <- snapshotReadWrite.eventualRead(record.snapshot.payload) snapshot = record.snapshot.copy(payload = payload) - snapshot <- snapshotSerializer.toAkkaRepresentation(persistenceId, record.timestamp, snapshot) + metadata = SnapshotMetadata(persistenceId, record.snapshot.seqNr.value, record.timestamp.toEpochMilli) + snapshot <- snapshotSerializer.decode(metadata, snapshot) } yield snapshot }