Skip to content

Commit

Permalink
test: update some test (pingcap#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Jan 11, 2025
1 parent d9fbc31 commit 52e89d0
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 34 deletions.
3 changes: 1 addition & 2 deletions api/middleware/authenticate_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type tidbInstance struct {
func AuthenticateMiddleware(server server.Server) gin.HandlerFunc {
return func(ctx *gin.Context) {
security := config.GetGlobalServerConfig().Security
if security.ClientUserRequired {
if security != nil && security.ClientUserRequired {
if err := verify(ctx, server.GetEtcdClient().GetEtcdClient()); err != nil {
ctx.IndentedJSON(http.StatusUnauthorized, model.NewHTTPError(err))
ctx.Abort()
Expand Down Expand Up @@ -145,7 +145,6 @@ func fetchTiDBTopology(ctx context.Context, etcdClient etcd.Client) ([]tidbInsta

switch keyParts[1] {
case "info":
log.Error("info", zap.Any("", keyParts))
address := keyParts[0]
hostname, port, err := util.ParseHostAndPortFromAddress(address)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions api/v2/unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) {
_ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err))
return
}
var (
err error
)
kvStorage := h.server.GetKVStorage()

if kvStorage == nil {
Expand All @@ -58,8 +55,7 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) {
}

txnResolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
err = txnResolver.Resolve(c, resolveLockReq.RegionID, resolveLockReq.Ts)
if err != nil {
if err := txnResolver.Resolve(c, resolveLockReq.RegionID, resolveLockReq.Ts); err != nil {
_ = c.Error(err)
return
}
Expand Down
4 changes: 2 additions & 2 deletions logservice/logpuller/grpc_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func createGRPCConn(ctx context.Context, credential *security.Credential, target
dialOptions = append(dialOptions, grpc.WithStreamInterceptor(grpcMetrics.StreamClientInterceptor()))
}

return grpc.DialContext(ctx, target, dialOptions...)
// return grpc.DialContext(ctx, target, dialOptions...)

// return grpc.NewClient(target, dialOptions...)
return grpc.NewClient(target, dialOptions...)
}

func getContextFromFeatures(ctx context.Context, features []string) context.Context {
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/tcpserver"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -127,13 +126,14 @@ func (c *server) initialize(ctx context.Context) error {
nodeManager.RegisterNodeChangeHandler(
appcontext.MessageCenter,
appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).OnNodeChanges)

conf := config.GetGlobalServerConfig()
subscriptionClient := logpuller.NewSubscriptionClient(
&logpuller.SubscriptionClientConfig{
RegionRequestWorkerPerStore: 16,
}, c.pdClient, c.RegionCache, c.PDClock,
txnutil.NewLockerResolver(c.KVStorage.(tikv.Storage)), &security.Credential{},
txnutil.NewLockerResolver(c.KVStorage.(tikv.Storage)), conf.Security,
)
conf := config.GetGlobalServerConfig()
schemaStore := schemastore.New(ctx, conf.DataDir, subscriptionClient, c.pdClient, c.PDClock, c.KVStorage)
eventStore := eventstore.New(ctx, conf.DataDir, subscriptionClient, c.PDClock)
eventService := eventservice.New(eventStore, schemaStore)
Expand Down
1 change: 1 addition & 0 deletions server/watcher/module_node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (c *NodeManager) Tick(
newCoordinatorID, err := c.etcdClient.GetOwnerID(context.Background())
if err != nil {
log.Warn("get coordinator id failed, will retry in next tick", zap.Error(err))
return state, nil
}

if newCoordinatorID != oldCoordinatorID {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/_utils/check_changefeed_status
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ error_pattern=${5}
info=$(curl $endpoint/api/v2/changefeeds/$changefeed_id/status)
echo "$info"

state=$(echo $info | grep -v "Command to ticdc" | jq -r '.state')
state=$(echo $info | jq -r '.state')
if [[ ! "$state" == "$expected_state" ]]; then
echo "changefeed state $state does not equal to $expected_state"
exit 1
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/_utils/move_table_with_retry
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ count=0

while [[ $count -lt $retryCount ]]; do
ans=$(run_cdc_cli capture list)
node2ID=$(echo $ans | sed 's/ PASS.*//' | sed 's/^=== Command to ticdc(new arch). //' | jq -r ".[] | select(.address == \"$ipAddr\") | .id")
node2ID=$(echo $ans | sed 's/ PASS.*//' | grep -v "Command to ticdc" | jq -r ".[] | select(.address == \"$ipAddr\") | .id")
if [ -z "$node2ID" ]; then
echo "Failed to extract node2 ID"
continue
Expand All @@ -25,7 +25,7 @@ while [[ $count -lt $retryCount ]]; do
# move table 1 to node2
result=$(run_cdc_cli changefeed move-table -c "$changefeedID" -t $tableID -d "$node2ID")
echo $result
success=$(echo $result | sed 's/ PASS.*//' | sed 's/^=== Command to ticdc(new arch). //' | jq -r '.success')
success=$(echo $result | sed 's/ PASS.*//' | grep -v "Command to ticdc" | jq -r '.success')

if [ "$success" == "true" ]; then
exit 0
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/cli_with_auth/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ EOF
echo "y" | run_cdc_cli unsafe delete-service-gc-safepoint
run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr
REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id')
TS=$(cdc cli tso query --pd=$pd_addr)
TS=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
# wait for owner online
sleep 3
run_cdc_cli unsafe resolve-lock --region=$REGION_ID
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ function complete_ddls() {
# ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`')
# ddls+=("recover table ddl_reentrant.t2" false 'RECOVER TABLE `ddl_reentrant`.`t2`')
# ddls+=("drop database ddl_reentrant" false 'DROP DATABASE `ddl_reentrant`')
echo $ddls
}

changefeedid=""
Expand Down
18 changes: 9 additions & 9 deletions tests/integration_tests/synced_status/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function run_normal_case_and_unavailable_pd() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path"

# case 1: test in available cluster
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)

status=$(echo $synced_status | jq '.synced')
sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts')
Expand Down Expand Up @@ -98,7 +98,7 @@ function run_normal_case_and_unavailable_pd() {
check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

sleep 5 # wait data insert
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -111,7 +111,7 @@ function run_normal_case_and_unavailable_pd() {
fi

sleep 130 # wait enough time for pass synced-check-interval
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != true ]; then
echo "synced status isn't correct"
Expand All @@ -124,7 +124,7 @@ function run_normal_case_and_unavailable_pd() {

sleep 20

synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
error_code=$(echo $synced_status | jq -r '.error_code')
cleanup_process $CDC_BINARY
stop_tidb_cluster
Expand Down Expand Up @@ -153,7 +153,7 @@ function run_case_with_unavailable_tikv() {
kill_tikv

# test the case when pdNow - lastSyncedTs < threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -169,7 +169,7 @@ function run_case_with_unavailable_tikv() {

sleep 130 # wait enough time for pass synced-check-interval
# test the case when pdNow - lastSyncedTs > threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand Down Expand Up @@ -215,7 +215,7 @@ function run_case_with_unavailable_tidb() {
kill_tidb

# test the case when pdNow - lastSyncedTs < threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -231,7 +231,7 @@ function run_case_with_unavailable_tidb() {

sleep 130 # wait enough time for pass synced-check-interval
# test the case when pdNow - lastSyncedTs > threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != true ]; then
echo "synced status isn't correct"
Expand Down Expand Up @@ -268,7 +268,7 @@ function run_case_with_failpoint() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path"

sleep 20 # wait enough time for pass checkpoint-check-interval
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand Down
18 changes: 9 additions & 9 deletions tests/integration_tests/synced_status_with_redo/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function run_normal_case_and_unavailable_pd() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path"

# case 1: test in available cluster
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)

status=$(echo $synced_status | jq '.synced')
sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts')
Expand Down Expand Up @@ -102,7 +102,7 @@ function run_normal_case_and_unavailable_pd() {
check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

sleep 5 # wait data insert
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -115,7 +115,7 @@ function run_normal_case_and_unavailable_pd() {
fi

sleep 130 # wait enough time for pass synced-check-interval
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != true ]; then
echo "synced status isn't correct"
Expand All @@ -128,7 +128,7 @@ function run_normal_case_and_unavailable_pd() {

sleep 20

synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
error_code=$(echo $synced_status | jq -r '.error_code')
cleanup_process $CDC_BINARY
stop_tidb_cluster
Expand Down Expand Up @@ -157,7 +157,7 @@ function run_case_with_unavailable_tikv() {
kill_tikv

# test the case when pdNow - lastSyncedTs < threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -173,7 +173,7 @@ function run_case_with_unavailable_tikv() {

sleep 130 # wait enough time for pass synced-check-interval
# test the case when pdNow - lastSyncedTs > threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand Down Expand Up @@ -219,7 +219,7 @@ function run_case_with_unavailable_tidb() {
kill_tidb

# test the case when pdNow - lastSyncedTs < threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand All @@ -235,7 +235,7 @@ function run_case_with_unavailable_tidb() {

sleep 130 # wait enough time for pass synced-check-interval
# test the case when pdNow - lastSyncedTs > threshold
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != true ]; then
echo "synced status isn't correct"
Expand Down Expand Up @@ -272,7 +272,7 @@ function run_case_with_failpoint() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path"

sleep 20 # wait enough time for pass checkpoint-check-interval
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc")
synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced)
status=$(echo $synced_status | jq '.synced')
if [ $status != false ]; then
echo "synced status isn't correct"
Expand Down

0 comments on commit 52e89d0

Please sign in to comment.