From fcb7e635d039351ff98f071409018f1f40e952a5 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Tue, 21 Jan 2025 14:19:53 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #12003 Signed-off-by: ti-chi-bot --- cdc/api/v2/changefeed.go | 169 +++++----- cdc/api/v2/changefeed_test.go | 40 ++- .../synced_status_with_redo/run.sh | 308 ++++++++++++++++++ 3 files changed, 432 insertions(+), 85 deletions(-) create mode 100644 tests/integration_tests/synced_status_with_redo/run.sh diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index eefa893b96a..a04beee8ac0 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/kv" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" @@ -72,30 +73,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var pdClient pd.Client + var kvStorage kv.Storage + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) - return - } - defer pdClient.Close() - - // verify tables todo: del kvstore - kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) - return + pdClient = up.PDClient + kvStorage = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + var err error + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) + return + } + defer pdClient.Close() + // verify tables todo: del kvstore + kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) + return + } } ctrl, err := h.capture.GetController() if err != nil { @@ -142,19 +147,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } - // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - _ = c.Error(err) - return - } - - cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) - if err != nil { - _ = c.Error(err) - return + var etcdCli *clientv3.Client + if len(cfg.PDAddrs) == 0 { + etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap() + } else { + credential := cfg.PDConfig.toCredential() + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + _ = c.Error(err) + return + } + etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg) + if err != nil { + _ = c.Error(err) + return + } } - err = hasRunningImport(ctx, cli) + err = hasRunningImport(ctx, etcdCli) if err != nil { log.Error("failed to create changefeed", zap.Error(err)) _ = c.Error( @@ -329,21 +339,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + ctx := c.Request.Context() + var kvStore tidbkv.Storage + // if PDAddrs is empty, use the default upstream if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - ctx := c.Request.Context() - kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(err) - return + kvStore = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + var err error + kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(errors.Trace(err)) + return + } } + uri, err := url.Parse(cfg.SinkURI) if err != nil { _ = c.Error(err) @@ -926,48 +941,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } + if c.Request.Body != nil && c.Request.ContentLength > 0 { + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + } // try to get pd client to get pd time, and determine synced status based on the pd time + var pdClient pd.Client if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - // case 1. we can't get pd client, pd may be unavailable. - // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced - // otherwise, if pd is unavailable, we decide data whether is synced based on - // the time difference between current time and lastSyncedTs. - var message string - if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > - cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { - message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) - } else { - message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ - "If pd is offline, please check whether we satisfy the condition that "+ - "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ - "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return } - c.JSON(http.StatusOK, SyncedStatus{ - Synced: false, - SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), - PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), - LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), - NowTs: model.JSONTime(time.Unix(0, 0)), - Info: message, - }) - return + defer pdClient.Close() } - defer pdClient.Close() // get time from pd physicalNow, _, _ := pdClient.GetTS(ctx) @@ -1087,12 +1109,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) { } return up, nil } - -func getUpstreamPDConfig(up *upstream.Upstream) PDConfig { - return PDConfig{ - PDAddrs: up.PdEndpoints, - KeyPath: up.SecurityConfig.KeyPath, - CAPath: up.SecurityConfig.CAPath, - CertPath: up.SecurityConfig.CertPath, - } -} diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 587bcf3158e..ccd0aa6967b 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -109,7 +109,7 @@ func TestCreateChangefeed(t *testing.T) { ID: changeFeedID.ID, Namespace: changeFeedID.Namespace, SinkURI: blackholeSink, - PDAddrs: []string{}, + PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client } body, err := json.Marshal(&cfConfig) require.Nil(t, err) @@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) { // case 2: kv create failed updateCfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"} body, err := json.Marshal(&updateCfg) require.Nil(t, err) helpers.EXPECT(). @@ -1033,6 +1035,10 @@ func TestChangefeedSynced(t *testing.T) { statusProvider.err = nil statusProvider.changefeedInfo = cfInfo { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1045,7 +1051,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1058,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1070,7 +1080,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1089,6 +1099,10 @@ func TestChangefeedSynced(t *testing.T) { pdClient.logicTime = 1000 pdClient.timestamp = 1701153217279 { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1100,7 +1114,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1112,6 +1126,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1123,7 +1141,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1140,6 +1158,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1151,7 +1173,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1163,6 +1185,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1174,7 +1200,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh new file mode 100644 index 00000000000..1d4a8142469 --- /dev/null +++ b/tests/integration_tests/synced_status_with_redo/run.sh @@ -0,0 +1,308 @@ +#!/bin/bash + +## test the same logic as `sync_status`, but with redo mode + +#!/bin/bash + +# [DISCRIPTION]: +# This test is related to +# It will test the sync status request of cdc server in the following scenarios: +# 1. The sync status request of cdc server when the upstream cluster is available +# 1.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 1.2 pdNow - lastSyncedTs < threshold +# 1.3 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold, resolvedTs - checkpointTs > threshold +# 2. The sync status request of cdc server when the upstream pd is unavailable +# 2.1 resolvedTs - checkpointTs < threshold +# 3. The sync status request of cdc server when the upstream tikv is unavailable +# 3.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs > threshold, resolvedTs - checkpointTs < threshold +# 3.2 pdNow - lastSyncedTs < threshold +# 4. The sync status request of cdc server when the downstream tidb is available +# 4.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 4.2 pdNow - lastSyncedTs < threshold +# [STEP]: +# 1. Create changefeed with synced-time-config = xx +# 2. insert data to upstream cluster, and do the related actions for each scenarios +# 3. do the query of synced status of cdc server +# 4. check the info and status of query + +set -xeu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function kill_pd() { + info=$(ps aux | grep pd-server | grep $WORK_DIR) || true + $(ps aux | grep pd-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tikv() { + info=$(ps aux | grep tikv-server | grep $WORK_DIR) || true + $(ps aux | grep tikv-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tidb() { + info=$(ps aux | grep tidb-server | grep $WORK_DIR) || true + $(ps aux | grep tidb-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function run_normal_case_and_unavailable_pd() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 1: test in available cluster + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + + status=$(echo $synced_status | jq '.synced') + sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') + puller_resolved_ts=$(echo $synced_status | jq -r '.puller_resolved_ts') + last_synced_ts=$(echo $synced_status | jq -r '.last_synced_ts') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + # the timestamp for puller_resolved_ts is 0 when do data insert + if [ "$puller_resolved_ts" != "1970-01-01 08:00:00.000" ]; then + echo "puller_resolved_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + # the timestamp for last_synced_ts is 0 when do data insert + if [ "$last_synced_ts" != "1970-01-01 08:00:00.000" ]; then + echo "last_synced_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + + # compare sink_checkpoint_ts with current time + current=$(date +"%Y-%m-%d %H:%M:%S") + echo "sink_checkpoint_ts is "$sink_checkpoint_ts + checkpoint_timestamp=$(date -d "$sink_checkpoint_ts" +%s) + current_timestamp=$(date -d "$current" +%s) + if [ $(($current_timestamp - $checkpoint_timestamp)) -gt 300 ]; then # give a soft check + echo "sink_checkpoint_ts is not correct" + exit 1 + fi + + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # wait data insert + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + if [ "$info" != "The data syncing is not finished, please wait" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + + #========== + # case 2: test with unavailable pd, query will not get the available response + kill_pd + + sleep 20 + + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + error_code=$(echo $synced_status | jq -r '.error_code') + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tikv() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tikv + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether PD is online and TiKV Regions are all available. \ +If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ +To check whether TiKV regions are all available, you can view \ +'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ +If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ +Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tidb() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tidb + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Data syncing is finished" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_failpoint() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # make failpoint to block checkpoint-ts + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedOwnerNotUpdateCheckpoint=return(true)' + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + sleep 20 # wait enough time for pass checkpoint-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether PD is online and TiKV Regions are all available. \ +If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ +To check whether TiKV regions are all available, you can view \ +'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ +If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ +Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + export GO_FAILPOINTS='' + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +trap stop_tidb_cluster EXIT + +# enable redo +run_normal_case_and_unavailable_pd "conf/changefeed-redo.toml" +run_case_with_unavailable_tikv "conf/changefeed-redo.toml" +run_case_with_unavailable_tidb "conf/changefeed-redo.toml" +run_case_with_failpoint "conf/changefeed-redo.toml" + +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 3a9bea8b6e36bb10972e6dd58ab6336120693a82 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 21 Jan 2025 14:44:38 +0800 Subject: [PATCH 2/2] remove unneeded code --- .../synced_status_with_redo/run.sh | 308 ------------------ 1 file changed, 308 deletions(-) delete mode 100644 tests/integration_tests/synced_status_with_redo/run.sh diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh deleted file mode 100644 index 1d4a8142469..00000000000 --- a/tests/integration_tests/synced_status_with_redo/run.sh +++ /dev/null @@ -1,308 +0,0 @@ -#!/bin/bash - -## test the same logic as `sync_status`, but with redo mode - -#!/bin/bash - -# [DISCRIPTION]: -# This test is related to -# It will test the sync status request of cdc server in the following scenarios: -# 1. The sync status request of cdc server when the upstream cluster is available -# 1.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold -# 1.2 pdNow - lastSyncedTs < threshold -# 1.3 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold, resolvedTs - checkpointTs > threshold -# 2. The sync status request of cdc server when the upstream pd is unavailable -# 2.1 resolvedTs - checkpointTs < threshold -# 3. The sync status request of cdc server when the upstream tikv is unavailable -# 3.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs > threshold, resolvedTs - checkpointTs < threshold -# 3.2 pdNow - lastSyncedTs < threshold -# 4. The sync status request of cdc server when the downstream tidb is available -# 4.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold -# 4.2 pdNow - lastSyncedTs < threshold -# [STEP]: -# 1. Create changefeed with synced-time-config = xx -# 2. insert data to upstream cluster, and do the related actions for each scenarios -# 3. do the query of synced status of cdc server -# 4. check the info and status of query - -set -xeu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -CDC_COUNT=3 -DB_COUNT=4 - -function kill_pd() { - info=$(ps aux | grep pd-server | grep $WORK_DIR) || true - $(ps aux | grep pd-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true -} - -function kill_tikv() { - info=$(ps aux | grep tikv-server | grep $WORK_DIR) || true - $(ps aux | grep tikv-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true -} - -function kill_tidb() { - info=$(ps aux | grep tidb-server | grep $WORK_DIR) || true - $(ps aux | grep tidb-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true -} - -function run_normal_case_and_unavailable_pd() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - config_path=$1 - - SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" - - # case 1: test in available cluster - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - - status=$(echo $synced_status | jq '.synced') - sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') - puller_resolved_ts=$(echo $synced_status | jq -r '.puller_resolved_ts') - last_synced_ts=$(echo $synced_status | jq -r '.last_synced_ts') - if [ $status != true ]; then - echo "synced status isn't correct" - exit 1 - fi - # the timestamp for puller_resolved_ts is 0 when do data insert - if [ "$puller_resolved_ts" != "1970-01-01 08:00:00.000" ]; then - echo "puller_resolved_ts is not 1970-01-01 08:00:00.000" - exit 1 - fi - # the timestamp for last_synced_ts is 0 when do data insert - if [ "$last_synced_ts" != "1970-01-01 08:00:00.000" ]; then - echo "last_synced_ts is not 1970-01-01 08:00:00.000" - exit 1 - fi - - # compare sink_checkpoint_ts with current time - current=$(date +"%Y-%m-%d %H:%M:%S") - echo "sink_checkpoint_ts is "$sink_checkpoint_ts - checkpoint_timestamp=$(date -d "$sink_checkpoint_ts" +%s) - current_timestamp=$(date -d "$current" +%s) - if [ $(($current_timestamp - $checkpoint_timestamp)) -gt 300 ]; then # give a soft check - echo "sink_checkpoint_ts is not correct" - exit 1 - fi - - run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" - check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - sleep 5 # wait data insert - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != false ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - if [ "$info" != "The data syncing is not finished, please wait" ]; then - echo "synced status info is not correct" - exit 1 - fi - - sleep 130 # wait enough time for pass synced-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != true ]; then - echo "synced status isn't correct" - exit 1 - fi - - #========== - # case 2: test with unavailable pd, query will not get the available response - kill_pd - - sleep 20 - - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - error_code=$(echo $synced_status | jq -r '.error_code') - cleanup_process $CDC_BINARY - stop_tidb_cluster -} - -function run_case_with_unavailable_tikv() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - config_path=$1 - - SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" - - # case 3: test in unavailable tikv cluster - run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" - check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - sleep 5 # make data inserted into downstream - kill_tikv - - # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != false ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - target_message="The data syncing is not finished, please wait" - - if [ "$info" != "$target_message" ]; then - echo "synced status info is not correct" - exit 1 - fi - - sleep 130 # wait enough time for pass synced-check-interval - # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != false ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - target_message="Please check whether PD is online and TiKV Regions are all available. \ -If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ -To check whether TiKV regions are all available, you can view \ -'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ -If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ -Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" - - if [ "$info" != "$target_message" ]; then - echo "synced status info is not correct" - exit 1 - fi - - cleanup_process $CDC_BINARY - stop_tidb_cluster -} - -function run_case_with_unavailable_tidb() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - config_path=$1 - - SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" - - # case 3: test in unavailable tikv cluster - run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" - check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - sleep 5 # make data inserted into downstream - kill_tidb - - # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != false ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - target_message="The data syncing is not finished, please wait" - - if [ "$info" != "$target_message" ]; then - echo "synced status info is not correct" - exit 1 - fi - - sleep 130 # wait enough time for pass synced-check-interval - # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != true ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - target_message="Data syncing is finished" - - if [ "$info" != "$target_message" ]; then - echo "synced status info is not correct" - exit 1 - fi - - cleanup_process $CDC_BINARY - stop_tidb_cluster -} - -function run_case_with_failpoint() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - # make failpoint to block checkpoint-ts - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedOwnerNotUpdateCheckpoint=return(true)' - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - config_path=$1 - - SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" - - sleep 20 # wait enough time for pass checkpoint-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) - status=$(echo $synced_status | jq '.synced') - if [ $status != false ]; then - echo "synced status isn't correct" - exit 1 - fi - info=$(echo $synced_status | jq -r '.info') - target_message="Please check whether PD is online and TiKV Regions are all available. \ -If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ -To check whether TiKV regions are all available, you can view \ -'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ -If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ -Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" - if [ "$info" != "$target_message" ]; then - echo "synced status info is not correct" - exit 1 - fi - - export GO_FAILPOINTS='' - - cleanup_process $CDC_BINARY - stop_tidb_cluster -} - -trap stop_tidb_cluster EXIT - -# enable redo -run_normal_case_and_unavailable_pd "conf/changefeed-redo.toml" -run_case_with_unavailable_tikv "conf/changefeed-redo.toml" -run_case_with_unavailable_tidb "conf/changefeed-redo.toml" -run_case_with_failpoint "conf/changefeed-redo.toml" - -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"