Skip to content

Commit

Permalink
keep batch locked until emit
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasf committed Dec 30, 2024
1 parent 1289f86 commit bb0c8aa
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/main/scala/util/Batcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@ final class Batcher[Key, Elem, Batch](
private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity)

def add(key: Key, elem: Elem): Unit =
val newBuffer = buffers.compute(
buffers.compute(
key,
(_, buffer) =>
val prev = Option(buffer)
if prev.isEmpty then scheduler.scheduleOnce(timeout, () => emitAndRemove(key))
Buffer(
val newBuffer = Buffer(
append(prev.map(_.batch), elem),
prev.fold(1)(_.counter + 1)
)
if newBuffer.counter >= maxBatchSize then
emit(key, newBuffer.batch)
null
else newBuffer
)
if newBuffer.counter >= maxBatchSize then emitAndRemove(key)

private def emitAndRemove(key: Key): Unit =
Option(buffers.remove(key)).foreach: buffer =>
emit(key, buffer.batch)
buffers.computeIfPresent(
key,
(_, buffer) =>
emit(key, buffer.batch)
null
)

0 comments on commit bb0c8aa

Please sign in to comment.