From 0390b64790d7ca051b715569013b50495faa7628 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 18 Dec 2023 17:03:43 +0100 Subject: [PATCH] doc: Scaling projection instances (#1101) --- .../projection/R2dbcProjectionDocExample.java | 22 ++++++++++--------- .../R2dbcProjectionDocExample.scala | 20 ++++++++++------- docs/src/main/paradox/r2dbc.md | 3 +-- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java b/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java index bcc3a78bc..7ed34a7f8 100644 --- a/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java +++ b/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java @@ -7,6 +7,7 @@ import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.japi.Pair; import akka.persistence.query.Offset; @@ -154,19 +155,20 @@ public CompletionStage process( // #initProjections void initProjections() { - // Split the slices into 4 ranges - int numberOfSliceRanges = 4; - List> sliceRanges = - EventSourcedProvider.sliceRanges( - system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); - ShardedDaemonProcess.get(system) - .init( + .initWithContext( ProjectionBehavior.Command.class, "ShoppingCartProjection", - sliceRanges.size(), - i -> ProjectionBehavior.create(createProjection(sliceRanges.get(i))), - ProjectionBehavior.stopMessage()); + 4, + daemonContext -> { + List> sliceRanges = + EventSourcedProvider.sliceRanges( + system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); + Pair sliceRange = sliceRanges.get(daemonContext.processNumber()); + return ProjectionBehavior.create(createProjection(sliceRange)); + }, + ShardedDaemonProcessSettings.create(system), + Optional.of(ProjectionBehavior.stopMessage())); } Projection> createProjection( diff --git a/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala b/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala index 8b6cc0e14..8e5e9c701 100644 --- a/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala +++ b/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala @@ -13,12 +13,14 @@ import akka.persistence.query.Offset import akka.projection.r2dbc.R2dbcProjectionSettings import akka.serialization.jackson.CborSerializable import org.slf4j.LoggerFactory - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings + //#handler //#grouped-handler import akka.persistence.query.typed.EventEnvelope @@ -136,14 +138,16 @@ object R2dbcProjectionDocExample { handler = () => new ShoppingCartHandler) } - // Split the slices into 4 ranges - val numberOfSliceRanges: Int = 4 - val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges) - - ShardedDaemonProcess(system).init( + ShardedDaemonProcess(system).initWithContext( name = "ShoppingCartProjection", - numberOfInstances = sliceRanges.size, - behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))), + initialNumberOfInstances = 4, + behaviorFactory = { daemonContext => + val sliceRanges = + EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, daemonContext.totalProcesses) + val sliceRange = sliceRanges(daemonContext.processNumber) + ProjectionBehavior(projection(sliceRange)) + }, + ShardedDaemonProcessSettings(system), stopMessage = ProjectionBehavior.Stop) } // #initProjections diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md index e6ea648d1..d42ae927c 100644 --- a/docs/src/main/paradox/r2dbc.md +++ b/docs/src/main/paradox/r2dbc.md @@ -124,8 +124,7 @@ using 1024 slices and running 4 Projection instances the slice ranges would be 0 Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, ..., 768-895, 896-1023. However, when changing the number of slices the projections with the old slice distribution must be -stopped before starting new projections. That can be done with a full shutdown before deploying the -new slice distribution or pause (stop) the projections with @ref:[the management API](management.md). +stopped before starting new projections. That can be done at runtime when @ref[Running with Sharded Daemon Process](#running-with-sharded-daemon-process). When using `R2dbcProjection` together with the `EventSourcedProvider.eventsBySlices` the events will be delivered in sequence number order without duplicates.