Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: A few smaller follow ups for the edge replication #1094

Merged
merged 21 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@

package akka.projection.internal

import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
import akka.NotUsed
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.javadsl.EventTimestampQuery
import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.persistence.query.typed.scaladsl.{ EventTimestampQuery => ScalaEventTimestampQuery }
import akka.persistence.query.typed.scaladsl.{ LoadEventQuery => ScalaLoadEventQuery }
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.scaladsl.Source
import akka.stream.javadsl.{ Source => JSource }
import akka.stream.scaladsl.Source

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
Expand Down Expand Up @@ -48,7 +55,9 @@ import akka.stream.javadsl.{ Source => JSource }
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two of these adapters, one here in core and then another in projection-r2dbc. Looks very similar, if not identical?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if the reason we had two was that we didn't want to make the assumption that EventTimestampQuery and LoadEventQuery are always implemented. For r2dbc we know they are.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember, but gut feeling is that this could be simplified. Like one implementation here with an apply that chooses what mixins the adapter should have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a shot pulling those into core, adding factories there and giving them more clear names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved/refactored/renamed in 90430a6

delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider)
extends javadsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider {
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
override def source(
offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] =
delegate
Expand All @@ -63,4 +72,35 @@ import akka.stream.javadsl.{ Source => JSource }
def minSlice: Int = delegate.minSlice

def maxSlice: Int = delegate.maxSlice

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
delegate match {
case etq: ScalaEventTimestampQuery =>
etq.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava
case _ =>
throw new IllegalStateException(
s"timestampOf was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.EventTimestampQuery")
}

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
delegate match {
case etq: ScalaLoadEventQuery =>
etq.loadEnvelope[Event](persistenceId, sequenceNr).toJava
case _ =>
throw new IllegalStateException(
s"loadEnvelope was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.LoadEventQuery")
}

}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope](
delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with CanTriggerReplay)
extends ScalaBySlicesSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {

override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
package akka.projection.scaladsl

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -23,6 +21,9 @@ import akka.projection.ProjectionId
import akka.util.JavaDurationConverters._
import akka.util.Timeout

import java.net.URLEncoder
import java.nio.charset.StandardCharsets

object ProjectionManagement extends ExtensionId[ProjectionManagement] {
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system)

Expand Down Expand Up @@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = {
topics.computeIfAbsent(projectionName, _ => {
val name = topicName(projectionName)
system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sample, we want to use the location id (sweden/stockholm/kungsholmen) as replica id, but that ends up being the projection name and to allow that we must make sure we can use whatever projection names as actor names everywhere where we do.

})
}

Expand Down Expand Up @@ -151,4 +152,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
}
retry(() => askSetPaused())
}

private def sanitizeActorName(text: String): String =
URLEncoder.encode(text, StandardCharsets.UTF_8.name())
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
.withEdgeReplication(true)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
}

selfReplicaId match {
Expand Down
Loading
Loading