Skip to content

Commit

Permalink
fix(r2dbc): Leak of in-flight offsets
Browse files Browse the repository at this point in the history
* concurrency issue in R2dbcOffsetStore when saving offsets in case of concurrent modifications
* map vs flatMap
* DynamoDB was correct
* added some more logging
  • Loading branch information
patriknw committed Jan 17, 2025
1 parent e4aa9c3 commit 70c6efd
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,54 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L)
}

"cleanup inFlight when saving offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L))
val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3")
val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L))
val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4")
val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L))

// save same seqNr as inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save lower seqNr than inFlight should not remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe Some(4L)

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None
}

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ private[projection] object DynamoDBOffsetStore {
}
}

override def toString: String = {
val sb = new StringBuilder
sb.append("State(")
bySliceSorted.toVector.sortBy(_._1).foreach {
case (slice, records) =>
sb.append("slice ").append(slice).append(": ")
records.foreach { r =>
sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ")
}
}
sb.append(")")
sb.toString
}

}

final class RejectedEnvelope(message: String) extends IllegalStateException(message)
Expand Down Expand Up @@ -243,10 +257,10 @@ private[projection] class DynamoDBOffsetStore(
timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)
case Some(_) =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
case None =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}
}

Expand All @@ -262,6 +276,10 @@ private[projection] class DynamoDBOffsetStore(
Future.successful(Some(getState().latestOffset).map(_.asInstanceOf[Offset]))
}

private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = {
s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]"
}

def readOffset[Offset](): Future[Option[Offset]] = {
// look for TimestampOffset first since that is used by akka-persistence-dynamodb,
// and then fall back to the other more primitive offset types
Expand All @@ -273,7 +291,8 @@ private[projection] class DynamoDBOffsetStore(
}(ExecutionContext.parasitic)
case None =>
// FIXME primitive offsets not supported, maybe we can change the sourceProvider parameter
throw new IllegalStateException("BySlicesSourceProvider is required. Primitive offsets not supported.")
throw new IllegalStateException(
s"$logPrefix BySlicesSourceProvider is required. Primitive offsets not supported.")
}
}

Expand All @@ -299,7 +318,9 @@ private[projection] class DynamoDBOffsetStore(
val newState = State(offsetBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from readOffset. " +
s"${dumpState(oldState, getInflight())}")
clearInflight()
if (offsetBySlice.isEmpty) {
logger.debug("{} readTimestampOffset no stored offset", logPrefix)
Expand Down Expand Up @@ -381,14 +402,15 @@ private[projection] class DynamoDBOffsetStore(
val slice = persistenceExt.sliceForPersistenceId(pid)
Record(slice, pid, seqNr, t.timestamp)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
throw new IllegalArgumentException(
"Mix of TimestampOffset and other offset type in same transaction is not supported")
s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported")
}
storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent)
} else {
throw new IllegalStateException("TimestampOffset is required. Primitive offsets not supported.")
throw new IllegalStateException(s"$logPrefix TimestampOffset is required. Primitive offsets not supported.")
}
}

Expand Down Expand Up @@ -456,7 +478,9 @@ private[projection] class DynamoDBOffsetStore(
FutureDone
} else { // concurrent update
if (canBeConcurrent) storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent) // CAS retry
else throw new IllegalStateException("Unexpected concurrent modification of state in save offsets.")
else
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state in save offsets.")
}
}
}
Expand All @@ -477,7 +501,8 @@ private[projection] class DynamoDBOffsetStore(
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection")
"Please report this issue at https://github.com/akka/akka-projection " +
s"${dumpState(newState, newInflight)}")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down Expand Up @@ -773,7 +798,7 @@ private[projection] class DynamoDBOffsetStore(
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,54 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L)
}

"cleanup inFlight when saving offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L))
val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3")
val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L))
val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4")
val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L))

// save same seqNr as inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save lower seqNr than inFlight should not remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe Some(4L)

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None
}

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(100.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ private[projection] object R2dbcOffsetStore {
}
}

override def toString: String = {
val sb = new StringBuilder
sb.append("State(")
bySliceSorted.toVector.sortBy(_._1).foreach {
case (slice, records) =>
sb.append("slice ").append(slice).append(": ")
records.foreach { r =>
sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ")
}
}
sb.append(")")
sb.toString
}

}

final class RejectedEnvelope(message: String) extends IllegalStateException(message)
Expand Down Expand Up @@ -333,10 +347,10 @@ private[projection] class R2dbcOffsetStore(
timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)
case Some(_) =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
case None =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}
}

Expand All @@ -353,6 +367,10 @@ private[projection] class R2dbcOffsetStore(
}
}

private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = {
s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]"
}

def readOffset[Offset](): Future[Option[Offset]] = {
scheduleTasks()

Expand Down Expand Up @@ -441,7 +459,9 @@ private[projection] class R2dbcOffsetStore(
startOffset)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from readOffset. " +
s"${dumpState(oldState, getInflight())}")

startOffset
}
Expand Down Expand Up @@ -540,7 +560,8 @@ private[projection] class R2dbcOffsetStore(
val record = Record(slice, pid, seqNr, t.timestamp)
saveTimestampOffsetInTx(conn, Vector(record), canBeConcurrent = true)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
savePrimitiveOffsetInTx(conn, offset.offset)
}
Expand Down Expand Up @@ -569,10 +590,11 @@ private[projection] class R2dbcOffsetStore(
val slice = persistenceExt.sliceForPersistenceId(pid)
Record(slice, pid, seqNr, t.timestamp)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
throw new IllegalArgumentException(
"Mix of TimestampOffset and other offset type in same transaction is not supported")
s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported")
}
saveTimestampOffsetInTx(conn, records, canBeConcurrent)
} else {
Expand Down Expand Up @@ -630,15 +652,19 @@ private[projection] class R2dbcOffsetStore(

val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords)

offsetInserts.map { _ =>
offsetInserts.flatMap { _ =>
if (state.compareAndSet(oldState, evictedNewState)) {
slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE))
cleanupInflight(evictedNewState)
FutureDone
} else { // concurrent update
if (canBeConcurrent) saveTimestampOffsetInTx(conn, records, canBeConcurrent) // CAS retry
else throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.")
else
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from saveOffset. " +
s"${dumpState(newState, currentInflight)}")
}
Done

}
}
}
Expand All @@ -656,8 +682,9 @@ private[projection] class R2dbcOffsetStore(
}
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection")
s"$logPrefix Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection " +
s"${dumpState(newState, newInflight)}")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down Expand Up @@ -1197,7 +1224,7 @@ private[projection] class R2dbcOffsetStore(
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
case _ => None
}
}
Expand Down

0 comments on commit 70c6efd

Please sign in to comment.