From 9226e584461480fccb81b38324275a1bc7420289 Mon Sep 17 00:00:00 2001 From: sylzd Date: Wed, 21 Oct 2020 14:37:06 +0800 Subject: [PATCH] Improve for_select_default_busy (pingcap/dumpling#169) * fix for_select_default_busy --- dumpling/v4/export/writer_util.go | 43 ++++++++++++++----------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/dumpling/v4/export/writer_util.go b/dumpling/v4/export/writer_util.go index acfa3e10..6edf9fcc 100644 --- a/dumpling/v4/export/writer_util.go +++ b/dumpling/v4/export/writer_util.go @@ -193,21 +193,19 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w storage.Writer, file bf.WriteString(";\n") } if bf.Len() >= lengthLimit { - wp.input <- bf - bf = pool.Get().(*bytes.Buffer) - if bfCap := bf.Cap(); bfCap < lengthLimit { - bf.Grow(lengthLimit - bfCap) + select { + case <-pCtx.Done(): + return pCtx.Err() + case err := <-wp.errCh: + return err + case wp.input <- bf: + bf = pool.Get().(*bytes.Buffer) + if bfCap := bf.Cap(); bfCap < lengthLimit { + bf.Grow(lengthLimit - bfCap) + } } } - select { - case <-pCtx.Done(): - return pCtx.Err() - case err := <-wp.errCh: - return err - default: - } - if shouldSwitch { break } @@ -288,21 +286,20 @@ func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w storage.Writer, bf.WriteByte('\n') if bf.Len() >= lengthLimit { - wp.input <- bf - bf = pool.Get().(*bytes.Buffer) - if bfCap := bf.Cap(); bfCap < lengthLimit { - bf.Grow(lengthLimit - bfCap) + select { + case <-pCtx.Done(): + return pCtx.Err() + case err := <-wp.errCh: + return err + case wp.input <- bf: + bf = pool.Get().(*bytes.Buffer) + if bfCap := bf.Cap(); bfCap < lengthLimit { + bf.Grow(lengthLimit - bfCap) + } } } fileRowIter.Next() - select { - case <-pCtx.Done(): - return pCtx.Err() - case err := <-wp.errCh: - return err - default: - } if wp.ShouldSwitchFile() { break }