Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modify optimized compaction to cover edge cases #25594

Open
wants to merge 22 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d631314
feat: Modify optimized compaction to cover edge cases
devanbenz Dec 16, 2024
67849ae
feat: Modify the PR to include optimized compaction
devanbenz Dec 17, 2024
827e859
feat: Use named variables for PlanOptimize
devanbenz Dec 17, 2024
5387ca3
feat: adjust test comments
devanbenz Dec 17, 2024
3153596
feat: code removal from debugging
devanbenz Dec 17, 2024
83d28ec
feat: setting BlockCount idx value to 1
devanbenz Dec 17, 2024
f896a01
feat: Adjust testing and add sprintf for magic vars
devanbenz Dec 18, 2024
f15d9be
feat: need to use int64 instead of int
devanbenz Dec 18, 2024
54c8e1c
feat: touch
devanbenz Dec 18, 2024
403d888
feat: Adjust tests to include lower level planning function calls
devanbenz Dec 18, 2024
23d12e1
feat: Fix up some tests that I forgot to adjust
devanbenz Dec 18, 2024
d3afb03
feat: fix typo
devanbenz Dec 18, 2024
cf657a8
feat: touch
devanbenz Dec 18, 2024
fc6ca13
feat: Call SingleGenerationReason() once by initializing a
devanbenz Dec 19, 2024
4fc4d55
feat: clarify file counts for reason we are not fully compacted
devanbenz Dec 19, 2024
c93bdfb
feat: grammar typo
devanbenz Dec 19, 2024
2dd5ef4
feat: missed a test when updating the variable! whoops!
devanbenz Dec 19, 2024
479de96
feat: Add test for another edge case found;
devanbenz Dec 20, 2024
c392906
feat: Remove some overlapping tests
devanbenz Dec 20, 2024
f444518
feat: Adds check for block counts and adjusts tests to use require.Ze…
devanbenz Dec 26, 2024
5e4e2da
feat: Adds test for planning lower level TSMs with block sizes at agg…
devanbenz Dec 26, 2024
c315b1f
chore: rerun ci
devanbenz Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
// block in a TSM file
DefaultMaxPointsPerBlock = 1000

// AggressiveMaxPointsPerBlock is used when we want to further compact blocks
// it is 100 times the default amount of points we use per block
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
AggressiveMaxPointsPerBlock = DefaultMaxPointsPerBlock * 100

// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
// This limit only applies to the "inmem" index.
DefaultMaxSeriesPerDatabase = 1000000
Expand All @@ -77,8 +81,20 @@ const (
// partition snapshot compactions that can run at one time.
// A value of 0 results in runtime.GOMAXPROCS(0).
DefaultSeriesFileMaxConcurrentSnapshotCompactions = 0

// MaxTSMFileSize is the maximum size of TSM files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

MaxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
)

var SingleGenerationReasonText string = SingleGenerationReason()

// SingleGenerationReason outputs a log message for our single generation compaction
// when checked for full compaction.
// 1048576000 is a magic number for bytes per gigabyte.
func SingleGenerationReason() string {
return fmt.Sprintf("not fully compacted and not idle because single generation with more than 2 files under %d GB and more than 1 file(s) under aggressive compaction points per block count (%d points)", int(MaxTSMFileSize/1048576000), AggressiveMaxPointsPerBlock)
}

// Config holds the configuration for the tsbd package.
type Config struct {
Dir string `toml:"dir"`
Expand Down
82 changes: 53 additions & 29 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.uber.org/zap"
)

const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
const logEvery = 2 * DefaultSegmentSize

const (
Expand Down Expand Up @@ -96,7 +95,13 @@ type CompactionGroup []string
type CompactionPlanner interface {
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
// PlanOptimize will return the groups for compaction, the compaction group length,
// and the amount of generations within the compaction group.
// generationCount needs to be set to decide how many points per block during compaction.
// This value is mostly ignored in normal compaction code paths, but,
// for the edge case where there is a single generation with many
// files under 2 GB this value is an important indicator.
Comment on lines +98 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this comment!

PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)

Expand Down Expand Up @@ -225,6 +230,27 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
} else if gens.hasTombstones() {
return false, "not fully compacted and not idle because of tombstones"
} else {
// For planning we want to ensure that if there is a single generation
// shard, but it has many files that are under 2 GB and many files that are
// not at the aggressive compaction points per block count (100,000) we further
// compact the shard. It is okay to stop compaction if there are many
// files that are under 2 GB but at the aggressive points per block count.
if len(gens) == 1 && len(gens[0].files) > 1 {
aggressivePointsPerBlockCount := 0
filesUnderMaxTsmSizeCount := 0
for _, tsmFile := range gens[0].files {
if c.FileStore.BlockCount(tsmFile.Path, 1) >= tsdb.AggressiveMaxPointsPerBlock {
aggressivePointsPerBlockCount++
}
if tsmFile.Size < tsdb.MaxTSMFileSize {
filesUnderMaxTsmSizeCount++
}
}

if filesUnderMaxTsmSizeCount > 1 && aggressivePointsPerBlockCount < len(gens[0].files) {
return false, tsdb.SingleGenerationReasonText
}
}
return true, ""
}
}
Expand Down Expand Up @@ -335,25 +361,24 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
func (c *DefaultPlanner) PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil, 0
return nil, 0, 0
}
c.mu.RUnlock()

// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
generations := c.findGenerations(true)
fullyCompacted, _ := c.FullyCompacted()

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil, 0
if fullyCompacted {
return nil, 0, 0
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
}

// Group each generation by level such that two adjacent generations in the same
Expand All @@ -363,11 +388,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
for i := 0; i < len(generations); i++ {
cur := generations[i]

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}

// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
Expand All @@ -377,7 +397,7 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}
}

if len(currentGen) == 0 || currentGen.level() == cur.level() {
if len(currentGen) == 0 || currentGen.level() >= cur.level() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the halting issue for the recent customer situation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Sometimes higher generations in a shard end up being less then level 4. Need to not skip those.

currentGen = append(currentGen, cur)
continue
}
Expand All @@ -392,21 +412,21 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}

// Only optimize level 4 files since using lower-levels will collide
// with the level planners
// with the level planners. If this is a single generation optimization
// do not skip any levels.
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
if len(generations) == 1 {
levelGroups = append(levelGroups, groups...)
} else {
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
}
}
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
// Skip the group if it's not worthwhile to optimize it
if len(group) < 4 && !group.hasTombstones() {
continue
}

var cGroup CompactionGroup
for _, gen := range group {
for _, file := range gen.files {
Expand All @@ -418,10 +438,10 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}

if !c.acquire(cGroups) {
return nil, int64(len(cGroups))
return nil, int64(len(cGroups)), int64(len(generations))
}

return cGroups, int64(len(cGroups))
return cGroups, int64(len(cGroups)), int64(len(generations))
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
Expand Down Expand Up @@ -449,7 +469,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
var skip bool

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {
if len(generations) > 2 && group.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock has to become a >= because you may produce files with the aggressive block count in an earlier compaction that you don't want to compact again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a test for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added in updated code 👍

skip = true
}

Expand Down Expand Up @@ -525,7 +545,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
// Skip the file if it's over the max size and contains a full block or the generation is split
// over multiple files. In the latter case, that would mean the data in the file spilled over
// the 2GB limit.
if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
if g.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above:
c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock might need to be >=

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, may need a test for this.

start = i + 1
}

Expand Down Expand Up @@ -569,7 +589,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
}

// Skip the file if it's over the max size and it contains a full block
if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
if gen.size() >= uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock might need to be >=

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, may need a test for this

startIndex++
continue
}
Expand Down Expand Up @@ -905,6 +925,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e

// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
// Sets the points per block size. The larger this value is set
// the more points there will be in a single index. Under normal
// conditions this should always be 1000 but there is an edge case
// where this is increased.
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
Expand Down Expand Up @@ -1190,7 +1214,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *

// If we have a max file size configured and we're over it, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if w.Size() > tsdb.MaxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
}
Expand Down
Loading
Loading