From 9ff1e22263bead2414f4d3eeebe670d9e21ce025 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Thu, 26 Sep 2024 08:54:44 -0500 Subject: [PATCH] use netty scheduler for flushing --- .../scala/netty/ActorChannelConnector.scala | 17 ++++++++++------- src/main/scala/netty/NettyServer.scala | 16 ++++++++-------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 0970d168..c4dfe091 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -6,6 +6,7 @@ import io.netty.channel.* import io.netty.handler.codec.http.websocketx.* import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener } import org.apache.pekko.actor.typed.{ ActorRef, Scheduler } +import java.util.concurrent.TimeUnit import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key @@ -13,7 +14,8 @@ import lila.ws.netty.ProtocolHandler.key final private class ActorChannelConnector( clients: ActorRef[Clients.Control], staticConfig: com.typesafe.config.Config, - settings: util.SettingStore + settings: util.SettingStore, + workers: EventLoopGroup )(using scheduler: Scheduler, ec: Executor): private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]() @@ -30,7 +32,7 @@ final private class ActorChannelConnector( monitor.config.interval.update(interval.get()) monitor.config.maxDelay.update(maxDelay.get()) - scheduler.scheduleOnce(1 second, () => flush()) + workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -77,8 +79,9 @@ final private class ActorChannelConnector( case _ => channelsToFlush = 0 - val nextInterval = - if !config.alwaysFlush() then config.interval.get().millis - else if flushQ.isEmpty then 1.second // hibernate - else 1.millis // interval is 0 but we still need to empty the queue - scheduler.scheduleOnce(nextInterval, () => flush()) + val nextIntervalMillis = + if !config.alwaysFlush() then config.interval.get() + else if flushQ.isEmpty then 1 // hibernate + else 1 // interval is 0 but we still need to empty the queue + + workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index dc4a7c09..ef05d75d 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -16,18 +16,18 @@ final class NettyServer( config: Config, settings: util.SettingStore )(using Executor, Scheduler): - private val logger = Logger(getClass) - private val connector = ActorChannelConnector(clients, config, settings) + private val logger = Logger(getClass) + private val threads = config.getInt("netty.threads") + private val (parent, workers, channelClass) = + if System.getProperty("os.name").toLowerCase.startsWith("mac") then + (new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel]) + else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel]) + private val connector = ActorChannelConnector(clients, config, settings, workers) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") - val threads = config.getInt("netty.threads") - val (parent, workers, channelClass) = - if System.getProperty("os.name").toLowerCase.startsWith("mac") then - (new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel]) - else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel]) + val port = config.getInt("http.port") try val boot = new ServerBootstrap boot