Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug #446: buffered reader is default for jsonl #482

Merged
merged 27 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

Breaking changes are annotated with ☢️, and alpha/beta features with 🐥.

## Upcoming

### Fixed

- [#446]: A [`bufio.ErrTooLong`](https://pkg.go.dev/bufio#ErrTooLong) was being returned
by [`bufio.Scanner`](https://pkg.go.dev/bufio#Scanner), when splitting
lines from input that was too long (larger than
[`bufio.MaxScanTokenSize`](https://pkg.go.dev/bufio#MaxScanTokenSize), i.e. `64KB`). This meant that
`sq` wasn't able to parse large JSON files, amongst other problems. The maximum buffer size is
now configurable via the new [`tuning.scan-buffer-limit`](https://sq.io/docs/config/#tuningscan-buffer-limit)
option. Note that the buffer will start small and grow as needed, up to the limit.

```plaintext
$ sq config set tuning.scan-buffer-limit 64MB # or 1024B, 64KB, 1GB, etc.
```
A more useful error message is also now returned when the buffer limit is exceeded
(the error suggests adjusting `tuning.scan-buffer-limit`).

### Changed

- Renamed config option `tuning.buffer-mem-limit` to [`tuning.buffer-spill-limit`](https://sq.io/docs/config/#tuningbuffer-spill-limit).
The new name better reflects the purpose of the option.


## [v0.48.4] - 2024-11-24

### Changed
Expand Down Expand Up @@ -1219,6 +1243,7 @@ make working with lots of sources much easier.
[#340]: https://github.com/neilotoole/sq/pull/340
[#353]: https://github.com/neilotoole/sq/issues/353
[#415]: https://github.com/neilotoole/sq/issues/415
[#446]: https://github.com/neilotoole/sq/issues/446


[v0.15.2]: https://github.com/neilotoole/sq/releases/tag/v0.15.2
Expand Down
5 changes: 3 additions & 2 deletions cli/cmd_config_edit.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package cli

import (
"bufio"
"bytes"
"os"
"strings"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"github.com/spf13/cobra"

"github.com/neilotoole/shelleditor"
Expand Down Expand Up @@ -165,7 +166,7 @@ func execConfigEditSource(cmd *cobra.Command, args []string) error {
}

// Add indentation
sc := bufio.NewScanner(strings.NewReader(optionsText))
sc := scannerz.NewScanner(ctx, strings.NewReader(optionsText))
var line string
for sc.Scan() {
line = sc.Text()
Expand Down
2 changes: 1 addition & 1 deletion cli/diff/dbprops.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ func diffDBProps(ctx context.Context, cfg *Config, src1, src2 *source.Source, do
return
}

_, err = io.Copy(doc, diffdoc.NewColorizer(cfg.Colors, strings.NewReader(unified)))
_, err = io.Copy(doc, diffdoc.NewColorizer(ctx, cfg.Colors, strings.NewReader(unified)))
}
2 changes: 1 addition & 1 deletion cli/diff/overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ func diffOverview(ctx context.Context, cfg *Config, src1, src2 *source.Source, d
return
}

_, err = io.Copy(doc, diffdoc.NewColorizer(cfg.Colors, strings.NewReader(unified)))
_, err = io.Copy(doc, diffdoc.NewColorizer(ctx, cfg.Colors, strings.NewReader(unified)))
}
4 changes: 3 additions & 1 deletion cli/diff/recordhunkwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"strings"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"golang.org/x/sync/errgroup"

"github.com/neilotoole/sq/cli/output"
Expand Down Expand Up @@ -107,7 +109,7 @@ func (wa *recordHunkWriterAdapter) WriteHunk(ctx context.Context, hunk *diffdoc.

// Trim the diff "file header"... ultimately, we should change ComputeUnified
// to not return this (e.g. add an arg "noHeader=true")
trimmed := stringz.TrimHead(unified, 2)
trimmed := scannerz.TrimHead(ctx, unified, 2)

var ok bool
if hunkHeader, hunkBody, ok = strings.Cut(trimmed, "\n"); !ok {
Expand Down
2 changes: 1 addition & 1 deletion cli/diff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ func diffTableSchema(ctx context.Context, cfg *Config, showRowCounts bool,
return
}

_, err = io.Copy(doc, diffdoc.NewColorizer(cfg.Colors, strings.NewReader(unified)))
_, err = io.Copy(doc, diffdoc.NewColorizer(ctx, cfg.Colors, strings.NewReader(unified)))
}
9 changes: 9 additions & 0 deletions cli/error.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package cli

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"strings"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand Down Expand Up @@ -183,6 +186,12 @@ func humanizeError(err error) error {
switch {
// Friendlier messages for context errors.
default:
case errors.Is(err, bufio.ErrTooLong):
err = errz.Errorf(
"%s: maybe adjust config '%s'",
err.Error(),
scannerz.OptScanBufLimit.Key(),
)
case errors.Is(err, context.Canceled):
err = errz.New("canceled")
case errors.Is(err, context.DeadlineExceeded):
Expand Down
75 changes: 75 additions & 0 deletions cli/issues_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cli_test

import (
"bufio"
"bytes"
"context"
"errors"
"os"
"path/filepath"
"strconv"
"strings"
"testing"

"github.com/neilotoole/sq/cli/testrun"
"github.com/neilotoole/sq/libsq/core/ioz/scannerz"
"github.com/stretchr/testify/require"
)

// See: https://github.com/neilotoole/sq/issues/446.
func TestGitHubIssue446_ScannerErrTooLong(t *testing.T) {
// 1. Create a JSONL file with large tokens.
// 2. Set the config for scanner buffer size to a known value (which will be too small).
// 3. Attempt to ingest the JSONL file, which should fail with bufio.ErrTooLong.
// 4. Set the config for scanner buffer size to a larger value.
// 5. Attempt to ingest the JSONL file, which should succeed.

dir := t.TempDir()
f, err := os.Create(filepath.Join(dir, "test.jsonl"))
require.NoError(t, err)
t.Cleanup(func() { _ = f.Close() })

blob := generateJSONLinesBlobWithLargeTokens(bufio.MaxScanTokenSize+100, 5)
_, err = f.Write(blob)
require.NoError(t, err)
require.NoError(t, f.Close())

tr := testrun.New(context.Background(), t, nil).Hush()
require.NoError(t, tr.Exec(
"config",
"set",
scannerz.OptScanBufLimit.Key(),
strconv.Itoa(bufio.MaxScanTokenSize),
))

const handle = "@test/large_jsonl"
err = tr.Reset().Exec("add", f.Name(), "--handle", handle)
require.Error(t, err, "should fail with bufio.ErrTooLong")
require.True(t, errors.Is(err, bufio.ErrTooLong))

require.NoError(t, tr.Reset().Exec(
"config",
"set",
scannerz.OptScanBufLimit.Key(),
"10MB",
))

err = tr.Reset().Exec("add", f.Name(), "--handle", handle)
require.NoError(t, err, "should succeed with increased buffer size")

// Extra check to verify that ingest worked as expected.
require.NoError(t, tr.Reset().Exec("inspect", "--json", handle))
require.Equal(t, handle, tr.JQ(".handle"))
require.Equal(t, "id", tr.JQ(".tables[0].columns[0].name"))
}

func generateJSONLinesBlobWithLargeTokens(tokenSize, lines int) []byte {
buf := &bytes.Buffer{}
for i := 0; i < lines; i++ {
buf.WriteString(`{"id": "` + strconv.Itoa(i) + `", "name": "`)
buf.WriteString(strings.Repeat("x", tokenSize))
buf.WriteString(`"}`)
buf.WriteString("\n")
}
return buf.Bytes()
}
5 changes: 4 additions & 1 deletion cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strings"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"github.com/samber/lo"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -202,7 +204,8 @@ func RegisterDefaultOpts(reg *options.Registry) {
tuning.OptErrgroupLimit,
tuning.OptRecBufSize,
tuning.OptFlushThreshold,
tuning.OptBufMemLimit,
tuning.OptBufSpillLimit,
scannerz.OptScanBufLimit,
driver.OptIngestHeader,
driver.OptIngestCache,
files.OptCacheLockTimeout,
Expand Down
2 changes: 1 addition & 1 deletion cli/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestRegisterDefaultOpts(t *testing.T) {
lgt.New(t).Debug("options.Registry (after)", "reg", reg)

keys := reg.Keys()
require.Len(t, keys, 60)
require.Len(t, keys, 61)

for _, opt := range reg.Opts() {
opt := opt
Expand Down
11 changes: 7 additions & 4 deletions cli/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"github.com/fatih/color"
colorable "github.com/mattn/go-colorable"
wordwrap "github.com/mitchellh/go-wordwrap"
Expand All @@ -31,7 +33,6 @@ import (
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/options"
"github.com/neilotoole/sq/libsq/core/progress"
"github.com/neilotoole/sq/libsq/core/stringz"
"github.com/neilotoole/sq/libsq/core/termz"
"github.com/neilotoole/sq/libsq/core/timez"
"github.com/neilotoole/sq/libsq/core/tuning"
Expand Down Expand Up @@ -138,9 +139,11 @@ sampled, and reported on exit. If zero, memory usage sampling is disabled.`,
options.TagOutput,
)

timeLayoutsList = "Predefined values:\n" + stringz.IndentLines(
timeLayoutsList = "Predefined values:\n" + scannerz.IndentLines(
context.Background(),
wordwrap.WrapString(strings.Join(timez.NamedLayouts(), ", "), 64),
" ")
" ",
)

OptDatetimeFormat = options.NewString(
"format.datetime",
Expand Down Expand Up @@ -464,7 +467,7 @@ func getOutputConfig(cmd *cobra.Command, fs *files.Files, clnup *cleanup.Cleanup
pr.ExcelTimeFormat = xlsxw.OptTimeFormat.Get(o)

pr.Verbose = OptVerbose.Get(o)
pr.FlushThreshold = tuning.OptFlushThreshold.Get(o)
pr.FlushThreshold = int(tuning.OptFlushThreshold.Get(o).Bytes()) //nolint:gosec // ignore overflow concern
pr.Compact = OptCompact.Get(o)
pr.Redact = OptRedact.Get(o)

Expand Down
13 changes: 7 additions & 6 deletions cli/output/tablew/tablewdiff.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package tablew

import (
"bufio"
"bytes"
"context"
"fmt"
"slices"

"github.com/neilotoole/sq/libsq/core/ioz/scannerz"

"github.com/neilotoole/sq/cli/diff"
"github.com/neilotoole/sq/cli/output"
"github.com/neilotoole/sq/libsq/core/colorz"
Expand Down Expand Up @@ -136,8 +137,8 @@ func (dw *diffWriter) writeDifferent(ctx context.Context, dest *diffdoc.Hunk,
return
}

sc1 := bufio.NewScanner(buf1)
sc2 := bufio.NewScanner(buf2)
sc1 := scannerz.NewScanner(ctx, buf1)
sc2 := scannerz.NewScanner(ctx, buf2)
var line []byte
var i, j, k int
for i = 0; i < len(pairs) && ctx.Err() == nil; i++ {
Expand Down Expand Up @@ -261,7 +262,7 @@ func (dw *diffWriter) writeEqualish(ctx context.Context, dest *diffdoc.Hunk,
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}

sc := bufio.NewScanner(bufAllRecs)
sc := scannerz.NewScanner(ctx, bufAllRecs)
var i, j, k int
for i = 0; ctx.Err() == nil && i < len(recs1) && sc.Scan(); i++ {
_, _ = buf1.Write(sc.Bytes())
Expand All @@ -281,8 +282,8 @@ func (dw *diffWriter) writeEqualish(ctx context.Context, dest *diffdoc.Hunk,
return
}

sc1 := bufio.NewScanner(buf1)
sc2 := bufio.NewScanner(buf2)
sc1 := scannerz.NewScanner(ctx, buf1)
sc2 := scannerz.NewScanner(ctx, buf2)
var line []byte

for i = 0; i < len(pairs) && ctx.Err() == nil; i++ {
Expand Down
Loading
Loading