From 7d33065019f78d9150a8c89ddb4593f81e6ff9b3 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 31 Dec 2024 11:15:42 +0800 Subject: [PATCH 1/6] grafana: Add `gRPC Received commands rate` panel (#8921) close tikv/pd#8920 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- metrics/grafana/pd.json | 220 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 0f4e91afd50..62b2e7234ef 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -8937,6 +8937,226 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "dashLength": 10, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The rate of received each kind of gRPC commands", + "editable": true, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 127 + }, + "id": 904, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "paceLength": 10, + "pointradius": 5, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "targets": [ + { + "expr": "sum(rate(grpc_server_msg_received_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, grpc_method)", + "legendFormat": "{{instance}}-{{grpc_method}}", + "interval": "", + "exemplar": true, + "intervalFactor": 2, + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeRegions": [], + "title": "gRPC Received commands rate", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "options": { + "alertThreshold": true + }, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "pluginVersion": "7.5.17", + "bars": false, + "dashes": false, + "decimals": null, + "error": false, + "percentage": false, + "points": false, + "stack": false, + "steppedLine": false, + "timeFrom": null, + "timeShift": null, + "fillGradient": 0, + "hiddenSeries": false + }, + { + "aliasColors": {}, + "dashLength": 10, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The error rate of handled gRPC commands.Note: It can't catch the error hide in the header, like this https://github.com/tikv/pd/blob/2d970a619a8917c35d306f401326141481c133e0/server/grpc_service.go#L2071", + "editable": true, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 135 + }, + "id": 905, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "pluginVersion": "7.5.17", + "pointradius": 5, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "targets": [ + { + "expr": "sum(rate(grpc_server_handled_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", grpc_type=\"unary\", grpc_code!=\"OK\"}[1m])) by (grpc_method)", + "legendFormat": "{{grpc_method}}", + "interval": "", + "exemplar": true, + "intervalFactor": 2, + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeRegions": [], + "title": "gRPC Error rate", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true, + "$$hashKey": "object:132" + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true, + "$$hashKey": "object:133" + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "bars": false, + "dashes": false, + "decimals": null, + "error": false, + "fillGradient": 0, + "hiddenSeries": false, + "percentage": false, + "points": false, + "stack": false, + "steppedLine": false, + "timeFrom": null, + "timeShift": null } ], "repeat": null, From 5ad4301b7a095a348ba9d5a5ffd2df8d021dcf44 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 31 Dec 2024 13:54:33 +0800 Subject: [PATCH 2/6] client/pkg: introduce the deadline watcher (#8955) ref tikv/pd#8690 Introduce the deadline watcher. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/clients/tso/dispatcher.go | 73 +++--------------- client/errs/errno.go | 1 - client/pkg/deadline/watcher.go | 111 ++++++++++++++++++++++++++++ client/pkg/deadline/watcher_test.go | 58 +++++++++++++++ errors.toml | 5 -- pkg/errs/errno.go | 1 - 6 files changed, 181 insertions(+), 68 deletions(-) create mode 100644 client/pkg/deadline/watcher.go create mode 100644 client/pkg/deadline/watcher_test.go diff --git a/client/clients/tso/dispatcher.go b/client/clients/tso/dispatcher.go index c05ab27d755..1cc2b2aa940 100644 --- a/client/clients/tso/dispatcher.go +++ b/client/clients/tso/dispatcher.go @@ -36,33 +36,12 @@ import ( "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/batch" cctx "github.com/tikv/pd/client/pkg/connectionctx" + "github.com/tikv/pd/client/pkg/deadline" "github.com/tikv/pd/client/pkg/retry" - "github.com/tikv/pd/client/pkg/utils/timerutil" "github.com/tikv/pd/client/pkg/utils/tsoutil" sd "github.com/tikv/pd/client/servicediscovery" ) -// deadline is used to control the TS request timeout manually, -// it will be sent to the `tsDeadlineCh` to be handled by the `watchTSDeadline` goroutine. -type deadline struct { - timer *time.Timer - done chan struct{} - cancel context.CancelFunc -} - -func newTSDeadline( - timeout time.Duration, - done chan struct{}, - cancel context.CancelFunc, -) *deadline { - timer := timerutil.GlobalTimerPool.Get(timeout) - return &deadline{ - timer: timer, - done: done, - cancel: cancel, - } -} - type tsoInfo struct { tsoServer string reqKeyspaceGroupID uint32 @@ -86,10 +65,10 @@ type tsoDispatcher struct { ctx context.Context cancel context.CancelFunc - provider tsoServiceProvider - tsoRequestCh chan *Request - tsDeadlineCh chan *deadline - latestTSOInfo atomic.Pointer[tsoInfo] + provider tsoServiceProvider + tsoRequestCh chan *Request + deadlineWatcher *deadline.Watcher + latestTSOInfo atomic.Pointer[tsoInfo] // For reusing `*batchController` objects batchBufferPool *sync.Pool @@ -119,11 +98,11 @@ func newTSODispatcher( tokenCh := make(chan struct{}, tokenChCapacity) td := &tsoDispatcher{ - ctx: dispatcherCtx, - cancel: dispatcherCancel, - provider: provider, - tsoRequestCh: tsoRequestCh, - tsDeadlineCh: make(chan *deadline, tokenChCapacity), + ctx: dispatcherCtx, + cancel: dispatcherCancel, + provider: provider, + tsoRequestCh: tsoRequestCh, + deadlineWatcher: deadline.NewWatcher(dispatcherCtx, tokenChCapacity, "tso"), batchBufferPool: &sync.Pool{ New: func() any { return batch.NewController[*Request]( @@ -135,34 +114,9 @@ func newTSODispatcher( }, tokenCh: tokenCh, } - go td.watchTSDeadline() return td } -func (td *tsoDispatcher) watchTSDeadline() { - log.Info("[tso] start tso deadline watcher") - defer log.Info("[tso] exit tso deadline watcher") - for { - select { - case d := <-td.tsDeadlineCh: - select { - case <-d.timer.C: - log.Error("[tso] tso request is canceled due to timeout", - errs.ZapError(errs.ErrClientGetTSOTimeout)) - d.cancel() - timerutil.GlobalTimerPool.Put(d.timer) - case <-d.done: - timerutil.GlobalTimerPool.Put(d.timer) - case <-td.ctx.Done(): - timerutil.GlobalTimerPool.Put(d.timer) - return - } - case <-td.ctx.Done(): - return - } - } -} - func (td *tsoDispatcher) revokePendingRequests(err error) { for range len(td.tsoRequestCh) { req := <-td.tsoRequestCh @@ -378,14 +332,11 @@ tsoBatchLoop: } } - done := make(chan struct{}) - dl := newTSDeadline(option.Timeout, done, cancel) - select { - case <-ctx.Done(): + done := td.deadlineWatcher.Start(ctx, option.Timeout, cancel) + if done == nil { // Finish the collected requests if the context is canceled. td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(ctx.Err())) return - case td.tsDeadlineCh <- dl: } // processRequests guarantees that the collected requests could be finished properly. err = td.processRequests(stream, tsoBatchController, done) diff --git a/client/errs/errno.go b/client/errs/errno.go index 25665f01017..99a426d0776 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -56,7 +56,6 @@ var ( ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient")) ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) - ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) diff --git a/client/pkg/deadline/watcher.go b/client/pkg/deadline/watcher.go new file mode 100644 index 00000000000..b40857edbfd --- /dev/null +++ b/client/pkg/deadline/watcher.go @@ -0,0 +1,111 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deadline + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/pingcap/log" + + "github.com/tikv/pd/client/pkg/utils/timerutil" +) + +// The `cancel` function will be invoked once the specified `timeout` elapses without receiving a `done` signal. +type deadline struct { + timer *time.Timer + done chan struct{} + cancel context.CancelFunc +} + +// Watcher is used to watch and manage the deadlines. +type Watcher struct { + ctx context.Context + source string + Ch chan *deadline +} + +// NewWatcher is used to create a new deadline watcher. +func NewWatcher(ctx context.Context, capacity int, source string) *Watcher { + watcher := &Watcher{ + ctx: ctx, + source: source, + Ch: make(chan *deadline, capacity), + } + go watcher.Watch() + return watcher +} + +// Watch is used to watch the deadlines and invoke the `cancel` function when the deadline is reached. +// The `err` will be returned if the deadline is reached. +func (w *Watcher) Watch() { + log.Info("[pd] start the deadline watcher", zap.String("source", w.source)) + defer log.Info("[pd] exit the deadline watcher", zap.String("source", w.source)) + for { + select { + case d := <-w.Ch: + select { + case <-d.timer.C: + log.Error("[pd] the deadline is reached", zap.String("source", w.source)) + d.cancel() + timerutil.GlobalTimerPool.Put(d.timer) + case <-d.done: + timerutil.GlobalTimerPool.Put(d.timer) + case <-w.ctx.Done(): + timerutil.GlobalTimerPool.Put(d.timer) + return + } + case <-w.ctx.Done(): + return + } + } +} + +// Start is used to start a deadline. It returns a channel which will be closed when the deadline is reached. +// Returns nil if the deadline is not started. +func (w *Watcher) Start( + ctx context.Context, + timeout time.Duration, + cancel context.CancelFunc, +) chan struct{} { + // Check if the watcher is already canceled. + select { + case <-w.ctx.Done(): + return nil + case <-ctx.Done(): + return nil + default: + } + // Initialize the deadline. + timer := timerutil.GlobalTimerPool.Get(timeout) + d := &deadline{ + timer: timer, + done: make(chan struct{}), + cancel: cancel, + } + // Send the deadline to the watcher. + select { + case <-w.ctx.Done(): + timerutil.GlobalTimerPool.Put(timer) + return nil + case <-ctx.Done(): + timerutil.GlobalTimerPool.Put(timer) + return nil + case w.Ch <- d: + return d.done + } +} diff --git a/client/pkg/deadline/watcher_test.go b/client/pkg/deadline/watcher_test.go new file mode 100644 index 00000000000..b93987b8874 --- /dev/null +++ b/client/pkg/deadline/watcher_test.go @@ -0,0 +1,58 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deadline + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWatcher(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watcher := NewWatcher(ctx, 10, "test") + var deadlineReached atomic.Bool + done := watcher.Start(ctx, time.Millisecond, func() { + deadlineReached.Store(true) + }) + re.NotNil(done) + time.Sleep(5 * time.Millisecond) + re.True(deadlineReached.Load()) + + deadlineReached.Store(false) + done = watcher.Start(ctx, 500*time.Millisecond, func() { + deadlineReached.Store(true) + }) + re.NotNil(done) + done <- struct{}{} + time.Sleep(time.Second) + re.False(deadlineReached.Load()) + + deadCtx, deadCancel := context.WithCancel(ctx) + deadCancel() + deadlineReached.Store(false) + done = watcher.Start(deadCtx, time.Millisecond, func() { + deadlineReached.Store(true) + }) + re.Nil(done) + time.Sleep(5 * time.Millisecond) + re.False(deadlineReached.Load()) +} diff --git a/errors.toml b/errors.toml index 2ab3b014f5a..9980a98ab14 100644 --- a/errors.toml +++ b/errors.toml @@ -131,11 +131,6 @@ error = ''' get TSO failed ''' -["PD:client:ErrClientGetTSOTimeout"] -error = ''' -get TSO timeout -''' - ["PD:cluster:ErrInvalidStoreID"] error = ''' invalid store id %d, not found diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index ee24b4d0673..834bf4f824e 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -144,7 +144,6 @@ var ( // client errors var ( ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) - ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) ErrClientGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrClientGetTSO")) ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) From c2d48542d8c4c1d1a42ebc0b2449993080d57d29 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 2 Jan 2025 11:20:24 +0800 Subject: [PATCH 3/6] *: format log fields using kebab-case style (#8956) ref tikv/pd#4322 Signed-off-by: Ryan Leung --- pkg/core/region.go | 4 ++-- pkg/encryption/key_manager.go | 2 +- pkg/keyspace/keyspace.go | 2 +- .../resourcemanager/server/token_buckets.go | 2 +- pkg/memory/meminfo.go | 4 ++-- pkg/schedule/config/store_config.go | 2 +- pkg/schedule/schedulers/grant_hot_region.go | 2 +- pkg/tso/global_allocator.go | 2 +- pkg/tso/keyspace_group_manager.go | 22 +++++++++---------- server/grpc_service.go | 2 +- server/server.go | 2 +- server/util.go | 2 +- 12 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 5f5a4a5f2e0..706e6bbd712 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1885,7 +1885,7 @@ func scanRegion(regionTree *regionTree, keyRange *KeyRange, limit int, outputMus keyRange.StartKey, keyRange.EndKey, lastRegion.GetStartKey(), lastRegion.GetEndKey(), region.GetStartKey(), region.GetEndKey()) - log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange", + log.Warn("scan regions failed", zap.Bool("contain-all-key-range", outputMustContainAllKeyRange), zap.Error(err)) if outputMustContainAllKeyRange { return false @@ -1907,7 +1907,7 @@ func scanRegion(regionTree *regionTree, keyRange *KeyRange, limit int, outputMus keyRange.StartKey, keyRange.EndKey, lastRegion.GetStartKey(), lastRegion.GetEndKey(), lastRegion.GetEndKey(), keyRange.EndKey) - log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange", + log.Warn("scan regions failed", zap.Bool("contain-all-key-range", outputMustContainAllKeyRange), zap.Error(err)) if outputMustContainAllKeyRange { return nil, err diff --git a/pkg/encryption/key_manager.go b/pkg/encryption/key_manager.go index 54e5fa01b35..5fc6788c549 100644 --- a/pkg/encryption/key_manager.go +++ b/pkg/encryption/key_manager.go @@ -413,7 +413,7 @@ func (m *Manager) rotateKeyIfNeeded(forceUpdate bool) error { keys.Keys[keyID] = key keys.CurrentKeyId = keyID rotated = true - log.Info("ready to create or rotate data encryption key", zap.Uint64("keyID", keyID)) + log.Info("ready to create or rotate data encryption key", zap.Uint64("key-id", keyID)) break } // Duplicated key id. retry. diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index c9e390df47a..93312ae4ff1 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -593,7 +593,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key return nil, err } log.Info("[keyspace] keyspace state updated", - zap.Uint32("ID", meta.GetId()), + zap.Uint32("id", meta.GetId()), zap.String("keyspace-id", meta.GetName()), zap.String("new-state", newState.String()), ) diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index 50dc78c9d68..be32884e1ea 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -174,7 +174,7 @@ func (gts *GroupTokenBucketState) balanceSlotTokens( if time.Since(slot.lastReqTime) >= slotExpireTimeout { delete(gts.tokenSlots, clientUniqueID) log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime), - zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots))) + zap.Duration("expire-timeout", slotExpireTimeout), zap.Uint64("del-client-id", clientUniqueID), zap.Int("len", len(gts.tokenSlots))) } } } diff --git a/pkg/memory/meminfo.go b/pkg/memory/meminfo.go index 7ed1afb579b..9ce9ff9b886 100644 --- a/pkg/memory/meminfo.go +++ b/pkg/memory/meminfo.go @@ -210,9 +210,9 @@ func InitMemoryHook() { MemTotal = MemTotalCGroup MemUsed = MemUsedCGroup sysutil.RegisterGetMemoryCapacity(MemTotalCGroup) - log.Info("use cgroup memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue))) + log.Info("use cgroup memory hook", zap.Int64("cgroup-memory-size", int64(cgroupValue)), zap.Int64("physical-memory-size", int64(physicalValue))) } else { - log.Info("use physical memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue))) + log.Info("use physical memory hook", zap.Int64("cgroup-memory-size", int64(cgroupValue)), zap.Int64("physical-memory-size", int64(physicalValue))) } _, err = MemTotal() mustNil(err) diff --git a/pkg/schedule/config/store_config.go b/pkg/schedule/config/store_config.go index 5575f0d9d56..cbf085d93fb 100644 --- a/pkg/schedule/config/store_config.go +++ b/pkg/schedule/config/store_config.go @@ -190,7 +190,7 @@ func (c *StoreConfig) CheckRegionKeys(keys, mergeKeys uint64) error { } if smallKeys := keys % c.GetRegionSplitKeys(); smallKeys <= mergeKeys && smallKeys > 0 { - log.Debug("region keys is too small", zap.Uint64("keys", keys), zap.Uint64("merge-keys", mergeKeys), zap.Uint64("smallSize", smallKeys)) + log.Debug("region keys is too small", zap.Uint64("keys", keys), zap.Uint64("merge-keys", mergeKeys), zap.Uint64("small-keys", smallKeys)) return errs.ErrCheckerMergeAgain.FastGenByArgs("the smallest region of the split regions is less than max-merge-region-keys") } return nil diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 79be126f1d4..005e6b4182a 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -262,7 +262,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster sche.SchedulerCluster, op, err := s.transfer(cluster, peer.RegionID, srcStoreID, isLeader) if err != nil { log.Debug("fail to create grant hot region operator", zap.Uint64("region-id", peer.RegionID), - zap.Uint64("src store id", srcStoreID), errs.ZapError(err)) + zap.Uint64("src-store-id", srcStoreID), errs.ZapError(err)) continue } return []*operator.Operator{op} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index d6bf27878a7..553e0b0effd 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -212,7 +212,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { zap.String("server-name", gta.member.Name()), zap.String("expected-primary-id", expectedPrimary), zap.Uint64("member-id", gta.member.ID()), - zap.String("cur-memberValue", gta.member.MemberValue())) + zap.String("cur-member-value", gta.member.MemberValue())) time.Sleep(200 * time.Millisecond) continue } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d22d284e1be..149c68029be 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1297,7 +1297,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget log.Info("start to merge the keyspace group", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList)) + zap.Uint32s("merge-list", mergeList)) defer logutil.LogPanic() defer kgm.wg.Done() @@ -1316,7 +1316,7 @@ mergeLoop: log.Info("merging checker is closed", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList)) + zap.Uint32s("merge-list", mergeList)) return case <-checkTicker.C: } @@ -1326,7 +1326,7 @@ mergeLoop: log.Warn("unable to get the merge target allocator manager", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("keyspace-group-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Error(err)) continue } @@ -1336,7 +1336,7 @@ mergeLoop: log.Debug("current tso node is not the merge target primary", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList)) + zap.Uint32s("merge-list", mergeList)) continue } // Check if the keyspace group primaries in the merge map are all gone. @@ -1351,7 +1351,7 @@ mergeLoop: log.Error("failed to check if the keyspace group primary in the merge list has gone", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Uint32("merge-id", id), zap.Any("remaining", mergeMap), zap.Error(err)) @@ -1370,7 +1370,7 @@ mergeLoop: "start to calculate the newly merged TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList)) + zap.Uint32s("merge-list", mergeList)) // All the keyspace group primaries in the merge list are gone, // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time @@ -1380,7 +1380,7 @@ mergeLoop: log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Uint32("merge-id", id), zap.Time("ts", ts), zap.Error(err)) @@ -1396,7 +1396,7 @@ mergeLoop: log.Info("start to set the newly merged TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Time("merged-ts", mergedTS)) err = am.GetAllocator().SetTSO( tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), @@ -1405,7 +1405,7 @@ mergeLoop: log.Error("failed to update the newly merged TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Time("merged-ts", mergedTS), zap.Error(err)) continue @@ -1417,7 +1417,7 @@ mergeLoop: log.Error("failed to finish the merge", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Error(err)) continue } @@ -1425,7 +1425,7 @@ mergeLoop: log.Info("finished merging keyspace group", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), + zap.Uint32s("merge-list", mergeList), zap.Time("merged-ts", mergedTS)) return } diff --git a/server/grpc_service.go b/server/grpc_service.go index 398325cd30a..8db79d3b8f5 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2803,7 +2803,7 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo } log.Debug("updated min resolved-ts", zap.Uint64("store", storeID), - zap.Uint64("min resolved-ts", minResolvedTS)) + zap.Uint64("min-resolved-ts", minResolvedTS)) return &pdpb.ReportMinResolvedTsResponse{ Header: wrapHeader(), }, nil diff --git a/server/server.go b/server/server.go index 3f397da4d0b..4cae86b6587 100644 --- a/server/server.go +++ b/server/server.go @@ -2042,7 +2042,7 @@ func (s *Server) GetExternalTS() uint64 { func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 { desc := "the external timestamp should not be larger than global ts" - log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS)) + log.Error(desc, zap.Uint64("request-timestamp", externalTS), zap.Uint64("global-ts", globalTS)) return errors.New(desc) } c := s.GetRaftCluster() diff --git a/server/util.go b/server/util.go index 1764e4e9850..2f05c06b8f5 100644 --- a/server/util.go +++ b/server/util.go @@ -54,7 +54,7 @@ func CheckPDVersionWithClusterVersion(opt *config.PersistOptions) { if pdVersion.LessThan(clusterVersion) { log.Warn( "PD version less than cluster version, please upgrade PD", - zap.String("PD-version", pdVersion.String()), + zap.String("pd-version", pdVersion.String()), zap.String("cluster-version", clusterVersion.String())) } } From 0bfa31f9697dd7ce5a495f235eeb9eedaca67921 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 2 Jan 2025 15:23:21 +0800 Subject: [PATCH 4/6] cluster: fix panic when minResolvedTS is not initialized (#8965) close tikv/pd#8964 Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 699b43e7901..d2f3855d14e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2283,7 +2283,8 @@ func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) { newMinResolvedTS = s.GetMinResolvedTS() } } - oldMinResolvedTS := c.minResolvedTS.Load().(uint64) + // Avoid panic when minResolvedTS is not initialized. + oldMinResolvedTS, _ := c.minResolvedTS.Load().(uint64) if newMinResolvedTS == math.MaxUint64 || newMinResolvedTS <= oldMinResolvedTS { return oldMinResolvedTS, false } From 7a30ebc972ffa2f0f76e6f99f9c55a68f975669e Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 2 Jan 2025 15:35:42 +0800 Subject: [PATCH 5/6] server: advance ServerStart check (#8951) close tikv/pd#8950 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/clients/tso/client.go | 3 +++ client/go.mod | 2 +- client/go.sum | 8 ++------ go.mod | 2 +- go.sum | 5 ++--- server/grpc_service.go | 15 ++++++++++---- server/server.go | 1 + tests/integrations/client/client_test.go | 26 ++++++++++++++++++++++++ tests/integrations/go.mod | 2 +- tests/integrations/go.sum | 7 ++----- tools/go.mod | 2 +- tools/go.sum | 7 ++----- 12 files changed, 53 insertions(+), 27 deletions(-) diff --git a/client/clients/tso/client.go b/client/clients/tso/client.go index c6caa8b985f..d24dba52394 100644 --- a/client/clients/tso/client.go +++ b/client/clients/tso/client.go @@ -281,6 +281,9 @@ func (c *Cli) connectionCtxsUpdater() { // Because the TSO Follower Proxy is enabled, // the periodic check needs to be performed. setNewUpdateTicker(sd.MemberUpdateInterval) + failpoint.Inject("speedUpTsoDispatcherUpdateInterval", func() { + setNewUpdateTicker(10 * time.Millisecond) + }) } else if !enableTSOFollowerProxy && updateTicker.C != nil { // Because the TSO Follower Proxy is disabled, // the periodic check needs to be turned off. diff --git a/client/go.mod b/client/go.mod index b26abcbea55..78aef084ff7 100644 --- a/client/go.mod +++ b/client/go.mod @@ -9,7 +9,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c - github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 diff --git a/client/go.sum b/client/go.sum index 36c58efb823..4cca5ba3ad5 100644 --- a/client/go.sum +++ b/client/go.sum @@ -45,11 +45,10 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= @@ -69,7 +68,6 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= @@ -158,7 +156,6 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= @@ -167,7 +164,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/go.mod b/go.mod index 9c8a7bc90a5..c1107b6ffc5 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c - github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 diff --git a/go.sum b/go.sum index a2f070397d1..e6156e9ecf7 100644 --- a/go.sum +++ b/go.sum @@ -386,12 +386,11 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg= github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= diff --git a/server/grpc_service.go b/server/grpc_service.go index 8db79d3b8f5..d3fc5c58d7f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -325,6 +325,9 @@ func (s *GrpcServer) GetMinTS( // GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across // all keyspace groups. func (s *GrpcServer) GetMinTSFromTSOService() (*pdpb.Timestamp, error) { + if s.IsClosed() { + return nil, errs.ErrNotStarted + } addrs := s.keyspaceGroupManager.GetTSOServiceAddrs() if len(addrs) == 0 { return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered") @@ -536,6 +539,11 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return errors.WithStack(err) } + // TSO uses leader lease to determine validity. No need to check leader here. + if s.IsClosed() { + return errs.ErrNotStarted + } + forwardedHost := grpcutil.GetForwardedHost(stream.Context()) if !s.isLocalRequest(forwardedHost) { clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) @@ -570,10 +578,6 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { } start := time.Now() - // TSO uses leader lease to determine validity. No need to check leader here. - if s.IsClosed() { - return errs.ErrNotStarted - } if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID { return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId()) } @@ -710,6 +714,9 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho return nil, errs.ErrGRPCRateLimitExceeded(err) } } + if s.IsClosed() { + return nil, errs.ErrNotStarted + } // recovering mark is stored in etcd directly, there's no need to forward. marked, err := s.Server.IsSnapshotRecovering(ctx) if err != nil { diff --git a/server/server.go b/server/server.go index 4cae86b6587..94250128fe3 100644 --- a/server/server.go +++ b/server/server.go @@ -506,6 +506,7 @@ func (s *Server) startServer(ctx context.Context) error { s.grpcServiceRateLimiter.Update(service, ratelimit.InitLimiter()) } + failpoint.InjectCall("delayStartServer") // Server has started. atomic.StoreInt64(&s.isRunning, 1) bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index fadfb952e4c..2018860130e 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -270,6 +270,7 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() + re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/speedUpTsoDispatcherUpdateInterval", "return(true)")) err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, true) re.NoError(err) @@ -296,6 +297,31 @@ func TestTSOFollowerProxy(t *testing.T) { } wg.Wait() + followerServer := cluster.GetServer(cluster.GetFollower()) + re.NoError(followerServer.Stop()) + ch := make(chan struct{}) + re.NoError(failpoint.EnableCall("github.com/tikv/pd/server/delayStartServer", func() { + // Server is not in `Running` state, so the follower proxy should return + // error while create stream. + ch <- struct{}{} + })) + wg.Add(1) + go func() { + defer wg.Done() + re.NoError(followerServer.Run()) + }() + re.Eventually(func() bool { + _, _, err := cli2.GetTS(context.Background()) + if err == nil { + return false + } + return strings.Contains(err.Error(), "server not started") + }, 3*time.Second, 10*time.Millisecond) + <-ch + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/speedUpTsoDispatcherUpdateInterval")) + wg.Wait() + // Disable the follower proxy and check if the stream is updated. err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, false) re.NoError(err) diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index c5fcb617014..ec1d74923d6 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -13,7 +13,7 @@ require ( github.com/docker/go-units v0.5.0 github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c - github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 5a48549ef65..5f78324c3c2 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -379,12 +379,11 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg= github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= @@ -429,7 +428,6 @@ github.com/samber/lo v1.37.0 h1:XjVcB8g6tgUp8rsPsJ2CvhClfImrpL04YpQHXeHPhRw= github.com/samber/lo v1.37.0/go.mod h1:9vaz2O4o8oOnK23pd2TrXufcbdbJIa3b6cstBWKpopA= github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= github.com/shirou/gopsutil/v3 v3.23.3 h1:Syt5vVZXUDXPEXpIBt5ziWsJ4LdSAAxF4l/xZeQgSEE= github.com/shirou/gopsutil/v3 v3.23.3/go.mod h1:lSBNN6t3+D6W5e5nXTxc8KIMMVxAcS+6IJlffjRRlMU= @@ -748,7 +746,6 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/tools/go.mod b/tools/go.mod index ded2e2e82c8..31309986d92 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -21,7 +21,7 @@ require ( github.com/influxdata/tdigest v0.0.1 github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c - github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 diff --git a/tools/go.sum b/tools/go.sum index bd68a4f0ca1..b9c49d466ef 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -380,12 +380,11 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg= github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= -github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= @@ -432,7 +431,6 @@ github.com/samber/lo v1.37.0 h1:XjVcB8g6tgUp8rsPsJ2CvhClfImrpL04YpQHXeHPhRw= github.com/samber/lo v1.37.0/go.mod h1:9vaz2O4o8oOnK23pd2TrXufcbdbJIa3b6cstBWKpopA= github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= github.com/shirou/gopsutil/v3 v3.23.3 h1:Syt5vVZXUDXPEXpIBt5ziWsJ4LdSAAxF4l/xZeQgSEE= github.com/shirou/gopsutil/v3 v3.23.3/go.mod h1:lSBNN6t3+D6W5e5nXTxc8KIMMVxAcS+6IJlffjRRlMU= @@ -759,7 +757,6 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From 41919ad57acca17c6e2c24a19a1c0185c355e469 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 2 Jan 2025 16:46:51 +0800 Subject: [PATCH 6/6] test: make TestPreparingProgress stable (#8966) close tikv/pd#8693 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/server/api/api_test.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index faa22ce08f4..44a0ae69a46 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -1092,13 +1092,24 @@ func TestPreparingProgress(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } -func sendRequest(re *require.Assertions, url string, method string, statusCode int) []byte { +func sendRequest(re *require.Assertions, url string, method string, statusCode int) (output []byte) { req, _ := http.NewRequest(method, url, http.NoBody) - resp, err := tests.TestDialClient.Do(req) - re.NoError(err) - re.Equal(statusCode, resp.StatusCode) - output, err := io.ReadAll(resp.Body) - re.NoError(err) - resp.Body.Close() + + testutil.Eventually(re, func() bool { + resp, err := tests.TestDialClient.Do(req) + re.NoError(err) + defer resp.Body.Close() + + // Due to service unavailability caused by environmental issues, + // we will retry it. + if resp.StatusCode == http.StatusServiceUnavailable { + return false + } + re.Equal(statusCode, resp.StatusCode) + output, err = io.ReadAll(resp.Body) + re.NoError(err) + return true + }) + return output }