Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix changefeed stuck problem when overwrite resume with a forward checkpointTs #12056

Merged
merged 8 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,10 @@ LOOP2:
}

checkpointTs := c.state.Status.CheckpointTs
if c.resolvedTs == 0 {
// Invariant: ResolvedTs must >= checkpointTs!
if c.resolvedTs == 0 || c.resolvedTs < checkpointTs {
c.resolvedTs = checkpointTs
log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", checkpointTs))
}

minTableBarrierTs := c.state.Status.MinTableBarrierTs
Expand Down Expand Up @@ -719,6 +721,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.barriers = nil
c.initialized = false
c.isReleased = true
c.resolvedTs = 0

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable-sync-point = true
sync-point-interval = "30s"
67 changes: 67 additions & 0 deletions tests/integration_tests/overwrite_resume_with_syncpoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash
# the script test when we enable syncpoint, and pause the changefeed,
# then resume with a forward checkpoint, to ensure the changefeed can be sync correctly.

set -eux

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function check_ts_forward() {
changefeedid=$1
rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
sleep 1
rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then
if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then
echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}"
return
fi
fi
exit 1
}

function run() {
# No need to test kafka and storage sink.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://[email protected]:3306/"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4"

check_ts_forward "test4"

run_cdc_cli changefeed pause --changefeed-id="test4"

sleep 15

checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso')
# add a large number to avoid the problem of losing precision when jq processing large integers
checkpoint1=$((checkpoint1 + 1000000))

# resume a forward checkpointTs
run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1

check_ts_forward "test4"

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
4 changes: 2 additions & 2 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ group=$2
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart safe_mode"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down Expand Up @@ -41,7 +41,7 @@ groups=(
["G07"]='kv_client_stream_reconnect cdc split_region'
["G08"]='processor_err_chan changefeed_reconstruct multi_capture'
["G09"]='gc_safepoint changefeed_pause_resume cli savepoint synced_status'
["G10"]='default_value simple cdc_server_tips event_filter'
["G10"]='default_value simple cdc_server_tips event_filter overwrite_resume_with_syncpoint'
["G11"]='resolve_lock move_table autorandom generate_column'
["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes'
["G13"]='tiflash region_merge common_1'
Expand Down
Loading