Skip to content

Commit

Permalink
doc: Scaling projection instances (#1101)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Dec 18, 2023
1 parent 091a943 commit 0390b64
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.japi.Pair;
import akka.persistence.query.Offset;
Expand Down Expand Up @@ -154,19 +155,20 @@ public CompletionStage<Done> process(

// #initProjections
void initProjections() {
// Split the slices into 4 ranges
int numberOfSliceRanges = 4;
List<Pair<Integer, Integer>> sliceRanges =
EventSourcedProvider.sliceRanges(
system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);

ShardedDaemonProcess.get(system)
.init(
.initWithContext(
ProjectionBehavior.Command.class,
"ShoppingCartProjection",
sliceRanges.size(),
i -> ProjectionBehavior.create(createProjection(sliceRanges.get(i))),
ProjectionBehavior.stopMessage());
4,
daemonContext -> {
List<Pair<Integer, Integer>> sliceRanges =
EventSourcedProvider.sliceRanges(
system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses());
Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber());
return ProjectionBehavior.create(createProjection(sliceRange));
},
ShardedDaemonProcessSettings.create(system),
Optional.of(ProjectionBehavior.stopMessage()));
}

Projection<EventEnvelope<ShoppingCart.Event>> createProjection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import akka.persistence.query.Offset
import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.serialization.jackson.CborSerializable
import org.slf4j.LoggerFactory

import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.cluster.sharding.typed.ShardedDaemonProcessSettings

//#handler
//#grouped-handler
import akka.persistence.query.typed.EventEnvelope
Expand Down Expand Up @@ -136,14 +138,16 @@ object R2dbcProjectionDocExample {
handler = () => new ShoppingCartHandler)
}

// Split the slices into 4 ranges
val numberOfSliceRanges: Int = 4
val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges)

ShardedDaemonProcess(system).init(
ShardedDaemonProcess(system).initWithContext(
name = "ShoppingCartProjection",
numberOfInstances = sliceRanges.size,
behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))),
initialNumberOfInstances = 4,
behaviorFactory = { daemonContext =>
val sliceRanges =
EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, daemonContext.totalProcesses)
val sliceRange = sliceRanges(daemonContext.processNumber)
ProjectionBehavior(projection(sliceRange))
},
ShardedDaemonProcessSettings(system),
stopMessage = ProjectionBehavior.Stop)
}
// #initProjections
Expand Down
3 changes: 1 addition & 2 deletions docs/src/main/paradox/r2dbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ using 1024 slices and running 4 Projection instances the slice ranges would be 0
Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, ..., 768-895, 896-1023.

However, when changing the number of slices the projections with the old slice distribution must be
stopped before starting new projections. That can be done with a full shutdown before deploying the
new slice distribution or pause (stop) the projections with @ref:[the management API](management.md).
stopped before starting new projections. That can be done at runtime when @ref[Running with Sharded Daemon Process](#running-with-sharded-daemon-process).

When using `R2dbcProjection` together with the `EventSourcedProvider.eventsBySlices` the events will be delivered in
sequence number order without duplicates.
Expand Down

0 comments on commit 0390b64

Please sign in to comment.