Skip to content

Commit

Permalink
[BUG] multi-thread flusher causes data inconsistent with chunk offsets (
Browse files Browse the repository at this point in the history
#275)

(cherry picked from commit cb42b2f)
  • Loading branch information
FMX committed Jul 25, 2022
1 parent f00fae8 commit 4a88079
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class FileWriter extends DeviceObserver {
private long bytesFlushed;

private final DiskFlusher flusher;
private final int flushWorkerIndex;
private CompositeByteBuf flushBuffer;

private final long chunkSize;
Expand Down Expand Up @@ -119,6 +120,7 @@ public FileWriter(
PartitionSplitMode splitMode) throws IOException {
this.file = file;
this.flusher = flusher;
this.flushWorkerIndex = flusher.getWorkerIndex();
this.dataRootDir = workingDir;
this.chunkSize = chunkSize;
this.nextBoundary = chunkSize;
Expand Down Expand Up @@ -315,7 +317,7 @@ private void takeBuffer() {
}

// real action
flushBuffer = flusher.takeBuffer(timeoutMs);
flushBuffer = flusher.takeBuffer(timeoutMs, flushWorkerIndex);

// metrics end
if (source.samplePerfCritical()) {
Expand All @@ -330,7 +332,7 @@ private void takeBuffer() {
}

private void addTask(FlushTask task) throws IOException {
if (!flusher.addTask(task, timeoutMs)) {
if (!flusher.addTask(task, timeoutMs, flushWorkerIndex)) {
IOException e = new IOException("Add flush task timeout.");
notifier.setException(e);
throw e;
Expand All @@ -339,7 +341,7 @@ private void addTask(FlushTask task) throws IOException {

private synchronized void returnBuffer() {
if (flushBuffer != null) {
flusher.returnBuffer(flushBuffer);
flusher.returnBuffer(flushBuffer, flushWorkerIndex);
flushBuffer = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,10 @@ private[worker] final class DiskFlusher(
val deviceMonitor: DeviceMonitor,
val threadCount: Int) extends DeviceObserver with Logging {
private lazy val diskFlusherId = System.identityHashCode(this)
private val workingQueue = new LinkedBlockingQueue[FlushTask](queueCapacity)
private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity)
private val writeActionPool = ThreadUtils.newDaemonFixedThreadPool(threadCount,
workingDir.getName + "-flusher")
for (_ <- 0 until queueCapacity) {
bufferQueue.put(Unpooled.compositeBuffer(256))
}
private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
private val bufferQueues = new Array[LinkedBlockingQueue[CompositeByteBuf]](threadCount)
private val workers = new Array[Thread](threadCount)
private val nextWorkerIndex = new AtomicInteger()

@volatile
private var lastBeginFlushTime: Long = -1
Expand All @@ -68,12 +65,21 @@ private[worker] final class DiskFlusher(
val stopFlag = new AtomicBoolean(false)
val rand = new Random()

private val worker = new Thread(s"$this") {
override def run(): Unit = {
while (!stopFlag.get()) {
val task = workingQueue.take()
writeActionPool.submit(new Runnable {
override def run(): Unit = {
init()

private def init(): Unit = {
val actualQueueSize = queueCapacity / threadCount + 1
for (index <- 0 until (threadCount)) {
workingQueues(index) = new LinkedBlockingQueue[FlushTask](actualQueueSize)
bufferQueues(index) = new LinkedBlockingQueue[CompositeByteBuf](actualQueueSize)
for (_ <- 0 until actualQueueSize) {
bufferQueues(index).put(Unpooled.compositeBuffer(256))
}

workers(index) = new Thread(s"$this-$index") {
override def run(): Unit = {
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"DiskFlusher-$workingDir-${rand.nextInt()}"
workerSource.sample(WorkerSource.FlushDataTime, key) {
if (!task.notifier.hasException) {
Expand All @@ -90,55 +96,64 @@ private[worker] final class DiskFlusher(
}
lastBeginFlushTime = -1
}
returnBuffer(task.buffer)
returnBuffer(task.buffer, index)
task.notifier.numPendingFlushes.decrementAndGet()
}
}
})
}
}
workers(index).setDaemon(true)
workers(index).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError(s"$this thread terminated.", e)
}
})
workers(index).start()
}

deviceMonitor.registerDiskFlusher(this)
}
worker.setDaemon(true)
worker.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError(s"$this thread terminated.", e)
}
})
worker.start()

deviceMonitor.registerDiskFlusher(this)
def getWorkerIndex: Int = {
val nextIndex = nextWorkerIndex.getAndIncrement()
if (nextIndex > threadCount) {
nextWorkerIndex.set(0)
}
nextIndex % threadCount
}

def takeBuffer(timeoutMs: Long): CompositeByteBuf = {
bufferQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
def takeBuffer(timeoutMs: Long, workerIndex: Int): CompositeByteBuf = {
bufferQueues(workerIndex).poll(timeoutMs, TimeUnit.MILLISECONDS)
}

def returnBuffer(buffer: CompositeByteBuf): Unit = {
def returnBuffer(buffer: CompositeByteBuf, workerIndex: Int): Unit = {
MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes())
buffer.removeComponents(0, buffer.numComponents())
buffer.clear()

bufferQueue.put(buffer)
bufferQueues(workerIndex).put(buffer)
}

def addTask(task: FlushTask, timeoutMs: Long): Boolean = {
workingQueue.offer(task, timeoutMs, TimeUnit.MILLISECONDS)
def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = {
workingQueues(workerIndex).offer(task, timeoutMs, TimeUnit.MILLISECONDS)
}

override def notifyError(deviceName: String, dirs: ListBuffer[File] = null,
deviceErrorType: DeviceErrorType): Unit = {
logError(s"$this is notified Device $deviceName Error $deviceErrorType! Stop Flusher.")
stopFlag.set(true)
try {
worker.interrupt()
writeActionPool.shutdown()
workers.foreach(_.interrupt())
} catch {
case e: Exception =>
logError(s"Exception when interrupt worker: $worker, $e")
logError(s"Exception when interrupt worker: $workers, $e")
}
workingQueues.foreach { queue =>
queue.asScala.foreach { task =>
task.buffer.removeComponents(0, task.buffer.numComponents())
task.buffer.clear()
}
}
workingQueue.asScala.foreach(task => {
task.buffer.removeComponents(0, task.buffer.numComponents())
task.buffer.clear()
})
deviceMonitor.unregisterDiskFlusher(this)
}

Expand All @@ -147,7 +162,7 @@ private[worker] final class DiskFlusher(
deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType)
}

def bufferQueueInfo(): String = s"$this available buffers: ${bufferQueue.size()}"
def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueues.map(_.size()).toList}"

override def hashCode(): Int = {
workingDir.hashCode()
Expand Down

0 comments on commit 4a88079

Please sign in to comment.