-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: A few smaller follow ups for the edge replication #1094
Conversation
@@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { | |||
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = { | |||
topics.computeIfAbsent(projectionName, _ => { | |||
val name = topicName(projectionName) | |||
system.systemActorOf(Topic[ProjectionManagementCommand](name), name) | |||
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the sample, we want to use the location id (sweden/stockholm/kungsholmen
) as replica id, but that ends up being the projection name and to allow that we must make sure we can use whatever projection names as actor names everywhere where we do.
case Some(metadata) => | ||
EventProducerPush[AnyRef](settings.entityTypeKey.name, eps, metadata, remoteReplica.grpcClientSettings) | ||
EventProducerPush[AnyRef](settings.selfReplicaId.id, eps, metadata, remoteReplica.grpcClientSettings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Origin id of push stream should be the replica id, not the entity name (that is already in stream id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala
Outdated
Show resolved
Hide resolved
@patriknw one more thing: I realised the entity ref factory was Akka JAPI function so throws checked exception which is quite annoying for Java API users. |
@@ -285,6 +285,10 @@ private[akka] object ReplicationImpl { | |||
val sharding = ClusterSharding(system) | |||
sharding.init(replicatedEntity.entity) | |||
|
|||
if (settings.initialConsumerFilter.nonEmpty) { | |||
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…to Java API EventProducerPushDestination
I'll add an integration test for Java DSL as well, since that would have caught pretty much all things fixed in this PR |
@@ -48,7 +55,9 @@ import akka.stream.javadsl.{ Source => JSource } | |||
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two of these adapters, one here in core and then another in projection-r2dbc. Looks very similar, if not identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if the reason we had two was that we didn't want to make the assumption that EventTimestampQuery and LoadEventQuery are always implemented. For r2dbc we know they are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember, but gut feeling is that this could be simplified. Like one implementation here with an apply that chooses what mixins the adapter should have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll give it a shot pulling those into core, adding factories there and giving them more clear names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved/refactored/renamed in 90430a6
|
||
val sourceProviderAdapter = providerWithSlices match { | ||
case delegate: CanTriggerReplay => new ScalaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate) | ||
case _ => new ScalaBySlicesSourceProviderAdapter(providerWithSlices) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. In the previously failing test, this was not the adapter that was used in the end, it was the r2dbc adapter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I check out 2daba9c I get this:
java.lang.IllegalArgumentException: Expected SourceProvider [akka.projection.internal.ScalaBySlicesSourceProviderAdapter] to implement EventTimestampQuery when TimestampOffset is used.
So not the r2dbc adapter either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debugging a bit I can see both of the adapters being created when the test is run. Looking into from where now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
akka.projection.internal.ScalaBySlicesSourceProviderAdapter
adapts a Scala provider into Java DSL while akka.projection.r2dbc.internal.BySliceSourceProviderAdapter
adapts a Java provider to Scala DSL.
Still feels like both should be in projection core though? (And possibly detect the other one and unwrap in case we do adapt an already adapted source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 90430a6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM again, after one small suggestion
/** | ||
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider | ||
*/ | ||
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be placed in the other file: JavaToScalaSourceProviderAdapter.scala ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ups
No description provided.