Skip to content

Commit

Permalink
ttl: cancel the hearbeat timeout job after disable the TTL (pingcap#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 19, 2024
1 parent fb15e5e commit 719f68e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,18 @@ j:
}

func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
// never be cancelled.
jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
}
}

if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) {
if len(m.runningJobs) > 0 {
for _, job := range m.runningJobs {
Expand Down Expand Up @@ -602,16 +614,6 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}
m.removeJob(job)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
}
}
}

func (m *JobManager) localJobs() []*ttlJob {
Expand Down
40 changes: 40 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,3 +1393,43 @@ func TestFinishError(t *testing.T) {
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

ctx := context.Background()
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
require.NoError(t, m1.InfoSchemaCache().Update(se))
require.NoError(t, m1.TableStatusCache().Update(ctx, se))

now := se.Now()
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

// stop the tidb_ttl_job_enable
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")

// reschedule and try to get the job
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
}

0 comments on commit 719f68e

Please sign in to comment.