Skip to content

Commit

Permalink
use netty scheduler for flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
schlawg committed Sep 26, 2024
1 parent 116fc92 commit 9ff1e22
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
17 changes: 10 additions & 7 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import io.netty.channel.*
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }
import java.util.concurrent.TimeUnit

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

final private class ActorChannelConnector(
clients: ActorRef[Clients.Control],
staticConfig: com.typesafe.config.Config,
settings: util.SettingStore
settings: util.SettingStore,
workers: EventLoopGroup
)(using scheduler: Scheduler, ec: Executor):

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
Expand All @@ -30,7 +32,7 @@ final private class ActorChannelConnector(
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

scheduler.scheduleOnce(1 second, () => flush())
workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS)

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
Expand Down Expand Up @@ -77,8 +79,9 @@ final private class ActorChannelConnector(
case _ =>
channelsToFlush = 0

val nextInterval =
if !config.alwaysFlush() then config.interval.get().millis
else if flushQ.isEmpty then 1.second // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
scheduler.scheduleOnce(nextInterval, () => flush())
val nextIntervalMillis =
if !config.alwaysFlush() then config.interval.get()
else if flushQ.isEmpty then 1 // hibernate
else 1 // interval is 0 but we still need to empty the queue

workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS)
16 changes: 8 additions & 8 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ final class NettyServer(
config: Config,
settings: util.SettingStore
)(using Executor, Scheduler):
private val logger = Logger(getClass)
private val connector = ActorChannelConnector(clients, config, settings)
private val logger = Logger(getClass)
private val threads = config.getInt("netty.threads")
private val (parent, workers, channelClass) =
if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel])
else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel])
private val connector = ActorChannelConnector(clients, config, settings, workers)

def start(): Unit =

logger.info("Start")
val port = config.getInt("http.port")
val threads = config.getInt("netty.threads")
val (parent, workers, channelClass) =
if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel])
else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel])
val port = config.getInt("http.port")
try
val boot = new ServerBootstrap
boot
Expand Down

0 comments on commit 9ff1e22

Please sign in to comment.