diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java index 1f35d482af..998e89c9b2 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java @@ -317,7 +317,7 @@ private void takeBuffer() { } // real action - flushBuffer = flusher.takeBuffer(timeoutMs, flushWorkerIndex); + flushBuffer = flusher.takeBuffer(timeoutMs); // metrics end if (source.samplePerfCritical()) { @@ -341,7 +341,7 @@ private void addTask(FlushTask task) throws IOException { private synchronized void returnBuffer() { if (flushBuffer != null) { - flusher.returnBuffer(flushBuffer, flushWorkerIndex); + flusher.returnBuffer(flushBuffer); flushBuffer = null; } } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala index 40ec5dcb33..b7cb20966f 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala @@ -54,7 +54,7 @@ private[worker] final class DiskFlusher( val threadCount: Int) extends DeviceObserver with Logging { private lazy val diskFlusherId = System.identityHashCode(this) private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) - private val bufferQueues = new Array[LinkedBlockingQueue[CompositeByteBuf]](threadCount) + private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity) private val workers = new Array[Thread](threadCount) private val nextWorkerIndex = new AtomicInteger() @@ -68,14 +68,11 @@ private[worker] final class DiskFlusher( init() private def init(): Unit = { - val actualQueueSize = queueCapacity / threadCount + 1 + for (_ <- 0 until queueCapacity) { + bufferQueue.put(Unpooled.compositeBuffer(256)) + } for (index <- 0 until (threadCount)) { - workingQueues(index) = new LinkedBlockingQueue[FlushTask](actualQueueSize) - bufferQueues(index) = new LinkedBlockingQueue[CompositeByteBuf](actualQueueSize) - for (_ <- 0 until actualQueueSize) { - bufferQueues(index).put(Unpooled.compositeBuffer(256)) - } - + workingQueues(index) = new LinkedBlockingQueue[FlushTask](queueCapacity) workers(index) = new Thread(s"$this-$index") { override def run(): Unit = { while (!stopFlag.get()) { @@ -96,7 +93,7 @@ private[worker] final class DiskFlusher( } lastBeginFlushTime = -1 } - returnBuffer(task.buffer, index) + returnBuffer(task.buffer) task.notifier.numPendingFlushes.decrementAndGet() } } @@ -122,16 +119,16 @@ private[worker] final class DiskFlusher( nextIndex % threadCount } - def takeBuffer(timeoutMs: Long, workerIndex: Int): CompositeByteBuf = { - bufferQueues(workerIndex).poll(timeoutMs, TimeUnit.MILLISECONDS) + def takeBuffer(timeoutMs: Long): CompositeByteBuf = { + bufferQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) } - def returnBuffer(buffer: CompositeByteBuf, workerIndex: Int): Unit = { + def returnBuffer(buffer: CompositeByteBuf): Unit = { MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes()) buffer.removeComponents(0, buffer.numComponents()) buffer.clear() - bufferQueues(workerIndex).put(buffer) + bufferQueue.put(buffer) } def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = { @@ -162,7 +159,7 @@ private[worker] final class DiskFlusher( deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType) } - def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueues.map(_.size()).toList}" + def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueue.size()}" override def hashCode(): Int = { workingDir.hashCode()