Skip to content

Commit

Permalink
fix csv batch flush (#32)
Browse files Browse the repository at this point in the history
* fix csv batch flush

Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 authored Mar 19, 2020
1 parent 9925b49 commit 2aaeb27
Showing 1 changed file with 2 additions and 17 deletions.
19 changes: 2 additions & 17 deletions pkg/load/batch_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,42 +68,27 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error {

// CSVBatchLoader helps us insert in batch
type CSVBatchLoader struct {
buf [][]string
f *os.File
writer *csv.Writer
}

// NewCSVBatchLoader creates a batch loader for csv format
func NewCSVBatchLoader(f *os.File) *CSVBatchLoader {
return &CSVBatchLoader{
buf: make([][]string, 0, maxBatchCount),
f: f,
writer: csv.NewWriter(f),
}
}

// InsertValue inserts a value, the loader may flush all pending values.
func (b *CSVBatchLoader) InsertValue(ctx context.Context, columns []string) error {
b.buf = append(b.buf, columns)

if len(b.buf) >= maxBatchCount {
return b.Flush(ctx)
}

return nil
return b.writer.Write(columns)
}

// Flush inserts all pending values
func (b *CSVBatchLoader) Flush(ctx context.Context) error {
if len(b.buf) == 0 {
return nil
}

err := b.writer.WriteAll(b.buf)
b.buf = b.buf[:0]
b.writer.Flush()

return err
return nil
}

// Close closes the file.
Expand Down

0 comments on commit 2aaeb27

Please sign in to comment.