diff --git a/akka-projection-dynamodb-integration/src/test/resources/logback-test.xml b/akka-projection-dynamodb-integration/src/test/resources/logback-test.xml index e1aa88c4d..554158027 100644 --- a/akka-projection-dynamodb-integration/src/test/resources/logback-test.xml +++ b/akka-projection-dynamodb-integration/src/test/resources/logback-test.xml @@ -15,7 +15,7 @@ - + diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala new file mode 100644 index 000000000..a58ae9c59 --- /dev/null +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/EventSourcedEndToEndSpec.scala @@ -0,0 +1,576 @@ +/* + * Copyright (C) 2022-2024 Lightbend Inc. + */ + +package akka.projection.dynamodb + +import java.lang +import java.util.UUID +import java.util.concurrent.CompletionException +import java.util.concurrent.ConcurrentHashMap +import java.util.{ HashMap => JHashMap } + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters.CompletionStageOps +import scala.util.Failure +import scala.util.Success + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal +import akka.persistence.query.Offset +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.RetentionCriteria +import akka.projection.Projection +import akka.projection.ProjectionBehavior +import akka.projection.ProjectionContext +import akka.projection.ProjectionId +import akka.projection.dynamodb.scaladsl.DynamoDBProjection +import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.scaladsl.Handler +import akka.projection.scaladsl.SourceProvider +import akka.stream.scaladsl.FlowWithContext +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement +import software.amazon.awssdk.services.dynamodb.model.KeyType +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput +import software.amazon.awssdk.services.dynamodb.model.Put +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem + +object EventSourcedEndToEndSpec { + + private val log = LoggerFactory.getLogger(classOf[EventSourcedEndToEndSpec]) + + val config: Config = ConfigFactory + .parseString(""" + akka.persistence.dynamodb { + query { + refresh-interval = 500 millis + # stress more by using a small buffer + buffer-size = 10 + + backtracking.behind-current-time = 5 seconds + } + } + """) + .withFallback(TestConfig.config) + + object Persister { + sealed trait Command + final case class Persist(payload: Any) extends Command + final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) extends Command + final case class PersistAll(payloads: List[Any]) extends Command + final case class Ping(replyTo: ActorRef[Done]) extends Command + final case class Stop(replyTo: ActorRef[Done]) extends Command + + def apply(pid: PersistenceId): Behavior[Command] = { + Behaviors.setup { context => + EventSourcedBehavior[Command, Any, String](persistenceId = pid, "", { + (_, command) => + command match { + case command: Persist => + context.log.debug( + "Persist [{}], pid [{}], seqNr [{}]", + command.payload, + pid.id, + EventSourcedBehavior.lastSequenceNumber(context) + 1) + Effect.persist(command.payload) + case command: PersistWithAck => + context.log.debug( + "Persist [{}], pid [{}], seqNr [{}]", + command.payload, + pid.id, + EventSourcedBehavior.lastSequenceNumber(context) + 1) + Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done) + case command: PersistAll => + if (context.log.isDebugEnabled) + context.log.debug( + "PersistAll [{}], pid [{}], seqNr [{}]", + command.payloads.mkString(","), + pid.id, + EventSourcedBehavior.lastSequenceNumber(context) + 1) + Effect.persist(command.payloads) + case Ping(replyTo) => + replyTo ! Done + Effect.none + case Stop(replyTo) => + replyTo ! Done + Effect.stop() + } + }, (_, _) => "") + .withRetention(RetentionCriteria.snapshotEvery(5)) + } + } + } + + final case class Processed(projectionId: ProjectionId, envelope: EventEnvelope[String]) + + final case class StartParams( + entityType: String, + projectionName: String, + nrOfProjections: Int, + processedProbe: TestProbe[Processed]) + + object TestTable { + val name = "endtoend_spec" + + object Attributes { + val Id = "id" + val Payload = "payload" + } + + def create(client: DynamoDbAsyncClient, system: ActorSystem[_]): Future[Done] = { + import system.executionContext + + client.describeTable(DescribeTableRequest.builder().tableName(name).build()).asScala.transformWith { + case Success(_) => Future.successful(Done) // already exists + case Failure(_) => + val request = CreateTableRequest + .builder() + .tableName(name) + .keySchema(KeySchemaElement.builder().attributeName(Attributes.Id).keyType(KeyType.HASH).build()) + .attributeDefinitions( + AttributeDefinition.builder().attributeName(Attributes.Id).attributeType(ScalarAttributeType.S).build()) + .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()) + .build() + client + .createTable(request) + .asScala + .map(_ => Done) + .recoverWith { + case c: CompletionException => + Future.failed(c.getCause) + }(ExecutionContext.parasitic) + } + } + + def findById(id: String)(client: DynamoDbAsyncClient, system: ActorSystem[_]): Future[Option[String]] = { + import system.executionContext + + client + .getItem( + GetItemRequest + .builder() + .tableName(name) + .consistentRead(true) + .key(Map(Attributes.Id -> AttributeValue.fromS(id)).asJava) + .build()) + .asScala + .map { response => + if (response.hasItem && response.item.containsKey(Attributes.Payload)) + Some(response.item.get(Attributes.Payload).s()) + else None + } + .recoverWith { + case c: CompletionException => + Future.failed(c.getCause) + }(ExecutionContext.parasitic) + } + } + + class TestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) + extends Handler[EventEnvelope[String]] { + + override def process(envelope: EventEnvelope[String]): Future[Done] = { + log.debug( + "{} Processed {} [{}], pid [{}], seqNr [{}]", + projectionId.key, + if (processedEvents.containsKey(envelope.event)) "duplicate event" else "event", + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + // could be at-least once + if (processedEvents.putIfAbsent(envelope.event, true) == null) + probe ! Processed(projectionId, envelope) + Future.successful(Done) + } + } + + class GroupedTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) + extends Handler[Seq[EventEnvelope[String]]] { + val delegate = new TestHandler(projectionId, probe, processedEvents) + + override def process(envelopes: Seq[EventEnvelope[String]]): Future[Done] = { + envelopes.foreach(delegate.process) + Future.successful(Done) + } + } + + private def flowTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + processedEvents: ConcurrentHashMap[String, java.lang.Boolean]) = { + + FlowWithContext[EventEnvelope[String], ProjectionContext].map { envelope => + log.debug( + "{} Processed {} [{}], pid [{}], seqNr [{}]", + projectionId.key, + if (processedEvents.containsKey(envelope.event)) "duplicate event" else "event", + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + // could be at-least once + if (processedEvents.putIfAbsent(envelope.event, true) == null) + probe ! Processed(projectionId, envelope) + Done + } + } + + class TransactTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + client: DynamoDbAsyncClient, + system: ActorSystem[_]) + extends DynamoDBTransactHandler[EventEnvelope[String]] { + import system.executionContext + + override def process(envelope: EventEnvelope[String]): Future[Iterable[TransactWriteItem]] = { + log.debug( + "{} Processed event [{}], pid [{}], seqNr [{}]", + projectionId.key, + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + + val id = s"${envelope.persistenceId}-${envelope.sequenceNr}" + TestTable.findById(id)(client, system).map { + case None => + val attributes = new JHashMap[String, AttributeValue] + attributes.put(TestTable.Attributes.Id, AttributeValue.fromS(id)) + attributes.put(TestTable.Attributes.Payload, AttributeValue.fromS(envelope.event)) + probe ! Processed(projectionId, envelope) + Seq(TransactWriteItem.builder().put(Put.builder().tableName(TestTable.name).item(attributes).build()).build()) + case Some(_) => + log.error( + "{} Processed duplicate event [{}], pid [{}], seqNr [{}]", + projectionId.key, + envelope.event, + envelope.persistenceId, + envelope.sequenceNr) + // this will be detected as duplicate and fail the test + probe ! Processed(projectionId, envelope) + Nil + } + } + } + + class GroupedTransactTestHandler( + projectionId: ProjectionId, + probe: ActorRef[Processed], + client: DynamoDbAsyncClient, + system: ActorSystem[_]) + extends DynamoDBTransactHandler[Seq[EventEnvelope[String]]] { + import system.executionContext + val delegate = new TransactTestHandler(projectionId, probe, client, system) + + override def process(envelopes: Seq[EventEnvelope[String]]): Future[Iterable[TransactWriteItem]] = { + envelopes.foldLeft(Future.successful(Vector.empty[TransactWriteItem])) { + case (acc, env) => + acc.flatMap { accWriteItems => + delegate.process(env).map(accWriteItems ++ _) + } + } + } + } + +} + +class EventSourcedEndToEndSpec + extends ScalaTestWithActorTestKit(EventSourcedEndToEndSpec.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + import EventSourcedEndToEndSpec._ + + override def typedSystem: ActorSystem[_] = system + + override protected def beforeAll(): Unit = { + if (localDynamoDB) + Await.result(TestTable.create(client, system), 10.seconds) + super.beforeAll() + } + + private var processedEventsPerProjection: Map[ProjectionId, ConcurrentHashMap[String, java.lang.Boolean]] = Map.empty + + private def processedEvents(projectionId: ProjectionId): ConcurrentHashMap[String, lang.Boolean] = { + processedEventsPerProjection.get(projectionId) match { + case None => + val processedEvents = new ConcurrentHashMap[String, java.lang.Boolean] + processedEventsPerProjection = processedEventsPerProjection.updated(projectionId, processedEvents) + processedEvents + case Some(processedEvents) => + processedEvents + } + } + + private def startAtLeastOnceProjections(startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + DynamoDBProjection + .atLeastOnce( + projectionId, + Some(settings), + sourceProvider = sourceProvider, + handler = + () => new TestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId)))) + } + + private def startAtLeastOnceGroupedProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + DynamoDBProjection + .atLeastOnceGroupedWithin( + projectionId, + Some(settings), + sourceProvider = sourceProvider, + handler = + () => new GroupedTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId))) + .withGroup(3, groupAfterDuration = 200.millis)) + } + + private def startAtLeastOnceFlowProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + DynamoDBProjection + .atLeastOnceFlow( + projectionId, + Some(settings), + sourceProvider = sourceProvider, + flowTestHandler(projectionId, startParams.processedProbe.ref, processedEvents(projectionId)))) + } + + private def startExactlyOnceProjections(startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + DynamoDBProjection + .exactlyOnce( + projectionId, + Some(settings), + sourceProvider = sourceProvider, + handler = () => new TransactTestHandler(projectionId, startParams.processedProbe.ref, client, system))) + } + + private def startExactlyOnceGroupedProjections( + startParams: StartParams): Vector[ActorRef[ProjectionBehavior.Command]] = { + startProjections( + startParams, + (projectionId, sourceProvider) => + DynamoDBProjection + .exactlyOnceGroupedWithin( + projectionId, + Some(settings), + sourceProvider = sourceProvider, + handler = + () => new GroupedTransactTestHandler(projectionId, startParams.processedProbe.ref, client, system)) + .withGroup(3, groupAfterDuration = 200.millis)) + } + + private def startProjections( + startParams: StartParams, + projectionFactory: ( + ProjectionId, + SourceProvider[Offset, EventEnvelope[String]]) => Projection[EventEnvelope[String]]) + : Vector[ActorRef[ProjectionBehavior.Command]] = { + import startParams._ + val sliceRanges = EventSourcedProvider.sliceRanges(system, DynamoDBReadJournal.Identifier, nrOfProjections) + + sliceRanges.map { range => + val projectionId = ProjectionId(projectionName, s"${range.min}-${range.max}") + + val sourceProvider = + EventSourcedProvider + .eventsBySlices[String](system, DynamoDBReadJournal.Identifier, entityType, range.min, range.max) + + val projection = projectionFactory(projectionId, sourceProvider) + spawn(ProjectionBehavior(projection)) + }.toVector + } + + private def mkEvent(n: Int): String = f"e$n%05d" + + private def assertEventsProcessed( + expectedEvents: Vector[String], + processedProbe: TestProbe[Processed], + verifyProjectionId: Boolean): Unit = { + val expectedNumberOfEvents = expectedEvents.size + var processed = Vector.empty[Processed] + + (1 to expectedNumberOfEvents).foreach { _ => + // not using receiveMessages(expectedEvents) for better logging in case of failure + try { + processed :+= processedProbe.receiveMessage(15.seconds) + } catch { + case e: AssertionError => + val missing = expectedEvents.diff(processed.map(_.envelope.event)) + log.error(s"Processed [${processed.size}] events, but expected [$expectedNumberOfEvents]. " + + s"Missing [${missing.mkString(",")}]. " + + s"Received [${processed.map(p => s"(${p.envelope.event}, ${p.envelope.persistenceId}, ${p.envelope.sequenceNr})").mkString(", ")}]. ") + throw e + } + } + + if (verifyProjectionId) { + val byPid = processed.groupBy(_.envelope.persistenceId) + byPid.foreach { + case (pid, processedByPid) => + withClue(s"PersistenceId [$pid]: ") { + // all events of a pid must be processed by the same projection instance + processedByPid.map(_.projectionId).toSet.size shouldBe 1 + // processed events in right order + processedByPid.map(_.envelope.sequenceNr) shouldBe (1 to processedByPid.size).toVector + } + } + } + } + + private def test( + startParams: StartParams, + startProjectionsFactory: () => Vector[ActorRef[ProjectionBehavior.Command]]): Unit = { + val numberOfEntities = 20 // increase this for longer testing + val numberOfEvents = numberOfEntities * 10 + import startParams.entityType + processedEventsPerProjection = Map.empty + + val entities = (0 until numberOfEntities).map { n => + val persistenceId = PersistenceId(entityType, s"p$n") + spawn(Persister(persistenceId), s"$entityType-p$n") + } + + // write some before starting the projections + var n = 1 + while (n <= numberOfEvents / 4) { + val p = n % numberOfEntities + // mix some persist 1 and persist 3 events + if (n % 7 == 0) { + entities(p) ! Persister.PersistAll((0 until 3).map(i => mkEvent(n + i)).toList) + n += 3 + } else { + entities(p) ! Persister.Persist(mkEvent(n)) + n += 1 + } + + if (n % 10 == 0) + Thread.sleep(50) + else if (n % 25 == 0) + Thread.sleep(1500) + } + + val projections = startProjectionsFactory() + + // give them some time to start before writing more events + Thread.sleep(200) + + while (n <= numberOfEvents) { + val p = n % numberOfEntities + entities(p) ! Persister.Persist(mkEvent(n)) + + // stop projections + if (n == numberOfEvents / 2) { + projections.foreach { ref => + ref ! ProjectionBehavior.Stop + } + } + + // wait until stopped + if (n == (numberOfEvents / 2) + 10) { + val probe = createTestProbe() + projections.foreach { ref => + probe.expectTerminated(ref) + } + } + + // resume projections again + if (n == (numberOfEvents / 2) + 20) + startProjectionsFactory() + + if (n % 10 == 0) + Thread.sleep(50) + else if (n % 25 == 0) + Thread.sleep(1500) + + n += 1 + } + + val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector + assertEventsProcessed(expectedEvents, startParams.processedProbe, verifyProjectionId = true) + + projections.foreach(_ ! ProjectionBehavior.Stop) + val probe = createTestProbe() + projections.foreach { ref => + probe.expectTerminated(ref) + } + } + + private def newStartParams(): StartParams = { + val entityType = nextEntityType() + val projectionName = UUID.randomUUID().toString + val processedProbe = createTestProbe[Processed]() + StartParams(entityType, projectionName, nrOfProjections = 4, processedProbe) + } + + s"A DynamoDB projection with eventsBySlices source" must { + + "handle all events atLeastOnce" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceProjections(startParams)) + } + + "handle all events atLeastOnceGrouped" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceGroupedProjections(startParams)) + } + + "handle all events atLeastOnceFlow" in { + val startParams = newStartParams() + test(startParams, () => startAtLeastOnceFlowProjections(startParams)) + } + + "handle all events exactlyOnce" in { + val startParams = newStartParams() + test(startParams, () => startExactlyOnceProjections(startParams)) + } + + "handle all events exactlyOnceGrouped" in { + val startParams = newStartParams() + test(startParams, () => startExactlyOnceGroupedProjections(startParams)) + } + + } +} diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestConfig.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestConfig.scala index c3efda054..bfd6907d0 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestConfig.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestConfig.scala @@ -15,6 +15,7 @@ object TestConfig { .parseString(""" akka.loglevel = DEBUG akka.persistence.journal.plugin = "akka.persistence.dynamodb.journal" + akka.persistence.snapshot-store.plugin = "akka.persistence.dynamodb.snapshot" akka.persistence.dynamodb { query { refresh-interval = 1s diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestDbLifecycle.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestDbLifecycle.scala index a8fac4e57..f654ab726 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestDbLifecycle.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestDbLifecycle.scala @@ -49,6 +49,10 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => akka.persistence.dynamodb.util.scaladsl.CreateTables .createJournalTable(typedSystem, dynamoDBSettings, client, deleteIfExists = true), 10.seconds) + Await.result( + akka.persistence.dynamodb.util.scaladsl.CreateTables + .createSnapshotsTable(typedSystem, dynamoDBSettings, client, deleteIfExists = true), + 10.seconds) Await.result( CreateTables.createTimestampOffsetStoreTable(typedSystem, settings, client, deleteIfExists = true), 10.seconds)