Skip to content

Commit

Permalink
fix: Ensure that inflight persistence IDs are not evicted
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 25, 2024
1 parent 7e5af73 commit 22c0c26
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
state.latestOffset.get.seen shouldBe Map("p1" -> 3L, "p2" -> 2L, "p3" -> 5L, "p4" -> 9L)
}

val allowAll: Any => Boolean = _ => true

"evict old" in {
val p1 = "p500" // slice 645
val p2 = "p621" // slice 645
Expand All @@ -116,17 +118,17 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

// keep all
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000), allowAll) shouldBe state1

// evict older than time window
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll)
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10))))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll)
state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L)

val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2))
val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2), allowAll)
state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
p1 -> 1L,
p2 -> 2L,
Expand Down Expand Up @@ -166,7 +168,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
val timeWindow = JDuration.ofMillis(1)

val state2 = slices.foldLeft(state1) {
case (acc, slice) => acc.evict(slice, timeWindow)
case (acc, slice) => acc.evict(slice, timeWindow, allowAll)
}
// note that p92 is evicted because it has same slice as p108
// p1 is kept because keeping one for each slice
Expand All @@ -175,13 +177,35 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
state2.latestOffset.get.seen shouldBe Map("p5" -> 5L)

val state3 = slices.foldLeft(state2) {
case (acc, slice) => acc.evict(slice, timeWindow)
case (acc, slice) => acc.evict(slice, timeWindow, allowAll)
}
// still keeping one for each slice
state3.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
}

"evict old but only those allowed to be evicted" in {
val t0 = TestClock.nowMillis().instant()
val state1 = State.empty.add(
Vector(
createRecord("p92", 2, t0.plusMillis(1)),
createRecord("p108", 3, t0.plusMillis(20)),
createRecord("p229", 4, t0.plusMillis(30))))

val slices = state1.bySliceSorted.keySet
slices.size shouldBe 1

state1
.evict(slices.head, JDuration.ofMillis(1), allowAll)
.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p229" -> 4) // p92 and p108 evicted

state1
.evict(slices.head, JDuration.ofMillis(1), _.pid == "p108")
.byPid // allow only p108 to be evicted
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p92" -> 2, "p229" -> 4)
}

"find duplicate" in {
val t0 = TestClock.nowMillis().instant()
val state =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,19 @@ private[projection] object R2dbcOffsetStore {
type Pid = String

final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] {

override def compare(that: Record): Int = {
val result = this.timestamp.compareTo(that.timestamp)
if (result == 0) {
if (this.slice == that.slice)
if (this.pid == that.pid)
if (this.seqNr == that.seqNr)
0
else
java.lang.Long.compare(this.seqNr, that.seqNr)
else
this.pid.compareTo(that.pid)
else Integer.compare(this.slice, that.slice)
} else {
result
override def compare(that: Record): Int =
timestamp.compareTo(that.timestamp) match {
case 0 =>
Integer.compare(slice, that.slice) match {
case 0 =>
pid.compareTo(that.pid) match {
case 0 => java.lang.Long.compare(seqNr, that.seqNr)
case result => result
}
case result => result
}
case result => result
}
}
}

final case class RecordWithOffset(
Expand Down Expand Up @@ -159,15 +155,28 @@ private[projection] object R2dbcOffsetStore {
}
}

def evict(slice: Int, timeWindow: JDuration): State = {
def evict(slice: Int, timeWindow: JDuration, ableToEvictRecord: Record => Boolean): State = {
val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record])
if (recordsSortedByTimestamp.isEmpty) {
this
} else {
// this will always keep at least one, latest per slice
val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow)
val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until))
if (filtered.size == recordsSortedByTimestamp.size) {
val filtered = {
// Records comparing >= this record by recordOrdering will definitely be kept,
// Records comparing < this record by recordOrdering are subject to eviction
// Slice will be equal, and pid will compare lexicographically less than any valid pid
val untilRecord = Record(slice, "", 0, until)
// this will always keep at least one, latest per slice
val newerRecords = recordsSortedByTimestamp.rangeImpl(Some(untilRecord), None) // inclusive of until
val olderRecords = recordsSortedByTimestamp.rangeImpl(None, Some(untilRecord)) // exclusive of until
val filteredOlder = olderRecords.filterNot(ableToEvictRecord)

if (filteredOlder.size == olderRecords.size) recordsSortedByTimestamp
else newerRecords.union(filteredOlder)
}

// adding back filtered is linear in the size of filtered, but so is checking if we're able to evict
if (filtered eq recordsSortedByTimestamp) {
this
} else {
val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice }
Expand Down Expand Up @@ -357,7 +366,7 @@ private[projection] class R2dbcOffsetStore(
val s = State(recordsWithKey.map(_.record))
// FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets?
(minSlice to maxSlice).foldLeft(s) {
case (acc, slice) => acc.evict(slice, settings.timeWindow)
case (acc, slice) => acc.evict(slice, settings.timeWindow, _ => true)
}
}

Expand Down Expand Up @@ -586,8 +595,18 @@ private[projection] class R2dbcOffsetStore(
if (filteredRecords.size == 1) Set(filteredRecords.head.slice)
else filteredRecords.iterator.map(_.slice).toSet

val currentInflight = getInflight()
val evictedNewState = slices.foldLeft(newState) {
case (s, slice) => s.evict(slice, settings.timeWindow)
case (s, slice) =>
s.evict(
slice,
settings.timeWindow,
// Only persistence IDs that aren't inflight are evictable,
// if only so that those persistence IDs can be removed from
// inflight... in the absence of further records from that
// persistence ID, the next store will evict (further records
// would make that persistence ID recent enough to not be evicted)
record => !currentInflight.contains(record.pid))
}

val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords)
Expand Down Expand Up @@ -619,7 +638,7 @@ private[projection] class R2dbcOffsetStore(
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-persistence-r2dbc")
"Please report this issue at https://github.com/akka/akka-projection")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down

0 comments on commit 22c0c26

Please sign in to comment.