Skip to content

Commit

Permalink
Add withRouter to SSESink (#749)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcai-stripe authored Feb 12, 2025
1 parent 902db20 commit d2f7ff7
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import io.reactivex.mantis.network.push.Router;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class ServerSentEventsSink<T> implements SelfDocumentingSink<T> {
private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
private int port = -1;
private final MantisPropertiesLoader propService;
private final Router<T> router;

private PushServerSse<T, Context> pushServerSse;
private HttpServer<ByteBuf, ServerSentEvent> httpServer;
Expand All @@ -76,6 +78,7 @@ public ServerSentEventsSink(Func1<T, String> encoder) {
this.predicate = predicate;
this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
this.subscribeProcessor = null;
this.router = null;
}

ServerSentEventsSink(Builder<T> builder) {
Expand All @@ -86,6 +89,7 @@ public ServerSentEventsSink(Func1<T, String> encoder) {
this.requestPostprocessor = builder.requestPostprocessor;
this.subscribeProcessor = builder.subscribeProcessor;
this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
this.router = builder.router;
}

@Override
Expand Down Expand Up @@ -152,7 +156,7 @@ public void call(Context context, PortRequest portRequest, final Observable<T> o
String serverName = "SseSink";
ServerConfig.Builder<T> config = new ServerConfig.Builder<T>()
.name(serverName)
.groupRouter(Routers.roundRobinSse(serverName, encoder))
.groupRouter(router != null ? router : Routers.roundRobinSse(serverName, encoder))
.port(port)
.metricsRegistry(context.getMetricsRegistry())
.maxChunkTimeMSec(maxReadTime())
Expand Down Expand Up @@ -249,6 +253,7 @@ public static class Builder<T> {
private Func1<Throwable, String> errorEncoder = Throwable::getMessage;
private Predicate<T> predicate;
private Func2<Map<String, List<String>>, Context, Void> subscribeProcessor;
private Router<T> router;

public Builder<T> withEncoder(Func1<T, String> encoder) {
this.encoder = encoder;
Expand Down Expand Up @@ -281,6 +286,11 @@ public Builder<T> withRequestPostprocessor(Func2<Map<String, List<String>>, Cont
return this;
}

public Builder<T> withRouter(Router<T> router) {
this.router = router;
return this;
}

public ServerSentEventsSink<T> build() {
return new ServerSentEventsSink<>(this);
}
Expand Down

0 comments on commit d2f7ff7

Please sign in to comment.