Skip to content

Commit

Permalink
[BUG] Fix reserve slots failed due to take buffer stuck (#283)
Browse files Browse the repository at this point in the history
* [BUG] Fix reserve slots failed due to take buffer stuck
  • Loading branch information
FMX committed Jul 26, 2022
1 parent 4a88079 commit 6badd20
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private void takeBuffer() {
}

// real action
flushBuffer = flusher.takeBuffer(timeoutMs, flushWorkerIndex);
flushBuffer = flusher.takeBuffer(timeoutMs);

// metrics end
if (source.samplePerfCritical()) {
Expand All @@ -341,7 +341,7 @@ private void addTask(FlushTask task) throws IOException {

private synchronized void returnBuffer() {
if (flushBuffer != null) {
flusher.returnBuffer(flushBuffer, flushWorkerIndex);
flusher.returnBuffer(flushBuffer);
flushBuffer = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[worker] final class DiskFlusher(
val threadCount: Int) extends DeviceObserver with Logging {
private lazy val diskFlusherId = System.identityHashCode(this)
private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
private val bufferQueues = new Array[LinkedBlockingQueue[CompositeByteBuf]](threadCount)
private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity)
private val workers = new Array[Thread](threadCount)
private val nextWorkerIndex = new AtomicInteger()

Expand All @@ -68,14 +68,11 @@ private[worker] final class DiskFlusher(
init()

private def init(): Unit = {
val actualQueueSize = queueCapacity / threadCount + 1
for (_ <- 0 until queueCapacity) {
bufferQueue.put(Unpooled.compositeBuffer(256))
}
for (index <- 0 until (threadCount)) {
workingQueues(index) = new LinkedBlockingQueue[FlushTask](actualQueueSize)
bufferQueues(index) = new LinkedBlockingQueue[CompositeByteBuf](actualQueueSize)
for (_ <- 0 until actualQueueSize) {
bufferQueues(index).put(Unpooled.compositeBuffer(256))
}

workingQueues(index) = new LinkedBlockingQueue[FlushTask](queueCapacity)
workers(index) = new Thread(s"$this-$index") {
override def run(): Unit = {
while (!stopFlag.get()) {
Expand All @@ -96,7 +93,7 @@ private[worker] final class DiskFlusher(
}
lastBeginFlushTime = -1
}
returnBuffer(task.buffer, index)
returnBuffer(task.buffer)
task.notifier.numPendingFlushes.decrementAndGet()
}
}
Expand All @@ -122,16 +119,16 @@ private[worker] final class DiskFlusher(
nextIndex % threadCount
}

def takeBuffer(timeoutMs: Long, workerIndex: Int): CompositeByteBuf = {
bufferQueues(workerIndex).poll(timeoutMs, TimeUnit.MILLISECONDS)
def takeBuffer(timeoutMs: Long): CompositeByteBuf = {
bufferQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
}

def returnBuffer(buffer: CompositeByteBuf, workerIndex: Int): Unit = {
def returnBuffer(buffer: CompositeByteBuf): Unit = {
MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes())
buffer.removeComponents(0, buffer.numComponents())
buffer.clear()

bufferQueues(workerIndex).put(buffer)
bufferQueue.put(buffer)
}

def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = {
Expand Down Expand Up @@ -162,7 +159,7 @@ private[worker] final class DiskFlusher(
deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType)
}

def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueues.map(_.size()).toList}"
def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueue.size()}"

override def hashCode(): Int = {
workingDir.hashCode()
Expand Down

0 comments on commit 6badd20

Please sign in to comment.