Skip to content

Commit

Permalink
ttl, test: scale TTL workers during the fault tests (#58750)
Browse files Browse the repository at this point in the history
close #58745
  • Loading branch information
YangKeao authored Jan 8, 2025
1 parent 7bee5a7 commit d73e584
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1727,8 +1727,6 @@ func TestJobManagerWithFault(t *testing.T) {
}

stopTestCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)

fault := newFaultWithFilter(func(sql string) bool {
// skip some local only sql, ref `getSession()` in `session.go`
Expand All @@ -1739,6 +1737,10 @@ func TestJobManagerWithFault(t *testing.T) {

return true
}, newFaultWithProbability(faultPercent))

wg := &sync.WaitGroup{}
// start the goroutine to inject fault to managers randomly
wg.Add(1)
go func() {
defer wg.Done()

Expand Down Expand Up @@ -1775,6 +1777,38 @@ func TestJobManagerWithFault(t *testing.T) {
}
}()

// start the goroutine to randomly scale the worker count
wg.Add(1)
go func() {
defer wg.Done()

maxScanWorkerCount := variable.DefTiDBTTLScanWorkerCount * 2
minScanWorkerCount := variable.DefTiDBTTLScanWorkerCount / 2

maxDelWorkerCount := variable.DefTiDBTTLDeleteWorkerCount * 2
minDelWorkerCount := variable.DefTiDBTTLDeleteWorkerCount / 2
faultTicker := time.NewTicker(time.Second)

tk := testkit.NewTestKit(t, store)
for {
select {
case <-stopTestCh:
// Recover to the default count
tk.MustExec("set @@global.tidb_ttl_scan_worker_count = ?", variable.DefTiDBTTLScanWorkerCount)
tk.MustExec("set @@global.tidb_ttl_delete_worker_count = ?", variable.DefTiDBTTLDeleteWorkerCount)

return
case <-faultTicker.C:
scanWorkerCount := rand.Int()%(maxScanWorkerCount-minScanWorkerCount) + minScanWorkerCount
delWorkerCount := rand.Int()%(maxDelWorkerCount-minDelWorkerCount) + minDelWorkerCount

logutil.BgLogger().Info("scale worker count", zap.Int("scanWorkerCount", scanWorkerCount), zap.Int("delWorkerCount", delWorkerCount))
tk.MustExec("set @@global.tidb_ttl_scan_worker_count = ?", scanWorkerCount)
tk.MustExec("set @@global.tidb_ttl_delete_worker_count = ?", delWorkerCount)
}
}
}()

// run the workload goroutine
testStart := time.Now()
for time.Since(testStart) < testDuration {
Expand Down Expand Up @@ -1825,7 +1859,6 @@ func TestJobManagerWithFault(t *testing.T) {
}

logutil.BgLogger().Info("test finished")
stopTestCh <- struct{}{}
close(stopTestCh)

wg.Wait()
Expand Down

0 comments on commit d73e584

Please sign in to comment.