From 9b94fe51c9c6d53d454c3bf0b9b8c0ffbd13fb11 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 14:58:14 +0800 Subject: [PATCH 1/7] fix create changefeed after scale-in pd --- cdc/api/v2/changefeed.go | 151 +++++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 70 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index d8793d77d9e..d5eed307149 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/kv" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" @@ -66,30 +67,47 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var pdClient pd.Client + var kvStorage kv.Storage + var etcdCli *clientv3.Client + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) - return - } - defer pdClient.Close() - - // verify tables todo: del kvstore - kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) - return + pdClient = up.PDClient + kvStorage = up.KVStorage + etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + var err error + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + defer pdClient.Close() + // verify tables todo: del kvstore + kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) + return + } + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + _ = c.Error(err) + return + } + etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) + if err != nil { + _ = c.Error(err) + return + } } provider := h.capture.StatusProvider() owner, err := h.capture.GetOwner() @@ -136,19 +154,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } - // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - _ = c.Error(err) - return - } - - cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) - if err != nil { - _ = c.Error(err) - return - } - err = hasRunningImport(ctx, cli) + err = hasRunningImport(ctx, etcdCli) if err != nil { log.Error("failed to create changefeed", zap.Error(err)) _ = c.Error( @@ -319,21 +325,25 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + ctx := c.Request.Context() + var kvStore tidbkv.Storage + // if PDAddrs is empty, use the default upstream if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - ctx := c.Request.Context() - kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(err) - return + kvStore = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + var err error + kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(errors.Trace(err)) + } } + uri, err := url.Parse(cfg.SinkURI) if err != nil { _ = c.Error(err) @@ -926,46 +936,47 @@ func (h *OpenAPIV2) synced(c *gin.Context) { } // try to get pd client to get pd time, and determine synced status based on the pd time + var pdClient pd.Client if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - // case 1. we can't get pd client, pd may be unavailable. - // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced - // otherwise, if pd is unavailable, we decide data whether is synced based on - // the time difference between current time and lastSyncedTs. - var message string - if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > - cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { - message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) - } else { - message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ - "If pd is offline, please check whether we satisfy the condition that "+ - "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ - "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return } - c.JSON(http.StatusOK, SyncedStatus{ - Synced: false, - SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), - PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), - LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), - NowTs: model.JSONTime(time.Unix(0, 0)), - Info: message, - }) - return + defer pdClient.Close() } - defer pdClient.Close() // get time from pd physicalNow, _, _ := pdClient.GetTS(ctx) From 2b6ad53cdac85f45abe441105d9406df7549ba82 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 15:53:49 +0800 Subject: [PATCH 2/7] fix test --- cdc/api/v2/changefeed.go | 1 + cdc/api/v2/changefeed_test.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index d5eed307149..e2cdec31011 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -341,6 +341,7 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) if err != nil { _ = c.Error(errors.Trace(err)) + return } } diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index b8295cf4011..050cbbc4af8 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) { // case 2: kv create failed updateCfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"} body, err := json.Marshal(&updateCfg) require.Nil(t, err) helpers.EXPECT(). From c464c44db600989e3eb0fdd327e581686c0cd392 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 16:10:38 +0800 Subject: [PATCH 3/7] fix test --- cdc/api/v2/changefeed.go | 6 ++++- cdc/api/v2/changefeed_test.go | 49 ++++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index e2cdec31011..a45be7aaf31 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -935,6 +935,10 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } // try to get pd client to get pd time, and determine synced status based on the pd time var pdClient pd.Client @@ -950,7 +954,7 @@ func (h *OpenAPIV2) synced(c *gin.Context) { timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) if err != nil { // case 1. we can't get pd client, pd may be unavailable. // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 050cbbc4af8..e6f9b61e71a 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -1037,6 +1037,11 @@ func TestChangefeedSynced(t *testing.T) { statusProvider.err = nil statusProvider.changefeedInfo = cfInfo { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) + require.Nil(t, err) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1049,12 +1054,12 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+ @@ -1062,6 +1067,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1074,12 +1083,12 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+ @@ -1093,6 +1102,10 @@ func TestChangefeedSynced(t *testing.T) { pdClient.logicTime = 1000 pdClient.timestamp = 1701153217279 { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1104,18 +1117,22 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, true, resp.Synced) require.Equal(t, "Data syncing is finished", resp.Info) } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1127,12 +1144,12 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+ @@ -1144,6 +1161,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1155,18 +1176,22 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "The data syncing is not finished, please wait", resp.Info) } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, err := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1178,12 +1203,12 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err := json.NewDecoder(w.Body).Decode(&resp) + err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "The data syncing is not finished, please wait", resp.Info) From 5df078ca0b88a0ac387a81e7dd0992fb29af58fb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 16:17:19 +0800 Subject: [PATCH 4/7] fix more tests --- cdc/api/v2/changefeed_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index e6f9b61e71a..5e86bd6d1f0 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -77,7 +77,7 @@ func TestCreateChangefeed(t *testing.T) { cp.EXPECT().GetOwner().Return(mo, nil).AnyTimes() cp.EXPECT().StatusProvider().Return(provider).AnyTimes() - // case 1: json format mismatches with the spec. + case 1: json format mismatches with the spec. errConfig := struct { ID string `json:"changefeed_id"` Namespace string `json:"namespace"` @@ -110,7 +110,7 @@ func TestCreateChangefeed(t *testing.T) { ID: changeFeedID.ID, Namespace: changeFeedID.Namespace, SinkURI: blackholeSink, - PDAddrs: []string{}, + PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client } body, err := json.Marshal(&cfConfig) require.Nil(t, err) From c09b2fe3cdf6576ed6ffd7447e9cd4ee6ad19d45 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 16:25:03 +0800 Subject: [PATCH 5/7] fix --- cdc/api/v2/changefeed.go | 32 ++++++++++++++++++-------------- cdc/api/v2/changefeed_test.go | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index a45be7aaf31..4fdd9ac86a4 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -69,7 +69,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { } var pdClient pd.Client var kvStorage kv.Storage - var etcdCli *clientv3.Client // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) @@ -79,7 +78,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { } pdClient = up.PDClient kvStorage = up.KVStorage - etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() } else { credential := cfg.PDConfig.toCredential() timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -87,7 +85,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { var err error pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) return } defer pdClient.Close() @@ -97,17 +95,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) return } - // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - _ = c.Error(err) - return - } - etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) - if err != nil { - _ = c.Error(err) - return - } } provider := h.capture.StatusProvider() owner, err := h.capture.GetOwner() @@ -154,6 +141,23 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } + var etcdCli *clientv3.Client + if len(cfg.PDAddrs) == 0 { + etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() + } else { + credential := cfg.PDConfig.toCredential() + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + _ = c.Error(err) + return + } + etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) + if err != nil { + _ = c.Error(err) + return + } + } err = hasRunningImport(ctx, etcdCli) if err != nil { log.Error("failed to create changefeed", zap.Error(err)) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 5e86bd6d1f0..2e1700d8a3f 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -77,7 +77,7 @@ func TestCreateChangefeed(t *testing.T) { cp.EXPECT().GetOwner().Return(mo, nil).AnyTimes() cp.EXPECT().StatusProvider().Return(provider).AnyTimes() - case 1: json format mismatches with the spec. + // case 1: json format mismatches with the spec. errConfig := struct { ID string `json:"changefeed_id"` Namespace string `json:"namespace"` From 91f18f24a199567f04bd368d67c7589ad5194fae Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 17:02:30 +0800 Subject: [PATCH 6/7] fix check --- cdc/api/v2/changefeed.go | 9 --------- cdc/api/v2/changefeed_test.go | 25 ++++++++++++------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 4fdd9ac86a4..0a035e21fba 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -1114,12 +1114,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) { } return up, nil } - -func getUpstreamPDConfig(up *upstream.Upstream) PDConfig { - return PDConfig{ - PDAddrs: up.PdEndpoints, - KeyPath: up.SecurityConfig.KeyPath, - CAPath: up.SecurityConfig.CAPath, - CertPath: up.SecurityConfig.CertPath, - } -} diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 2e1700d8a3f..9be237d3899 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -1040,8 +1040,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) - require.Nil(t, err) + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1059,7 +1058,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+ @@ -1070,7 +1069,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1088,7 +1087,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+ @@ -1105,7 +1104,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) + body, _ := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1122,7 +1121,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, true, resp.Synced) require.Equal(t, "Data syncing is finished", resp.Info) @@ -1132,7 +1131,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) + body, _ := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1149,7 +1148,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+ @@ -1164,7 +1163,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) + body, _ := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1181,7 +1180,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "The data syncing is not finished, please wait", resp.Info) @@ -1191,7 +1190,7 @@ func TestChangefeedSynced(t *testing.T) { cfg := getDefaultVerifyTableConfig() // arbitrary pd address to trigger create new pd client cfg.PDAddrs = []string{"http://127.0.0.1:2379"} - body, err := json.Marshal(&cfg) + body, _ := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1208,7 +1207,7 @@ func TestChangefeedSynced(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) resp := SyncedStatus{} - err = json.NewDecoder(w.Body).Decode(&resp) + err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, false, resp.Synced) require.Equal(t, "The data syncing is not finished, please wait", resp.Info) From bc53da549a991502d1b11604b86eaa432e3178d6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jan 2025 18:26:56 +0800 Subject: [PATCH 7/7] try fix --- cdc/api/v2/changefeed.go | 8 +++++--- tests/integration_tests/synced_status_with_redo/run.sh | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 0a035e21fba..49d9fed318d 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -939,9 +939,11 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } - if err := c.BindJSON(cfg); err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return + if c.Request.Body != nil && c.Request.ContentLength > 0 { + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } } // try to get pd client to get pd time, and determine synced status based on the pd time diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh index 9e5fb22a98b..1d4a8142469 100644 --- a/tests/integration_tests/synced_status_with_redo/run.sh +++ b/tests/integration_tests/synced_status_with_redo/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -## test the same logic as `sync_status``, but with redo mode +## test the same logic as `sync_status`, but with redo mode #!/bin/bash