diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index d8793d77d9e..49d9fed318d 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/kv" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" @@ -66,30 +67,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var pdClient pd.Client + var kvStorage kv.Storage + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) - return - } - defer pdClient.Close() - - // verify tables todo: del kvstore - kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) - return + pdClient = up.PDClient + kvStorage = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + var err error + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) + return + } + defer pdClient.Close() + // verify tables todo: del kvstore + kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) + return + } } provider := h.capture.StatusProvider() owner, err := h.capture.GetOwner() @@ -136,19 +141,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } - // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - _ = c.Error(err) - return - } - - cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) - if err != nil { - _ = c.Error(err) - return + var etcdCli *clientv3.Client + if len(cfg.PDAddrs) == 0 { + etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() + } else { + credential := cfg.PDConfig.toCredential() + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + _ = c.Error(err) + return + } + etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) + if err != nil { + _ = c.Error(err) + return + } } - err = hasRunningImport(ctx, cli) + err = hasRunningImport(ctx, etcdCli) if err != nil { log.Error("failed to create changefeed", zap.Error(err)) _ = c.Error( @@ -319,21 +329,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + ctx := c.Request.Context() + var kvStore tidbkv.Storage + // if PDAddrs is empty, use the default upstream if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - ctx := c.Request.Context() - kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(err) - return + kvStore = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + var err error + kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(errors.Trace(err)) + return + } } + uri, err := url.Parse(cfg.SinkURI) if err != nil { _ = c.Error(err) @@ -924,48 +939,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } + if c.Request.Body != nil && c.Request.ContentLength > 0 { + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + } // try to get pd client to get pd time, and determine synced status based on the pd time + var pdClient pd.Client if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - // case 1. we can't get pd client, pd may be unavailable. - // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced - // otherwise, if pd is unavailable, we decide data whether is synced based on - // the time difference between current time and lastSyncedTs. - var message string - if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > - cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { - message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) - } else { - message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ - "If pd is offline, please check whether we satisfy the condition that "+ - "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ - "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return } - c.JSON(http.StatusOK, SyncedStatus{ - Synced: false, - SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), - PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), - LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), - NowTs: model.JSONTime(time.Unix(0, 0)), - Info: message, - }) - return + defer pdClient.Close() } - defer pdClient.Close() // get time from pd physicalNow, _, _ := pdClient.GetTS(ctx) @@ -1094,12 +1116,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) { } return up, nil } - -func getUpstreamPDConfig(up *upstream.Upstream) PDConfig { - return PDConfig{ - PDAddrs: up.PdEndpoints, - KeyPath: up.SecurityConfig.KeyPath, - CAPath: up.SecurityConfig.CAPath, - CertPath: up.SecurityConfig.CertPath, - } -} diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index b8295cf4011..9be237d3899 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -110,7 +110,7 @@ func TestCreateChangefeed(t *testing.T) { ID: changeFeedID.ID, Namespace: changeFeedID.Namespace, SinkURI: blackholeSink, - PDAddrs: []string{}, + PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client } body, err := json.Marshal(&cfConfig) require.Nil(t, err) @@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) { // case 2: kv create failed updateCfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"} body, err := json.Marshal(&updateCfg) require.Nil(t, err) helpers.EXPECT(). @@ -1035,6 +1037,10 @@ func TestChangefeedSynced(t *testing.T) { statusProvider.err = nil statusProvider.changefeedInfo = cfInfo { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1047,7 +1053,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1060,6 +1066,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1072,7 +1082,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1091,6 +1101,10 @@ func TestChangefeedSynced(t *testing.T) { pdClient.logicTime = 1000 pdClient.timestamp = 1701153217279 { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1102,7 +1116,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1114,6 +1128,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1125,7 +1143,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1142,6 +1160,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1153,7 +1175,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1165,6 +1187,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1176,7 +1202,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh index 9e5fb22a98b..1d4a8142469 100644 --- a/tests/integration_tests/synced_status_with_redo/run.sh +++ b/tests/integration_tests/synced_status_with_redo/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -## test the same logic as `sync_status``, but with redo mode +## test the same logic as `sync_status`, but with redo mode #!/bin/bash