Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: A few smaller follow ups for the edge replication #1094

Merged
merged 21 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.projection.internal.GroupedHandlerStrategy
import akka.projection.internal.HandlerAdapter
import akka.projection.internal.NoopStatusObserver
import akka.projection.internal.SingleHandlerStrategy
import akka.projection.internal.SourceProviderAdapter
import akka.projection.internal.JavaToScalaSourceProviderAdapter
import akka.projection.javadsl.AtLeastOnceFlowProjection
import akka.projection.javadsl.AtLeastOnceProjection
import akka.projection.javadsl.AtMostOnceProjection
Expand Down Expand Up @@ -52,7 +52,7 @@ object CassandraProjection {
handler: Supplier[Handler[Envelope]]): AtLeastOnceProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
Expand All @@ -76,7 +76,7 @@ object CassandraProjection {
handler: Supplier[Handler[java.util.List[Envelope]]]): GroupedProjection[Offset, Envelope] =
new CassandraProjectionImpl[Offset, Envelope](
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy =
Expand Down Expand Up @@ -112,7 +112,7 @@ object CassandraProjection {
: AtLeastOnceFlowProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
Expand All @@ -130,7 +130,7 @@ object CassandraProjection {
handler: Supplier[Handler[Envelope]]): AtMostOnceProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtMostOnce(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# internal/renamed
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.ScalaBySlicesSourceProviderAdapter")
# (was internal stable but only because it was historically used by akka-persistence-r2dbc)
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.SourceProviderAdapter")
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,45 @@
* Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.r2dbc.internal
package akka.projection.internal

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.concurrent.Future
import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.scaladsl.Source

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._

import scala.compat.java8.OptionConverters._
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import scala.concurrent.Future
@InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter {
def apply[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope]): scaladsl.SourceProvider[Offset, Envelope] =
delegate match {
case adapted: ScalaToJavaBySlicesSourceProviderAdapter[_, _] =>
// just unwrap rather than wrapping further
adapted.delegate
case delegate: BySlicesSourceProvider with CanTriggerReplay =>
new JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay(delegate)
case _: BySlicesSourceProvider => new JavaToScalaBySliceSourceProviderAdapter(delegate)
case _ => new JavaToScalaSourceProviderAdapter(delegate)
}
}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
@InternalApi private[projection] class BySliceSourceProviderAdapter[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope])
@InternalApi private[projection] sealed class JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope] private[internal] (
val delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider
with EventTimestampQuery
Expand Down Expand Up @@ -75,4 +87,18 @@ import akka.projection.BySlicesSourceProvider
s"Expected SourceProvider [${delegate.getClass.getName}] to implement " +
s"EventTimestampQuery when LoadEventQuery is used."))
}

}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider that also implements
* CanTriggerReplay
*/
@InternalApi
private[projection] final class JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] (
delegate: javadsl.SourceProvider[Offset, Envelope] with CanTriggerReplay)
extends JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.internal

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.javadsl.EventTimestampQuery
import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.persistence.query.typed.scaladsl.{ EventTimestampQuery => ScalaEventTimestampQuery }
import akka.persistence.query.typed.scaladsl.{ LoadEventQuery => ScalaLoadEventQuery }
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.javadsl.{ Source => JSource }
import akka.stream.scaladsl.Source

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be placed in the other file: JavaToScalaSourceProviderAdapter.scala ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups

delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)
}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi private[projection] object ScalaToJavaBySlicesSourceProviderAdapter {
def apply[Offset, Envelope](
delegate: scaladsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider): javadsl.SourceProvider[Offset, Envelope] =
delegate match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] =>
// just unwrap rather than wrapping further
adapted.delegate
case delegate: CanTriggerReplay => new ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate)
case _ => new ScalaToJavaBySlicesSourceProviderAdapter(delegate)
}
}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi
private[projection] sealed class ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope] private[internal] (
val delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider)
extends javadsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
override def source(
offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] =
delegate
.source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic))
.map(_.asJava)(ExecutionContexts.parasitic)
.toJava

override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)

def minSlice: Int = delegate.minSlice

def maxSlice: Int = delegate.maxSlice

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
delegate match {
case etq: ScalaEventTimestampQuery =>
etq.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava
case _ =>
throw new IllegalStateException(
s"timestampOf was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.EventTimestampQuery")
}

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
delegate match {
case etq: ScalaLoadEventQuery =>
etq.loadEnvelope[Event](persistenceId, sequenceNr).toJava
case _ =>
throw new IllegalStateException(
s"loadEnvelope was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.LoadEventQuery")
}

}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi
private[projection] final class ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] (
delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with CanTriggerReplay)
extends ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {

override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
package akka.projection.scaladsl

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -23,6 +21,9 @@ import akka.projection.ProjectionId
import akka.util.JavaDurationConverters._
import akka.util.Timeout

import java.net.URLEncoder
import java.nio.charset.StandardCharsets

object ProjectionManagement extends ExtensionId[ProjectionManagement] {
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system)

Expand Down Expand Up @@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = {
topics.computeIfAbsent(projectionName, _ => {
val name = topicName(projectionName)
system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sample, we want to use the location id (sweden/stockholm/kungsholmen) as replica id, but that ends up being the projection name and to allow that we must make sure we can use whatever projection names as actor names everywhere where we do.

})
}

Expand Down Expand Up @@ -151,4 +152,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
}
retry(() => askSetPaused())
}

private def sanitizeActorName(text: String): String =
URLEncoder.encode(text, StandardCharsets.UTF_8.name())
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
.withEdgeReplication(true)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
}

selfReplicaId match {
Expand Down
Loading