Skip to content

Commit

Permalink
chore: Only keep latest per pid in DynamoDbOffsetStore bySliceSorted (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Nov 20, 2024
1 parent 40cd8b3 commit 81a35c4
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
createRecord("p1", 2, t0.plusMillis(1)),
createRecord("p1", 3, t0.plusMillis(2))))
state1.byPid("p1").seqNr shouldBe 3L
state1.bySliceSorted.size shouldBe 1
state1.bySliceSorted(slice("p1")).size shouldBe 1
state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 3
state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L))
state1.latestTimestamp shouldBe t0.plusMillis(2)

Expand All @@ -51,13 +54,35 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state3.byPid("p3").seqNr shouldBe 10L
slice("p3") should not be slice("p1")
slice("p3") should not be slice("p2")
state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3"
state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10
state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L))
state3.latestTimestamp shouldBe t0.plusMillis(3)

slice("p863") shouldBe slice("p984") // both slice 645
slice("p863") should not be slice("p1")
slice("p863") should not be slice("p2")
slice("p863") should not be slice("p3")
val state4 = state3
.add(Vector(createRecord("p863", 1, t0.plusMillis(10))))
.add(Vector(createRecord("p863", 2, t0.plusMillis(11))))
.add(Vector(createRecord("p984", 1, t0.plusMillis(12)), createRecord("p984", 2, t0.plusMillis(13))))
state4.bySliceSorted(slice("p984")).size shouldBe 2
state4.bySliceSorted(slice("p984")).last.pid shouldBe "p984"
state4.bySliceSorted(slice("p984")).last.seqNr shouldBe 2

val state5 = state3
.add(Vector(createRecord("p863", 2, t0.plusMillis(13))))
.add(Vector(createRecord("p863", 1, t0.plusMillis(12))))
.add(Vector(createRecord("p984", 2, t0.plusMillis(11)), createRecord("p984", 1, t0.plusMillis(10))))
state5.bySliceSorted(slice("p863")).size shouldBe 2
state5.bySliceSorted(slice("p863")).last.pid shouldBe "p863"
state5.bySliceSorted(slice("p863")).last.seqNr shouldBe 2

// same slice and same timestamp, keep both in seen
slice("p10084") shouldBe slice("p3")
val state4 = state3.add(Vector(createRecord("p10084", 9, t0.plusMillis(3))))
state4.offsetBySlice(slice("p10084")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L, "p10084" -> 9))
val state6 = state3.add(Vector(createRecord("p10084", 9, t0.plusMillis(3))))
state6.offsetBySlice(slice("p10084")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L, "p10084" -> 9))
}

"evict old" in {
Expand Down Expand Up @@ -102,6 +127,50 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
p7 -> 7L)
}

"evict old but keep latest for each slice" in {
val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
createRecord("p1", 1, t0),
createRecord("p92", 2, t0.plusMillis(1)),
createRecord("p108", 3, t0.plusMillis(20)),
createRecord("p4", 4, t0.plusMillis(30)),
createRecord("p5", 5, t0.plusMillis(40))))

state1.byPid("p1").slice shouldBe 449
state1.byPid("p92").slice shouldBe 905
state1.byPid("p108").slice shouldBe 905 // same slice as p92
state1.byPid("p4").slice shouldBe 452
state1.byPid("p5").slice shouldBe 453

val slices = state1.bySliceSorted.keySet

state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
"p1" -> 1L,
"p92" -> 2L,
"p108" -> 3L,
"p4" -> 4L,
"p5" -> 5L)

val timeWindow = JDuration.ofMillis(1)

val state2 = slices.foldLeft(state1) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// note that p92 is evicted because it has same slice as p108
// p1 is kept because keeping one for each slice
state2.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)

val state3 = slices.foldLeft(state2) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// still keeping one for each slice
state3.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
}

"find duplicate" in {
val t0 = TestClock.nowMillis().instant()
val state =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
def slice(pid: String): Int =
persistenceExt.sliceForPersistenceId(pid)

s"The DynamoDBOffsetStore for TimestampOffset" must {
"The DynamoDBOffsetStore for TimestampOffset" must {

"save TimestampOffset with one entry" in {
val projectionId = genRandomProjectionId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,6 @@ private[projection] object DynamoDBOffsetStore {
def add(records: Iterable[Record]): State = {
records.foldLeft(this) {
case (acc, r) =>
val newByPid =
acc.byPid.get(r.pid) match {
case Some(existingRecord) =>
if (r.seqNr > existingRecord.seqNr)
acc.byPid.updated(r.pid, r)
else
acc.byPid // older or same seqNr
case None =>
acc.byPid.updated(r.pid, r)
}

val newBySliceSorted =
acc.bySliceSorted.updated(r.slice, acc.bySliceSorted.get(r.slice) match {
case Some(existing) => existing + r
case None => TreeSet.empty[Record] + r
})

val newOffsetBySlice =
acc.offsetBySlice.get(r.slice) match {
case Some(existing) =>
Expand All @@ -130,7 +113,23 @@ private[projection] object DynamoDBOffsetStore {
acc.offsetBySlice.updated(r.slice, TimestampOffset(r.timestamp, Map(r.pid -> r.seqNr)))
}

acc.copy(byPid = newByPid, bySliceSorted = newBySliceSorted, offsetBySlice = newOffsetBySlice)
val sorted = acc.bySliceSorted.getOrElse(r.slice, TreeSet.empty[Record])
acc.byPid.get(r.pid) match {
case Some(existingRecord) =>
if (r.seqNr > existingRecord.seqNr)
acc.copy(
byPid = acc.byPid.updated(r.pid, r),
bySliceSorted = acc.bySliceSorted.updated(r.slice, sorted - existingRecord + r),
offsetBySlice = newOffsetBySlice)
else
// older or same seqNr
acc.copy(offsetBySlice = newOffsetBySlice)
case None =>
acc.copy(
byPid = acc.byPid.updated(r.pid, r),
bySliceSorted = acc.bySliceSorted.updated(r.slice, sorted + r),
offsetBySlice = newOffsetBySlice)
}
}
}

Expand Down

0 comments on commit 81a35c4

Please sign in to comment.