From 0b1edab966e065c7106ba10f8bb8a014208b67e6 Mon Sep 17 00:00:00 2001 From: Brett Lawson Date: Wed, 29 Nov 2023 14:51:05 -0800 Subject: [PATCH] Added ability to collect logs from server. --- cmd/collectlogs.go | 46 +++++++++++ deployment/clouddeploy/deployer.go | 4 + deployment/deployer.go | 1 + deployment/dockerdeploy/deployer.go | 114 ++++++++++++++++++++++++++++ deployment/localdeploy/deployer.go | 4 + utils/clustercontrol/controller.go | 88 +++++++++++++++++++-- utils/clustercontrol/nodemanager.go | 66 +++++++++++++++- 7 files changed, 314 insertions(+), 9 deletions(-) create mode 100644 cmd/collectlogs.go diff --git a/cmd/collectlogs.go b/cmd/collectlogs.go new file mode 100644 index 0000000..89adeec --- /dev/null +++ b/cmd/collectlogs.go @@ -0,0 +1,46 @@ +package cmd + +import ( + "fmt" + + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +type CollectLogsOutput []string + +var collectLogsCmd = &cobra.Command{ + Use: "collect-logs [flags] cluster dest-path", + Short: "Gets a connection string to connect to the cluster", + Args: cobra.MinimumNArgs(2), + Run: func(cmd *cobra.Command, args []string) { + helper := CmdHelper{} + logger := helper.GetLogger() + ctx := helper.GetContext() + + outputJson, _ := cmd.Flags().GetBool("json") + + _, deployer, cluster := helper.IdentifyCluster(ctx, args[0]) + destPath := args[1] + + logPaths, err := deployer.CollectLogs(ctx, cluster.GetID(), destPath) + if err != nil { + logger.Fatal("failed to collect logs", zap.Error(err)) + } + + if !outputJson { + fmt.Printf("Collected Files:\n") + for _, path := range logPaths { + fmt.Printf(" %s\n", + path) + } + } else { + var out CollectLogsOutput = logPaths + helper.OutputJson(out) + } + }, +} + +func init() { + rootCmd.AddCommand(collectLogsCmd) +} diff --git a/deployment/clouddeploy/deployer.go b/deployment/clouddeploy/deployer.go index 733df1c..6d05968 100644 --- a/deployment/clouddeploy/deployer.go +++ b/deployment/clouddeploy/deployer.go @@ -1119,3 +1119,7 @@ func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeI func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error { return errors.New("clouddeploy does not support traffic control") } + +func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) { + return nil, errors.New("clouddeploy does not support log collection") +} diff --git a/deployment/deployer.go b/deployment/deployer.go index aced606..9c0b7b2 100644 --- a/deployment/deployer.go +++ b/deployment/deployer.go @@ -82,4 +82,5 @@ type Deployer interface { DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error + CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) } diff --git a/deployment/dockerdeploy/deployer.go b/deployment/dockerdeploy/deployer.go index 104a383..1baa11c 100644 --- a/deployment/dockerdeploy/deployer.go +++ b/deployment/dockerdeploy/deployer.go @@ -1,9 +1,13 @@ package dockerdeploy import ( + "archive/tar" "context" "encoding/json" "fmt" + "io" + "os" + "path" "strings" "time" @@ -18,6 +22,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" ) @@ -1080,3 +1085,112 @@ func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeI return nil } + +func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) { + clusterInfo, err := d.getClusterInfo(ctx, clusterID) + if err != nil { + return nil, errors.Wrap(err, "failed to get cluster info") + } + + if len(clusterInfo.Nodes) == 0 { + return nil, errors.New("cannot collection logs from a cluster with no nodes") + } + + nodeCtrl := clustercontrol.NodeManager{ + Endpoint: fmt.Sprintf("http://%s:8091", clusterInfo.Nodes[0].IPAddress), + } + + nodeOtps, err := nodeCtrl.Controller().ListNodeOTPs(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to list nodes") + } + + d.logger.Info("beginning log collection", zap.Strings("nodes", nodeOtps)) + + err = nodeCtrl.Controller().BeginLogsCollection(ctx, &clustercontrol.BeginLogsCollectionOptions{ + Nodes: nodeOtps, + LogRedactionLevel: "none", + }) + if err != nil { + return nil, errors.Wrap(err, "failed to begin log collection") + } + + d.logger.Info("waiting for log collection to start") + + err = nodeCtrl.WaitForTaskRunning(ctx, "clusterLogsCollection") + if err != nil { + return nil, errors.Wrap(err, "failed to wait for log collection to start") + } + + d.logger.Info("waiting for log collection to complete (this can take a _long_ time)") + + logPaths, err := nodeCtrl.WaitForLogCollection(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to wait for log collection to complete") + } + + nodeInfoFromIp := func(ipAddress string) *deployedNodeInfo { + for _, nodeInfo := range clusterInfo.Nodes { + if nodeInfo.IPAddress == ipAddress { + return nodeInfo + } + } + return nil + } + + var destPaths []string + for nodeId, filePath := range logPaths { + otpParts := strings.Split(nodeId, "@") + if len(otpParts) != 2 { + return nil, errors.New("unexpected node otp format") + } + ipAddress := otpParts[1] + + nodeInfo := nodeInfoFromIp(ipAddress) + if nodeInfo == nil { + return nil, fmt.Errorf("failed to find node for ip %s", ipAddress) + } + containerId := nodeInfo.ContainerID + + fileName := path.Base(filePath) + destFilePath := path.Join(destPath, fileName) + + if !d.logger.Level().Enabled(zapcore.DebugLevel) { + d.logger.Info("downloading log from node", + zap.String("node", nodeId)) + } else { + d.logger.Info("downloading log from node", + zap.String("node", nodeId), + zap.String("ipAddress", ipAddress), + zap.String("container", containerId), + zap.String("srcPath", filePath), + zap.String("destPath", destFilePath)) + } + + resp, _, err := d.dockerCli.CopyFromContainer(ctx, containerId, filePath) + if err != nil { + return nil, errors.Wrap(err, "failed to copy from container") + } + defer resp.Close() + + tarRdr := tar.NewReader(resp) + _, err = tarRdr.Next() + if err != nil { + return nil, errors.Wrap(err, "failed to parse transmitted file") + } + + fileWrt, err := os.Create(destFilePath) + if err != nil { + return nil, errors.Wrap(err, "failed to open destination file for writing") + } + + _, err = io.Copy(fileWrt, tarRdr) + if err != nil { + return nil, errors.Wrap(err, "failed to copy container file to local disk") + } + + destPaths = append(destPaths, destFilePath) + } + + return destPaths, nil +} diff --git a/deployment/localdeploy/deployer.go b/deployment/localdeploy/deployer.go index fb131e1..821bc85 100644 --- a/deployment/localdeploy/deployer.go +++ b/deployment/localdeploy/deployer.go @@ -161,3 +161,7 @@ func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeI func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error { return errors.New("localdeploy does not support traffic control") } + +func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) { + return nil, errors.New("localdeploy does not support log collection") +} diff --git a/utils/clustercontrol/controller.go b/utils/clustercontrol/controller.go index 3f16142..460ccc7 100644 --- a/utils/clustercontrol/controller.go +++ b/utils/clustercontrol/controller.go @@ -337,24 +337,96 @@ func (c *Controller) BeginRebalance(ctx context.Context, opts *BeginRebalanceOpt return c.doFormPost(ctx, "/controller/rebalance", form, true, nil) } -type Task struct { +type BeginLogsCollectionOptions struct { + Nodes []string `url:"nodes,comma"` + LogRedactionLevel string `url:"logRedactionLevel"` +} + +func (c *Controller) BeginLogsCollection(ctx context.Context, opts *BeginLogsCollectionOptions) error { + form, _ := query.Values(opts) + return c.doFormPost(ctx, "/controller/startLogsCollection", form, true, nil) +} + +type Task interface { + GetStatus() string + GetType() string +} + +type GenericTask struct { + Status string + Type string +} + +func (t GenericTask) GetStatus() string { return t.Status } +func (t GenericTask) GetType() string { return t.Type } + +type CollectLogsTask struct { + GenericTask + PerNode map[string]CollectLogsTask_PerNode +} + +type CollectLogsTask_PerNode struct { Status string + Path string } -func (c *Controller) ListTasks(ctx context.Context) ([]*Task, error) { - var resp []struct { +func (c *Controller) ListTasks(ctx context.Context) ([]Task, error) { + type genericTaskJson struct { Status string `json:"status"` - } + Type string `json:"type"` + } + type clusterLogsCollectionTaskJson struct { + genericTaskJson + Node string `json:"node"` + PerNode map[string]struct { + Status string `json:"status"` + Path string `json:"path"` + } `json:"perNode"` + Progress int `json:"progress"` + Ts string `json:"ts"` + RecommendedRefreshPeriod int `json:"recommendedRefreshPeriod"` + CancelURI string `json:"cancelURI"` + } + + var resp []json.RawMessage err := c.doGet(ctx, "/pools/default/tasks", &resp) if err != nil { return nil, err } - tasks := make([]*Task, len(resp)) - for statusIdx, status := range resp { - tasks[statusIdx] = &Task{ - Status: status.Status, + tasks := make([]Task, len(resp)) + for taskIdx, taskJson := range resp { + var baseTask genericTaskJson + err := json.Unmarshal(taskJson, &baseTask) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal task") } + + var outTask Task + if baseTask.Type == "clusterLogsCollection" { + var task clusterLogsCollectionTaskJson + err := json.Unmarshal(taskJson, &task) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal clusterLogsCollection task") + } + + perNode := make(map[string]CollectLogsTask_PerNode) + for nodeId, nodeInfo := range task.PerNode { + perNode[nodeId] = CollectLogsTask_PerNode{ + Status: nodeInfo.Status, + Path: nodeInfo.Path, + } + } + + outTask = CollectLogsTask{ + GenericTask: GenericTask(task.genericTaskJson), + PerNode: perNode, + } + } else { + outTask = GenericTask(baseTask) + } + + tasks[taskIdx] = outTask } return tasks, nil diff --git a/utils/clustercontrol/nodemanager.go b/utils/clustercontrol/nodemanager.go index 1231e6c..66ae405 100644 --- a/utils/clustercontrol/nodemanager.go +++ b/utils/clustercontrol/nodemanager.go @@ -150,7 +150,10 @@ func (m *NodeManager) WaitForNoRunningTasks(ctx context.Context) error { hasRunningTask := false for _, task := range tasks { - if task.Status != "notRunning" { + taskStatus := task.GetStatus() + if taskStatus != "notRunning" && + taskStatus != "completed" && + taskStatus != "cancelled" { hasRunningTask = true } } @@ -165,3 +168,64 @@ func (m *NodeManager) WaitForNoRunningTasks(ctx context.Context) error { return nil } + +func (m *NodeManager) WaitForTaskRunning(ctx context.Context, taskType string) error { + c := m.Controller() + + for { + tasks, err := c.ListTasks(ctx) + if err != nil { + return errors.Wrap(err, "failed to fetch list of tasks") + } + + hasRunningTask := false + for _, task := range tasks { + if task.GetType() == taskType && task.GetStatus() == "running" { + hasRunningTask = true + } + } + + if !hasRunningTask { + time.Sleep(1 * time.Second) + continue + } + + break + } + + return nil +} + +// waits for log collection and returns a map of otp -> log path +func (m *NodeManager) WaitForLogCollection(ctx context.Context) (map[string]string, error) { + c := m.Controller() + + for { + tasks, err := c.ListTasks(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to fetch list of tasks") + } + + var foundLogTask *CollectLogsTask + for _, task := range tasks { + if logTask, ok := task.(CollectLogsTask); ok { + foundLogTask = &logTask + } + } + if foundLogTask == nil { + return nil, errors.New("failed to find log collection task") + } + + if foundLogTask.Status != "completed" { + time.Sleep(1 * time.Second) + continue + } + + paths := make(map[string]string) + for nodeId, nodeInfo := range foundLogTask.PerNode { + paths[nodeId] = nodeInfo.Path + } + + return paths, nil + } +}