diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala index cceafde22..69285b567 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala @@ -11,6 +11,7 @@ import java.util.UUID import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -191,8 +192,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) if (usingOffsetTTL) { val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds - val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong - timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds + timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds } else { @@ -352,8 +352,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) if (usingOffsetTTL) { val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds - val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong - timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds + timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds } else { @@ -558,15 +557,14 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) "accept known sequence numbers and reject unknown" in { val projectionId = genRandomProjectionId() + val offsetStoreClock = TestClock.nowMicros() val eventTimestampQueryClock = TestClock.nowMicros() - val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + val offsetStore = createOffsetStore( + projectionId, + offsetStoreClock = offsetStoreClock, + eventTimestampQueryClock = eventTimestampQueryClock) - // some validation require the startTimestamp, which is set from readOffset - offsetStore.getState().startTimestampBySlice.size shouldBe 0 - offsetStore.readOffset().futureValue - offsetStore.getState().startTimestampBySlice.values.toSet shouldBe Set(clock.instant()) - - val startTime = TestClock.nowMicros().instant() + val startTime = offsetStoreClock.instant() val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L)) offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue @@ -632,16 +630,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7") offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr - // but ok when previous is old - eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600)) - val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7") - offsetStore.validate(env8).futureValue shouldBe Accepted - eventTimestampQueryClock.setInstant(startTime) - offsetStore.addInflight(env8) - // and subsequent seqNr is accepted - val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8") - offsetStore.validate(env9).futureValue shouldBe Accepted - offsetStore.addInflight(env9) + if (usingOffsetTTL) { + // but ok when previous is older than expiry window + val now = offsetStoreClock.tick(JDuration.ofSeconds(10)) + val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.value + eventTimestampQueryClock.withInstant(now.minusSeconds(offsetExpiry.toSeconds + 1)) { + val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7") + offsetStore.validate(env8).futureValue shouldBe Accepted + offsetStore.addInflight(env8) + } + // and subsequent seqNr is accepted + val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8") + offsetStore.validate(env9).futureValue shouldBe Accepted + offsetStore.addInflight(env9) + } // reject unknown filtered val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7")) @@ -654,14 +656,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) .futureValue shouldBe RejectedBacktrackingSeqNr // it's keeping the inflight that are not in the "stored" state - offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L) + if (usingOffsetTTL) { + offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L) + } else { + offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L) + } // and they are removed from inflight once they have been stored offsetStore .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L)) .futureValue - offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L)) - .futureValue + if (usingOffsetTTL) { + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L)) + .futureValue + } offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L) } @@ -1045,14 +1053,17 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) val projectionName = UUID.randomUUID().toString + val offsetStoreClock = TestClock.nowMicros() + val eventTimestampQueryClock = TestClock.nowMicros() + def offsetStore(minSlice: Int, maxSlice: Int) = new DynamoDBOffsetStore( ProjectionId(projectionName, s"$minSlice-$maxSlice"), - Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)), + Some(new TestTimestampSourceProvider(minSlice, maxSlice, eventTimestampQueryClock)), system, settings, client, - clock) + offsetStoreClock) // one projection at lower scale val offsetStore1 = offsetStore(512, 1023) @@ -1064,7 +1075,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) val p2 = "p-6009" // slice 640 val p3 = "p-3039" // slice 832 - val t0 = clock.instant().minusSeconds(100) + val t0 = offsetStoreClock.instant().minusSeconds(100) def time(step: Int) = t0.plusSeconds(step) // starting with 2 projections, testing 512-1023 @@ -1072,26 +1083,51 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue // scaled up to 4 projections, testing 512-767 - offsetStore2.readOffset().futureValue - offsetStore2.getState().startTimestampBySlice(576) shouldBe time(2) - val slice640StartTimestamp = offsetStore2.getState().startTimestampBySlice(640) - slice640StartTimestamp shouldBe clock.instant() val latestTime = time(10) offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue offsetStore2.getState().latestTimestamp shouldBe latestTime - // clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr. - // rejected if timestamp of previous seqNr is after start timestamp minus backtracking window - clock.setInstant(slice640StartTimestamp.minus(settings.backtrackingWindow.minusSeconds(1))) - offsetStore2 - .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) - .futureValue shouldBe RejectedBacktrackingSeqNr - // accepted if timestamp of previous seqNr is before start timestamp minus backtracking window - clock.setInstant(slice640StartTimestamp.minus(settings.timeWindow.plusSeconds(1))) - offsetStore2 - .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) - .futureValue shouldBe Accepted + // note: eventTimestampQueryClock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr + + if (usingOffsetTTL) { + // if offset TTL is configured, use expiry window (from now) to validate old timestamps + + val now = offsetStoreClock.tick(JDuration.ofSeconds(10)) + val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionName).offsetTimeToLive.get.toJava + + // rejected if timestamp of previous seqNr is after expiry timestamp for this slice + eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.minusSeconds(1))) { + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe RejectedBacktrackingSeqNr + } + // still rejected if timestamp of previous seqNr is before expiry timestamp for latest (slice not tracked) + eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.plusSeconds(1))) { + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe Accepted + } + } else { + // when no offset TTL expiry is configured, then always reject + + val now = offsetStoreClock.tick(JDuration.ofSeconds(10)) + val testOffsetExpiry = JDuration.ofHours(1) + + // rejected if timestamp of previous seqNr is after possible expiry timestamp (but no TTL configured) + eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.minusSeconds(1))) { + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe RejectedBacktrackingSeqNr + } + + // still rejected if timestamp of previous seqNr is before possible expiry timestamp (but no TTL configured) + eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.plusSeconds(1))) { + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe RejectedBacktrackingSeqNr + } + } } } diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestClock.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestClock.scala index 3b406a755..59f583300 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestClock.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestClock.scala @@ -40,6 +40,14 @@ object TestClock { def setInstant(newInstant: Instant): Unit = _instant = newInstant.truncatedTo(resolution) + def withInstant[T](instant: Instant)(block: => T): T = { + val restore = _instant + try { + setInstant(instant) + block + } finally setInstant(restore) + } + /** * Increase the clock with this duration (truncated to the resolution) */ diff --git a/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/dynamodb-offset-store.excludes b/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/dynamodb-offset-store.excludes new file mode 100644 index 000000000..8ad2bca5b --- /dev/null +++ b/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/dynamodb-offset-store.excludes @@ -0,0 +1,2 @@ +# internal +ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#State*") diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index a44a3ac9e..04efa3f61 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -13,6 +13,7 @@ import scala.annotation.tailrec import scala.collection.immutable.TreeSet import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.jdk.DurationConverters._ import akka.Done import akka.actor.typed.ActorSystem @@ -70,21 +71,17 @@ private[projection] object DynamoDBOffsetStore { fromSnapshot: Boolean) object State { - val empty: State = State(Map.empty, Map.empty, Map.empty, Map.empty) - - def apply(offsetBySlice: Map[Int, TimestampOffset], startTimestampBySlice: Map[Int, Instant]): State = - if (offsetBySlice.isEmpty && startTimestampBySlice.isEmpty) - empty - else - new State(Map.empty, Map.empty, offsetBySlice, startTimestampBySlice) + val empty: State = State(Map.empty, Map.empty, Map.empty) + def apply(offsetBySlice: Map[Int, TimestampOffset]): State = + if (offsetBySlice.isEmpty) empty + else new State(Map.empty, Map.empty, offsetBySlice) } final case class State( byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], - offsetBySlice: Map[Int, TimestampOffset], - startTimestampBySlice: Map[Int, Instant]) { + offsetBySlice: Map[Int, TimestampOffset]) { def size: Int = byPid.size @@ -221,6 +218,9 @@ private[projection] class DynamoDBOffsetStore( private val dao = new OffsetStoreDao(system, settings, projectionId, client) + private val offsetExpiry = + settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.map(_.toJava) + private[projection] implicit val executionContext: ExecutionContext = system.executionContext // The OffsetStore instance is used by a single projectionId and there shouldn't be many concurrent @@ -296,13 +296,7 @@ private[projection] class DynamoDBOffsetStore( }) offsetBySliceFut.map { offsetBySlice => - val now = clock.instant() - val startTimestampBySlice = - (minSlice to maxSlice).map { slice => - slice -> offsetBySlice.get(slice).map(_.timestamp).getOrElse(now) - }.toMap - - val newState = State(offsetBySlice, startTimestampBySlice) + val newState = State(offsetBySlice) if (!state.compareAndSet(oldState, newState)) throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") @@ -622,7 +616,7 @@ private[projection] class DynamoDBOffsetStore( // always accept starting from snapshots when there was no previous event seen FutureAccepted } else { - validateEventTimestamp(currentState, recordWithOffset) + validateEventTimestamp(recordWithOffset) } } else { // strictSeqNr == false is for durable state where each revision might not be visible @@ -644,29 +638,28 @@ private[projection] class DynamoDBOffsetStore( } } - private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = { + private def validateEventTimestamp(recordWithOffset: RecordWithOffset) = { import Validation._ val pid = recordWithOffset.record.pid val seqNr = recordWithOffset.record.seqNr - val slice = recordWithOffset.record.slice - // Haven't seen this pid within the time window. Since events can be missed - // when read at the tail we will only accept it if the event with previous seqNr has timestamp - // before the startTimestamp minus backtracking window + // Haven't seen this pid in the time window (or lazy loaded from the database). + // Only accept if the event with previous seqNr is outside the TTL expiry window (if configured). timestampOf(pid, seqNr - 1).map { case Some(previousTimestamp) => - val acceptBefore = - currentState.startTimestampBySlice(slice).minus(settings.backtrackingWindow) + val acceptBefore = offsetExpiry.map { expiry => + val now = clock.instant() // expiry is from when an offset was written, consider the window from now + now.minus(expiry) + } - if (previousTimestamp.isBefore(acceptBefore)) { + if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) { logger.debug( "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + - "is before start timestamp [{}] minus backtracking window [{}].", + "is before TTL expiry window timestamp [{}].", pid, seqNr, previousTimestamp, - currentState.startTimestampBySlice(slice), - settings.backtrackingWindow) + acceptBefore.fold("none")(_.toString)) Accepted } else if (recordWithOffset.fromPubSub) { logger.debug( @@ -679,13 +672,12 @@ private[projection] class DynamoDBOffsetStore( // This will result in projection restart (with normal configuration) logger.warn( "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + - "is after start timestamp [{}] minus backtracking window [{}].", + "is after TTL expiry window timestamp [{}].", seqNr, pid, recordWithOffset.offset, previousTimestamp, - currentState.startTimestampBySlice(slice), - settings.backtrackingWindow) + acceptBefore.fold("none")(_.toString)) // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled // and SourceProvider supports it. RejectedBacktrackingSeqNr diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala index b61fdb27f..7c99ef5f0 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala @@ -127,10 +127,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = { import OffsetStoreDao.OffsetStoreAttributes._ - val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive => - Instant.now().plusSeconds(timeToLive.toSeconds) - } - def writeBatch(offsetsBatch: IndexedSeq[(Int, TimestampOffset)]): Future[Done] = { val writeItems = offsetsBatch.map { @@ -155,10 +151,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse } attributes.put(Seen, AttributeValue.fromM(seen)) - expiry.foreach { timestamp => - attributes.put(Expiry, AttributeValue.fromN(timestamp.getEpochSecond.toString)) - } - WriteRequest.builder .putRequest( PutRequest