Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api(cdc): fix create changefeed after scale-in pd #12003

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test
lidezhu committed Jan 15, 2025
commit c464c44db600989e3eb0fdd327e581686c0cd392
6 changes: 5 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
@@ -935,6 +935,10 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}
if err := c.BindJSON(cfg); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

// try to get pd client to get pd time, and determine synced status based on the pd time
var pdClient pd.Client
@@ -950,7 +954,7 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
49 changes: 37 additions & 12 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -1037,6 +1037,11 @@ func TestChangefeedSynced(t *testing.T) {
statusProvider.err = nil
statusProvider.changefeedInfo = cfInfo
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
require.Nil(t, err)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case3: pd is offline,resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1049,19 +1054,23 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+
"please recheck. Besides the data is not finish syncing", resp.Info)
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case4: pd is offline,resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1074,12 +1083,12 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+
@@ -1093,6 +1102,10 @@ func TestChangefeedSynced(t *testing.T) {
pdClient.logicTime = 1000
pdClient.timestamp = 1701153217279
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217209 << 18,
@@ -1104,18 +1117,22 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, true, resp.Synced)
require.Equal(t, "Data syncing is finished", resp.Info)
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
@@ -1127,12 +1144,12 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+
@@ -1144,6 +1161,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
@@ -1155,18 +1176,22 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&cfg)
// case8: pdTs - lastSyncedTs < 5min
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
@@ -1178,12 +1203,12 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)