diff --git a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala index abda348f3..c8995639a 100644 --- a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala +++ b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala @@ -21,7 +21,7 @@ import akka.projection.internal.GroupedHandlerStrategy import akka.projection.internal.HandlerAdapter import akka.projection.internal.NoopStatusObserver import akka.projection.internal.SingleHandlerStrategy -import akka.projection.internal.SourceProviderAdapter +import akka.projection.internal.JavaToScalaSourceProviderAdapter import akka.projection.javadsl.AtLeastOnceFlowProjection import akka.projection.javadsl.AtLeastOnceProjection import akka.projection.javadsl.AtMostOnceProjection @@ -52,7 +52,7 @@ object CassandraProjection { handler: Supplier[Handler[Envelope]]): AtLeastOnceProjection[Offset, Envelope] = new CassandraProjectionImpl( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), settingsOpt = None, restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), @@ -76,7 +76,7 @@ object CassandraProjection { handler: Supplier[Handler[java.util.List[Envelope]]]): GroupedProjection[Offset, Envelope] = new CassandraProjectionImpl[Offset, Envelope]( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), settingsOpt = None, restartBackoffOpt = None, offsetStrategy = @@ -112,7 +112,7 @@ object CassandraProjection { : AtLeastOnceFlowProjection[Offset, Envelope] = new CassandraProjectionImpl( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), settingsOpt = None, restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), @@ -130,7 +130,7 @@ object CassandraProjection { handler: Supplier[Handler[Envelope]]): AtMostOnceProjection[Offset, Envelope] = new CassandraProjectionImpl( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), settingsOpt = None, restartBackoffOpt = None, offsetStrategy = AtMostOnce(), diff --git a/akka-projection-core/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes b/akka-projection-core/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes new file mode 100644 index 000000000..4a1b73200 --- /dev/null +++ b/akka-projection-core/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes @@ -0,0 +1,4 @@ +# internal/renamed +ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.ScalaBySlicesSourceProviderAdapter") +# (was internal stable but only because it was historically used by akka-persistence-r2dbc) +ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.SourceProviderAdapter") \ No newline at end of file diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala b/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala similarity index 54% rename from akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala rename to akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala index cfdec0df1..4220130a2 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala @@ -2,33 +2,68 @@ * Copyright (C) 2022 - 2023 Lightbend Inc. */ -package akka.projection.r2dbc.internal +package akka.projection.internal -import java.time.Instant -import java.util.Optional -import java.util.concurrent.CompletionStage -import java.util.function.Supplier -import scala.concurrent.Future import akka.NotUsed import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.query.typed.scaladsl.EventTimestampQuery +import akka.persistence.query.typed.scaladsl.LoadEventQuery +import akka.projection.BySlicesSourceProvider import akka.projection.javadsl import akka.projection.scaladsl import akka.stream.scaladsl.Source +import java.time.Instant +import java.util.Optional +import java.util.concurrent.CompletionStage +import java.util.function.Supplier import scala.compat.java8.FutureConverters._ - import scala.compat.java8.OptionConverters._ -import akka.persistence.query.typed.EventEnvelope -import akka.persistence.query.typed.scaladsl.EventTimestampQuery -import akka.persistence.query.typed.scaladsl.LoadEventQuery -import akka.projection.BySlicesSourceProvider +import scala.concurrent.Future + +@InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter { + def apply[Offset, Envelope]( + delegate: javadsl.SourceProvider[Offset, Envelope]): scaladsl.SourceProvider[Offset, Envelope] = + delegate match { + case adapted: ScalaToJavaBySlicesSourceProviderAdapter[_, _] => + // just unwrap rather than wrapping further + adapted.delegate + case delegate: BySlicesSourceProvider with CanTriggerReplay => + new JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay(delegate) + case _: BySlicesSourceProvider => new JavaToScalaBySliceSourceProviderAdapter(delegate) + case _ => new JavaToScalaSourceProviderAdapter(delegate) + } +} /** * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider */ -@InternalApi private[projection] class BySliceSourceProviderAdapter[Offset, Envelope]( +private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope]( delegate: javadsl.SourceProvider[Offset, Envelope]) + extends scaladsl.SourceProvider[Offset, Envelope] { + + def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { + // the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, + // it _should_ not be used for the blocking operation of getting offsets themselves + val ec = akka.dispatch.ExecutionContexts.parasitic + val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { + override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava + } + delegate.source(offsetAdapter).toScala.map(_.asScala)(ec) + } + + def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) + + def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) +} + +/** + * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider + */ +@InternalApi private[projection] sealed class JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope] private[internal] ( + val delegate: javadsl.SourceProvider[Offset, Envelope]) extends scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with EventTimestampQuery @@ -75,4 +110,18 @@ import akka.projection.BySlicesSourceProvider s"Expected SourceProvider [${delegate.getClass.getName}] to implement " + s"EventTimestampQuery when LoadEventQuery is used.")) } + +} + +/** + * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider that also implements + * CanTriggerReplay + */ +@InternalApi +private[projection] final class JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] ( + delegate: javadsl.SourceProvider[Offset, Envelope] with CanTriggerReplay) + extends JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope](delegate) + with CanTriggerReplay { + override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = + delegate.triggerReplay(persistenceId, fromSeqNr) } diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala b/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala new file mode 100644 index 000000000..c20c4df6d --- /dev/null +++ b/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.projection.internal + +import akka.NotUsed +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.query.typed.javadsl.EventTimestampQuery +import akka.persistence.query.typed.javadsl.LoadEventQuery +import akka.persistence.query.typed.scaladsl.{ EventTimestampQuery => ScalaEventTimestampQuery } +import akka.persistence.query.typed.scaladsl.{ LoadEventQuery => ScalaLoadEventQuery } +import akka.projection.BySlicesSourceProvider +import akka.projection.javadsl +import akka.projection.scaladsl +import akka.stream.javadsl.{ Source => JSource } + +import java.time.Instant +import java.util.Optional +import java.util.concurrent.CompletionStage +import java.util.function.Supplier +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ + +/** + * INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider + */ +@InternalApi private[projection] object ScalaToJavaBySlicesSourceProviderAdapter { + def apply[Offset, Envelope]( + delegate: scaladsl.SourceProvider[Offset, Envelope] + with BySlicesSourceProvider): javadsl.SourceProvider[Offset, Envelope] = + delegate match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => + // just unwrap rather than wrapping further + adapted.delegate + case delegate: CanTriggerReplay => new ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate) + case _ => new ScalaToJavaBySlicesSourceProviderAdapter(delegate) + } +} + +/** + * INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider + */ +@InternalApi +private[projection] sealed class ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope] private[internal] ( + val delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider) + extends javadsl.SourceProvider[Offset, Envelope] + with BySlicesSourceProvider + with EventTimestampQuery + with LoadEventQuery { + override def source( + offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] = + delegate + .source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic)) + .map(_.asJava)(ExecutionContexts.parasitic) + .toJava + + override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) + + override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) + + def minSlice: Int = delegate.minSlice + + def maxSlice: Int = delegate.maxSlice + + override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = + delegate match { + case etq: ScalaEventTimestampQuery => + etq.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava + case _ => + throw new IllegalStateException( + s"timestampOf was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.EventTimestampQuery") + } + + override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = + delegate match { + case etq: ScalaLoadEventQuery => + etq.loadEnvelope[Event](persistenceId, sequenceNr).toJava + case _ => + throw new IllegalStateException( + s"loadEnvelope was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.LoadEventQuery") + } + +} + +/** + * INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider + */ +@InternalApi +private[projection] final class ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] ( + delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with CanTriggerReplay) + extends ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope](delegate) + with CanTriggerReplay { + + override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = + delegate.triggerReplay(persistenceId, fromSeqNr) +} diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala b/akka-projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala deleted file mode 100644 index 6a18f8409..000000000 --- a/akka-projection-core/src/main/scala/akka/projection/internal/SourceProviderAdapter.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2020-2023 Lightbend Inc. - */ - -package akka.projection.internal - -import java.util.Optional -import java.util.concurrent.CompletionStage -import java.util.function.Supplier -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ -import scala.concurrent.Future -import akka.NotUsed -import akka.annotation.InternalApi -import akka.annotation.InternalStableApi -import akka.dispatch.ExecutionContexts -import akka.projection.BySlicesSourceProvider -import akka.projection.javadsl -import akka.projection.scaladsl -import akka.stream.scaladsl.Source -import akka.stream.javadsl.{ Source => JSource } - -/** - * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider - */ -@InternalStableApi private[projection] class SourceProviderAdapter[Offset, Envelope]( - delegate: javadsl.SourceProvider[Offset, Envelope]) - extends scaladsl.SourceProvider[Offset, Envelope] { - - def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { - // the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, - // it _should_ not be used for the blocking operation of getting offsets themselves - val ec = akka.dispatch.ExecutionContexts.parasitic - val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { - override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava - } - delegate.source(offsetAdapter).toScala.map(_.asScala)(ec) - } - - def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) - - def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) -} - -/** - * INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider - */ -@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope]( - delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider) - extends javadsl.SourceProvider[Offset, Envelope] - with BySlicesSourceProvider { - override def source( - offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] = - delegate - .source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic)) - .map(_.asJava)(ExecutionContexts.parasitic) - .toJava - - override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) - - override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) - - def minSlice: Int = delegate.minSlice - - def maxSlice: Int = delegate.maxSlice -} diff --git a/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala b/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala index 55248416e..17128ad14 100644 --- a/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala +++ b/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala @@ -5,12 +5,10 @@ package akka.projection.scaladsl import java.util.concurrent.ConcurrentHashMap - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration - import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -23,6 +21,9 @@ import akka.projection.ProjectionId import akka.util.JavaDurationConverters._ import akka.util.Timeout +import java.net.URLEncoder +import java.nio.charset.StandardCharsets + object ProjectionManagement extends ExtensionId[ProjectionManagement] { def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system) @@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = { topics.computeIfAbsent(projectionName, _ => { val name = topicName(projectionName) - system.systemActorOf(Topic[ProjectionManagementCommand](name), name) + system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name)) }) } @@ -151,4 +152,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { } retry(() => askSetPaused()) } + + private def sanitizeActorName(text: String): String = + URLEncoder.encode(text, StandardCharsets.UTF_8.name()) } diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala index ae9cc98b7..61d29fc94 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala @@ -169,7 +169,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf) otherReplicas, 10.seconds, 8, - R2dbcReplication()) + R2dbcReplication()(replicaSystem)) .withEdgeReplication(true) } @@ -198,7 +198,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf) otherReplicas, 10.seconds, 8, - R2dbcReplication()) + R2dbcReplication()(replicaSystem)) } selfReplicaId match { diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationJavaDSLIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationJavaDSLIntegrationSpec.scala new file mode 100644 index 000000000..568acec65 --- /dev/null +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationJavaDSLIntegrationSpec.scala @@ -0,0 +1,512 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.javadsl.ClusterSharding +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.javadsl.Http +import akka.persistence.typed.ReplicaId +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestData +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.consumer.ConsumerFilter.IncludeTags +import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.javadsl.EdgeReplication +import akka.projection.grpc.replication.javadsl.Replica +import akka.projection.grpc.replication.javadsl.Replication +import akka.projection.grpc.replication.javadsl.ReplicationSettings +import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.r2dbc.javadsl.R2dbcReplication +import akka.testkit.SocketUtil +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +import java.time.Duration +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters._ + +object EdgeReplicationJavaDSLIntegrationSpec { + + private val CloudReplicaA = ReplicaId("DCA") + private val CloudReplicaB = ReplicaId("DCB") + private val EdgeReplicaC = ReplicaId("DCC") + private val EdgeReplicaD = ReplicaId("DCD") + +} + +class EdgeReplicationJavaDSLIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationJavaSpecA", + ReplicationJavaDSLIntegrationSpec + .config(EdgeReplicationJavaDSLIntegrationSpec.CloudReplicaA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing + with TestData { + import EdgeReplicationJavaDSLIntegrationSpec._ + import ReplicationJavaDSLIntegrationSpec.LWWHelloWorld + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[EdgeReplicationJavaDSLIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val askTimeout = Duration.ofSeconds(3) + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationJavaSpecB", + ReplicationJavaDSLIntegrationSpec.config(CloudReplicaB).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationJavaSpecC", + ReplicationJavaDSLIntegrationSpec.config(EdgeReplicaC).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationJavaSpecD", + ReplicationJavaDSLIntegrationSpec.config(EdgeReplicaD).withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) + private val allDcsAndPorts = Seq(CloudReplicaA, CloudReplicaB, EdgeReplicaC, EdgeReplicaD).zip(grpcPorts) + private val allReplicas = allDcsAndPorts.map { + case (id, port) => + Replica.create(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) + }.toSet + + private val testKitsPerDc = Map( + CloudReplicaA -> testKit, + CloudReplicaB -> ActorTestKit(systems(1)), + EdgeReplicaC -> ActorTestKit(systems(2)), + EdgeReplicaD -> ActorTestKit(systems(3))) + private val systemPerDc = + Map(CloudReplicaA -> system, CloudReplicaB -> systems(1), EdgeReplicaC -> systems(2), EdgeReplicaD -> systems(3)) + private val entityIds = Set( + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId) + + override protected def beforeAll(): Unit = { + super.beforeAll() + // We can share the journal to save a bit of work, because the persistence id contains + // the dc so is unique (this is ofc completely synthetic, the whole point of replication + // over grpc is to replicate between different dcs/regions with completely separate databases). + // The offset tables need to be separate though to not get conflicts on projection names + systemPerDc.values.foreach { system => + val r2dbcProjectionSettings = R2dbcProjectionSettings(system) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete")( + _.createStatement(s"delete from ${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")), + 10.seconds) + + } + } + + def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { + def replicationSettings(otherReplicas: Set[Replica]) = { + ReplicationSettings + .create( + classOf[LWWHelloWorld.Command], + LWWHelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings.create(replicaSystem), + otherReplicas.asJava, + Duration.ofSeconds(10), + 8, + R2dbcReplication.create(system)) + .withEdgeReplication(true) + } + + selfReplicaId match { + case CloudReplicaA => + val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaB) + Replication.grpcReplication(replicationSettings(otherReplicas), LWWHelloWorld.create _, replicaSystem) + + case CloudReplicaB => + val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA) + Replication.grpcReplication(replicationSettings(otherReplicas), LWWHelloWorld.create _, replicaSystem) + + case other => + throw new IllegalArgumentException(other.id) + } + } + + def startEdgeReplica( + replicaSystem: ActorSystem[_], + selfReplicaId: ReplicaId): EdgeReplication[LWWHelloWorld.Command] = { + def replicationSettings(otherReplicas: Set[Replica]) = { + ReplicationSettings.create( + classOf[LWWHelloWorld.Command], + LWWHelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings.create(replicaSystem), + otherReplicas.asJava, + Duration.ofSeconds(10), + 8, + R2dbcReplication.create(replicaSystem)) + } + + selfReplicaId match { + case EdgeReplicaC => + val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA) + Replication.grpcEdgeReplication(replicationSettings(otherReplicas), LWWHelloWorld.create _, replicaSystem) + + case EdgeReplicaD => + val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA) + Replication.grpcEdgeReplication(replicationSettings(otherReplicas), LWWHelloWorld.create _, replicaSystem) + + case other => + throw new IllegalArgumentException(other.id) + } + } + + def assertGreeting(entityId: String, expected: String): Unit = { + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding + .get(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get(_), askTimeout) + .toCompletableFuture + .toScala + .futureValue should ===(expected) + }, 10.seconds) + } + } + } + } + + "Replication over gRPC" should { + "form one node clusters" in { + testKitsPerDc.values.foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start replicas" in { + val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { + case (replica, index) => + val system = systems(index) + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + system.name, + replica.grpcClientSettings.defaultPort) + if (replica.replicaId == CloudReplicaA || replica.replicaId == CloudReplicaB) { + + val started = startReplica(system, replica.replicaId) + val grpcPort = grpcPorts(index) + + // start producer server + Http + .get(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .toScala + .map(_.addToCoordinatedShutdown(Duration.ofSeconds(3), system))(system.executionContext) + .map(_ => Done) + } else { + startEdgeReplica(system, replica.replicaId) + Future.successful(Done) + } + }) + + replicasStarted.futureValue + logger.info("All replication/producer services bound") + } + + "replicate directly" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from A1", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from A1") + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from A2", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from A2") + } + + "replicate indirectly" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + // Edge replicas are only connected to CloudReplicaA + ClusterSharding + .get(systemPerDc(CloudReplicaB)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from B1", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from B1") + + ClusterSharding + .get(systemPerDc(CloudReplicaB)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from B2", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from B2") + } + + "replicate both directions" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from A1", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from A1") + + ClusterSharding + .get(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from C1", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from C1") + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from A2", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from A2") + + ClusterSharding + .get(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from C2", _), askTimeout) + .toScala + .futureValue + assertGreeting(entityId, "Hello from C2") + } + + "replicate writes from one dc to the other DCs" in { + systemPerDc.keys.foreach { dc => + withClue(s"from ${dc.id}") { + Future + .sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding + .get(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _), askTimeout) + .toScala + }) + .futureValue + + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding + .get(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply, askTimeout) + .toScala + .futureValue should ===(s"hello 1 from ${dc.id}") + }, 10.seconds) + } + } + } + } + } + } + } + + "replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo => + withClue(s"Greeting $greetingNo") { + Future + .sequence(systemPerDc.keys.map { dc => + withClue(s"from ${dc.id}") { + Future.sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding + .get(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _), askTimeout) + .toScala + }) + } + }) + .futureValue // all updated in roughly parallel + + // All 3 should eventually arrive at the same value + testKit + .createTestProbe() + .awaitAssert( + { + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + testKitsPerDc.values.map { testKit => + val entityRef = ClusterSharding + .get(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + entityRef + .ask(LWWHelloWorld.Get.apply, askTimeout) + .toScala + .futureValue + }.toSet should have size (1) + } + } + }, + 20.seconds) + } + } + } + "use consumer filter on tag" in { + system.log.info("Consumer filter test starting") + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + ConsumerFilter(systemPerDc(EdgeReplicaC)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-C")))) + ConsumerFilter(systemPerDc(EdgeReplicaD)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-D")))) + + // let the filter propagate to producer + Thread.sleep(1000) + system.log.info("Continuing after setting IncludeTags") + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-C", _), askTimeout) + .toScala + .futureValue + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello C", _), askTimeout) + .toScala + .futureValue + + eventually { + ClusterSharding + .get(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_), askTimeout) + .toScala + .futureValue shouldBe "Hello C" + } + + // but not updated in D + ClusterSharding + .get(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_), askTimeout) + .toScala + .futureValue shouldBe "Hello world" + + system.log.info("Verified filter worked, changing tag on entity") + + // change tag + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-D", _), askTimeout) + .toScala + .futureValue + + // previous greeting should be replicated + eventually { + ClusterSharding + .get(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_), askTimeout) + .toScala + .futureValue shouldBe "Hello C" + } + + ClusterSharding + .get(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello D", _), askTimeout) + .toScala + .futureValue + eventually { + ClusterSharding + .get(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask[String](LWWHelloWorld.Get(_), askTimeout) + .toScala + .futureValue shouldBe "Hello D" + } + + // but not updated in C + ClusterSharding + .get(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_), askTimeout) + .toScala + .futureValue shouldBe "Hello C" + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala index 90e2116c0..808446323 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala @@ -10,10 +10,13 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem +import akka.actor.typed.javadsl.ActorContext +import akka.actor.typed.javadsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps import akka.cluster.MemberStatus import akka.cluster.sharding.typed.javadsl.ClusterSharding +import akka.cluster.sharding.typed.javadsl.EntityTypeKey import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.grpc.GrpcClientSettings @@ -28,13 +31,13 @@ import akka.persistence.typed.javadsl.ReplicationContext import akka.projection.grpc.TestContainerConf import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.producer.EventProducerSettings -import akka.projection.grpc.replication import akka.projection.grpc.replication.javadsl.Replica import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors import akka.projection.grpc.replication.javadsl.Replication import akka.projection.grpc.replication.javadsl.ReplicationSettings import akka.projection.r2dbc.R2dbcProjectionSettings import akka.projection.r2dbc.javadsl.R2dbcReplication +import akka.serialization.jackson.CborSerializable import akka.testkit.SocketUtil import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -50,16 +53,13 @@ import scala.concurrent.Future import scala.concurrent.duration.DurationInt import akka.util.ccompat.JavaConverters._ +import java.util + object ReplicationJavaDSLIntegrationSpec { - private def config(dc: ReplicaId): Config = + def config(dc: ReplicaId): Config = ConfigFactory.parseString(s""" akka.actor.provider = cluster - akka.actor { - serialization-bindings { - "${classOf[replication.ReplicationJavaDSLIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json - } - } akka.http.server.preview.enable-http2 = on akka.persistence.r2dbc { query { @@ -90,26 +90,34 @@ object ReplicationJavaDSLIntegrationSpec { object LWWHelloWorld { + val EntityType: EntityTypeKey[Command] = EntityTypeKey.create(classOf[Command], "hello-world") + sealed trait Command case class Get(replyTo: ActorRef[String]) extends Command case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command - sealed trait Event + case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command + + sealed trait Event extends CborSerializable case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + final case class TagChanged(tag: String, timestamp: LwwTime) extends Event + object State { - val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId(""))) + val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "") } - case class State(greeting: String, timestamp: LwwTime) + case class State(greeting: String, timestamp: LwwTime, tag: String) - def create(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = - replicatedBehaviors.setup { replicationContext => new LWWHelloWorldBehavior(replicationContext) } + def create(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = { + Behaviors.setup[Command](context => + replicatedBehaviors.setup { replicationContext => new LWWHelloWorldBehavior(context, replicationContext) }) + } - class LWWHelloWorldBehavior(replicationContext: ReplicationContext) + class LWWHelloWorldBehavior(context: ActorContext[Command], replicationContext: ReplicationContext) extends EventSourcedBehavior[Command, Event, State](replicationContext.persistenceId) { protected def emptyState: State = State.initial @@ -122,6 +130,7 @@ object ReplicationJavaDSLIntegrationSpec { }) .onCommand( classOf[SetGreeting], { (state: State, command: SetGreeting) => + context.getLog.info("Request to change greeting to {}", command.newGreeting) Effect .persist( GreetingChanged( @@ -130,17 +139,55 @@ object ReplicationJavaDSLIntegrationSpec { state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) .thenReply(command.replyTo, _ => Done) }) + .onCommand( + classOf[SetTag], { (state: State, command: SetTag) => + context.getLog.info("Request to change tag to {}", command.tag) + Effect + .persist( + TagChanged( + tag = command.tag, + timestamp = + state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + .thenReply(command.replyTo, _ => Done) + }) .build() protected def eventHandler(): EventHandler[State, Event] = newEventHandlerBuilder() .forAnyState() - .onEvent(classOf[GreetingChanged], { (currentState: State, event: GreetingChanged) => - if (event.timestamp.isAfter(currentState.timestamp)) State(event.greeting, event.timestamp) - else currentState - }) + .onEvent( + classOf[GreetingChanged], { (currentState: State, event: GreetingChanged) => + if (event.timestamp.isAfter(currentState.timestamp)) { + context.getLog.info("Changing greeting to {}", event.greeting) + State(event.greeting, event.timestamp, currentState.tag) + } else { + context.getLog + .info( + "Ignoring greeting change to {} since state was changed after {}", + event.greeting, + currentState.timestamp) + currentState + } + }) + .onEvent( + classOf[TagChanged], { (currentState: State, event: TagChanged) => + if (event.timestamp.isAfter(currentState.timestamp)) { + context.getLog.info("Changing tag to {}", event.tag) + State(currentState.greeting, event.timestamp, event.tag) + } else { + context.getLog + .info("Ignoring tag change to {} since state was changed after {}", event.tag, currentState.timestamp) + currentState + } + + }) .build() + + override def tagsFor(state: State, event: Event): util.Set[String] = + if (state.tag.isEmpty) util.Set.of() + else util.Set.of(state.tag) } + } } @@ -208,15 +255,17 @@ class ReplicationJavaDSLIntegrationSpec(testContainerConf: TestContainerConf) } def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { - val settings = ReplicationSettings.create( - classOf[LWWHelloWorld.Command], - "hello-world-java", - selfReplicaId, - EventProducerSettings.create(replicaSystem), - allReplicas.toSet.asJava: java.util.Set[Replica], - Duration.ofSeconds(10), - 8, - R2dbcReplication.create(system)) + val settings = ReplicationSettings + .create( + classOf[LWWHelloWorld.Command], + "hello-world-java", + selfReplicaId, + EventProducerSettings.create(replicaSystem), + allReplicas.toSet.asJava: java.util.Set[Replica], + Duration.ofSeconds(10), + 8, + R2dbcReplication.create(system)) + .withEdgeReplication(true) Replication.grpcReplication(settings, LWWHelloWorld.create _, replicaSystem) } diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.1-M1.backwards.excludes/additional-edge-replication.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.1-M1.backwards.excludes/additional-edge-replication.excludes new file mode 100644 index 000000000..fe4de5b96 --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.1-M1.backwards.excludes/additional-edge-replication.excludes @@ -0,0 +1,12 @@ +# never released +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.projection.grpc.replication.javadsl.Replication.grpcEdgeReplication") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.projection.grpc.replication.javadsl.Replication.grpcEdgeReplication") + +# released but maychange and worth it +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.replication.javadsl.Replication.entityRefFactory") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.replication.javadsl.Replication.entityRefFactory") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.replication.javadsl.EdgeReplication.entityRefFactory") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.projection.grpc.replication.javadsl.EdgeReplication.entityRefFactory") + +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.javadsl.EventProducerPushDestination.this") diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala index d7f8f44c3..31b84d72c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala @@ -21,6 +21,7 @@ import akka.projection.grpc.consumer.scaladsl import akka.util.ccompat.JavaConverters._ import akka.japi.function.{ Function => JapiFunction } import akka.projection.grpc.internal.ProtoAnySerialization.Prefer +import akka.projection.grpc.replication.scaladsl.ReplicationSettings import com.google.protobuf.Descriptors import java.util.Collections @@ -64,7 +65,8 @@ object EventProducerPushDestination { Optional.empty(), Collections.emptyList(), protobufDescriptors, - EventProducerPushDestinationSettings.create(system)) + EventProducerPushDestinationSettings.create(system), + None) def grpcServiceHandler( eventConsumer: EventProducerPushDestination, @@ -104,7 +106,8 @@ object EventProducerPushDestination { .toJava, scalaDestination.filters.asJava, scalaDestination.protobufDescriptors.asJava, - scalaDestination.settings) + scalaDestination.settings, + scalaDestination.replicationSettings) } @@ -116,7 +119,10 @@ final class EventProducerPushDestination private ( val interceptor: Optional[EventDestinationInterceptor], val filters: java.util.List[FilterCriteria], val protobufDescriptors: JList[Descriptors.FileDescriptor], - val settings: EventProducerPushDestinationSettings) { + val settings: EventProducerPushDestinationSettings, + /** INTERNAL API */ + @InternalApi + private[akka] val replicationSettings: Option[ReplicationSettings[_]]) { def withInterceptor(interceptor: EventDestinationInterceptor): EventProducerPushDestination = copy(interceptor = Optional.of(interceptor)) @@ -167,7 +173,8 @@ final class EventProducerPushDestination private ( interceptor, filters, protobufDescriptors, - settings) + settings, + replicationSettings) /** * INTERNAL API @@ -183,5 +190,5 @@ final class EventProducerPushDestination private ( filters.asScala.toVector, protobufDescriptors.asScala.toVector, settings, - None) + replicationSettings) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 9323d8ab7..667fe79f3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -133,7 +133,7 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation val offset = protocolOffsetToOffset(init.offset) log.debugN( - "Starting eventsBySlices stream [{}], [{}], slices [{} - {}], offset [{}]", + "Starting eventsBySlices stream [{}], [{}], slices [{} - {}], offset [{}]{}", producerSource.streamId, producerSource.entityType, init.sliceMin, @@ -141,7 +141,8 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation offset match { case t: TimestampOffset => t.timestamp case _ => offset - }) + }, + init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) val events: Source[EventEnvelope[Any], NotUsed] = eventsBySlicesPerStreamId.get(init.streamId) match { @@ -211,20 +212,24 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) .map { case Some(event) => - log.traceN( - "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", - env.persistenceId, - env.sequenceNr, - env.offset, - event.source) + if (log.isTraceEnabled) + log.traceN( + "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]{}", + env.persistenceId, + env.sequenceNr, + env.offset, + event.source, + init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) StreamOut(StreamOut.Message.Event(event)) case None => - log.traceN( - "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", - env.persistenceId, - env.sequenceNr, - env.offset, - env.source) + if (log.isTraceEnabled) + log.traceN( + "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]{}", + env.persistenceId, + env.sequenceNr, + env.offset, + env.source, + init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) StreamOut( StreamOut.Message.FilteredEvent( FilteredEvent( @@ -234,12 +239,14 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation ProtobufProtocolConversions.offsetToProtoOffset(env.offset)))) } } else { - log.traceN( - "Filtered event, due to origin, from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", - env.persistenceId, - env.sequenceNr, - env.offset, - env.source) + if (log.isTraceEnabled) + log.traceN( + "Filtered event, due to origin, from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]{}", + env.persistenceId, + env.sequenceNr, + env.offset, + env.source, + init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) Future.successful( StreamOut( StreamOut.Message.FilteredEvent( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala index b6dcc94c6..078903c2b 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala @@ -92,6 +92,10 @@ private[akka] object EventPusher { (filter, eventOriginFilterPredicate) case None => + if (eps.replicatedEventOriginFilter.isDefined) + throw new IllegalArgumentException( + s"Entity ${eps.entityType} is not a replicated entity but `replicatedEventOriginFilter` is set") + ( updateFilterFromProto( Filter.empty(eps.settings.topicTagPrefix), @@ -110,17 +114,19 @@ private[akka] object EventPusher { consumerFilter.matches(envelope)) { if (logger.isTraceEnabled()) logger.trace( - "Pushing event persistence id [{}], sequence number [{}]", + "Pushing event persistence id [{}], sequence number [{}]{}", envelope.persistenceId, - envelope.sequenceNr) + envelope.sequenceNr, + startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization) } else { if (logger.isTraceEnabled()) logger.trace( - "Filtering event persistence id [{}], sequence number [{}]", + "Filtering event persistence id [{}], sequence number [{}]{}", envelope.persistenceId, - envelope.sequenceNr) + envelope.sequenceNr, + startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) Future.successful(None) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 1e33e9403..8e2aa2655 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -77,6 +77,7 @@ private[akka] object EventPusherConsumerServiceImpl { { (envelope: EventEnvelope[_], fillSequenceNumberGaps: Boolean) => if (envelope.filtered) { + log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId) Future.successful(Done) } else { envelope.eventMetadata match { @@ -90,7 +91,11 @@ private[akka] object EventPusherConsumerServiceImpl { val entityRef = sharding .entityRefFor(replicationSettings.entityTypeKey, destinationReplicaId.entityId) .asInstanceOf[EntityRef[PublishedEvent]] - val ask = () => + val ask = { () => + log.trace( + "Passing event [{}] for pid [{}] to replicated entity", + envelope.sequenceNr, + envelope.persistenceId) entityRef.ask[Done]( replyTo => PublishedEventImpl( @@ -103,6 +108,7 @@ private[akka] object EventPusherConsumerServiceImpl { replicatedEventMetadata.originReplica, replicatedEventMetadata.version)), Some(replyTo))) + } // try a few times before tearing stream down, forcing the client to restart/reconnect val askResult = akka.pattern.retry( @@ -128,6 +134,7 @@ private[akka] object EventPusherConsumerServiceImpl { } case None => { (envelope: EventEnvelope[_], fillSequenceNumberGaps: Boolean) => + log.trace("Passing event [{}] for pid [{}] to event writer", envelope.sequenceNr, envelope.persistenceId) writerForJournal.askWithStatus[EventWriter.WriteAck]( replyTo => EventWriter.Write( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index 44edfa394..f3cac05ac 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -49,6 +49,8 @@ import akka.stream.scaladsl.FlowWithContext import akka.util.Timeout import org.slf4j.LoggerFactory +import java.net.URLEncoder +import java.nio.charset.StandardCharsets import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -108,6 +110,7 @@ private[akka] object ReplicationImpl { settings.otherReplicas.foreach(startConsumer(_, settings, entityRefFactory)) if (settings.acceptEdgeReplication) { + // Note: duplicated in the Java Replication val pushDestination = EventProducerPushDestination(settings.streamId, protobufDescriptors = Nil).withEdgeReplication(settings) new ReplicationImpl[Command]( @@ -155,7 +158,12 @@ private[akka] object ReplicationImpl { val grpcQuerySettings = { val s = GrpcQuerySettings(settings.streamId) - remoteReplica.additionalQueryRequestMetadata.fold(s)(s.withAdditionalRequestMetadata) + val s2 = + if (settings.initialConsumerFilter.isEmpty) s else s.withInitialConsumerFilter(settings.initialConsumerFilter) + remoteReplica.additionalQueryRequestMetadata match { + case Some(metadata) => s2.withAdditionalRequestMetadata(metadata) + case None => s2 + } } val eventsBySlicesQuery = GrpcReadJournal( grpcQuerySettings, @@ -187,7 +195,7 @@ private[akka] object ReplicationImpl { case Some(role) => defaultWithShardingSettings.withRole(role) } } - ShardedDaemonProcess(system).init(projectionName, remoteReplica.numberOfConsumers, { + ShardedDaemonProcess(system).init(sanitizeActorName(projectionName), remoteReplica.numberOfConsumers, { idx => val sliceRange = sliceRanges(idx) val projectionKey = s"${sliceRange.min}-${sliceRange.max}" @@ -283,6 +291,10 @@ private[akka] object ReplicationImpl { val sharding = ClusterSharding(system) sharding.init(replicatedEntity.entity) + if (settings.initialConsumerFilter.nonEmpty) { + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter) + } + // sharded daemon process for consuming event stream from the other replicas val shardingEntityRefFactory: String => EntityRef[Command] = sharding.entityRefFor(replicatedEntity.entity.typeKey, _) @@ -341,13 +353,13 @@ private[akka] object ReplicationImpl { val epp = remoteReplica.additionalQueryRequestMetadata match { case None => - EventProducerPush[AnyRef](settings.entityTypeKey.name, eps, remoteReplica.grpcClientSettings) + EventProducerPush[AnyRef](settings.selfReplicaId.id, eps, remoteReplica.grpcClientSettings) case Some(metadata) => - EventProducerPush[AnyRef](settings.entityTypeKey.name, eps, metadata, remoteReplica.grpcClientSettings) + EventProducerPush[AnyRef](settings.selfReplicaId.id, eps, metadata, remoteReplica.grpcClientSettings) } ShardedDaemonProcess(system).initWithContext[ProjectionBehavior.Command]( - s"${settings.selfReplicaId.id}EventProducer", + sanitizeActorName(s"${settings.selfReplicaId.id}EventProducer"), // FIXME separate setting for number of producers? remoteReplica.numberOfConsumers, { (context: ShardedDaemonProcessContext) => val sliceRange = sliceRanges(context.processNumber) @@ -370,4 +382,7 @@ private[akka] object ReplicationImpl { } + private def sanitizeActorName(text: String): String = + URLEncoder.encode(text, StandardCharsets.UTF_8.name()) + } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationProjectionProviderAdapter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationProjectionProviderAdapter.scala index 9e56b746e..6d66ebdfd 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationProjectionProviderAdapter.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationProjectionProviderAdapter.scala @@ -15,7 +15,7 @@ import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider } import akka.projection.grpc.replication.scaladsl.{ ReplicationProjectionProvider => SReplicationProjectionProvider } -import akka.projection.internal.ScalaBySlicesSourceProviderAdapter +import akka.projection.internal.ScalaToJavaBySlicesSourceProviderAdapter import akka.projection.scaladsl.{ AtLeastOnceFlowProjection => SAtLeastOnceFlowProjection } import akka.projection.scaladsl.{ SourceProvider => SSourceProvider } import akka.stream.scaladsl.{ FlowWithContext => SFlowWithContext } @@ -37,12 +37,10 @@ private[akka] object ReplicationProjectionProviderAdapter { throw new IllegalArgumentException( s"The source provider is required to implement akka.projection.BySlicesSourceProvider but ${noSlices.getClass} does not") } + + val sourceProviderAdapter = ScalaToJavaBySlicesSourceProviderAdapter(providerWithSlices) val javaProjection = - provider.create( - projectionId, - new ScalaBySlicesSourceProviderAdapter(providerWithSlices), - replicationFlow.asJava, - system) + provider.create(projectionId, sourceProviderAdapter, replicationFlow.asJava, system) javaProjection match { case alsoSProjection: SAtLeastOnceFlowProjection[Offset @unchecked, EventEnvelope[AnyRef] @unchecked] => alsoSProjection diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index dd7ced8de..59d21d760 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -10,14 +10,15 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit +import akka.japi.function.{ Function => JApiFunction } import akka.cluster.sharding.typed.ReplicatedEntity import akka.cluster.sharding.typed.javadsl.Entity import akka.cluster.sharding.typed.javadsl.EntityContext import akka.cluster.sharding.typed.javadsl.EntityRef import akka.cluster.sharding.typed.javadsl.EntityTypeKey +import akka.grpc.javadsl.ServiceHandler import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse -import akka.japi.function.{ Function => JFunction } import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl @@ -28,6 +29,10 @@ import akka.projection.grpc.producer.javadsl.EventProducer import akka.projection.grpc.producer.javadsl.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl +import java.util.function.{ Function => JFunction } +import java.util.Optional +import scala.compat.java8.OptionConverters._ + /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and * replication stream consumers but not the replication endpoint needed to publish events to other replication places. @@ -55,14 +60,14 @@ trait Replication[Command] { * and possibly also regular projections into one producer push destination handler in a set passed to * EventProducerPushDestination.grpcServiceHandler to create a single gRPC endpoint. */ - def eventProducerPushDestination: Option[EventProducerPushDestination] + def eventProducerPushDestination: Optional[EventProducerPushDestination] /** * If only replicating one Replicated Event Sourced Entity and not using * Akka Projection gRPC this endpoint factory can be used to get a partial function * that can be served/bound with an Akka HTTP/2 server */ - def createSingleServiceHandler(): JFunction[HttpRequest, CompletionStage[HttpResponse]] + def createSingleServiceHandler(): JApiFunction[HttpRequest, CompletionStage[HttpResponse]] /** * Entity type key for looking up the entities @@ -73,7 +78,7 @@ trait Replication[Command] { * Shortcut for creating EntityRefs for the sharded Replicated Event Sourced entities for * sending commands. */ - def entityRefFactory: String => EntityRef[Command] + def entityRefFactory: JFunction[String, EntityRef[Command]] } @ApiMayChange @@ -86,7 +91,7 @@ object Replication { */ def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], - replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + replicatedBehaviorFactory: JApiFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { val scalaReplicationSettings = settings.toScala @@ -108,7 +113,6 @@ object Replication { factory .apply(replicationContext.asInstanceOf[ReplicationContext]) .createEventSourcedBehavior() - // MEH .withReplication(replicationContext.asInstanceOf[ReplicationContextImpl]))) })) .toScala) @@ -121,21 +125,30 @@ object Replication { scalaReplication.eventProducerService.transformation.toJava, scalaReplication.eventProducerService.settings) + val jEventProducerPushDestination = + scalaReplication.eventProducerPushDestination.map(EventProducerPushDestination.fromScala).asJava new Replication[Command] { override def eventProducerService: EventProducerSource = jEventProducerSource override def eventProducerSource: EventProducerSource = jEventProducerSource - override def eventProducerPushDestination: Option[EventProducerPushDestination] = - scalaReplication.eventProducerPushDestination.map(EventProducerPushDestination.fromScala) + override def eventProducerPushDestination: Optional[EventProducerPushDestination] = + jEventProducerPushDestination - override def createSingleServiceHandler(): JFunction[HttpRequest, CompletionStage[HttpResponse]] = - EventProducer.grpcServiceHandler(system, jEventProducerSource) + override def createSingleServiceHandler(): JApiFunction[HttpRequest, CompletionStage[HttpResponse]] = { + val handler = EventProducer.grpcServiceHandler(system, jEventProducerSource) + if (jEventProducerPushDestination.isPresent) { + // Fold in edge push gRPC consumer service if enabled + val eventProducerPushHandler = + EventProducerPushDestination.grpcServiceHandler(jEventProducerPushDestination.get(), system) + ServiceHandler.concatOrNotFound(handler, eventProducerPushHandler) + } else handler + } override def entityTypeKey: EntityTypeKey[Command] = scalaReplication.entityTypeKey.asJava - override def entityRefFactory: String => EntityRef[Command] = + override def entityRefFactory: JFunction[String, EntityRef[Command]] = (entityId: String) => scalaReplication.entityRefFactory.apply(entityId).asJava override def toString: String = scalaReplication.toString @@ -154,7 +167,7 @@ object Replication { def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], producerFilter: Predicate[EventEnvelope[Event]], - replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + replicatedBehaviorFactory: JApiFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { grpcReplication(settings.withProducerFilter(producerFilter), replicatedBehaviorFactory, system) @@ -173,7 +186,7 @@ object Replication { def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], topicExpression: String, - replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + replicatedBehaviorFactory: JApiFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { grpcReplication(settings.withProducerFilterTopicExpression(topicExpression), replicatedBehaviorFactory, system) } @@ -189,9 +202,10 @@ object Replication { * * An edge replica can connect to more than one cloud replica for redundancy (but only one is required). */ - def grpcEdgeReplication[Command, Event, State](settings: ReplicationSettings[Command])( - replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( - implicit system: ActorSystem[_]): EdgeReplication[Command] = { + def grpcEdgeReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + replicatedBehaviorFactory: JApiFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + system: ActorSystem[_]): EdgeReplication[Command] = { val scalaReplicationSettings = settings.toScala val replicatedEntity = @@ -216,17 +230,17 @@ object Replication { })) .toScala) - val scalaReplication = - ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system) + val scalaEdgeReplication = + ReplicationImpl.grpcEdgeReplication[Command](scalaReplicationSettings, replicatedEntity)(system) new EdgeReplication[Command] { override def entityTypeKey: EntityTypeKey[Command] = - scalaReplication.entityTypeKey.asJava + scalaEdgeReplication.entityTypeKey.asJava - override def entityRefFactory: String => EntityRef[Command] = - (entityId: String) => scalaReplication.entityRefFactory.apply(entityId).asJava + override def entityRefFactory: JFunction[String, EntityRef[Command]] = + (entityId: String) => scalaEdgeReplication.entityRefFactory.apply(entityId).asJava - override def toString: String = scalaReplication.toString + override def toString: String = scalaEdgeReplication.toString } } @@ -243,5 +257,5 @@ trait EdgeReplication[Command] { * Shortcut for creating EntityRefs for the sharded Replicated Event Sourced entities for * sending commands. */ - def entityRefFactory: String => EntityRef[Command] + def entityRefFactory: JFunction[String, EntityRef[Command]] } diff --git a/akka-projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala b/akka-projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala index 759b901cb..f3e2e6551 100644 --- a/akka-projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala +++ b/akka-projection-jdbc/src/main/scala/akka/projection/jdbc/javadsl/JdbcProjection.scala @@ -21,7 +21,7 @@ import akka.projection.internal.GroupedHandlerStrategy import akka.projection.internal.HandlerAdapter import akka.projection.internal.NoopStatusObserver import akka.projection.internal.SingleHandlerStrategy -import akka.projection.internal.SourceProviderAdapter +import akka.projection.internal.JavaToScalaSourceProviderAdapter import akka.projection.javadsl.AtLeastOnceFlowProjection import akka.projection.javadsl.AtLeastOnceProjection import akka.projection.javadsl.ExactlyOnceProjection @@ -51,7 +51,7 @@ object JdbcProjection { system: ActorSystem[_]): ExactlyOnceProjection[Offset, Envelope] = { val sessionFactory = () => sessionCreator.get() - val javaSourceProvider = new SourceProviderAdapter(sourceProvider) + val javaSourceProvider = new JavaToScalaSourceProviderAdapter(sourceProvider) val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)(system) val adaptedHandler = @@ -104,7 +104,7 @@ object JdbcProjection { new JdbcProjectionImpl( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), sessionFactory = sessionFactory, settingsOpt = None, restartBackoffOpt = None, @@ -141,7 +141,7 @@ object JdbcProjection { new JdbcProjectionImpl( projectionId, - new SourceProviderAdapter(sourceProvider), + new JavaToScalaSourceProviderAdapter(sourceProvider), sessionFactory = sessionFactory, settingsOpt = None, restartBackoffOpt = None, @@ -169,7 +169,7 @@ object JdbcProjection { system: ActorSystem[_]): GroupedProjection[Offset, Envelope] = { val sessionFactory = () => sessionCreator.get() - val javaSourceProvider = new SourceProviderAdapter(sourceProvider) + val javaSourceProvider = new JavaToScalaSourceProviderAdapter(sourceProvider) val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)(system) val adaptedHandler = @@ -215,7 +215,7 @@ object JdbcProjection { system: ActorSystem[_]): GroupedProjection[Offset, Envelope] = { val sessionFactory = () => sessionCreator.get() - val javaSourceProvider = new SourceProviderAdapter(sourceProvider) + val javaSourceProvider = new JavaToScalaSourceProviderAdapter(sourceProvider) val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)(system) new JdbcProjectionImpl( @@ -259,7 +259,7 @@ object JdbcProjection { system: ActorSystem[_]): AtLeastOnceFlowProjection[Offset, Envelope] = { val sessionFactory = () => sessionCreator.get() - val javaSourceProvider = new SourceProviderAdapter(sourceProvider) + val javaSourceProvider = new JavaToScalaSourceProviderAdapter(sourceProvider) val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)(system) new JdbcProjectionImpl( diff --git a/akka-projection-r2dbc/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes b/akka-projection-r2dbc/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes new file mode 100644 index 000000000..2f86e4d0d --- /dev/null +++ b/akka-projection-r2dbc/src/main/mima-filters/1.5.1-M1.backwards.excludes/edge-replication.excludes @@ -0,0 +1,2 @@ +# internal, moved to projection-core/renamed +ProblemFilters.exclude[MissingClassProblem]("akka.projection.r2dbc.internal.BySliceSourceProviderAdapter") \ No newline at end of file diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/javadsl/R2dbcProjection.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/javadsl/R2dbcProjection.scala index b004b7004..8a0cf0d6e 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/javadsl/R2dbcProjection.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/javadsl/R2dbcProjection.scala @@ -6,18 +6,15 @@ package akka.projection.r2dbc.javadsl import java.util.Optional import java.util.function.Supplier - import scala.compat.java8.OptionConverters._ - import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange -import akka.projection.BySlicesSourceProvider import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.internal.GroupedHandlerAdapter import akka.projection.internal.HandlerAdapter -import akka.projection.internal.SourceProviderAdapter +import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter import akka.projection.javadsl.AtLeastOnceFlowProjection import akka.projection.javadsl.AtLeastOnceProjection import akka.projection.javadsl.ExactlyOnceProjection @@ -25,7 +22,6 @@ import akka.projection.javadsl.GroupedProjection import akka.projection.javadsl.Handler import akka.projection.javadsl.SourceProvider import akka.projection.r2dbc.R2dbcProjectionSettings -import akka.projection.r2dbc.internal.BySliceSourceProviderAdapter import akka.projection.r2dbc.internal.R2dbcGroupedHandlerAdapter import akka.projection.r2dbc.internal.R2dbcHandlerAdapter import akka.projection.r2dbc.scaladsl @@ -50,7 +46,7 @@ object R2dbcProjection { .exactlyOnce[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter(sourceProvider), () => new R2dbcHandlerAdapter(handler.get()))(system) .asInstanceOf[ExactlyOnceProjection[Offset, Envelope]] } @@ -79,7 +75,7 @@ object R2dbcProjection { .atLeastOnce[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter(sourceProvider), () => new R2dbcHandlerAdapter(handler.get()))(system) .asInstanceOf[AtLeastOnceProjection[Offset, Envelope]] } @@ -109,7 +105,7 @@ object R2dbcProjection { .atLeastOnceAsync[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter(sourceProvider), () => HandlerAdapter(handler.get()))(system) .asInstanceOf[AtLeastOnceProjection[Offset, Envelope]] } @@ -133,7 +129,7 @@ object R2dbcProjection { .groupedWithin[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter(sourceProvider), () => new R2dbcGroupedHandlerAdapter(handler.get()))(system) .asInstanceOf[GroupedProjection[Offset, Envelope]] } @@ -161,7 +157,7 @@ object R2dbcProjection { .groupedWithinAsync[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter(sourceProvider), () => new GroupedHandlerAdapter(handler.get()))(system) .asInstanceOf[GroupedProjection[Offset, Envelope]] } @@ -197,15 +193,9 @@ object R2dbcProjection { .atLeastOnceFlow[Offset, Envelope]( projectionId, settings.asScala, - adaptSourceProvider(sourceProvider), + JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope](sourceProvider), handler.asScala)(system) .asInstanceOf[AtLeastOnceFlowProjection[Offset, Envelope]] } - private def adaptSourceProvider[Offset, Envelope](sourceProvider: SourceProvider[Offset, Envelope]) = - sourceProvider match { - case _: BySlicesSourceProvider => new BySliceSourceProviderAdapter(sourceProvider) - case _ => new SourceProviderAdapter(sourceProvider) - } - } diff --git a/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/TestProjection.scala b/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/TestProjection.scala index 8180c39bb..22f367b26 100644 --- a/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/TestProjection.scala +++ b/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/TestProjection.scala @@ -16,7 +16,7 @@ import akka.projection.internal.HandlerAdapter import akka.projection.internal.NoopStatusObserver import akka.projection.internal.OffsetStrategy import akka.projection.internal.SingleHandlerStrategy -import akka.projection.internal.SourceProviderAdapter +import akka.projection.internal.JavaToScalaSourceProviderAdapter import akka.projection.testkit.internal.TestInMemoryOffsetStoreImpl import akka.projection.testkit.internal.TestProjectionImpl @@ -45,7 +45,7 @@ object TestProjection { handler: Supplier[akka.projection.javadsl.Handler[Envelope]]): TestProjection[Offset, Envelope] = new TestProjectionImpl( projectionId = projectionId, - sourceProvider = new SourceProviderAdapter(sourceProvider), + sourceProvider = new JavaToScalaSourceProviderAdapter(sourceProvider), handlerStrategy = SingleHandlerStrategy(() => new HandlerAdapter[Envelope](handler.get())), // Disable batching so that `ProjectionTestKit.runWithTestSink` emits 1 `Done` per envelope. offsetStrategy = AtLeastOnce(afterEnvelopes = Some(1)),