From 22c0c26fd0832c11605603b1529e9fa44e12abbf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Nov 2024 10:16:15 +0100 Subject: [PATCH] fix: Ensure that inflight persistence IDs are not evicted --- .../r2dbc/R2dbcOffsetStoreStateSpec.scala | 36 ++++++++-- .../r2dbc/internal/R2dbcOffsetStore.scala | 65 ++++++++++++------- 2 files changed, 72 insertions(+), 29 deletions(-) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala index 6661ca66d..5ff14a0c1 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala @@ -94,6 +94,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state.latestOffset.get.seen shouldBe Map("p1" -> 3L, "p2" -> 2L, "p3" -> 5L, "p4" -> 9L) } + val allowAll: Any => Boolean = _ => true + "evict old" in { val p1 = "p500" // slice 645 val p2 = "p621" // slice 645 @@ -116,17 +118,17 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) // keep all - state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1 + state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000), allowAll) shouldBe state1 // evict older than time window - val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10)))) - val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll) state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L) - val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2)) + val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2), allowAll) state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( p1 -> 1L, p2 -> 2L, @@ -166,7 +168,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers val timeWindow = JDuration.ofMillis(1) val state2 = slices.foldLeft(state1) { - case (acc, slice) => acc.evict(slice, timeWindow) + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) } // note that p92 is evicted because it has same slice as p108 // p1 is kept because keeping one for each slice @@ -175,13 +177,35 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state2.latestOffset.get.seen shouldBe Map("p5" -> 5L) val state3 = slices.foldLeft(state2) { - case (acc, slice) => acc.evict(slice, timeWindow) + case (acc, slice) => acc.evict(slice, timeWindow, allowAll) } // still keeping one for each slice state3.byPid .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) } + "evict old but only those allowed to be evicted" in { + val t0 = TestClock.nowMillis().instant() + val state1 = State.empty.add( + Vector( + createRecord("p92", 2, t0.plusMillis(1)), + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p229", 4, t0.plusMillis(30)))) + + val slices = state1.bySliceSorted.keySet + slices.size shouldBe 1 + + state1 + .evict(slices.head, JDuration.ofMillis(1), allowAll) + .byPid + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p229" -> 4) // p92 and p108 evicted + + state1 + .evict(slices.head, JDuration.ofMillis(1), _.pid == "p108") + .byPid // allow only p108 to be evicted + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p92" -> 2, "p229" -> 4) + } + "find duplicate" in { val t0 = TestClock.nowMillis().instant() val state = diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 7e7c8abed..bb20101da 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -53,23 +53,19 @@ private[projection] object R2dbcOffsetStore { type Pid = String final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] { - - override def compare(that: Record): Int = { - val result = this.timestamp.compareTo(that.timestamp) - if (result == 0) { - if (this.slice == that.slice) - if (this.pid == that.pid) - if (this.seqNr == that.seqNr) - 0 - else - java.lang.Long.compare(this.seqNr, that.seqNr) - else - this.pid.compareTo(that.pid) - else Integer.compare(this.slice, that.slice) - } else { - result + override def compare(that: Record): Int = + timestamp.compareTo(that.timestamp) match { + case 0 => + Integer.compare(slice, that.slice) match { + case 0 => + pid.compareTo(that.pid) match { + case 0 => java.lang.Long.compare(seqNr, that.seqNr) + case result => result + } + case result => result + } + case result => result } - } } final case class RecordWithOffset( @@ -159,15 +155,28 @@ private[projection] object R2dbcOffsetStore { } } - def evict(slice: Int, timeWindow: JDuration): State = { + def evict(slice: Int, timeWindow: JDuration, ableToEvictRecord: Record => Boolean): State = { val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record]) if (recordsSortedByTimestamp.isEmpty) { this } else { - // this will always keep at least one, latest per slice val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) - val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until)) - if (filtered.size == recordsSortedByTimestamp.size) { + val filtered = { + // Records comparing >= this record by recordOrdering will definitely be kept, + // Records comparing < this record by recordOrdering are subject to eviction + // Slice will be equal, and pid will compare lexicographically less than any valid pid + val untilRecord = Record(slice, "", 0, until) + // this will always keep at least one, latest per slice + val newerRecords = recordsSortedByTimestamp.rangeImpl(Some(untilRecord), None) // inclusive of until + val olderRecords = recordsSortedByTimestamp.rangeImpl(None, Some(untilRecord)) // exclusive of until + val filteredOlder = olderRecords.filterNot(ableToEvictRecord) + + if (filteredOlder.size == olderRecords.size) recordsSortedByTimestamp + else newerRecords.union(filteredOlder) + } + + // adding back filtered is linear in the size of filtered, but so is checking if we're able to evict + if (filtered eq recordsSortedByTimestamp) { this } else { val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice } @@ -357,7 +366,7 @@ private[projection] class R2dbcOffsetStore( val s = State(recordsWithKey.map(_.record)) // FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets? (minSlice to maxSlice).foldLeft(s) { - case (acc, slice) => acc.evict(slice, settings.timeWindow) + case (acc, slice) => acc.evict(slice, settings.timeWindow, _ => true) } } @@ -586,8 +595,18 @@ private[projection] class R2dbcOffsetStore( if (filteredRecords.size == 1) Set(filteredRecords.head.slice) else filteredRecords.iterator.map(_.slice).toSet + val currentInflight = getInflight() val evictedNewState = slices.foldLeft(newState) { - case (s, slice) => s.evict(slice, settings.timeWindow) + case (s, slice) => + s.evict( + slice, + settings.timeWindow, + // Only persistence IDs that aren't inflight are evictable, + // if only so that those persistence IDs can be removed from + // inflight... in the absence of further records from that + // persistence ID, the next store will evict (further records + // would make that persistence ID recent enough to not be evicted) + record => !currentInflight.contains(record.pid)) } val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords) @@ -619,7 +638,7 @@ private[projection] class R2dbcOffsetStore( if (newInflight.size >= 10000) { throw new IllegalStateException( s"Too many envelopes in-flight [${newInflight.size}]. " + - "Please report this issue at https://github.com/akka/akka-persistence-r2dbc") + "Please report this issue at https://github.com/akka/akka-projection") } if (!inflight.compareAndSet(currentInflight, newInflight)) cleanupInflight(newState) // CAS retry, concurrent update of inflight