diff --git a/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala index 01df52e..48cb18d 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala @@ -169,6 +169,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update } def readHighestSequenceNr(persistenceId: String): Future[Long] = { + readHighestSequenceNrAndTimestamp(persistenceId).map(_._1)(ExecutionContext.parasitic) + } + + def readHighestSequenceNrAndTimestamp(persistenceId: String): Future[(Long, Instant)] = { import JournalAttributes._ val attributeValues = Map(":pid" -> AttributeValue.fromS(persistenceId)) @@ -189,20 +193,22 @@ import software.amazon.awssdk.services.dynamodb.model.Update .consistentRead(true) .keyConditionExpression(s"$Pid = :pid") .expressionAttributeValues((attributeValues ++ filterAttributeValues).asJava) - .projectionExpression(s"$SeqNr") + .projectionExpression(s"$SeqNr, $Timestamp") .scanIndexForward(false) // get last item (highest sequence nr) .limit(1) filterExpression.foreach(requestBuilder.filterExpression) val result = client.query(requestBuilder.build()).asScala.map { response => - response.items().asScala.headOption.fold(0L) { item => - item.get(SeqNr).n().toLong + response.items().asScala.headOption.fold((0L, Instant.EPOCH)) { item => + (item.get(SeqNr).n().toLong, InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong)) } } if (log.isDebugEnabled) - result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) + result.foreach { case (seqNr, timestamp) => + log.debug("Highest sequence nr for persistenceId [{}]: [{}] (written at [{}])", persistenceId, seqNr, timestamp) + } result .recoverWith { case c: CompletionException => diff --git a/core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala b/core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala index b408ce4..c7d01c4 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala @@ -229,25 +229,25 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) case None => FutureDone } pendingWrite.flatMap { _ => - if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) { + val highestSeqNrAndTimestamp = if (toSequenceNr == Long.MaxValue && max == Long.MaxValue) { // this is the normal case, highest sequence number from last event query .internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted = true) - .runWith(Sink.fold(0L) { (_, item) => + .runWith(Sink.fold((0L, Instant.EPOCH)) { (_, item) => // payload is empty for deleted item if (item.payload.isDefined) { val repr = deserializeItem(serialization, item) recoveryCallback(repr) } - item.seqNr + (item.seqNr, item.writeTimestamp) }) } else if (toSequenceNr <= 0) { // no replay - journalDao.readHighestSequenceNr(persistenceId) + journalDao.readHighestSequenceNrAndTimestamp(persistenceId) } else { // replay to custom sequence number - val highestSeqNr = journalDao.readHighestSequenceNr(persistenceId) + val highestSeqNrAndTimestamp = journalDao.readHighestSequenceNrAndTimestamp(persistenceId) val effectiveToSequenceNr = if (max == Long.MaxValue) toSequenceNr @@ -264,7 +264,19 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) val repr = deserializeItem(serialization, item) recoveryCallback(repr) }) - .flatMap(_ => highestSeqNr) + .flatMap(_ => highestSeqNrAndTimestamp) + } + highestSeqNrAndTimestamp.map { case (highestSeqNr, timestamp) => + val now = Instant.now() + if (now.isBefore(timestamp)) { + log.warning( + "Detected clock skew when replaying events: persistence id [{}], highest seq nr [{}] written at [{}], current time is [{}]", + persistenceId, + highestSeqNr, + timestamp, + now) + } + highestSeqNr } } } diff --git a/core/src/test/scala/akka/persistence/dynamodb/journal/ClockSkewDetectionSpec.scala b/core/src/test/scala/akka/persistence/dynamodb/journal/ClockSkewDetectionSpec.scala new file mode 100644 index 0000000..8d34463 --- /dev/null +++ b/core/src/test/scala/akka/persistence/dynamodb/journal/ClockSkewDetectionSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024 Lightbend Inc. + */ + +package akka.persistence.dynamodb.journal + +import java.time.temporal.ChronoUnit + +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit } +import akka.actor.typed.ActorSystem +import akka.persistence.JournalProtocol.{ RecoverySuccess, ReplayMessages, ReplayedMessage } +import akka.persistence.dynamodb.internal.InstantFactory +import akka.persistence.dynamodb.query.EventsBySliceSpec +import akka.persistence.dynamodb.{ TestData, TestDbLifecycle } +import akka.testkit.TestProbe +import org.scalatest.wordspec.AnyWordSpecLike + +class ClockSkewDetectionSpec + extends ScalaTestWithActorTestKit(EventsBySliceSpec.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val journal = persistenceExt.journalFor(null) + + "DynamoDBJournal" should { + + "detect clock skew on event replay" in { + val entityType = nextEntityType() + val pid = nextPersistenceId(entityType) + val slice = persistenceExt.sliceForPersistenceId(pid.id) + + val now = InstantFactory.now().truncatedTo(ChronoUnit.SECONDS) + + // first 5 events in the past + for (n <- 1 to 5) { + writeEvent(slice, pid, n, now.minusSeconds(10).plusSeconds(n), s"e$n") + } + + // next 5 events over 1 minute in the future + for (n <- 6 to 10) { + writeEvent(slice, pid, n, now.plusSeconds(60).plusSeconds(n), s"e$n") + } + + val replayProbe = TestProbe()(system.classicSystem) + + LoggingTestKit + .warn("Detected clock skew when replaying events:" + + s" persistence id [${pid.id}], highest seq nr [10] written at [${now.plusSeconds(70)}]") + .expect { + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid.id, replayProbe.ref) + (1 to 10).foreach { _ => replayProbe.expectMsgType[ReplayedMessage] } + replayProbe.expectMsg(RecoverySuccess(highestSequenceNr = 10L)) + } + } + + } +}