diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 36e3ec2..0473da4 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -276,3 +276,12 @@ akka.persistence.dynamodb { } } // #client-settings + +akka.persistence.dynamodb { + clock-skew-detection { + # When the local clock and time in AWS response diverge by more than this duration + # a warning is logged. Can be disabled by setting this to "off". + # This check only has precision of seconds. + warning-tolerance = 2 seconds + } +} diff --git a/core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala b/core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala index b95b1db..3c42490 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala @@ -4,6 +4,7 @@ package akka.persistence.dynamodb +import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.jdk.DurationConverters._ @@ -55,6 +56,8 @@ object DynamoDBSettings { if (indexName.nonEmpty) indexName else snapshotTable + "_slice_idx" } + val clockSkewSettings = new ClockSkewSettings(config) + new DynamoDBSettings( journalTable, journalPublishEvents, @@ -63,7 +66,8 @@ object DynamoDBSettings { cleanupSettings, timeToLiveSettings, journalBySliceGsi, - snapshotBySliceGsi) + snapshotBySliceGsi, + clockSkewSettings) } /** @@ -74,6 +78,9 @@ object DynamoDBSettings { } +/** + * Use `DynamoDBSettings.apply` or `DynamoDBSettings.create` for construction. + */ final class DynamoDBSettings private ( val journalTable: String, val journalPublishEvents: Boolean, @@ -82,7 +89,8 @@ final class DynamoDBSettings private ( val cleanupSettings: CleanupSettings, val timeToLiveSettings: TimeToLiveSettings, val journalBySliceGsi: String, - val snapshotBySliceGsi: String) + val snapshotBySliceGsi: String, + val clockSkewSettings: ClockSkewSettings) final class QuerySettings(config: Config) { val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala @@ -278,6 +286,22 @@ final class EventSourcedEntityTimeToLiveSettings(config: Config) { val snapshotTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "snapshot-time-to-live") } +/** + * INTERNAL API + */ +@InternalStableApi +final class ClockSkewSettings(config: Config) { + val warningTolerance: FiniteDuration = { + val path = "clock-skew-detection.warning-tolerance" + Helpers.toRootLowerCase(config.getString(path)) match { + case "off" | "none" => Duration.Zero + case _ => config.getDuration(path).toScala + } + } + + override def toString: String = s"ClockSkewSettings($warningTolerance)" +} + private[akka] object ConfigHelpers { def optString(config: Config, path: String): Option[String] = { if (config.hasPath(path)) { diff --git a/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala index 48cb18d..59e3aca 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala @@ -6,15 +6,19 @@ package akka.persistence.dynamodb.internal import java.nio.ByteBuffer import java.time.Instant +import java.time.format.DateTimeFormatter import java.util.concurrent.CompletionException import java.util.Base64 +import java.util.Locale import java.util.{ HashMap => JHashMap } import java.util.UUID +import java.util.concurrent.atomic.AtomicLong import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.jdk.CollectionConverters._ import scala.jdk.FutureConverters._ +import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem @@ -25,6 +29,7 @@ import akka.persistence.typed.PersistenceId import org.slf4j.Logger import org.slf4j.LoggerFactory import software.amazon.awssdk.core.SdkBytes +import software.amazon.awssdk.core.SdkResponse import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.AttributeValue import software.amazon.awssdk.services.dynamodb.model.Delete @@ -58,6 +63,36 @@ import software.amazon.awssdk.services.dynamodb.model.Update private implicit val ec: ExecutionContext = system.executionContext + private val dateHeaderLogCounter = new AtomicLong + private val dateHeaderParser = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US) + private val clockSkewToleranceMillis = settings.clockSkewSettings.warningTolerance.toMillis + + private def checkClockSkew(response: SdkResponse): Unit = { + try { + if (clockSkewToleranceMillis > 0 && + dateHeaderLogCounter.getAndIncrement() % 1000 == 0) { + val dateHeaderOpt = response.sdkHttpResponse().firstMatchingHeader("Date") + if (dateHeaderOpt.isPresent) { + val dateHeader = dateHeaderOpt.get + val awsInstant = Instant.from(dateHeaderParser.parse(dateHeader)) + val now = Instant.now() + // The Date header only has precision of seconds so this is just a rough check + if (math.abs(java.time.Duration.between(awsInstant, now).toMillis) >= clockSkewToleranceMillis) { + log.warn( + "Possible clock skew, make sure clock synchronization is installed. " + + "Local time [{}] vs DynamoDB response time [{}]", + now, + awsInstant) + } + } + } + } catch { + case NonFatal(exc) => + log.warn("check clock skew failed", exc) + } + + } + def writeEvents(events: Seq[SerializedJournalItem]): Future[Done] = { require(events.nonEmpty) @@ -118,7 +153,14 @@ import software.amazon.awssdk.services.dynamodb.model.Update response.consumedCapacity.capacityUnits) } } - result.map(_ => Done)(ExecutionContext.parasitic) + result + .map { response => + checkClockSkew(response) + Done + }(ExecutionContext.parasitic) + .recoverWith { case c: CompletionException => + Future.failed(c.getCause) + }(ExecutionContext.parasitic) } else { val writeItems = events.map { item => @@ -160,7 +202,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update } } result - .map(_ => Done)(ExecutionContext.parasitic) + .map { response => + checkClockSkew(response) + Done + }(ExecutionContext.parasitic) .recoverWith { case c: CompletionException => Future.failed(c.getCause) }(ExecutionContext.parasitic)