diff --git a/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go b/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go index e419104ec92..596522711fe 100644 --- a/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go +++ b/cmd/influxd/inspect/verify_seriesfile/verify_seriesfile.go @@ -269,7 +269,7 @@ func (v verify) verifySegment(segmentPath string, ids map[uint64]IDData) (valid v.Logger = v.Logger.With(zap.String("segment", segmentName)) v.Logger.Info("Verifying segment") - // Open up the segment and grab it's data. + // Open up the segment and grab its data. segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentName) if err != nil { return false, err @@ -280,7 +280,8 @@ func (v verify) verifySegment(segmentPath string, ids map[uint64]IDData) (valid return false, nil } defer segment.Close() - buf := newBuffer(segment.Data()) + // Only walk the file as it exists, not the whole mapping which may be bigger than the file. + buf := newBuffer(segment.Data()[:segment.Size()]) defer func() { if rec := recover(); rec != nil { diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 4100dcfe4ab..bd754472aaa 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -366,9 +366,9 @@ func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { } // ReadSeriesKey returns the series key from the beginning of the buffer. -func ReadSeriesKey(data []byte) (key, remainder []byte) { +func ReadSeriesKey(data []byte) (key []byte) { sz, n := binary.Uvarint(data) - return data[:int(sz)+n], data[int(sz)+n:] + return data[:int(sz)+n] } func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) { diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 7dc433ffc5d..eef16bba903 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -507,7 +507,7 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { continue } - key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + key := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) return key } diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index b1820e5228e..6b3037eaf45 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -92,6 +92,12 @@ func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { // Open memory maps the data file at the file's path. func (s *SeriesSegment) Open() error { if err := func() (err error) { + st, err := os.Stat(s.path) + if err != nil { + return fmt.Errorf("cannot stat %s: %w", s.path, err) + } + s.size = uint32(st.Size()) + // Memory map file data. if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { return err @@ -120,14 +126,16 @@ func (s *SeriesSegment) Path() string { return s.path } // InitForWrite initializes a write handle for the segment. // This is only used for the last segment in the series file. func (s *SeriesSegment) InitForWrite() (err error) { - // Only calculate segment data size if writing. - for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); { - flag, _, _, sz := ReadSeriesEntry(s.data[s.size:]) + // Only recalculate segment data size if writing. + var size uint32 + for size = uint32(SeriesSegmentHeaderSize); size < s.size; { + flag, _, _, sz := ReadSeriesEntry(s.data[size:s.size]) if !IsValidSeriesEntryFlag(flag) { break } - s.size += uint32(sz) + size += uint32(sz) } + s.size = size // Open file handler for writing & seek to end of data. if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil { @@ -243,8 +251,8 @@ func (s *SeriesSegment) MaxSeriesID() uint64 { // ForEachEntry executes fn for every entry in the segment. func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error { - for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); { - flag, id, key, sz := ReadSeriesEntry(s.data[pos:]) + for pos := uint32(SeriesSegmentHeaderSize); pos < s.size; { + flag, id, key, sz := ReadSeriesEntry(s.data[pos:s.size]) if !IsValidSeriesEntryFlag(flag) { break } @@ -337,7 +345,7 @@ func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte { return nil } buf := segment.Slice(pos) - key, _ := ReadSeriesKey(buf) + key := ReadSeriesKey(buf) return key } @@ -416,16 +424,22 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) { } func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { + if len(data) <= 0 { + return 0, 0, nil, 1 + } // If flag byte is zero then no more entries exist. flag, data = uint8(data[0]), data[1:] if !IsValidSeriesEntryFlag(flag) { return 0, 0, nil, 1 } + if len(data) < 8 { + return 0, 0, nil, 1 + } id, data = binary.BigEndian.Uint64(data), data[8:] switch flag { case SeriesEntryInsertFlag: - key, _ = ReadSeriesKey(data) + key = ReadSeriesKey(data) } return flag, id, key, int64(SeriesEntryHeaderSize + len(key)) } diff --git a/tsdb/series_segment_test.go b/tsdb/series_segment_test.go index 451091fe074..87cc7412e85 100644 --- a/tsdb/series_segment_test.go +++ b/tsdb/series_segment_test.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "path/filepath" + "strconv" "testing" "github.com/google/go-cmp/cmp" @@ -139,44 +140,59 @@ func TestSeriesSegmentHeader(t *testing.T) { } func TestSeriesSegment_PartialWrite(t *testing.T) { - dir := t.TempDir() + for extraSegs := uint64(2000); extraSegs < 4000; extraSegs++ { + func() { + dir, cleanup := MustTempDir() + defer cleanup() - // Create a new initial segment (4mb) and initialize for writing. - segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) - if err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } - defer segment.Close() + // Create a new initial segment (4mb) and initialize for writing. + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() - // Write two entries. - if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { - t.Fatal(err) - } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { - t.Fatal(err) - } - sz := segment.Size() - entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + // Write two entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { + t.Fatal(err) + } - // Close segment. - if err := segment.Close(); err != nil { - t.Fatal(err) - } + // Adding intermediary segments in between "A" and "B" is to try and induce a SIGBUS + // when the file truncation backs over a page. + for i := uint64(0); i < extraSegs; i++ { + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1+i, tsdb.AppendSeriesKey(nil, []byte(strconv.Itoa(int(i))), nil))); err != nil { + t.Fatal(err) + } + } - // Truncate at each point and reopen. - for i := entrySize; i > 0; i-- { - if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { - t.Fatal(err) - } - segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) - if err := segment.Open(); err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } else if err := segment.Close(); err != nil { - t.Fatal(err) - } + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { + t.Fatal(err) + } + sz := segment.Size() + entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + + // Close segment. + if err := segment.Close(); err != nil { + t.Fatal(err) + } + + // Truncate at each point and reopen. + for i := entrySize; i > 0; i-- { + if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { + t.Fatal(err) + } + segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) + if err := segment.Open(); err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } else if err := segment.Close(); err != nil { + t.Fatal(err) + } + } + }() } } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index f799cafee5f..9c3bbc90f0e 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -2456,6 +2456,14 @@ func (sh *Shard) MustWritePointsString(s string) { } } +func MustTempDir() (string, func()) { + dir, err := os.MkdirTemp("", "shard-test") + if err != nil { + panic(fmt.Sprintf("failed to create temp dir: %v", err)) + } + return dir, func() { os.RemoveAll(dir) } +} + type seriesIterator struct { keys [][]byte }