Skip to content

Commit

Permalink
Add scaladoc for SnapshotSerializer.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Feb 21, 2024
1 parent 1ff53f9 commit 4337c0a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,45 @@ import com.evolutiongaming.serialization.SerializedMsg
import play.api.libs.json.{JsString, JsValue, Json}
import scodec.bits.ByteVector

import java.time.Instant

/** Serialize Akka snapshot to an internal Kafka Journal format.
*
* @tparam A
* Type of serialized payload. At the time of writing it could be either
* [[Payload]] by default, or [[Json]] if `kafka-journal-circe` module is
* used.
*/
trait SnapshotSerializer[F[_], A] {

def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]]
def toAkkaRepresentation(persistenceId: PersistenceId, timestamp: Instant, snapshot: Snapshot[A]): F[SelectedSnapshot]
/** Encode Akka snapshot to Kafka Journal specific internal representation.
*
* @param metadata
* Metadata to get the additional information from, i.e. sequence number.
* @param snapshot
* Payload to be serialized to a form writable to eventual storage (i.e.
* Cassandra) and stored into [[Snapshot#payload]].
*
* The method may raise an error into `F[_]` if it is not possible to
* serialize a snapshot, i.e. for example if [[SnapshotMetadata#sequenceNr]]
* is negative or equals to zero.
*
* @note
* As `snapshot` accepts `Any` as a paramater, it is too easy to pass a
* wrong parameter to this method. The recommendation is to be very careful
* in that area and write a unit test for an affected code.
*/
def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]]

/** Decode Akka snapshot from Kafka Journal specific internal representation.
*
* @param metadata
* Metadata to get the additional information from, i.e. sequence number.
* @param snapshot
* Serialized snapshot to parse the payload from.
*
* The method may raise an error into `F[_]` if parsing of
* [[Snapshot#payload]] fails.
*/
def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot]

}

Expand Down Expand Up @@ -95,7 +128,7 @@ object SnapshotSerializer {

implicit val fail: Fail[F] = Fail.lift[F]

def toInternalRepresentation(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = {
def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = {

val result = for {
payload <- toSnapshotPayload(snapshot)
Expand All @@ -107,27 +140,14 @@ object SnapshotSerializer {
}
}

def toAkkaRepresentation(
persistenceId: PersistenceId,
timestamp: Instant,
snapshot: Snapshot[A]
): F[SelectedSnapshot] = {
def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot] = {

val payload = fromSnapshotPayload(snapshot.payload)

val result = payload.map { payload =>
SelectedSnapshot(
metadata = SnapshotMetadata(
persistenceId = persistenceId,
sequenceNr = snapshot.seqNr.value,
timestamp = timestamp.toEpochMilli
),
snapshot = payload
)
}
val result = payload.map(SelectedSnapshot(metadata, _))

result.adaptErr { case e =>
SnapshotStoreError(s"FromSnapshot error, persistenceId: $persistenceId, snapshot: $snapshot: $e", e)
SnapshotStoreError(s"FromSnapshot error, persistenceId: ${metadata.persistenceId}, snapshot: $snapshot: $e", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SnapshotStoreAdapter {
def serializeSnapshot(metadata: SnapshotMetadata, snapshot: Any): F[SnapshotRecord[EventualPayloadAndType]] = {
for {
seqNr <- SeqNr.of(metadata.sequenceNr)
snapshot <- snapshotSerializer.toInternalRepresentation(metadata, snapshot)
snapshot <- snapshotSerializer.encode(metadata, snapshot)
payload <- snapshotReadWrite.eventualWrite(snapshot.payload)
record = SnapshotRecord(
snapshot = Snapshot(seqNr = seqNr, payload = payload),
Expand All @@ -103,7 +103,8 @@ object SnapshotStoreAdapter {
for {
payload <- snapshotReadWrite.eventualRead(record.snapshot.payload)
snapshot = record.snapshot.copy(payload = payload)
snapshot <- snapshotSerializer.toAkkaRepresentation(persistenceId, record.timestamp, snapshot)
metadata = SnapshotMetadata(persistenceId, record.snapshot.seqNr.value, record.timestamp.toEpochMilli)
snapshot <- snapshotSerializer.decode(metadata, snapshot)
} yield snapshot
}

Expand Down

0 comments on commit 4337c0a

Please sign in to comment.