Skip to content

Commit

Permalink
study handler Batcher.maxBatchSize = 100
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Dec 28, 2024
1 parent 061a49e commit 8704620
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/LilaHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ final class LilaHandler(
private val studyHandler: Emit[LilaOut] =

val batcher = Batcher[RoomId, ClientIn.Versioned, NonEmptyList[ClientIn.Versioned]](
maxBatchSize = 100,
initialCapacity = 64,
timeout = 150.millis,
timeout = 100.millis,
append = (prev, elem) => prev.fold(NonEmptyList.one(elem))(_.prepend(elem)),
emit = (roomId, batch) =>
Monitor.handler.batch.record(batch.size)
Expand Down
28 changes: 16 additions & 12 deletions src/main/scala/util/Batcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,31 @@ import java.util.concurrent.ConcurrentHashMap

/* Batches elements, sends the batch when `timeout` has elapsed since the last element was added. */
final class Batcher[Key, Elem, Batch](
maxBatchSize: Int, // max number of elements in a batch
initialCapacity: Int, // number of keys to expect
timeout: FiniteDuration, // how long to wait for more elements before emitting
append: (Option[Batch], Elem) => Batch, // how batches are built
emit: (Key, Batch) => Unit // callback to emit a batch on timeout or maxElems reached
emit: (Key, Batch) => Unit // callback to emit a batch on timeout or maxBatchSize reached
)(using scheduler: Scheduler, ec: Executor):

final private class Buffer(val batch: Batch, cancelable: Cancellable):
final private class Buffer(val batch: Batch, val counter: Int, cancelable: Cancellable):
export cancelable.cancel

private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity)

def add(key: Key, elem: Elem): Unit = buffers.compute(
key,
(_, buffer) =>
val prev = Option(buffer)
prev.foreach(_.cancel())
Buffer(
append(prev.map(_.batch), elem),
scheduler.scheduleOnce(timeout, () => emitAndRemove(key))
)
)
def add(key: Key, elem: Elem): Unit =
val newBuffer = buffers.compute(
key,
(_, buffer) =>
val prev = Option(buffer)
prev.foreach(_.cancel())
Buffer(
append(prev.map(_.batch), elem),
prev.fold(1)(_.counter + 1),
scheduler.scheduleOnce(timeout, () => emitAndRemove(key))
)
)
if newBuffer.counter >= maxBatchSize then emitAndRemove(key)

private def emitAndRemove(key: Key): Unit =
buffers.computeIfPresent(
Expand Down

0 comments on commit 8704620

Please sign in to comment.