Skip to content

Commit

Permalink
fix(dynamodb): avoid unnecessary rejection given expired offsets (#1295)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Jan 14, 2025
1 parent 4a2eabb commit c755fe1
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.UUID
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand Down Expand Up @@ -191,8 +192,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

if (usingOffsetTTL) {
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice
val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
} else {
Expand Down Expand Up @@ -352,8 +352,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

if (usingOffsetTTL) {
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice
val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
} else {
Expand Down Expand Up @@ -558,15 +557,14 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

"accept known sequence numbers and reject unknown" in {
val projectionId = genRandomProjectionId()
val offsetStoreClock = TestClock.nowMicros()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
val offsetStore = createOffsetStore(
projectionId,
offsetStoreClock = offsetStoreClock,
eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestampBySlice.size shouldBe 0
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestampBySlice.values.toSet shouldBe Set(clock.instant())

val startTime = TestClock.nowMicros().instant()
val startTime = offsetStoreClock.instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
Expand Down Expand Up @@ -632,16 +630,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr
offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)
if (usingOffsetTTL) {
// but ok when previous is older than expiry window
val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.value
eventTimestampQueryClock.withInstant(now.minusSeconds(offsetExpiry.toSeconds + 1)) {
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
offsetStore.addInflight(env8)
}
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)
}

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
Expand All @@ -654,14 +656,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
if (usingOffsetTTL) {
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
} else {
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L)
}
// and they are removed from inflight once they have been stored
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.futureValue
if (usingOffsetTTL) {
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.futureValue
}
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
}

Expand Down Expand Up @@ -1045,14 +1053,17 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

val projectionName = UUID.randomUUID().toString

val offsetStoreClock = TestClock.nowMicros()
val eventTimestampQueryClock = TestClock.nowMicros()

def offsetStore(minSlice: Int, maxSlice: Int) =
new DynamoDBOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, eventTimestampQueryClock)),
system,
settings,
client,
clock)
offsetStoreClock)

// one projection at lower scale
val offsetStore1 = offsetStore(512, 1023)
Expand All @@ -1064,34 +1075,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
val t0 = offsetStoreClock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
offsetStore2.readOffset().futureValue
offsetStore2.getState().startTimestampBySlice(576) shouldBe time(2)
val slice640StartTimestamp = offsetStore2.getState().startTimestampBySlice(640)
slice640StartTimestamp shouldBe clock.instant()
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
// note: eventTimestampQueryClock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr

if (usingOffsetTTL) {
// if offset TTL is configured, use expiry window (from now) to validate old timestamps

val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionName).offsetTimeToLive.get.toJava

// rejected if timestamp of previous seqNr is after expiry timestamp for this slice
eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.minusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}

// still rejected if timestamp of previous seqNr is before expiry timestamp for latest (slice not tracked)
eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.plusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
}
} else {
// when no offset TTL expiry is configured, then always reject

val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val testOffsetExpiry = JDuration.ofHours(1)

// rejected if timestamp of previous seqNr is after possible expiry timestamp (but no TTL configured)
eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.minusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}

// still rejected if timestamp of previous seqNr is before possible expiry timestamp (but no TTL configured)
eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.plusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ object TestClock {
def setInstant(newInstant: Instant): Unit =
_instant = newInstant.truncatedTo(resolution)

def withInstant[T](instant: Instant)(block: => T): T = {
val restore = _instant
try {
setInstant(instant)
block
} finally setInstant(restore)
}

/**
* Increase the clock with this duration (truncated to the resolution)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#State*")
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.DurationConverters._

import akka.Done
import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -70,21 +71,17 @@ private[projection] object DynamoDBOffsetStore {
fromSnapshot: Boolean)

object State {
val empty: State = State(Map.empty, Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset], startTimestampBySlice: Map[Int, Instant]): State =
if (offsetBySlice.isEmpty && startTimestampBySlice.isEmpty)
empty
else
new State(Map.empty, Map.empty, offsetBySlice, startTimestampBySlice)
val empty: State = State(Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset]): State =
if (offsetBySlice.isEmpty) empty
else new State(Map.empty, Map.empty, offsetBySlice)
}

final case class State(
byPid: Map[Pid, Record],
bySliceSorted: Map[Int, TreeSet[Record]],
offsetBySlice: Map[Int, TimestampOffset],
startTimestampBySlice: Map[Int, Instant]) {
offsetBySlice: Map[Int, TimestampOffset]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -221,6 +218,9 @@ private[projection] class DynamoDBOffsetStore(

private val dao = new OffsetStoreDao(system, settings, projectionId, client)

private val offsetExpiry =
settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.map(_.toJava)

private[projection] implicit val executionContext: ExecutionContext = system.executionContext

// The OffsetStore instance is used by a single projectionId and there shouldn't be many concurrent
Expand Down Expand Up @@ -296,13 +296,7 @@ private[projection] class DynamoDBOffsetStore(
})

offsetBySliceFut.map { offsetBySlice =>
val now = clock.instant()
val startTimestampBySlice =
(minSlice to maxSlice).map { slice =>
slice -> offsetBySlice.get(slice).map(_.timestamp).getOrElse(now)
}.toMap

val newState = State(offsetBySlice, startTimestampBySlice)
val newState = State(offsetBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
Expand Down Expand Up @@ -622,7 +616,7 @@ private[projection] class DynamoDBOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
validateEventTimestamp(currentState, recordWithOffset)
validateEventTimestamp(recordWithOffset)
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
Expand All @@ -644,29 +638,28 @@ private[projection] class DynamoDBOffsetStore(
}
}

private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = {
private def validateEventTimestamp(recordWithOffset: RecordWithOffset) = {
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
// Haven't seen this pid in the time window (or lazy loaded from the database).
// Only accept if the event with previous seqNr is outside the TTL expiry window (if configured).
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore =
currentState.startTimestampBySlice(slice).minus(settings.backtrackingWindow)
val acceptBefore = offsetExpiry.map { expiry =>
val now = clock.instant() // expiry is from when an offset was written, consider the window from now
now.minus(expiry)
}

if (previousTimestamp.isBefore(acceptBefore)) {
if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
"is before TTL expiry window timestamp [{}].",
pid,
seqNr,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString))
Accepted
} else if (recordWithOffset.fromPubSub) {
logger.debug(
Expand All @@ -679,13 +672,12 @@ private[projection] class DynamoDBOffsetStore(
// This will result in projection restart (with normal configuration)
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
"is after TTL expiry window timestamp [{}].",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString))
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
RejectedBacktrackingSeqNr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Instant.now().plusSeconds(timeToLive.toSeconds)
}

def writeBatch(offsetsBatch: IndexedSeq[(Int, TimestampOffset)]): Future[Done] = {
val writeItems =
offsetsBatch.map {
Expand All @@ -155,10 +151,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
attributes.put(Seen, AttributeValue.fromM(seen))

expiry.foreach { timestamp =>
attributes.put(Expiry, AttributeValue.fromN(timestamp.getEpochSecond.toString))
}

WriteRequest.builder
.putRequest(
PutRequest
Expand Down

0 comments on commit c755fe1

Please sign in to comment.