Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api(cdc): fix create changefeed after scale-in pd (#12003) #12029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 91 additions & 78 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
Expand Down Expand Up @@ -72,30 +73,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
var pdClient pd.Client
var kvStorage kv.Storage
// if PDAddrs is empty, use the default pdClient
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()

// verify tables todo: del kvstore
kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
pdClient = up.PDClient
kvStorage = up.KVStorage
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var err error
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()
// verify tables todo: del kvstore
kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
}
}
ctrl, err := h.capture.GetController()
if err != nil {
Expand Down Expand Up @@ -142,19 +147,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CertAllowedCN: cfg.CertAllowedCN,
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
if err != nil {
_ = c.Error(err)
return
}

cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
if err != nil {
_ = c.Error(err)
return
var etcdCli *clientv3.Client
if len(cfg.PDAddrs) == 0 {
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
} else {
credential := cfg.PDConfig.toCredential()
// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
if err != nil {
_ = c.Error(err)
return
}
etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
if err != nil {
_ = c.Error(err)
return
}
}
err = hasRunningImport(ctx, cli)
err = hasRunningImport(ctx, etcdCli)
if err != nil {
log.Error("failed to create changefeed", zap.Error(err))
_ = c.Error(
Expand Down Expand Up @@ -329,21 +339,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
ctx := c.Request.Context()
var kvStore tidbkv.Storage
// if PDAddrs is empty, use the default upstream
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()
ctx := c.Request.Context()
kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(err)
return
kvStore = up.KVStorage
} else {
credential := cfg.PDConfig.toCredential()
var err error
kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(errors.Trace(err))
return
}
}

uri, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -926,48 +941,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}
if c.Request.Body != nil && c.Request.ContentLength > 0 {
if err := c.BindJSON(cfg); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
}

// try to get pd client to get pd time, and determine synced status based on the pd time
var pdClient pd.Client
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
pdClient = up.PDClient
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
defer pdClient.Close()
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

Expand Down Expand Up @@ -1087,12 +1109,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
}
return up, nil
}

func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
return PDConfig{
PDAddrs: up.PdEndpoints,
KeyPath: up.SecurityConfig.KeyPath,
CAPath: up.SecurityConfig.CAPath,
CertPath: up.SecurityConfig.CertPath,
}
}
40 changes: 33 additions & 7 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestCreateChangefeed(t *testing.T) {
ID: changeFeedID.ID,
Namespace: changeFeedID.Namespace,
SinkURI: blackholeSink,
PDAddrs: []string{},
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
}
body, err := json.Marshal(&cfConfig)
require.Nil(t, err)
Expand Down Expand Up @@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) {

// case 2: kv create failed
updateCfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, err := json.Marshal(&updateCfg)
require.Nil(t, err)
helpers.EXPECT().
Expand Down Expand Up @@ -1033,6 +1035,10 @@ func TestChangefeedSynced(t *testing.T) {
statusProvider.err = nil
statusProvider.changefeedInfo = cfInfo
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case3: pd is offline,resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
Expand All @@ -1045,7 +1051,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1058,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case4: pd is offline,resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
Expand All @@ -1070,7 +1080,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1089,6 +1099,10 @@ func TestChangefeedSynced(t *testing.T) {
pdClient.logicTime = 1000
pdClient.timestamp = 1701153217279
{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217209 << 18,
Expand All @@ -1100,7 +1114,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1112,6 +1126,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
Expand All @@ -1123,7 +1141,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1140,6 +1158,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
Expand All @@ -1151,7 +1173,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -1163,6 +1185,10 @@ func TestChangefeedSynced(t *testing.T) {
}

{
cfg := getDefaultVerifyTableConfig()
// arbitrary pd address to trigger create new pd client
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
body, _ := json.Marshal(&cfg)
// case8: pdTs - lastSyncedTs < 5min
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
Expand All @@ -1174,7 +1200,7 @@ func TestChangefeedSynced(t *testing.T) {
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
bytes.NewReader(body),
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand Down
Loading