From 6e830c9dae8d048c47f00559484f26552db93f36 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 24 Jan 2025 17:24:26 +0800 Subject: [PATCH] executor: track the memory usage for building range in IndexLookUpExecutor (#56497) (#59035) close pingcap/tidb#56440 --- distsql/request_builder.go | 11 ++++++++++- executor/distsql.go | 10 ++++++++-- executor/executor_pkg_test.go | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 01c2426a96ce4..e0e430bce1e11 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -19,6 +19,7 @@ import ( "math" "sort" "sync/atomic" + "unsafe" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" @@ -770,7 +771,10 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx krs[i] = make([]kv.KeyRange, 0, len(ranges)) } - const checkSignalStep = 8 + if memTracker != nil { + memTracker.Consume(int64(unsafe.Sizeof(kv.KeyRange{})) * int64(len(ranges))) + } + const checkSignalStep = 64 var estimatedMemUsage int64 // encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to // check the interrupt signal periodically. @@ -798,6 +802,11 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx if interruptSignal != nil && interruptSignal.Load().(bool) { return kv.NewPartitionedKeyRanges(nil), nil } + if memTracker != nil { + // We use the Tracker.Consume function to check the memory usage of the current SQL. + // If the memory exceeds the quota, kill the SQL. + memTracker.Consume(1) + } } } return kv.NewPartitionedKeyRanges(krs), nil diff --git a/executor/distsql.go b/executor/distsql.go index 1dfcde44ae9a4..ccb5d67610445 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -455,6 +455,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return err } } + if e.memTracker != nil { + e.memTracker.Reset() + } else { + e.memTracker = memory.NewTracker(e.id, -1) + } + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) err = e.buildTableKeyRanges() if err != nil { e.feedback.Invalidate() @@ -491,7 +497,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if e.index.ID == -1 { kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges) } else { - kvRange, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback) + kvRange, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, ranges, e.feedback, e.memTracker, nil) } if err != nil { return err @@ -504,7 +510,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if e.index.ID == -1 { kvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges) } else { - kvRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback) + kvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, e.ranges, e.feedback, e.memTracker, nil) } e.kvRanges = kvRanges.FirstPartitionRange() } diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 3d4062cbb1413..0fbf456fc5d85 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -133,7 +133,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) { } require.Equal(t, 2*bytesConsumed1, bytesConsumed2) - require.Equal(t, int64(20760), bytesConsumed1) + require.Equal(t, int64(25570), bytesConsumed1) } func generateIndexRange(vals ...int64) *ranger.Range {