diff --git a/bulk_indexer.go b/bulk_indexer.go index 4ebb502..2324250 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -234,7 +234,8 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf)) b.copyBuf = b.copyBuf[:cap(b.copyBuf)] } - copy(b.copyBuf, b.buf.Bytes()) + n := copy(b.copyBuf, b.buf.Bytes()) + b.copyBuf = b.copyBuf[:n] } req := esapi.BulkRequest{ @@ -338,7 +339,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error if b.gzipw != nil { // First loop, read from the gzip reader if len(buf) == 0 { - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] } @@ -348,7 +352,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // loop until we've seen the start newline for seen+newlines < startln { seen += newlines - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) } @@ -364,7 +371,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // loop until we've seen the end newline for seen+newlines < endln { seen += newlines - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) if seen+newlines < endln {