Skip to content

Commit

Permalink
fix: series file index compaction (influxdata#23916)
Browse files Browse the repository at this point in the history
Series file indices monotonically grew even
when series were deleted.  Also stop 
ignoring error in series index recovery

Partially closes https://github.com/influxdata/EAR/issues/3643
  • Loading branch information
davidby-influx authored and chengshiwen committed Aug 11, 2024
1 parent bf3cd32 commit 23dc01c
Showing 1 changed file with 74 additions and 58 deletions.
132 changes: 74 additions & 58 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
Expand Down Expand Up @@ -86,7 +88,7 @@ func (p *SeriesPartition) Open() error {
p.index = NewSeriesIndex(p.IndexPath())
if err := p.index.Open(); err != nil {
return err
} else if p.index.Recover(p.segments); err != nil {
} else if err = p.index.Recover(p.segments); err != nil {
return err
}

Expand Down Expand Up @@ -564,94 +566,108 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
return nil
}

func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
hdr := NewSeriesIndexHeader()
hdr.Count = seriesN
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)

// Allocate space for maps.
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
var errDone error = errors.New("done")

// Reindex all partitions.
var entryN int
for _, segment := range segments {
errDone := errors.New("done")
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) (err error) {

if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
hdr := NewSeriesIndexHeader()
var keyIDMap []byte
var idOffsetMap []byte

hdr.Count = math.MaxUint64
// seriesN is the current size of the index. Because it may contain tombstones
// for deleted series, we recalculate that number (as seriesCount) without the
// deleted series as we rebuild the index. If the count of existing series does
// not equal the seriesN passed in (meaning there were tombstones), we rebuild
// the index a second time with the correct size.
seriesCount := seriesN
for {
seriesN = seriesCount
seriesCount = uint64(0)
// This only loops if there are deleted entries, which shrinks the size
hdr.Capacity = pow2((int64(seriesN) * 100) / SeriesIndexLoadFactor)
// Allocate space for maps, guaranteeing slices are initialized to zero
keyIDMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)
idOffsetMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)

// Reindex all partitions.
var entryN int
for _, segment := range segments {

if err = segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {

// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}

// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}

// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag:
// does not fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
}

// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}

// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset

// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
}
seriesCount++
// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
break
} else if err != nil {
return err
}

// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
}
hdr.Count = seriesCount
if seriesN != seriesCount {
continue
} else {
break
} else if err != nil {
return err
}
}

// Open file handler.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

defer errors2.Capture(&err, f.Close)()
// Calculate map positions.
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))

// Write header.
if _, err := hdr.WriteTo(f); err != nil {
if _, err = hdr.WriteTo(f); err != nil {
return err
}

// Write maps.
if _, err := f.Write(keyIDMap); err != nil {
if _, err = f.Write(keyIDMap); err != nil {
return err
} else if _, err := f.Write(idOffsetMap); err != nil {
return err
}

// Sync & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}

return nil
// Sync, then deferred close
return f.Sync()
}

func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {
Expand Down

0 comments on commit 23dc01c

Please sign in to comment.