Skip to content

Commit

Permalink
Introduce the deadline watcher
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 27, 2024
1 parent 8cd7233 commit d271fd9
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 61 deletions.
73 changes: 12 additions & 61 deletions client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,12 @@ import (
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/batch"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
"github.com/tikv/pd/client/pkg/deadline"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/timerutil"
"github.com/tikv/pd/client/pkg/utils/tsoutil"
sd "github.com/tikv/pd/client/servicediscovery"
)

// deadline is used to control the TS request timeout manually,
// it will be sent to the `tsDeadlineCh` to be handled by the `watchTSDeadline` goroutine.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

func newTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *deadline {
timer := timerutil.GlobalTimerPool.Get(timeout)
return &deadline{
timer: timer,
done: done,
cancel: cancel,
}
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
Expand All @@ -86,10 +65,10 @@ type tsoDispatcher struct {
ctx context.Context
cancel context.CancelFunc

provider tsoServiceProvider
tsoRequestCh chan *Request
tsDeadlineCh chan *deadline
latestTSOInfo atomic.Pointer[tsoInfo]
provider tsoServiceProvider
tsoRequestCh chan *Request
deadlineWatcher *deadline.Watcher
latestTSOInfo atomic.Pointer[tsoInfo]
// For reusing `*batchController` objects
batchBufferPool *sync.Pool

Expand Down Expand Up @@ -119,11 +98,11 @@ func newTSODispatcher(
tokenCh := make(chan struct{}, tokenChCapacity)

td := &tsoDispatcher{
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
tsDeadlineCh: make(chan *deadline, tokenChCapacity),
ctx: dispatcherCtx,
cancel: dispatcherCancel,
provider: provider,
tsoRequestCh: tsoRequestCh,
deadlineWatcher: deadline.NewWatcher(dispatcherCtx, tokenChCapacity, "tso"),
batchBufferPool: &sync.Pool{
New: func() any {
return batch.NewController[*Request](
Expand All @@ -135,34 +114,9 @@ func newTSODispatcher(
},
tokenCh: tokenCh,
}
go td.watchTSDeadline()
return td
}

func (td *tsoDispatcher) watchTSDeadline() {
log.Info("[tso] start tso deadline watcher")
defer log.Info("[tso] exit tso deadline watcher")
for {
select {
case d := <-td.tsDeadlineCh:
select {
case <-d.timer.C:
log.Error("[tso] tso request is canceled due to timeout",
errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-td.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-td.ctx.Done():
return
}
}
}

func (td *tsoDispatcher) revokePendingRequests(err error) {
for range len(td.tsoRequestCh) {
req := <-td.tsoRequestCh
Expand Down Expand Up @@ -378,14 +332,11 @@ tsoBatchLoop:
}
}

done := make(chan struct{})
dl := newTSDeadline(option.Timeout, done, cancel)
select {
case <-ctx.Done():
done := td.deadlineWatcher.Start(ctx, option.Timeout, cancel)
if done == nil {
// Finish the collected requests if the context is canceled.
td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(ctx.Err()))
return
case td.tsDeadlineCh <- dl:
}
// processRequests guarantees that the collected requests could be finished properly.
err = td.processRequests(stream, tsoBatchController, done)
Expand Down
99 changes: 99 additions & 0 deletions client/pkg/deadline/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/client/pkg/utils/timerutil"
"go.uber.org/zap"
)

// The `cancel` function will be invoked once the specified `timeout` elapses without receiving a `done` signal.
type deadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// Watcher is used to watch and manage the deadlines.
type Watcher struct {
ctx context.Context
source string
Ch chan *deadline
}

// NewWatcher is used to create a new deadline watcher.
func NewWatcher(ctx context.Context, capacity int, source string) *Watcher {
watcher := &Watcher{
ctx: ctx,
source: source,
Ch: make(chan *deadline, capacity),
}
go watcher.Watch()
return watcher
}

// Watch is used to watch the deadlines and invoke the `cancel` function when the deadline is reached.
// The `err` will be returned if the deadline is reached.
func (w *Watcher) Watch() {
log.Info("[pd] start the deadline watcher", zap.String("source", w.source))
defer log.Info("[pd] exit the deadline watcher", zap.String("source", w.source))
for {
select {
case d := <-w.Ch:
select {
case <-d.timer.C:
log.Error("[pd] the deadline is reached", zap.String("source", w.source))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-w.ctx.Done():
return
}
}
}

// Start is used to start a deadline. It returns a channel which will be closed when the deadline is reached.
// Returns nil if the deadline is not started.
func (w *Watcher) Start(
ctx context.Context,
timeout time.Duration,
cancel context.CancelFunc,
) chan struct{} {
timer := timerutil.GlobalTimerPool.Get(timeout)
d := &deadline{
timer: timer,
done: make(chan struct{}),
cancel: cancel,
}
select {
case <-w.ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil
case <-ctx.Done():
timerutil.GlobalTimerPool.Put(timer)
return nil
case w.Ch <- d:
return d.done
}
}
57 changes: 57 additions & 0 deletions client/pkg/deadline/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deadline

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestWatcher(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher := NewWatcher(ctx, 10, "test")
deadlineReached := false
done := watcher.Start(ctx, time.Millisecond, func() {
deadlineReached = true
})
re.NotNil(done)
time.Sleep(5 * time.Millisecond)
re.True(deadlineReached)

deadlineReached = false
done = watcher.Start(ctx, 500*time.Millisecond, func() {
deadlineReached = true
})
re.NotNil(done)
done <- struct{}{}
time.Sleep(time.Second)
re.False(deadlineReached)

deadCtx, deadCancel := context.WithCancel(ctx)
deadCancel()
deadlineReached = false
done = watcher.Start(deadCtx, time.Millisecond, func() {
deadlineReached = true
})
re.Nil(done)
time.Sleep(5 * time.Millisecond)
re.False(deadlineReached)
}

0 comments on commit d271fd9

Please sign in to comment.