Skip to content

Commit

Permalink
api(cdc): fix create changefeed after scale-in pd (#12003) (#12028)
Browse files Browse the repository at this point in the history
close #12004
  • Loading branch information
ti-chi-bot authored Jan 21, 2025
1 parent c1a91ac commit 6ac1c2e
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 86 deletions.
169 changes: 91 additions & 78 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
Expand Down Expand Up @@ -66,30 +67,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
var pdClient pd.Client
var kvStorage kv.Storage
// if PDAddrs is empty, use the default pdClient
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()

// verify tables todo: del kvstore
kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
pdClient = up.PDClient
kvStorage = up.KVStorage
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var err error
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()
// verify tables todo: del kvstore
kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
}
}
ctrl, err := h.capture.GetController()
if err != nil {
Expand Down Expand Up @@ -136,19 +141,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CertAllowedCN: cfg.CertAllowedCN,
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
if err != nil {
_ = c.Error(err)
return
}

cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
if err != nil {
_ = c.Error(err)
return
var etcdCli *clientv3.Client
if len(cfg.PDAddrs) == 0 {
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
} else {
credential := cfg.PDConfig.toCredential()
// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
if err != nil {
_ = c.Error(err)
return
}
etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
if err != nil {
_ = c.Error(err)
return
}
}
err = hasRunningImport(ctx, cli)
err = hasRunningImport(ctx, etcdCli)
if err != nil {
log.Error("failed to create changefeed", zap.Error(err))
_ = c.Error(
Expand Down Expand Up @@ -323,21 +333,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
ctx := c.Request.Context()
var kvStore tidbkv.Storage
// if PDAddrs is empty, use the default upstream
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()
ctx := c.Request.Context()
kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(err)
return
kvStore = up.KVStorage
} else {
credential := cfg.PDConfig.toCredential()
var err error
kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(errors.Trace(err))
return
}
}

uri, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -933,48 +948,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}
if c.Request.Body != nil && c.Request.ContentLength > 0 {
if err := c.BindJSON(cfg); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
}

// try to get pd client to get pd time, and determine synced status based on the pd time
var pdClient pd.Client
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
pdClient = up.PDClient
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
defer pdClient.Close()
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

Expand Down Expand Up @@ -1094,12 +1116,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
}
return up, nil
}

func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
return PDConfig{
PDAddrs: up.PdEndpoints,
KeyPath: up.SecurityConfig.KeyPath,
CAPath: up.SecurityConfig.CAPath,
CertPath: up.SecurityConfig.CertPath,
}
}
40 changes: 33 additions & 7 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestCreateChangefeed(t *testing.T) {
ID: changeFeedID.ID,
Namespace: changeFeedID.Namespace,
SinkURI: blackholeSink,
PDAddrs: []string{},
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
}
body, err := json.Marshal(&cfConfig)
require.Nil(t, err)
Expand Down Expand Up @@ -646,6 +646,8 @@ func TestVerifyTable(t *testing.T) {

// case 2: kv create failed
updateCfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&updateCfg)
require.Nil(t, err)
helpers.EXPECT().
Expand Down Expand Up @@ -1034,6 +1036,10 @@ func TestChangefeedSynced(t *testing.T) {
statusProvider.err = nil
statusProvider.changefeedInfo = cfInfo
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case3: pd is offline,resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
Expand All @@ -1046,7 +1052,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1059,6 +1065,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case4: pd is offline,resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
Expand All @@ -1071,7 +1081,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1090,6 +1100,10 @@ func TestChangefeedSynced(t *testing.T) {
pdClient.logicTime = 1000
pdClient.timestamp = 1701153217279
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217209 << 18,
Expand All @@ -1101,7 +1115,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1113,6 +1127,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
Expand All @@ -1124,7 +1142,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1141,6 +1159,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
Expand All @@ -1152,7 +1174,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1164,6 +1186,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case8: pdTs - lastSyncedTs < 5min
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
Expand All @@ -1175,7 +1201,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/synced_status_with_redo/run.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

## test the same logic as `sync_status``, but with redo mode
## test the same logic as `sync_status`, but with redo mode

#!/bin/bash

Expand Down

0 comments on commit 6ac1c2e

Please sign in to comment.