From 8704620c9fb1fa4f764382a154cbe64ce1244fbf Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Sat, 28 Dec 2024 18:43:36 +0100 Subject: [PATCH] study handler Batcher.maxBatchSize = 100 --- src/main/scala/LilaHandler.scala | 3 ++- src/main/scala/util/Batcher.scala | 28 ++++++++++++++++------------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/main/scala/LilaHandler.scala b/src/main/scala/LilaHandler.scala index e2e97267..c875df97 100644 --- a/src/main/scala/LilaHandler.scala +++ b/src/main/scala/LilaHandler.scala @@ -113,8 +113,9 @@ final class LilaHandler( private val studyHandler: Emit[LilaOut] = val batcher = Batcher[RoomId, ClientIn.Versioned, NonEmptyList[ClientIn.Versioned]]( + maxBatchSize = 100, initialCapacity = 64, - timeout = 150.millis, + timeout = 100.millis, append = (prev, elem) => prev.fold(NonEmptyList.one(elem))(_.prepend(elem)), emit = (roomId, batch) => Monitor.handler.batch.record(batch.size) diff --git a/src/main/scala/util/Batcher.scala b/src/main/scala/util/Batcher.scala index f016a18f..f71c87b2 100644 --- a/src/main/scala/util/Batcher.scala +++ b/src/main/scala/util/Batcher.scala @@ -8,27 +8,31 @@ import java.util.concurrent.ConcurrentHashMap /* Batches elements, sends the batch when `timeout` has elapsed since the last element was added. */ final class Batcher[Key, Elem, Batch]( + maxBatchSize: Int, // max number of elements in a batch initialCapacity: Int, // number of keys to expect timeout: FiniteDuration, // how long to wait for more elements before emitting append: (Option[Batch], Elem) => Batch, // how batches are built - emit: (Key, Batch) => Unit // callback to emit a batch on timeout or maxElems reached + emit: (Key, Batch) => Unit // callback to emit a batch on timeout or maxBatchSize reached )(using scheduler: Scheduler, ec: Executor): - final private class Buffer(val batch: Batch, cancelable: Cancellable): + final private class Buffer(val batch: Batch, val counter: Int, cancelable: Cancellable): export cancelable.cancel private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity) - def add(key: Key, elem: Elem): Unit = buffers.compute( - key, - (_, buffer) => - val prev = Option(buffer) - prev.foreach(_.cancel()) - Buffer( - append(prev.map(_.batch), elem), - scheduler.scheduleOnce(timeout, () => emitAndRemove(key)) - ) - ) + def add(key: Key, elem: Elem): Unit = + val newBuffer = buffers.compute( + key, + (_, buffer) => + val prev = Option(buffer) + prev.foreach(_.cancel()) + Buffer( + append(prev.map(_.batch), elem), + prev.fold(1)(_.counter + 1), + scheduler.scheduleOnce(timeout, () => emitAndRemove(key)) + ) + ) + if newBuffer.counter >= maxBatchSize then emitAndRemove(key) private def emitAndRemove(key: Key): Unit = buffers.computeIfPresent(