From d73e58449eba168ee20a2d540a0054f59bdec94f Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 8 Jan 2025 17:06:44 +0800 Subject: [PATCH] ttl, test: scale TTL workers during the fault tests (#58750) close pingcap/tidb#58745 --- .../ttlworker/job_manager_integration_test.go | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index efe12c646ec2c..0cb045bc1098a 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -1727,8 +1727,6 @@ func TestJobManagerWithFault(t *testing.T) { } stopTestCh := make(chan struct{}) - wg := &sync.WaitGroup{} - wg.Add(1) fault := newFaultWithFilter(func(sql string) bool { // skip some local only sql, ref `getSession()` in `session.go` @@ -1739,6 +1737,10 @@ func TestJobManagerWithFault(t *testing.T) { return true }, newFaultWithProbability(faultPercent)) + + wg := &sync.WaitGroup{} + // start the goroutine to inject fault to managers randomly + wg.Add(1) go func() { defer wg.Done() @@ -1775,6 +1777,38 @@ func TestJobManagerWithFault(t *testing.T) { } }() + // start the goroutine to randomly scale the worker count + wg.Add(1) + go func() { + defer wg.Done() + + maxScanWorkerCount := variable.DefTiDBTTLScanWorkerCount * 2 + minScanWorkerCount := variable.DefTiDBTTLScanWorkerCount / 2 + + maxDelWorkerCount := variable.DefTiDBTTLDeleteWorkerCount * 2 + minDelWorkerCount := variable.DefTiDBTTLDeleteWorkerCount / 2 + faultTicker := time.NewTicker(time.Second) + + tk := testkit.NewTestKit(t, store) + for { + select { + case <-stopTestCh: + // Recover to the default count + tk.MustExec("set @@global.tidb_ttl_scan_worker_count = ?", variable.DefTiDBTTLScanWorkerCount) + tk.MustExec("set @@global.tidb_ttl_delete_worker_count = ?", variable.DefTiDBTTLDeleteWorkerCount) + + return + case <-faultTicker.C: + scanWorkerCount := rand.Int()%(maxScanWorkerCount-minScanWorkerCount) + minScanWorkerCount + delWorkerCount := rand.Int()%(maxDelWorkerCount-minDelWorkerCount) + minDelWorkerCount + + logutil.BgLogger().Info("scale worker count", zap.Int("scanWorkerCount", scanWorkerCount), zap.Int("delWorkerCount", delWorkerCount)) + tk.MustExec("set @@global.tidb_ttl_scan_worker_count = ?", scanWorkerCount) + tk.MustExec("set @@global.tidb_ttl_delete_worker_count = ?", delWorkerCount) + } + } + }() + // run the workload goroutine testStart := time.Now() for time.Since(testStart) < testDuration { @@ -1825,7 +1859,6 @@ func TestJobManagerWithFault(t *testing.T) { } logutil.BgLogger().Info("test finished") - stopTestCh <- struct{}{} close(stopTestCh) wg.Wait()