From 23dc01c2e562312aa9144951cc797bf1f3ae3a3e Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Thu, 1 Jun 2023 10:49:23 -0700 Subject: [PATCH] fix: series file index compaction (#23916) Series file indices monotonically grew even when series were deleted. Also stop ignoring error in series index recovery Partially closes https://github.com/influxdata/EAR/issues/3643 --- tsdb/series_partition.go | 132 ++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 58 deletions(-) diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 1a043e74994..26b0872ec49 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -4,12 +4,14 @@ import ( "encoding/binary" "errors" "fmt" + "math" "os" "path/filepath" "sync" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" + errors2 "github.com/influxdata/influxdb/pkg/errors" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/rhh" "go.uber.org/zap" @@ -86,7 +88,7 @@ func (p *SeriesPartition) Open() error { p.index = NewSeriesIndex(p.IndexPath()) if err := p.index.Open(); err != nil { return err - } else if p.index.Recover(p.segments); err != nil { + } else if err = p.index.Recover(p.segments); err != nil { return err } @@ -564,94 +566,108 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error { return nil } -func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { - hdr := NewSeriesIndexHeader() - hdr.Count = seriesN - hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) - - // Allocate space for maps. - keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) - idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) +var errDone error = errors.New("done") - // Reindex all partitions. - var entryN int - for _, segment := range segments { - errDone := errors.New("done") +func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) (err error) { - if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + hdr := NewSeriesIndexHeader() + var keyIDMap []byte + var idOffsetMap []byte + + hdr.Count = math.MaxUint64 + // seriesN is the current size of the index. Because it may contain tombstones + // for deleted series, we recalculate that number (as seriesCount) without the + // deleted series as we rebuild the index. If the count of existing series does + // not equal the seriesN passed in (meaning there were tombstones), we rebuild + // the index a second time with the correct size. + seriesCount := seriesN + for { + seriesN = seriesCount + seriesCount = uint64(0) + // This only loops if there are deleted entries, which shrinks the size + hdr.Capacity = pow2((int64(seriesN) * 100) / SeriesIndexLoadFactor) + // Allocate space for maps, guaranteeing slices are initialized to zero + keyIDMap = make([]byte, hdr.Capacity*SeriesIndexElemSize) + idOffsetMap = make([]byte, hdr.Capacity*SeriesIndexElemSize) + + // Reindex all partitions. + var entryN int + for _, segment := range segments { + + if err = segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + + // Make sure we don't go past the offset where the compaction began. + if offset > index.maxOffset { + return errDone + } - // Make sure we don't go past the offset where the compaction began. - if offset > index.maxOffset { - return errDone - } + // Check for cancellation periodically. + if entryN++; entryN%1000 == 0 { + select { + case <-c.cancel: + return ErrSeriesPartitionCompactionCancelled + default: + } + } - // Check for cancellation periodically. - if entryN++; entryN%1000 == 0 { - select { - case <-c.cancel: - return ErrSeriesPartitionCompactionCancelled + // Only process insert entries. + switch flag { + case SeriesEntryInsertFlag: + // does not fallthrough + case SeriesEntryTombstoneFlag: + return nil default: + return fmt.Errorf("unexpected series partition log entry flag: %d", flag) } - } - - // Only process insert entries. - switch flag { - case SeriesEntryInsertFlag: // fallthrough - case SeriesEntryTombstoneFlag: - return nil - default: - return fmt.Errorf("unexpected series partition log entry flag: %d", flag) - } - // Save max series identifier processed. - hdr.MaxSeriesID, hdr.MaxOffset = id, offset + // Save max series identifier processed. + hdr.MaxSeriesID, hdr.MaxOffset = id, offset - // Ignore entry if tombstoned. - if index.IsDeleted(id) { - return nil + // Ignore entry if tombstoned. + if index.IsDeleted(id) { + return nil + } + seriesCount++ + // Insert into maps. + c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) + return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) + }); err == errDone { + break + } else if err != nil { + return err } - - // Insert into maps. - c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) - return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) - }); err == errDone { + } + hdr.Count = seriesCount + if seriesN != seriesCount { + continue + } else { break - } else if err != nil { - return err } } - // Open file handler. f, err := os.Create(path) if err != nil { return err } - defer f.Close() - + defer errors2.Capture(&err, f.Close)() // Calculate map positions. hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap)) hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap)) // Write header. - if _, err := hdr.WriteTo(f); err != nil { + if _, err = hdr.WriteTo(f); err != nil { return err } // Write maps. - if _, err := f.Write(keyIDMap); err != nil { + if _, err = f.Write(keyIDMap); err != nil { return err } else if _, err := f.Write(idOffsetMap); err != nil { return err } - // Sync & close. - if err := f.Sync(); err != nil { - return err - } else if err := f.Close(); err != nil { - return err - } - - return nil + // Sync, then deferred close + return f.Sync() } func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {