Skip to content

Commit

Permalink
br: add retry for raw kv client put (#58963)
Browse files Browse the repository at this point in the history
close #58845
  • Loading branch information
Tristan1900 authored Jan 21, 2025
1 parent c34a6b6 commit 3a378c8
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
2 changes: 2 additions & 0 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_test(
"//br/pkg/mock",
"//br/pkg/restore",
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/internal/rawkv",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
Expand Down Expand Up @@ -132,6 +133,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//rawkv",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
Expand Down
12 changes: 11 additions & 1 deletion br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ func (rc *LogClient) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -2053,3 +2053,13 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore(

return eg.Wait()
}

func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
68 changes: 68 additions & 0 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv"
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/utils"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/util/sqlexec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -1986,3 +1988,69 @@ func fakeRowKey(tableID, rowID int64) kv.Key {
func fakeRowRawKey(tableID, rowID int64) kv.Key {
return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := rawclient.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
15 changes: 15 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
recoveryMaxAttempts = 16
recoveryDelayTime = 30 * time.Second
recoveryMaxDelayTime = 4 * time.Minute

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// BackoffStrategy implements a backoff strategy for retry operations.
Expand Down Expand Up @@ -379,6 +383,17 @@ func NewChecksumBackoffStrategy() BackoffStrategy {
)
}

func NewRawClientBackoffStrategy() BackoffStrategy {
return NewBackoffStrategy(
WithRemainingAttempts(rawClientMaxAttempts),
WithDelayTime(rawClientDelayTime),
WithMaxDelayTime(rawClientMaxDelayTime),
WithErrorContext(NewZeroRetryContext("raw client")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration {
errs := multierr.Errors(err)
lastErr := errs[len(errs)-1]
Expand Down

0 comments on commit 3a378c8

Please sign in to comment.