diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index eefa893b96a..a04beee8ac0 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/kv" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" @@ -72,30 +73,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var pdClient pd.Client + var kvStorage kv.Storage + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) - return - } - defer pdClient.Close() - - // verify tables todo: del kvstore - kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) - return + pdClient = up.PDClient + kvStorage = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + var err error + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) + return + } + defer pdClient.Close() + // verify tables todo: del kvstore + kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) + return + } } ctrl, err := h.capture.GetController() if err != nil { @@ -142,19 +147,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } - // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - _ = c.Error(err) - return - } - - cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) - if err != nil { - _ = c.Error(err) - return + var etcdCli *clientv3.Client + if len(cfg.PDAddrs) == 0 { + etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() + } else { + credential := cfg.PDConfig.toCredential() + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + _ = c.Error(err) + return + } + etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) + if err != nil { + _ = c.Error(err) + return + } } - err = hasRunningImport(ctx, cli) + err = hasRunningImport(ctx, etcdCli) if err != nil { log.Error("failed to create changefeed", zap.Error(err)) _ = c.Error( @@ -329,21 +339,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + ctx := c.Request.Context() + var kvStore tidbkv.Storage + // if PDAddrs is empty, use the default upstream if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - ctx := c.Request.Context() - kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(err) - return + kvStore = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + var err error + kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(errors.Trace(err)) + return + } } + uri, err := url.Parse(cfg.SinkURI) if err != nil { _ = c.Error(err) @@ -926,48 +941,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } + if c.Request.Body != nil && c.Request.ContentLength > 0 { + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + } // try to get pd client to get pd time, and determine synced status based on the pd time + var pdClient pd.Client if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - // case 1. we can't get pd client, pd may be unavailable. - // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced - // otherwise, if pd is unavailable, we decide data whether is synced based on - // the time difference between current time and lastSyncedTs. - var message string - if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > - cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { - message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) - } else { - message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ - "If pd is offline, please check whether we satisfy the condition that "+ - "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ - "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return } - c.JSON(http.StatusOK, SyncedStatus{ - Synced: false, - SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), - PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), - LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), - NowTs: model.JSONTime(time.Unix(0, 0)), - Info: message, - }) - return + defer pdClient.Close() } - defer pdClient.Close() // get time from pd physicalNow, _, _ := pdClient.GetTS(ctx) @@ -1087,12 +1109,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) { } return up, nil } - -func getUpstreamPDConfig(up *upstream.Upstream) PDConfig { - return PDConfig{ - PDAddrs: up.PdEndpoints, - KeyPath: up.SecurityConfig.KeyPath, - CAPath: up.SecurityConfig.CAPath, - CertPath: up.SecurityConfig.CertPath, - } -} diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 587bcf3158e..ccd0aa6967b 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -109,7 +109,7 @@ func TestCreateChangefeed(t *testing.T) { ID: changeFeedID.ID, Namespace: changeFeedID.Namespace, SinkURI: blackholeSink, - PDAddrs: []string{}, + PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client } body, err := json.Marshal(&cfConfig) require.Nil(t, err) @@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) { // case 2: kv create failed updateCfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"} body, err := json.Marshal(&updateCfg) require.Nil(t, err) helpers.EXPECT(). @@ -1033,6 +1035,10 @@ func TestChangefeedSynced(t *testing.T) { statusProvider.err = nil statusProvider.changefeedInfo = cfInfo { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1045,7 +1051,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1058,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1070,7 +1080,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1089,6 +1099,10 @@ func TestChangefeedSynced(t *testing.T) { pdClient.logicTime = 1000 pdClient.timestamp = 1701153217279 { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1100,7 +1114,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1112,6 +1126,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1123,7 +1141,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1140,6 +1158,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1151,7 +1173,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1163,6 +1185,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1174,7 +1200,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code)