Skip to content

Commit

Permalink
feat: detect clock skew on event replay
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Nov 30, 2024
1 parent 92e3889 commit 20b9d98
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}

def readHighestSequenceNr(persistenceId: String): Future[Long] = {
readHighestSequenceNrAndTimestamp(persistenceId).map(_._1)(ExecutionContext.parasitic)
}

def readHighestSequenceNrAndTimestamp(persistenceId: String): Future[(Long, Instant)] = {
import JournalAttributes._

val attributeValues = Map(":pid" -> AttributeValue.fromS(persistenceId))
Expand All @@ -189,20 +193,22 @@ import software.amazon.awssdk.services.dynamodb.model.Update
.consistentRead(true)
.keyConditionExpression(s"$Pid = :pid")
.expressionAttributeValues((attributeValues ++ filterAttributeValues).asJava)
.projectionExpression(s"$SeqNr")
.projectionExpression(s"$SeqNr, $Timestamp")
.scanIndexForward(false) // get last item (highest sequence nr)
.limit(1)

filterExpression.foreach(requestBuilder.filterExpression)

val result = client.query(requestBuilder.build()).asScala.map { response =>
response.items().asScala.headOption.fold(0L) { item =>
item.get(SeqNr).n().toLong
response.items().asScala.headOption.fold((0L, Instant.EPOCH)) { item =>
(item.get(SeqNr).n().toLong, InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong))
}
}

if (log.isDebugEnabled)
result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr))
result.foreach { case (seqNr, timestamp) =>
log.debug("Highest sequence nr for persistenceId [{}]: [{}] (written at [{}])", persistenceId, seqNr, timestamp)
}

result
.recoverWith { case c: CompletionException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,25 +229,25 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
case None => FutureDone
}
pendingWrite.flatMap { _ =>
if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
val highestSeqNrAndTimestamp = if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) {
// this is the normal case, highest sequence number from last event
query
.internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted = true)
.runWith(Sink.fold(0L) { (_, item) =>
.runWith(Sink.fold((0L, Instant.EPOCH)) { (_, item) =>
// payload is empty for deleted item
if (item.payload.isDefined) {
val repr = deserializeItem(serialization, item)
recoveryCallback(repr)
}
item.seqNr
(item.seqNr, item.writeTimestamp)
})
} else if (toSequenceNr <= 0) {
// no replay
journalDao.readHighestSequenceNr(persistenceId)
journalDao.readHighestSequenceNrAndTimestamp(persistenceId)
} else {
// replay to custom sequence number

val highestSeqNr = journalDao.readHighestSequenceNr(persistenceId)
val highestSeqNrAndTimestamp = journalDao.readHighestSequenceNrAndTimestamp(persistenceId)

val effectiveToSequenceNr =
if (max == Long.MaxValue) toSequenceNr
Expand All @@ -264,7 +264,19 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
val repr = deserializeItem(serialization, item)
recoveryCallback(repr)
})
.flatMap(_ => highestSeqNr)
.flatMap(_ => highestSeqNrAndTimestamp)
}
highestSeqNrAndTimestamp.map { case (highestSeqNr, timestamp) =>
val now = Instant.now()
if (now.isBefore(timestamp)) {
log.warning(
"Detected clock skew when replaying events: persistence id [{}], highest seq nr [{}] written at [{}], current time is [{}]",
persistenceId,
highestSeqNr,
timestamp,
now)
}
highestSeqNr
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.journal

import java.time.temporal.ChronoUnit

import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorSystem
import akka.persistence.JournalProtocol.{ RecoverySuccess, ReplayMessages, ReplayedMessage }
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.dynamodb.query.EventsBySliceSpec
import akka.persistence.dynamodb.{ TestData, TestDbLifecycle }
import akka.testkit.TestProbe
import org.scalatest.wordspec.AnyWordSpecLike

class ClockSkewDetectionSpec
extends ScalaTestWithActorTestKit(EventsBySliceSpec.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {

override def typedSystem: ActorSystem[_] = system

private val journal = persistenceExt.journalFor(null)

"DynamoDBJournal" should {

"detect clock skew on event replay" in {
val entityType = nextEntityType()
val pid = nextPersistenceId(entityType)
val slice = persistenceExt.sliceForPersistenceId(pid.id)

val now = InstantFactory.now().truncatedTo(ChronoUnit.SECONDS)

// first 5 events in the past
for (n <- 1 to 5) {
writeEvent(slice, pid, n, now.minusSeconds(10).plusSeconds(n), s"e$n")
}

// next 5 events over 1 minute in the future
for (n <- 6 to 10) {
writeEvent(slice, pid, n, now.plusSeconds(60).plusSeconds(n), s"e$n")
}

val replayProbe = TestProbe()(system.classicSystem)

LoggingTestKit
.warn("Detected clock skew when replaying events:" +
s" persistence id [${pid.id}], highest seq nr [10] written at [${now.plusSeconds(70)}]")
.expect {
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid.id, replayProbe.ref)
(1 to 10).foreach { _ => replayProbe.expectMsgType[ReplayedMessage] }
replayProbe.expectMsg(RecoverySuccess(highestSequenceNr = 10L))
}
}

}
}

0 comments on commit 20b9d98

Please sign in to comment.