Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(r2dbc): avoid unnecessary rejection given deleted offsets #1293

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ private[projection] class DynamoDBOffsetStore(
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// Haven't seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
timestampOf(pid, seqNr - 1).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ class EventSourcedEndToEndSpec
}

"accept unknown sequence number if previous is old" in {
val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val pid3 = nextPid(entityType)
val entityType = "test-002"
val pid1 = s"$entityType|p-08071" // slice 992
val pid2 = s"$entityType|p-08192" // slice 992
val pid3 = s"$entityType|p-09160" // slice 992

val startTime = TestClock.nowMicros().instant()
val oldTime = startTime.minus(projectionSettings.timeWindow).minusSeconds(60)
val oldTime = startTime.minus(projectionSettings.deleteAfter).minusSeconds(60)
writeEvent(pid1, 1L, startTime, "e1-1")

val projectionName = UUID.randomUUID().toString
Expand All @@ -306,7 +306,7 @@ class EventSourcedEndToEndSpec
// old event for pid2, seqN3. will not be picked up by backtracking because outside time window
writeEvent(pid2, 3L, oldTime, "e2-3")
// pid2, seqNr 3 is unknown when receiving 4 so will lookup timestamp of 3
// and accept 4 because 3 was older than time window
// and accept 4 because 3 was older than the deletion window (for tracked slice)
writeEvent(pid2, 4L, startTime.plusMillis(1), "e2-4")
processedProbe.receiveMessage().envelope.event shouldBe "e2-4"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,33 +453,35 @@ class R2dbcTimestampOffsetStoreSpec
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestamp shouldBe Instant.EPOCH
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestamp shouldBe clock.instant()
val p1 = "p-08071" // slice 101
val p2 = "p-08072" // slice 102
val p3 = "p-08073" // slice 103
val p4 = "p-08074" // slice 104
val p5 = "p-08192" // slice 101 (same as p1)
val p6 = "p-08076" // slice 106

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
val offset1 = TimestampOffset(startTime, Map(p1 -> 3L, p2 -> 1L, p3 -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p2, 1L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p3, 5L)).futureValue

// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
val env1 = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")
offsetStore.validate(env1).futureValue shouldBe Accepted
offsetStore.validate(backtrackingEnvelope(env1)).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
val env1Later = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")
offsetStore.validate(env1Later).futureValue shouldBe Duplicate
offsetStore.validate(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2")
val env2 = createEnvelope(p4, 2L, startTime.plusMillis(2), "e4-2")
offsetStore.validate(env2).futureValue shouldBe Accepted
offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// but not when gap
val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4")
val envP4SeqNr4 = createEnvelope(p4, 4L, startTime.plusMillis(3), "e4-4")
offsetStore.validate(envP4SeqNr4).futureValue shouldBe RejectedSeqNr
// hard reject when gap from backtracking
offsetStore.validate(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr
Expand All @@ -490,17 +492,17 @@ class R2dbcTimestampOffsetStoreSpec
.validate(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
.futureValue shouldBe RejectedBacktrackingSeqNr
// and not if later already inflight, seqNr 2 was accepted
offsetStore.validate(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate

// +1 to known is accepted
val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4")
val env3 = createEnvelope(p1, 4L, startTime.plusMillis(4), "e1-4")
offsetStore.validate(env3).futureValue shouldBe Accepted
// but not same
offsetStore.validate(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p3, 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.validate(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p2, 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.validate(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p3, 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)
// and then it's not accepted again
offsetStore.validate(env3).futureValue shouldBe Duplicate
Expand All @@ -510,33 +512,33 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate

// +1 to known, and then also subsequent are accepted (needed for grouped)
val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6")
val env4 = createEnvelope(p3, 6L, startTime.plusMillis(5), "e3-6")
offsetStore.validate(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7")
val env5 = createEnvelope(p3, 7L, startTime.plusMillis(6), "e3-7")
offsetStore.validate(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8")
val env6 = createEnvelope(p3, 8L, startTime.plusMillis(7), "e3-8")
offsetStore.validate(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// reject unknown
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
val env7 = createEnvelope(p5, 7L, startTime.plusMillis(8), "e5-7")
offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr
offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
// but ok when previous is old (offset has been deleted but slice is known)
eventTimestampQueryClock.setInstant(startTime.minus(settings.deleteAfter.plusSeconds(1)))
val env8 = createEnvelope(p5, 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
val env9 = createEnvelope(p5, 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
val env10 = filteredEnvelope(createEnvelope(p6, 7L, startTime.plusMillis(10), "e6-7"))
offsetStore.validate(env10).futureValue shouldBe RejectedSeqNr
// hard reject when unknown from backtracking
offsetStore.validate(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr
Expand All @@ -546,15 +548,15 @@ class R2dbcTimestampOffsetStoreSpec
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L, p4 -> 2L, p5 -> 8L)
// and they are removed from inflight once they have been stored
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map(p4 -> 2L)), p4, 2L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map(p5 -> 8L)), p5, 8L))
.futureValue
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L)
}

"accept via loading of previous seqNr" in {
Expand Down Expand Up @@ -1569,35 +1571,57 @@ class R2dbcTimestampOffsetStoreSpec

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832
val p3 = "p-7219" // slice 640
val p4 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p4 -> 1L)), p4, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(2)
offsetStore2.getState().startTimestamp shouldBe time(2)
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
// note: clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr

// when slice is not tracked: then always reject

// rejected if timestamp of previous seqNr is after delete-until timestamp for latest
clock.setInstant(latestTime.minus(settings.deleteAfter.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.deleteAfter.plusSeconds(1)))

// still rejected if timestamp of previous seqNr is before delete-until timestamp for latest (different slice)
clock.setInstant(latestTime.minus(settings.deleteAfter.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
.futureValue shouldBe RejectedBacktrackingSeqNr

// add an offset for the slice under test
val latestTime2 = time(20)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime2, Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime2

// when slice is tracked: use deletion window for this slice

// rejected if timestamp of previous seqNr is after delete-until timestamp for this slice
clock.setInstant(latestTime2.minus(settings.deleteAfter.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr

// accepted if timestamp of previous seqNr is before delete-until timestamp for this slice
clock.setInstant(latestTime2.minus(settings.deleteAfter.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,23 @@

package akka.projection.r2dbc.internal

import java.lang.Boolean.FALSE
import java.lang.Boolean.TRUE
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.Cancellable
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.Persistence
Expand All @@ -24,26 +40,10 @@ import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetSerialization
import akka.projection.internal.OffsetSerialization.MultipleOffsets
import akka.projection.r2dbc.R2dbcProjectionSettings
import io.r2dbc.spi.Connection
import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.lang.Boolean.FALSE
import java.lang.Boolean.TRUE

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.actor.Cancellable
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import io.r2dbc.spi.Connection
import org.slf4j.LoggerFactory

/**
* INTERNAL API
Expand Down Expand Up @@ -79,15 +79,15 @@ private[projection] object R2dbcOffsetStore {
final case class RecordWithProjectionKey(record: Record, projectionKey: String)

object State {
val empty: State = State(Map.empty, Map.empty, Instant.EPOCH)
val empty: State = State(Map.empty, Map.empty)

def apply(records: immutable.IndexedSeq[Record]): State = {
if (records.isEmpty) empty
else empty.add(records)
}
}

final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) {
final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -440,13 +440,7 @@ private[projection] class R2dbcOffsetStore(
newState.latestTimestamp,
startOffset)

val startTimestamp = startOffset match {
case None => clock.instant()
case Some(offset) => offset.timestamp
}
val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp)

if (!state.compareAndSet(oldState, newStateWithStartOffset))
if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")

startOffset
Expand Down Expand Up @@ -848,24 +842,26 @@ private[projection] class R2dbcOffsetStore(
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
// Haven't seen this pid in the time window (or lazy loaded from the database).
// Only accept it if the event with previous seqNr is outside the deletion window (for tracked slices).
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore = currentState.startTimestamp.minus(settings.backtrackingWindow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startTimestamp can now be removed. It's only used by some logging, but that is now obsolete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, removed.

val acceptBefore = currentState.bySliceSorted.get(slice).map { bySliceSorted =>
bySliceSorted.last.timestamp.minus(settings.deleteAfter)
}

if (previousTimestamp.isBefore(acceptBefore)) {
if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) {
logger.debug(
"{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
"is before deletion window timestamp [{}] for slice [{}].",
logPrefix,
pid,
seqNr,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString),
slice)
Accepted
} else if (recordWithOffset.fromPubSub) {
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
Expand All @@ -882,14 +878,14 @@ private[projection] class R2dbcOffsetStore(
// and SourceProvider supports it.
logger.warn(
"{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
"is after deletion window timestamp [{}] for slice [{}].",
logPrefix,
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString),
slice)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
Expand Down