Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: A few smaller follow ups for the edge replication #1094

Merged
merged 21 commits into from
Dec 13, 2023

Conversation

johanandren
Copy link
Member

No description provided.

@@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = {
topics.computeIfAbsent(projectionName, _ => {
val name = topicName(projectionName)
system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sample, we want to use the location id (sweden/stockholm/kungsholmen) as replica id, but that ends up being the projection name and to allow that we must make sure we can use whatever projection names as actor names everywhere where we do.

case Some(metadata) =>
EventProducerPush[AnyRef](settings.entityTypeKey.name, eps, metadata, remoteReplica.grpcClientSettings)
EventProducerPush[AnyRef](settings.selfReplicaId.id, eps, metadata, remoteReplica.grpcClientSettings)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Origin id of push stream should be the replica id, not the entity name (that is already in stream id)

@johanandren johanandren requested a review from patriknw December 7, 2023 15:15
johanandren added a commit that referenced this pull request Dec 7, 2023
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@johanandren
Copy link
Member Author

@patriknw one more thing: I realised the entity ref factory was Akka JAPI function so throws checked exception which is quite annoying for Java API users.

@@ -285,6 +285,10 @@ private[akka] object ReplicationImpl {
val sharding = ClusterSharding(system)
sharding.init(replicatedEntity.entity)

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@johanandren
Copy link
Member Author

I'll add an integration test for Java DSL as well, since that would have caught pretty much all things fixed in this PR

@@ -48,7 +55,9 @@ import akka.stream.javadsl.{ Source => JSource }
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two of these adapters, one here in core and then another in projection-r2dbc. Looks very similar, if not identical?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if the reason we had two was that we didn't want to make the assumption that EventTimestampQuery and LoadEventQuery are always implemented. For r2dbc we know they are.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember, but gut feeling is that this could be simplified. Like one implementation here with an apply that chooses what mixins the adapter should have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a shot pulling those into core, adding factories there and giving them more clear names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved/refactored/renamed in 90430a6


val sourceProviderAdapter = providerWithSlices match {
case delegate: CanTriggerReplay => new ScalaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate)
case _ => new ScalaBySlicesSourceProviderAdapter(providerWithSlices)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. In the previously failing test, this was not the adapter that was used in the end, it was the r2dbc adapter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I check out 2daba9c I get this:

java.lang.IllegalArgumentException: Expected SourceProvider [akka.projection.internal.ScalaBySlicesSourceProviderAdapter] to implement EventTimestampQuery when TimestampOffset is used.

So not the r2dbc adapter either?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugging a bit I can see both of the adapters being created when the test is run. Looking into from where now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

akka.projection.internal.ScalaBySlicesSourceProviderAdapter adapts a Scala provider into Java DSL while akka.projection.r2dbc.internal.BySliceSourceProviderAdapter adapts a Java provider to Scala DSL.

Still feels like both should be in projection core though? (And possibly detect the other one and unwrap in case we do adapt an already adapted source)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 90430a6

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM again, after one small suggestion

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be placed in the other file: JavaToScalaSourceProviderAdapter.scala ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups

@johanandren johanandren merged commit 9985eb9 into main Dec 13, 2023
21 checks passed
@johanandren johanandren deleted the wip-edge-replication-follow-ups branch December 13, 2023 09:23
johanandren added a commit that referenced this pull request Dec 13, 2023
johanandren added a commit that referenced this pull request Dec 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants