Skip to content

Commit

Permalink
fix inverted multiply, divide by zero, and not passing msg to channel…
Browse files Browse the repository at this point in the history
…Emit
  • Loading branch information
schlawg committed Sep 25, 2024
1 parent 7ad53ab commit 42bbe0b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ netty {
threads = 0 # auto

flush {
step = 100 # minimum number of channels to flush per interval
interval-millis = 1 # interval between flush cycles
interval-millis = 1 # interval between flush cycles, set to 0 to disable flush queue
max-delay-millis = 500 # max flush step targets this threshold if passed
step = 100 # minimum number of channels to flush per interval
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ final private class ActorChannelConnector(
private val monitor = Monitor.connector.flush

scheduler.scheduleOnce(1 second, () => flush())

scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
Expand All @@ -36,7 +35,7 @@ final private class ActorChannelConnector(
channel.attr(key.client).set(clientPromise.future)
val channelEmit: ClientEmit = (msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
emitToChannel(channel, withFlush = endpoint.alwaysFlush)
emitToChannel(channel, withFlush = endpoint.alwaysFlush)(msg)
clients ! Clients.Control.Start(endpoint.behavior(channelEmit), clientPromise)
channel.closeFuture.addListener:
new GenericFutureListener[NettyFuture[Void]]:
Expand All @@ -63,7 +62,7 @@ final private class ActorChannelConnector(

private def flush(): Unit =
val qSize = flushQ.size
val maxDelayFactor = maxDelay.get().toDouble / interval.get()
val maxDelayFactor = interval.get().toDouble / maxDelay.get().atLeast(1)
var channelsToFlush = step.get().atLeast((qSize * maxDelayFactor).toInt)

monitor.qSize.record(qSize)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ final class NettyServer(
config: Config,
settings: util.SettingStore
)(using Executor, Scheduler):
private val logger = Logger(getClass)
private val logger = Logger(getClass)
private val connector = ActorChannelConnector(clients, config, settings)

def start(): Unit =

logger.info("Start")

val port = config.getInt("http.port")
val threads = config.getInt("netty.threads")
val (parent, workers, channelClass) =
Expand All @@ -40,7 +40,7 @@ final class NettyServer(
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpObjectAggregator(4096))
pipeline.addLast(RequestHandler(router))
pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, config, settings)))
pipeline.addLast(ProtocolHandler(connector))
pipeline.addLast(FrameHandler())
)

Expand Down

0 comments on commit 42bbe0b

Please sign in to comment.