From 70c6efd4e9679e65336f2f0926d1a22e98ecd25d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 17 Jan 2025 11:53:16 +0100 Subject: [PATCH] fix(r2dbc): Leak of in-flight offsets * concurrency issue in R2dbcOffsetStore when saving offsets in case of concurrent modifications * map vs flatMap * DynamoDB was correct * added some more logging --- .../DynamoDBTimestampOffsetStoreSpec.scala | 48 +++++++++++++++++ .../internal/DynamoDBOffsetStore.scala | 45 ++++++++++++---- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 48 +++++++++++++++++ .../r2dbc/internal/R2dbcOffsetStore.scala | 51 ++++++++++++++----- 4 files changed, 170 insertions(+), 22 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala index 69285b567..a2fb6bc60 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala @@ -816,6 +816,54 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L) } + "cleanup inFlight when saving offset" in { + val projectionId = genRandomProjectionId() + val offsetStore = createOffsetStore(projectionId) + + val startTime = TestClock.nowMicros().instant() + val offset1 = TimestampOffset(startTime, Map("p1" -> 3L)) + val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3") + val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L)) + val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4") + val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L)) + + // save same seqNr as inFlight should remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + + // clear + offsetStore.readOffset().futureValue + + // save lower seqNr than inFlight should not remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.addInflight(envelope2) + offsetStore.getInflight().get("p1") shouldBe Some(4L) + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.getInflight().get("p1") shouldBe Some(4L) + + // clear + offsetStore.readOffset().futureValue + + // save higher seqNr than inFlight should remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + + // clear + offsetStore.readOffset().futureValue + + // save higher seqNr than inFlight should remove from inFlight + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.addInflight(envelope2) + offsetStore.getInflight().get("p1") shouldBe Some(4L) + offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + } + "evict old records from same slice" in { val projectionId = genRandomProjectionId() val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 04efa3f61..f92bbd3bb 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -171,6 +171,20 @@ private[projection] object DynamoDBOffsetStore { } } + override def toString: String = { + val sb = new StringBuilder + sb.append("State(") + bySliceSorted.toVector.sortBy(_._1).foreach { + case (slice, records) => + sb.append("slice ").append(slice).append(": ") + records.foreach { r => + sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ") + } + } + sb.append(")") + sb.toString + } + } final class RejectedEnvelope(message: String) extends IllegalStateException(message) @@ -243,10 +257,10 @@ private[projection] class DynamoDBOffsetStore( timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala) case Some(_) => throw new IllegalArgumentException( - s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.") + s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.") case None => throw new IllegalArgumentException( - s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") + s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } } @@ -262,6 +276,10 @@ private[projection] class DynamoDBOffsetStore( Future.successful(Some(getState().latestOffset).map(_.asInstanceOf[Offset])) } + private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = { + s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]" + } + def readOffset[Offset](): Future[Option[Offset]] = { // look for TimestampOffset first since that is used by akka-persistence-dynamodb, // and then fall back to the other more primitive offset types @@ -273,7 +291,8 @@ private[projection] class DynamoDBOffsetStore( }(ExecutionContext.parasitic) case None => // FIXME primitive offsets not supported, maybe we can change the sourceProvider parameter - throw new IllegalStateException("BySlicesSourceProvider is required. Primitive offsets not supported.") + throw new IllegalStateException( + s"$logPrefix BySlicesSourceProvider is required. Primitive offsets not supported.") } } @@ -299,7 +318,9 @@ private[projection] class DynamoDBOffsetStore( val newState = State(offsetBySlice) if (!state.compareAndSet(oldState, newState)) - throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") + throw new IllegalStateException( + s"$logPrefix Unexpected concurrent modification of state from readOffset. " + + s"${dumpState(oldState, getInflight())}") clearInflight() if (offsetBySlice.isEmpty) { logger.debug("{} readTimestampOffset no stored offset", logPrefix) @@ -381,14 +402,15 @@ private[projection] class DynamoDBOffsetStore( val slice = persistenceExt.sliceForPersistenceId(pid) Record(slice, pid, seqNr, t.timestamp) case OffsetPidSeqNr(_: TimestampOffset, None) => - throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.") + throw new IllegalArgumentException( + s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.") case _ => throw new IllegalArgumentException( - "Mix of TimestampOffset and other offset type in same transaction is not supported") + s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported") } storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent) } else { - throw new IllegalStateException("TimestampOffset is required. Primitive offsets not supported.") + throw new IllegalStateException(s"$logPrefix TimestampOffset is required. Primitive offsets not supported.") } } @@ -456,7 +478,9 @@ private[projection] class DynamoDBOffsetStore( FutureDone } else { // concurrent update if (canBeConcurrent) storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent) // CAS retry - else throw new IllegalStateException("Unexpected concurrent modification of state in save offsets.") + else + throw new IllegalStateException( + s"$logPrefix Unexpected concurrent modification of state in save offsets.") } } } @@ -477,7 +501,8 @@ private[projection] class DynamoDBOffsetStore( if (newInflight.size >= 10000) { throw new IllegalStateException( s"Too many envelopes in-flight [${newInflight.size}]. " + - "Please report this issue at https://github.com/akka/akka-projection") + "Please report this issue at https://github.com/akka/akka-projection " + + s"${dumpState(newState, newInflight)}") } if (!inflight.compareAndSet(currentInflight, newInflight)) cleanupInflight(newState) // CAS retry, concurrent update of inflight @@ -773,7 +798,7 @@ private[projection] class DynamoDBOffsetStore( case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] => // in case additional types are added throw new IllegalArgumentException( - s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues") + s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues") case _ => None } } diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 64acdcc40..dbd7520a0 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -736,6 +736,54 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L) } + "cleanup inFlight when saving offset" in { + val projectionId = genRandomProjectionId() + val offsetStore = createOffsetStore(projectionId) + + val startTime = TestClock.nowMicros().instant() + val offset1 = TimestampOffset(startTime, Map("p1" -> 3L)) + val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3") + val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L)) + val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4") + val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L)) + + // save same seqNr as inFlight should remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + + // clear + offsetStore.readOffset().futureValue + + // save lower seqNr than inFlight should not remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.addInflight(envelope2) + offsetStore.getInflight().get("p1") shouldBe Some(4L) + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.getInflight().get("p1") shouldBe Some(4L) + + // clear + offsetStore.readOffset().futureValue + + // save higher seqNr than inFlight should remove from inFlight + offsetStore.addInflight(envelope1) + offsetStore.getInflight().get("p1") shouldBe Some(3L) + offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + + // clear + offsetStore.readOffset().futureValue + + // save higher seqNr than inFlight should remove from inFlight + offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue + offsetStore.addInflight(envelope2) + offsetStore.getInflight().get("p1") shouldBe Some(4L) + offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue + offsetStore.getInflight().get("p1") shouldBe None + } + "evict old records from same slice" in { val projectionId = genRandomProjectionId() val evictSettings = settings.withTimeWindow(100.seconds) diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 294cb66be..70413a1bd 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -187,6 +187,20 @@ private[projection] object R2dbcOffsetStore { } } + override def toString: String = { + val sb = new StringBuilder + sb.append("State(") + bySliceSorted.toVector.sortBy(_._1).foreach { + case (slice, records) => + sb.append("slice ").append(slice).append(": ") + records.foreach { r => + sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ") + } + } + sb.append(")") + sb.toString + } + } final class RejectedEnvelope(message: String) extends IllegalStateException(message) @@ -333,10 +347,10 @@ private[projection] class R2dbcOffsetStore( timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala) case Some(_) => throw new IllegalArgumentException( - s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.") + s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.") case None => throw new IllegalArgumentException( - s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") + s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } } @@ -353,6 +367,10 @@ private[projection] class R2dbcOffsetStore( } } + private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = { + s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]" + } + def readOffset[Offset](): Future[Option[Offset]] = { scheduleTasks() @@ -441,7 +459,9 @@ private[projection] class R2dbcOffsetStore( startOffset) if (!state.compareAndSet(oldState, newState)) - throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") + throw new IllegalStateException( + s"$logPrefix Unexpected concurrent modification of state from readOffset. " + + s"${dumpState(oldState, getInflight())}") startOffset } @@ -540,7 +560,8 @@ private[projection] class R2dbcOffsetStore( val record = Record(slice, pid, seqNr, t.timestamp) saveTimestampOffsetInTx(conn, Vector(record), canBeConcurrent = true) case OffsetPidSeqNr(_: TimestampOffset, None) => - throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.") + throw new IllegalArgumentException( + s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.") case _ => savePrimitiveOffsetInTx(conn, offset.offset) } @@ -569,10 +590,11 @@ private[projection] class R2dbcOffsetStore( val slice = persistenceExt.sliceForPersistenceId(pid) Record(slice, pid, seqNr, t.timestamp) case OffsetPidSeqNr(_: TimestampOffset, None) => - throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.") + throw new IllegalArgumentException( + s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.") case _ => throw new IllegalArgumentException( - "Mix of TimestampOffset and other offset type in same transaction is not supported") + s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported") } saveTimestampOffsetInTx(conn, records, canBeConcurrent) } else { @@ -630,15 +652,19 @@ private[projection] class R2dbcOffsetStore( val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords) - offsetInserts.map { _ => + offsetInserts.flatMap { _ => if (state.compareAndSet(oldState, evictedNewState)) { slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE)) cleanupInflight(evictedNewState) + FutureDone } else { // concurrent update if (canBeConcurrent) saveTimestampOffsetInTx(conn, records, canBeConcurrent) // CAS retry - else throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.") + else + throw new IllegalStateException( + s"$logPrefix Unexpected concurrent modification of state from saveOffset. " + + s"${dumpState(newState, currentInflight)}") } - Done + } } } @@ -656,8 +682,9 @@ private[projection] class R2dbcOffsetStore( } if (newInflight.size >= 10000) { throw new IllegalStateException( - s"Too many envelopes in-flight [${newInflight.size}]. " + - "Please report this issue at https://github.com/akka/akka-projection") + s"$logPrefix Too many envelopes in-flight [${newInflight.size}]. " + + "Please report this issue at https://github.com/akka/akka-projection " + + s"${dumpState(newState, newInflight)}") } if (!inflight.compareAndSet(currentInflight, newInflight)) cleanupInflight(newState) // CAS retry, concurrent update of inflight @@ -1197,7 +1224,7 @@ private[projection] class R2dbcOffsetStore( case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] => // in case additional types are added throw new IllegalArgumentException( - s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues") + s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues") case _ => None } }