Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: separate the TSO client implementation #8848

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix the failpoints
Signed-off-by: JmPotato <[email protected]>
JmPotato committed Nov 25, 2024

Verified

This commit was signed with the committer’s verified signature.
JmPotato JmPotato
commit 88e113efa7ca32e8a9e322f3cdba960e63309503
24 changes: 12 additions & 12 deletions client/clients/tso/tso_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -272,11 +272,11 @@ func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) {
}

func (s *testTSODispatcherSuite) TestConcurrentRPC() {
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherAlwaysCheckConcurrency", "return"))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherAlwaysCheckConcurrency"))
}()

s.testStaticConcurrencyImpl(1)
@@ -289,11 +289,11 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() {
ctx := context.Background()
s.option.SetTSOClientRPCConcurrency(2)

s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoStreamSimulateEstimatedRPCLatency"))
}()

// Make sure concurrency option takes effect.
@@ -302,23 +302,23 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() {
s.reqMustReady(req)

// Trigger the check.
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration"))
}()
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

// Try other concurrency.
s.option.SetTSOClientRPCConcurrency(3)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

s.option.SetTSOClientRPCConcurrency(4)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)
16 changes: 8 additions & 8 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
@@ -404,10 +404,10 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
go func() {
defer wg.Done()
leader := cluster.GetLeaderServer()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
leader.Stop()
re.NotEmpty(cluster.WaitLeader())
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
leaderReadyTime = time.Now()
}()
wg.Wait()
@@ -519,7 +519,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1(
cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
@@ -532,7 +532,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1(
})

lastTS = checkTS(re, cli, lastTS)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
time.Sleep(2 * time.Second)
checkTS(re, cli, lastTS)

@@ -554,7 +554,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2(
cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
@@ -571,7 +571,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2(
re.NotEmpty(suite.cluster.WaitLeader())
lastTS = checkTS(re, cli, lastTS)

re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/unreachableNetwork"))
time.Sleep(5 * time.Second)
checkTS(re, cli, lastTS)
}
@@ -783,7 +783,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() {
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/shortDispatcherChannel", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/shortDispatcherChannel", "return(true)"))

cli := setupCli(ctx, re, suite.endpoints)

@@ -820,7 +820,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() {
wg2.Wait()
wg3.Wait()
re.Less(time.Since(start), time.Second*2)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/shortDispatcherChannel"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/shortDispatcherChannel"))
}

func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 {
4 changes: 2 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
@@ -448,7 +448,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {

func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/clients/tso/delayDispatchTSORequest", "return(true)"))
var (
stopSignal atomic.Bool
wg sync.WaitGroup
@@ -481,7 +481,7 @@ func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {
}
stopSignal.Store(true)
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/clients/tso/delayDispatchTSORequest"))
}

// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.

Unchanged files with check annotations Beta

// FetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly
// before calling this function.
func (bc *Controller[T]) FetchRequestsWithTimer(ctx context.Context, requestCh <-chan T, timer *time.Timer) error {

Check warning on line 174 in client/batch/batch_controller.go

Codecov / codecov/patch

client/batch/batch_controller.go#L174

Added line #L174 was not covered by tests
batchingLoop:
for bc.collectedRequestCount < bc.maxBatchSize {
select {
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture {

Check warning on line 573 in client/client.go

Codecov / codecov/patch

client/client.go#L573

Added line #L573 was not covered by tests
return c.GetTSAsync(ctx)
}
func (td *tsoDispatcher) revokePendingRequests(err error) {
for range len(td.tsoRequestCh) {
req := <-td.tsoRequestCh
req.TryDone(err)

Check warning on line 179 in client/clients/tso/tso_dispatcher.go

Codecov / codecov/patch

client/clients/tso/tso_dispatcher.go#L179

Added line #L179 was not covered by tests
}
}
}
batchingTimer.Reset(remainingBatchTime)
err = tsoBatchController.FetchRequestsWithTimer(ctx, td.tsoRequestCh, batchingTimer)

Check warning on line 386 in client/clients/tso/tso_dispatcher.go

Codecov / codecov/patch

client/clients/tso/tso_dispatcher.go#L386

Added line #L386 was not covered by tests
if err != nil {
// There should not be other kinds of errors.
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
}
// NewRequestFastFail creates a new fast fail TSO request.
func NewRequestFastFail(err error) *tsoRequestFastFail {

Check warning on line 115 in client/clients/tso/tso_request.go

Codecov / codecov/patch

client/clients/tso/tso_request.go#L115

Added line #L115 was not covered by tests
return &tsoRequestFastFail{err}
}
}
if err != nil {
if req == nil {
return tso.NewRequestFastFail(err)

Check warning on line 242 in client/inner_client.go

Codecov / codecov/patch

client/inner_client.go#L242

Added line #L242 was not covered by tests
}
req.TryDone(err)
}