diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index f04f83949b0..f72c29a28bb 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -466,8 +466,10 @@ LOOP2: } checkpointTs := c.state.Status.CheckpointTs - if c.resolvedTs == 0 { + // Invariant: ResolvedTs must >= checkpointTs! + if c.resolvedTs == 0 || c.resolvedTs < checkpointTs { c.resolvedTs = checkpointTs + log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", checkpointTs)) } minTableBarrierTs := c.state.Status.MinTableBarrierTs @@ -719,6 +721,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.barriers = nil c.initialized = false c.isReleased = true + c.resolvedTs = 0 log.Info("changefeed closed", zap.String("namespace", c.id.Namespace), diff --git a/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml b/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml new file mode 100644 index 00000000000..63153e1dc53 --- /dev/null +++ b/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml @@ -0,0 +1,2 @@ +enable-sync-point = true +sync-point-interval = "30s" \ No newline at end of file diff --git a/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh new file mode 100644 index 00000000000..9fa464d0a52 --- /dev/null +++ b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# the script test when we enable syncpoint, and pause the changefeed, +# then resume with a forward checkpoint, to ensure the changefeed can be sync correctly. + +set -eux + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function check_ts_forward() { + changefeedid=$1 + rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') + checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') + sleep 1 + rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') + checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') + if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then + if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then + echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}" + return + fi + fi + exit 1 +} + +function run() { + # No need to test kafka and storage sink. + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://root@127.0.0.1:3306/" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4" + + check_ts_forward "test4" + + run_cdc_cli changefeed pause --changefeed-id="test4" + + sleep 15 + + checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso') + # add a large number to avoid the problem of losing precision when jq processing large integers + checkpoint1=$((checkpoint1 + 1000000)) + + # resume a forward checkpointTs + run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1 + + check_ts_forward "test4" + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index da042c89e65..d0b88669bba 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart safe_mode" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" @@ -41,7 +41,7 @@ groups=( ["G07"]='kv_client_stream_reconnect cdc split_region' ["G08"]='processor_err_chan changefeed_reconstruct multi_capture' ["G09"]='gc_safepoint changefeed_pause_resume cli savepoint synced_status' - ["G10"]='default_value simple cdc_server_tips event_filter' + ["G10"]='default_value simple cdc_server_tips event_filter overwrite_resume_with_syncpoint' ["G11"]='resolve_lock move_table autorandom generate_column' ["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes' ["G13"]='tiflash region_merge common_1'