Skip to content

Commit

Permalink
Merge branch 'master' into retry-init-tso-client
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 6, 2025
2 parents 81261bd + 41919ad commit eba00d7
Show file tree
Hide file tree
Showing 31 changed files with 498 additions and 127 deletions.
3 changes: 3 additions & 0 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ func (c *Cli) connectionCtxsUpdater() {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(sd.MemberUpdateInterval)
failpoint.Inject("speedUpTsoDispatcherUpdateInterval", func() {
setNewUpdateTicker(10 * time.Millisecond)
})
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
Expand Down
73 changes: 12 additions & 61 deletions client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,12 @@ import (
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/batch"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
"github.com/tikv/pd/client/pkg/deadline"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/timerutil"
"github.com/tikv/pd/client/pkg/utils/tsoutil"
sd "github.com/tikv/pd/client/servicediscovery"
)

// deadline is used to control the TS request timeout manually,
// it will be sent to the `tsDeadlineCh` to be handled by the `watchTSDeadline` goroutine.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

func newTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *deadline {
timer := timerutil.GlobalTimerPool.Get(timeout)
return &deadline{
timer: timer,
done: done,
cancel: cancel,
}
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
Expand All @@ -86,10 +65,10 @@ type tsoDispatcher struct {
ctx context.Context
cancel context.CancelFunc

provider tsoServiceProvider
tsoRequestCh chan *Request
tsDeadlineCh chan *deadline
latestTSOInfo atomic.Pointer[tsoInfo]
provider tsoServiceProvider
tsoRequestCh chan *Request
deadlineWatcher *deadline.Watcher
latestTSOInfo atomic.Pointer[tsoInfo]
// For reusing `*batchController` objects
batchBufferPool *sync.Pool

Expand Down Expand Up @@ -119,11 +98,11 @@ func newTSODispatcher(
tokenCh := make(chan struct{}, tokenChCapacity)

td := &tsoDispatcher{
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
tsDeadlineCh: make(chan *deadline, tokenChCapacity),
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
deadlineWatcher: deadline.NewWatcher(dispatcherCtx, tokenChCapacity, "tso"),
batchBufferPool: &sync.Pool{
New: func() any {
return batch.NewController[*Request](
Expand All @@ -135,34 +114,9 @@ func newTSODispatcher(
},
tokenCh: tokenCh,
}
go td.watchTSDeadline()
return td
}

func (td *tsoDispatcher) watchTSDeadline() {
log.Info("[tso] start tso deadline watcher")
defer log.Info("[tso] exit tso deadline watcher")
for {
select {
case d := <-td.tsDeadlineCh:
select {
case <-d.timer.C:
log.Error("[tso] tso request is canceled due to timeout",
errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-td.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-td.ctx.Done():
return
}
}
}

func (td *tsoDispatcher) revokePendingRequests(err error) {
for range len(td.tsoRequestCh) {
req := <-td.tsoRequestCh
Expand Down Expand Up @@ -378,14 +332,11 @@ tsoBatchLoop:
}
}

done := make(chan struct{})
dl := newTSDeadline(option.Timeout, done, cancel)
select {
case <-ctx.Done():
done := td.deadlineWatcher.Start(ctx, option.Timeout, cancel)
if done == nil {
// Finish the collected requests if the context is canceled.
td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(ctx.Err()))
return
case td.tsDeadlineCh <- dl:
}
// processRequests guarantees that the collected requests could be finished properly.
err = td.processRequests(stream, tsoBatchController, done)
Expand Down
1 change: 0 additions & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ var (
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.20.5
Expand Down
8 changes: 2 additions & 6 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand All @@ -69,7 +68,6 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
Expand Down Expand Up @@ -158,7 +156,6 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
Expand All @@ -167,7 +164,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
Expand Down
111 changes: 111 additions & 0 deletions client/pkg/deadline/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"time"

"go.uber.org/zap"

"github.com/pingcap/log"

"github.com/tikv/pd/client/pkg/utils/timerutil"
)

// The `cancel` function will be invoked once the specified `timeout` elapses without receiving a `done` signal.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// Watcher is used to watch and manage the deadlines.
type Watcher struct {
ctx context.Context
source string
Ch chan *deadline
}

// NewWatcher is used to create a new deadline watcher.
func NewWatcher(ctx context.Context, capacity int, source string) *Watcher {
watcher := &Watcher{
ctx: ctx,
source: source,
Ch: make(chan *deadline, capacity),
}
go watcher.Watch()
return watcher
}

// Watch is used to watch the deadlines and invoke the `cancel` function when the deadline is reached.
// The `err` will be returned if the deadline is reached.
func (w *Watcher) Watch() {
log.Info("[pd] start the deadline watcher", zap.String("source", w.source))
defer log.Info("[pd] exit the deadline watcher", zap.String("source", w.source))
for {
select {
case d := <-w.Ch:
select {
case <-d.timer.C:
log.Error("[pd] the deadline is reached", zap.String("source", w.source))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-w.ctx.Done():
return
}
}
}

// Start is used to start a deadline. It returns a channel which will be closed when the deadline is reached.
// Returns nil if the deadline is not started.
func (w *Watcher) Start(
ctx context.Context,
timeout time.Duration,
cancel context.CancelFunc,
) chan struct{} {
// Check if the watcher is already canceled.
select {
case <-w.ctx.Done():
return nil
case <-ctx.Done():
return nil
default:
}
// Initialize the deadline.
timer := timerutil.GlobalTimerPool.Get(timeout)
d := &deadline{
timer: timer,
done: make(chan struct{}),
cancel: cancel,
}
// Send the deadline to the watcher.
select {
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil
case <-ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil
case w.Ch <- d:
return d.done
}
}
58 changes: 58 additions & 0 deletions client/pkg/deadline/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestWatcher(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher := NewWatcher(ctx, 10, "test")
var deadlineReached atomic.Bool
done := watcher.Start(ctx, time.Millisecond, func() {
deadlineReached.Store(true)
})
re.NotNil(done)
time.Sleep(5 * time.Millisecond)
re.True(deadlineReached.Load())

deadlineReached.Store(false)
done = watcher.Start(ctx, 500*time.Millisecond, func() {
deadlineReached.Store(true)
})
re.NotNil(done)
done <- struct{}{}
time.Sleep(time.Second)
re.False(deadlineReached.Load())

deadCtx, deadCancel := context.WithCancel(ctx)
deadCancel()
deadlineReached.Store(false)
done = watcher.Start(deadCtx, time.Millisecond, func() {
deadlineReached.Store(true)
})
re.Nil(done)
time.Sleep(5 * time.Millisecond)
re.False(deadlineReached.Load())
}
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ error = '''
get TSO failed
'''

["PD:client:ErrClientGetTSOTimeout"]
error = '''
get TSO timeout
'''

["PD:cluster:ErrInvalidStoreID"]
error = '''
invalid store id %d, not found
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,11 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH
github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg=
github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
Expand Down
Loading

0 comments on commit eba00d7

Please sign in to comment.