Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/pkg: introduce the deadline watcher #8955

Merged
merged 3 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 12 additions & 61 deletions client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,12 @@ import (
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/batch"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
"github.com/tikv/pd/client/pkg/deadline"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/timerutil"
"github.com/tikv/pd/client/pkg/utils/tsoutil"
sd "github.com/tikv/pd/client/servicediscovery"
)

// deadline is used to control the TS request timeout manually,
// it will be sent to the `tsDeadlineCh` to be handled by the `watchTSDeadline` goroutine.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

func newTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *deadline {
timer := timerutil.GlobalTimerPool.Get(timeout)
return &deadline{
timer: timer,
done: done,
cancel: cancel,
}
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
Expand All @@ -86,10 +65,10 @@ type tsoDispatcher struct {
ctx context.Context
cancel context.CancelFunc

provider tsoServiceProvider
tsoRequestCh chan *Request
tsDeadlineCh chan *deadline
latestTSOInfo atomic.Pointer[tsoInfo]
provider tsoServiceProvider
tsoRequestCh chan *Request
deadlineWatcher *deadline.Watcher
latestTSOInfo atomic.Pointer[tsoInfo]
// For reusing `*batchController` objects
batchBufferPool *sync.Pool

Expand Down Expand Up @@ -119,11 +98,11 @@ func newTSODispatcher(
tokenCh := make(chan struct{}, tokenChCapacity)

td := &tsoDispatcher{
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
tsDeadlineCh: make(chan *deadline, tokenChCapacity),
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
deadlineWatcher: deadline.NewWatcher(dispatcherCtx, tokenChCapacity, "tso"),
batchBufferPool: &sync.Pool{
New: func() any {
return batch.NewController[*Request](
Expand All @@ -135,34 +114,9 @@ func newTSODispatcher(
},
tokenCh: tokenCh,
}
go td.watchTSDeadline()
return td
}

func (td *tsoDispatcher) watchTSDeadline() {
log.Info("[tso] start tso deadline watcher")
defer log.Info("[tso] exit tso deadline watcher")
for {
select {
case d := <-td.tsDeadlineCh:
select {
case <-d.timer.C:
log.Error("[tso] tso request is canceled due to timeout",
errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-td.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-td.ctx.Done():
return
}
}
}

func (td *tsoDispatcher) revokePendingRequests(err error) {
for range len(td.tsoRequestCh) {
req := <-td.tsoRequestCh
Expand Down Expand Up @@ -378,14 +332,11 @@ tsoBatchLoop:
}
}

done := make(chan struct{})
dl := newTSDeadline(option.Timeout, done, cancel)
select {
case <-ctx.Done():
done := td.deadlineWatcher.Start(ctx, option.Timeout, cancel)
if done == nil {
// Finish the collected requests if the context is canceled.
td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(ctx.Err()))
return
case td.tsDeadlineCh <- dl:
}
// processRequests guarantees that the collected requests could be finished properly.
err = td.processRequests(stream, tsoBatchController, done)
Expand Down
1 change: 0 additions & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ var (
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
Expand Down
111 changes: 111 additions & 0 deletions client/pkg/deadline/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"time"

"go.uber.org/zap"

"github.com/pingcap/log"

"github.com/tikv/pd/client/pkg/utils/timerutil"
)

// The `cancel` function will be invoked once the specified `timeout` elapses without receiving a `done` signal.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// Watcher is used to watch and manage the deadlines.
type Watcher struct {
ctx context.Context
source string
Ch chan *deadline
}

// NewWatcher is used to create a new deadline watcher.
func NewWatcher(ctx context.Context, capacity int, source string) *Watcher {
watcher := &Watcher{
ctx: ctx,
source: source,
Ch: make(chan *deadline, capacity),
}
go watcher.Watch()
return watcher
}

// Watch is used to watch the deadlines and invoke the `cancel` function when the deadline is reached.
// The `err` will be returned if the deadline is reached.
func (w *Watcher) Watch() {
log.Info("[pd] start the deadline watcher", zap.String("source", w.source))
defer log.Info("[pd] exit the deadline watcher", zap.String("source", w.source))
for {
select {
case d := <-w.Ch:
select {
case <-d.timer.C:
log.Error("[pd] the deadline is reached", zap.String("source", w.source))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-w.ctx.Done():
return
}
}
}

// Start is used to start a deadline. It returns a channel which will be closed when the deadline is reached.
// Returns nil if the deadline is not started.
func (w *Watcher) Start(
ctx context.Context,
timeout time.Duration,
cancel context.CancelFunc,
) chan struct{} {
// Check if the watcher is already canceled.
select {
case <-w.ctx.Done():
return nil

Check warning on line 88 in client/pkg/deadline/watcher.go

View check run for this annotation

Codecov / codecov/patch

client/pkg/deadline/watcher.go#L87-L88

Added lines #L87 - L88 were not covered by tests
case <-ctx.Done():
return nil
default:
}
// Initialize the deadline.
timer := timerutil.GlobalTimerPool.Get(timeout)
d := &deadline{
timer: timer,
done: make(chan struct{}),
cancel: cancel,
}
// Send the deadline to the watcher.
select {
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil
case <-ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil

Check warning on line 107 in client/pkg/deadline/watcher.go

View check run for this annotation

Codecov / codecov/patch

client/pkg/deadline/watcher.go#L102-L107

Added lines #L102 - L107 were not covered by tests
case w.Ch <- d:
return d.done
}
}
58 changes: 58 additions & 0 deletions client/pkg/deadline/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestWatcher(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher := NewWatcher(ctx, 10, "test")
var deadlineReached atomic.Bool
done := watcher.Start(ctx, time.Millisecond, func() {
deadlineReached.Store(true)
})
re.NotNil(done)
time.Sleep(5 * time.Millisecond)
re.True(deadlineReached.Load())

deadlineReached.Store(false)
done = watcher.Start(ctx, 500*time.Millisecond, func() {
deadlineReached.Store(true)
})
re.NotNil(done)
done <- struct{}{}
time.Sleep(time.Second)
re.False(deadlineReached.Load())

deadCtx, deadCancel := context.WithCancel(ctx)
deadCancel()
deadlineReached.Store(false)
done = watcher.Start(deadCtx, time.Millisecond, func() {
deadlineReached.Store(true)
})
re.Nil(done)
time.Sleep(5 * time.Millisecond)
re.False(deadlineReached.Load())
}
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ error = '''
get TSO failed
'''

["PD:client:ErrClientGetTSOTimeout"]
error = '''
get TSO timeout
'''

["PD:cluster:ErrInvalidStoreID"]
error = '''
invalid store id %d, not found
Expand Down
1 change: 0 additions & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ var (
// client errors
var (
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
Expand Down
Loading