Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
dmctl: simplify default query-status output (#334) (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored and csuzhangxc committed Oct 29, 2019
1 parent 2ce7693 commit 0285535
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"syscall"

"github.com/chzyer/readline"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/errors"

"github.com/pingcap/dm/dm/ctl"
"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down
11 changes: 11 additions & 0 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package common

import (
"encoding/json"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -77,6 +78,16 @@ func PrettyPrintResponse(resp proto.Message) {
}
}

// PrettyPrintInterface prints an interface through encoding/json prettily
func PrettyPrintInterface(resp interface{}) {
s, err := json.MarshalIndent(resp, "", " ")
if err != nil {
PrintLines(errors.ErrorStack(err))
} else {
fmt.Println(string(s))
}
}

func marshResponseToString(resp proto.Message) (string, error) {
// encoding/json does not support proto Enum well
mar := jsonpb.Marshaler{EmitDefaults: true, Indent: " "}
Expand Down
87 changes: 86 additions & 1 deletion dm/ctl/master/query_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package master
import (
"context"
"fmt"
"strings"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
Expand All @@ -24,6 +25,20 @@ import (
"github.com/spf13/cobra"
)

const stageError = "Error"

type taskResult struct {
Result bool `json:"result"`
Msg string `json:"msg"`
Tasks []*taskInfo `json:"tasks"`
}

type taskInfo struct {
TaskName string `json:"taskName,omitempty"`
TaskStatus string `json:"taskStatus,omitempty"`
Workers []string `json:"workers,omitempty"`
}

// NewQueryStatusCmd creates a QueryStatus command
func NewQueryStatusCmd() *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -60,5 +75,75 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
return
}

common.PrettyPrintResponse(resp)
if resp.Result && taskName == "" && len(workers) == 0 {
result := wrapTaskResult(resp)
common.PrettyPrintInterface(result)
} else {
common.PrettyPrintResponse(resp)
}
}

// errorOccurred checks ProcessResult and return true if some error occurred
func errorOccurred(result *pb.ProcessResult) bool {
return result != nil && len(result.Errors) > 0
}

// getRelayStage returns current relay stage (including stageError)
func getRelayStage(relayStatus *pb.RelayStatus) string {
if errorOccurred(relayStatus.Result) {
return stageError
}
return relayStatus.Stage.String()
}

// wrapTaskResult picks task info and generate tasks' status and relative workers
func wrapTaskResult(resp *pb.QueryStatusListResponse) *taskResult {
taskStatusMap := make(map[string]string)
taskCorrespondingWorkers := make(map[string][]string)
for _, worker := range resp.Workers {
relayStatus := worker.RelayStatus
for _, subTask := range worker.SubTaskStatus {
subTaskName := subTask.Name
subTaskStage := subTask.Stage

taskCorrespondingWorkers[subTaskName] = append(taskCorrespondingWorkers[subTaskName], worker.Worker)
taskStage := taskStatusMap[subTaskName]
// the status of a task is decided by its subtasks, the rule is listed as follows:
// | Subtasks' status | Task's status |
// | :--------------------------------------------------------: | :------------------------------------------: |
// | Any Paused and len(result.errors) > 0 | Error - Some error occurred in subtask |
// | Any Running and unit is "Sync" and relay is Paused/Stopped | Error - Relay status is Error/Paused/Stopped |
// | Any Paused but without error | Paused |
// | All New | New |
// | All Finished | Finished |
// | All Stopped | Stopped |
// | Others | Running |
switch {
case strings.HasPrefix(taskStage, stageError):
case subTaskStage == pb.Stage_Paused && errorOccurred(subTask.Result):
taskStatusMap[subTaskName] = stageError + " - Some error occurred in subtask"
case subTask.Unit == pb.UnitType_Sync && subTask.Stage == pb.Stage_Running && (relayStatus.Stage == pb.Stage_Paused || relayStatus.Stage == pb.Stage_Stopped):
taskStatusMap[subTaskName] = stageError + " - Relay status is " + getRelayStage(relayStatus)
case taskStage == pb.Stage_Paused.String():
case taskStage == "", subTaskStage == pb.Stage_Paused:
taskStatusMap[subTaskName] = subTaskStage.String()
case taskStage != subTaskStage.String():
taskStatusMap[subTaskName] = pb.Stage_Running.String()
}
}
}
taskList := make([]*taskInfo, 0, len(taskStatusMap))
for curTaskName, taskStatus := range taskStatusMap {
taskList = append(taskList,
&taskInfo{
TaskName: curTaskName,
TaskStatus: taskStatus,
Workers: taskCorrespondingWorkers[curTaskName],
})
}
return &taskResult{
Result: resp.Result,
Msg: resp.Msg,
Tasks: taskList,
}
}
155 changes: 155 additions & 0 deletions dm/ctl/master/query_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"sort"
"testing"

"github.com/pingcap/dm/dm/pb"

"github.com/pingcap/check"
)

func TestCtlMaster(t *testing.T) {
check.TestingT(t)
}

type testCtlMaster struct {
}

var _ = check.Suite(&testCtlMaster{})

func generateAndCheckTaskResult(c *check.C, resp *pb.QueryStatusListResponse, expectedResult []*taskInfo) {
result := wrapTaskResult(resp)
c.Assert(result.Result, check.IsTrue)
c.Assert(result.Tasks, check.HasLen, 1)
sort.Strings(result.Tasks[0].Workers)
c.Assert(result.Tasks, check.DeepEquals, expectedResult)
}

func subTestSameSubTaskStatus(c *check.C, resp *pb.QueryStatusListResponse, expectedResult []*taskInfo, stage pb.Stage) {
for i := range resp.Workers {
resp.Workers[i].SubTaskStatus[0].Stage = stage
}
expectedResult[0].TaskStatus = stage.String()
generateAndCheckTaskResult(c, resp, expectedResult)
}

func (t *testCtlMaster) TestWrapTaskResult(c *check.C) {
resp := new(pb.QueryStatusListResponse)
resp.Result = true

// Should return error when some error occurs in subtask
resp.Workers = []*pb.QueryStatusResponse{
{
Result: true,
Worker: "172.17.0.2:8262",
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Stage: pb.Stage_Running,
}},
},
{
Result: true,
Worker: "172.17.0.3:8262",
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Stage: pb.Stage_Running,
}},
},
{
Result: true,
Worker: "172.17.0.6:8262",
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Stage: pb.Stage_Paused,
Result: &pb.ProcessResult{
Errors: []*pb.ProcessError{{Type: pb.ErrorType_ExecSQL}},
},
}},
},
}
expectedResult := []*taskInfo{{
TaskName: "test",
TaskStatus: stageError + " - Some error occurred in subtask",
Workers: []string{"172.17.0.2:8262", "172.17.0.3:8262", "172.17.0.6:8262"},
}}
generateAndCheckTaskResult(c, resp, expectedResult)
// Should return error when subtask unit is "Sync" while relay status is not running
resp.Workers[2].SubTaskStatus[0].Result = nil
resp.Workers[0].SubTaskStatus[0].Unit = pb.UnitType_Sync
// relay status is Error
resp.Workers[0].RelayStatus = &pb.RelayStatus{
Stage: pb.Stage_Paused,
Result: &pb.ProcessResult{
Errors: []*pb.ProcessError{{Type: pb.ErrorType_CheckFailed}},
}}
expectedResult[0].TaskStatus = stageError + " - Relay status is " + stageError
generateAndCheckTaskResult(c, resp, expectedResult)
// relay status is Paused
resp.Workers[0].RelayStatus = &pb.RelayStatus{Stage: pb.Stage_Paused}
expectedResult[0].TaskStatus = stageError + " - Relay status is " + pb.Stage_Paused.String()
generateAndCheckTaskResult(c, resp, expectedResult)
// relay status is Stopped
resp.Workers[0].RelayStatus = &pb.RelayStatus{Stage: pb.Stage_Stopped}
expectedResult[0].TaskStatus = stageError + " - Relay status is " + pb.Stage_Stopped.String()
generateAndCheckTaskResult(c, resp, expectedResult)

// one subtask is paused and no error occurs, should return paused
resp.Workers[2].SubTaskStatus[0].Result = nil
resp.Workers[0].SubTaskStatus[0].Unit = 0
resp.Workers[0].RelayStatus = nil
expectedResult[0].TaskStatus = pb.Stage_Paused.String()
generateAndCheckTaskResult(c, resp, expectedResult)
// All subtasks are Finished/Stopped/.../New
stageArray := []pb.Stage{pb.Stage_Finished, pb.Stage_Stopped, pb.Stage_Paused, pb.Stage_Running, pb.Stage_New}
for _, stage := range stageArray {
subTestSameSubTaskStatus(c, resp, expectedResult, stage)
}
// All subtasks are New except the last one(which is Finished)
resp.Workers[2].SubTaskStatus[0].Stage = pb.Stage_Finished
expectedResult[0].TaskStatus = pb.Stage_Running.String()
generateAndCheckTaskResult(c, resp, expectedResult)

// test situation with two tasks
resp.Workers = append(resp.Workers, &pb.QueryStatusResponse{
Result: true,
Worker: "172.17.0.4:8262",
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test2",
Stage: pb.Stage_Paused,
Result: &pb.ProcessResult{
Errors: []*pb.ProcessError{{Type: pb.ErrorType_ExecSQL}},
},
}},
})
result := wrapTaskResult(resp)
c.Assert(result.Tasks, check.HasLen, 2)
if result.Tasks[0].TaskName == "test2" {
result.Tasks[0], result.Tasks[1] = result.Tasks[1], result.Tasks[0]
}
sort.Strings(result.Tasks[0].Workers)
expectedResult = []*taskInfo{{
TaskName: "test",
TaskStatus: pb.Stage_Running.String(),
Workers: []string{"172.17.0.2:8262", "172.17.0.3:8262", "172.17.0.6:8262"},
}, {
TaskName: "test2",
TaskStatus: stageError + " - Some error occurred in subtask",
Workers: []string{"172.17.0.4:8262"},
},
}
c.Assert(result.Tasks, check.DeepEquals, expectedResult)
}
14 changes: 11 additions & 3 deletions tests/dmctl_basic/check_list/query_status.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@ function query_status_wrong_params() {

function query_status_with_no_tasks() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:$WORKER1_PORT,127.0.0.1:$WORKER2_PORT" \
"\"result\": true" 3 \
"\"msg\": \"no sub task started\"" 2
}

function query_status_with_tasks() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:$WORKER1_PORT,127.0.0.1:$WORKER2_PORT" \
"\"result\": true" 3 \
"\"unit\": \"Sync\"" 2 \
"\"stage\": \"Running\"" 4
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"\"result\": true" 1 \
"\"taskName\": \"test\"" 1 \
"\"taskStatus\": \"Running\"" 1 \
"\"workers\":" 1 \
"\"127.0.0.1:$WORKER1_PORT\"" 1 \
"\"127.0.0.1:$WORKER2_PORT\"" 1
}

function query_status_stopped_relay() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:$WORKER1_PORT,127.0.0.1:$WORKER2_PORT" \
"\"result\": true" 3 \
"\"stage\": \"Paused\"" 2
}
8 changes: 6 additions & 2 deletions tests/relay_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function run() {

echo "query status, relay log failed"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"no sub task started" 1 \
"ERROR" 1

Expand All @@ -49,9 +49,13 @@ function run() {
"\"result\": true" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"no valid relay sub directory exists" 1 \
"ERROR" 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"\"taskName\": \"test\"" 1 \
"\"taskStatus\": \"Error - Some error occurred in subtask\"" 1

echo "reset go failpoints, and need restart dm-worker"
echo "then resume task, task will recover success"
Expand Down
2 changes: 1 addition & 1 deletion tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function run() {

echo "check un-accessible DM-worker exists"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"query-status -w 127.0.0.1:8888" \
"transport: Error while dialing dial tcp 127.0.0.1:8888: connect: connection refused" 1

echo "start task and will failed"
Expand Down

0 comments on commit 0285535

Please sign in to comment.