-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: A few smaller follow ups for the edge replication #1094
Changes from 20 commits
b239808
1668132
1321e60
68ba092
66f1251
311e0fd
0da5120
e358cac
6e552a5
e191ed9
60026c4
7381a7c
2daba9c
a73f9e6
648e68b
6b1d99f
6b3df47
90430a6
b462361
14b8220
9c6dd36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# internal/renamed | ||
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.ScalaBySlicesSourceProviderAdapter") | ||
# (was internal stable but only because it was historically used by akka-persistence-r2dbc) | ||
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.SourceProviderAdapter") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
/* | ||
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.projection.internal | ||
|
||
import akka.NotUsed | ||
import akka.annotation.InternalApi | ||
import akka.dispatch.ExecutionContexts | ||
import akka.persistence.query.typed.EventEnvelope | ||
import akka.persistence.query.typed.javadsl.EventTimestampQuery | ||
import akka.persistence.query.typed.javadsl.LoadEventQuery | ||
import akka.persistence.query.typed.scaladsl.{ EventTimestampQuery => ScalaEventTimestampQuery } | ||
import akka.persistence.query.typed.scaladsl.{ LoadEventQuery => ScalaLoadEventQuery } | ||
import akka.projection.BySlicesSourceProvider | ||
import akka.projection.javadsl | ||
import akka.projection.scaladsl | ||
import akka.stream.javadsl.{ Source => JSource } | ||
import akka.stream.scaladsl.Source | ||
|
||
import java.time.Instant | ||
import java.util.Optional | ||
import java.util.concurrent.CompletionStage | ||
import java.util.function.Supplier | ||
import scala.compat.java8.FutureConverters._ | ||
import scala.compat.java8.OptionConverters._ | ||
import scala.concurrent.Future | ||
|
||
/** | ||
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider | ||
*/ | ||
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope]( | ||
delegate: javadsl.SourceProvider[Offset, Envelope]) | ||
extends scaladsl.SourceProvider[Offset, Envelope] { | ||
|
||
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { | ||
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, | ||
// it _should_ not be used for the blocking operation of getting offsets themselves | ||
val ec = akka.dispatch.ExecutionContexts.parasitic | ||
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { | ||
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava | ||
} | ||
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec) | ||
} | ||
|
||
def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) | ||
|
||
def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) | ||
} | ||
|
||
/** | ||
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider | ||
*/ | ||
@InternalApi private[projection] object ScalaToJavaBySlicesSourceProviderAdapter { | ||
def apply[Offset, Envelope]( | ||
delegate: scaladsl.SourceProvider[Offset, Envelope] | ||
with BySlicesSourceProvider): javadsl.SourceProvider[Offset, Envelope] = | ||
delegate match { | ||
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => | ||
// just unwrap rather than wrapping further | ||
adapted.delegate | ||
case delegate: CanTriggerReplay => new ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate) | ||
case _ => new ScalaToJavaBySlicesSourceProviderAdapter(delegate) | ||
} | ||
} | ||
|
||
/** | ||
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider | ||
*/ | ||
@InternalApi | ||
private[projection] sealed class ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope] private[internal] ( | ||
val delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider) | ||
extends javadsl.SourceProvider[Offset, Envelope] | ||
with BySlicesSourceProvider | ||
with EventTimestampQuery | ||
with LoadEventQuery { | ||
override def source( | ||
offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] = | ||
delegate | ||
.source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic)) | ||
.map(_.asJava)(ExecutionContexts.parasitic) | ||
.toJava | ||
|
||
override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) | ||
|
||
override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) | ||
|
||
def minSlice: Int = delegate.minSlice | ||
|
||
def maxSlice: Int = delegate.maxSlice | ||
|
||
override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = | ||
delegate match { | ||
case etq: ScalaEventTimestampQuery => | ||
etq.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava | ||
case _ => | ||
throw new IllegalStateException( | ||
s"timestampOf was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.EventTimestampQuery") | ||
} | ||
|
||
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = | ||
delegate match { | ||
case etq: ScalaLoadEventQuery => | ||
etq.loadEnvelope[Event](persistenceId, sequenceNr).toJava | ||
case _ => | ||
throw new IllegalStateException( | ||
s"loadEnvelope was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.LoadEventQuery") | ||
} | ||
|
||
} | ||
|
||
/** | ||
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider | ||
*/ | ||
@InternalApi | ||
private[projection] final class ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] ( | ||
delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with CanTriggerReplay) | ||
extends ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope](delegate) | ||
with CanTriggerReplay { | ||
|
||
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = | ||
delegate.triggerReplay(persistenceId, fromSeqNr) | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,10 @@ | |
package akka.projection.scaladsl | ||
|
||
import java.util.concurrent.ConcurrentHashMap | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
import scala.concurrent.TimeoutException | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
import akka.Done | ||
import akka.actor.typed.ActorRef | ||
import akka.actor.typed.ActorSystem | ||
|
@@ -23,6 +21,9 @@ import akka.projection.ProjectionId | |
import akka.util.JavaDurationConverters._ | ||
import akka.util.Timeout | ||
|
||
import java.net.URLEncoder | ||
import java.nio.charset.StandardCharsets | ||
|
||
object ProjectionManagement extends ExtensionId[ProjectionManagement] { | ||
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system) | ||
|
||
|
@@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { | |
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = { | ||
topics.computeIfAbsent(projectionName, _ => { | ||
val name = topicName(projectionName) | ||
system.systemActorOf(Topic[ProjectionManagementCommand](name), name) | ||
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the sample, we want to use the location id ( |
||
}) | ||
} | ||
|
||
|
@@ -151,4 +152,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { | |
} | ||
retry(() => askSetPaused()) | ||
} | ||
|
||
private def sanitizeActorName(text: String): String = | ||
URLEncoder.encode(text, StandardCharsets.UTF_8.name()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be placed in the other file: JavaToScalaSourceProviderAdapter.scala ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ups