From d7206394ad0a5a6b9d28827fab32f8d4d488bac1 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Thu, 16 Nov 2023 15:06:44 +0100 Subject: [PATCH] snapshot: delegate reading to arrow readers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We were previously copying all snapshot part bytes into a slice that was then passed to arrow readers. This was an unnecessary indirection and resulted in extra allocations on startup. Note that parquet bytes are still fully copied since we cannot read those lazily (underlying file is closed after reading from snapshot). Benchmark results on recovery from snapshot only: ``` goos: darwin goarch: arm64 pkg: github.com/polarsignals/frostdb │ benchmain │ benchnew │ │ sec/op │ sec/op vs base │ Replay-12 1.681 ± 5% 2.545 ± 4% +51.41% (p=0.002 n=6) │ benchmain │ benchnew │ │ B/op │ B/op vs base │ Replay-12 2.327Gi ± 0% 2.278Gi ± 0% -2.11% (p=0.002 n=6) │ benchmain │ benchnew │ │ allocs/op │ allocs/op vs base │ Replay-12 20.81M ± 0% 20.79M ± 0% -0.10% (p=0.002 n=6) ``` --- snapshot.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/snapshot.go b/snapshot.go index da9211111..9b16c570f 100644 --- a/snapshot.go +++ b/snapshot.go @@ -13,15 +13,14 @@ import ( "strconv" "time" + "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow/util" + "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/proto" - "github.com/apache/arrow/go/v14/arrow/ipc" - "github.com/apache/arrow/go/v14/arrow/util" - "github.com/go-kit/log/level" - "github.com/polarsignals/frostdb/dynparquet" snapshotpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/snapshot/v1alpha1" tablepb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/table/v1alpha1" @@ -621,20 +620,23 @@ func loadSnapshot(ctx context.Context, db *DB, r io.ReaderAt, size int64) ([]byt } startOffset := partMeta.StartOffset endOffset := partMeta.EndOffset - partBytes := make([]byte, endOffset-startOffset) - if _, err := r.ReadAt(partBytes, startOffset); err != nil { - return err - } partOptions := parts.WithCompactionLevel(int(partMeta.CompactionLevel)) + partReader := io.NewSectionReader(r, startOffset, endOffset-startOffset) switch partMeta.Encoding { case snapshotpb.Part_ENCODING_PARQUET: - serBuf, err := dynparquet.ReaderFromBytes(partBytes) + // Copy the full part here since parquet reads lazily + // and the file is closed after the snapshot is read. + var b bytes.Buffer + if _, err := io.Copy(&b, partReader); err != nil { + return err + } + serBuf, err := dynparquet.ReaderFromBytes(b.Bytes()) if err != nil { return err } resultParts = append(resultParts, parts.NewPart(partMeta.Tx, serBuf, partOptions)) case snapshotpb.Part_ENCODING_ARROW: - arrowReader, err := ipc.NewReader(bytes.NewReader(partBytes)) + arrowReader, err := ipc.NewReader(partReader) if err != nil { return err }