Skip to content

Commit

Permalink
Added ability to collect logs from server.
Browse files Browse the repository at this point in the history
  • Loading branch information
brett19 committed Nov 29, 2023
1 parent f668a94 commit 0b1edab
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 9 deletions.
46 changes: 46 additions & 0 deletions cmd/collectlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"
"go.uber.org/zap"
)

type CollectLogsOutput []string

var collectLogsCmd = &cobra.Command{
Use: "collect-logs [flags] cluster dest-path",
Short: "Gets a connection string to connect to the cluster",
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
helper := CmdHelper{}
logger := helper.GetLogger()
ctx := helper.GetContext()

outputJson, _ := cmd.Flags().GetBool("json")

_, deployer, cluster := helper.IdentifyCluster(ctx, args[0])
destPath := args[1]

logPaths, err := deployer.CollectLogs(ctx, cluster.GetID(), destPath)
if err != nil {
logger.Fatal("failed to collect logs", zap.Error(err))
}

if !outputJson {
fmt.Printf("Collected Files:\n")
for _, path := range logPaths {
fmt.Printf(" %s\n",
path)
}
} else {
var out CollectLogsOutput = logPaths
helper.OutputJson(out)
}
},
}

func init() {
rootCmd.AddCommand(collectLogsCmd)
}
4 changes: 4 additions & 0 deletions deployment/clouddeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,3 +1119,7 @@ func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeI
func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("clouddeploy does not support traffic control")
}

func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) {
return nil, errors.New("clouddeploy does not support log collection")
}
1 change: 1 addition & 0 deletions deployment/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,5 @@ type Deployer interface {
DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error
BlockNodeTraffic(ctx context.Context, clusterID string, nodeID string) error
AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error
CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error)
}
114 changes: 114 additions & 0 deletions deployment/dockerdeploy/deployer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package dockerdeploy

import (
"archive/tar"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
"strings"
"time"

Expand All @@ -18,6 +22,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -1080,3 +1085,112 @@ func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeI

return nil
}

func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) {
clusterInfo, err := d.getClusterInfo(ctx, clusterID)
if err != nil {
return nil, errors.Wrap(err, "failed to get cluster info")
}

if len(clusterInfo.Nodes) == 0 {
return nil, errors.New("cannot collection logs from a cluster with no nodes")
}

nodeCtrl := clustercontrol.NodeManager{
Endpoint: fmt.Sprintf("http://%s:8091", clusterInfo.Nodes[0].IPAddress),
}

nodeOtps, err := nodeCtrl.Controller().ListNodeOTPs(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to list nodes")
}

d.logger.Info("beginning log collection", zap.Strings("nodes", nodeOtps))

err = nodeCtrl.Controller().BeginLogsCollection(ctx, &clustercontrol.BeginLogsCollectionOptions{
Nodes: nodeOtps,
LogRedactionLevel: "none",
})
if err != nil {
return nil, errors.Wrap(err, "failed to begin log collection")
}

d.logger.Info("waiting for log collection to start")

err = nodeCtrl.WaitForTaskRunning(ctx, "clusterLogsCollection")
if err != nil {
return nil, errors.Wrap(err, "failed to wait for log collection to start")
}

d.logger.Info("waiting for log collection to complete (this can take a _long_ time)")

logPaths, err := nodeCtrl.WaitForLogCollection(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to wait for log collection to complete")
}

nodeInfoFromIp := func(ipAddress string) *deployedNodeInfo {
for _, nodeInfo := range clusterInfo.Nodes {
if nodeInfo.IPAddress == ipAddress {
return nodeInfo
}
}
return nil
}

var destPaths []string
for nodeId, filePath := range logPaths {
otpParts := strings.Split(nodeId, "@")
if len(otpParts) != 2 {
return nil, errors.New("unexpected node otp format")
}
ipAddress := otpParts[1]

nodeInfo := nodeInfoFromIp(ipAddress)
if nodeInfo == nil {
return nil, fmt.Errorf("failed to find node for ip %s", ipAddress)
}
containerId := nodeInfo.ContainerID

fileName := path.Base(filePath)
destFilePath := path.Join(destPath, fileName)

if !d.logger.Level().Enabled(zapcore.DebugLevel) {
d.logger.Info("downloading log from node",
zap.String("node", nodeId))
} else {
d.logger.Info("downloading log from node",
zap.String("node", nodeId),
zap.String("ipAddress", ipAddress),
zap.String("container", containerId),
zap.String("srcPath", filePath),
zap.String("destPath", destFilePath))
}

resp, _, err := d.dockerCli.CopyFromContainer(ctx, containerId, filePath)
if err != nil {
return nil, errors.Wrap(err, "failed to copy from container")
}
defer resp.Close()

tarRdr := tar.NewReader(resp)
_, err = tarRdr.Next()
if err != nil {
return nil, errors.Wrap(err, "failed to parse transmitted file")
}

fileWrt, err := os.Create(destFilePath)
if err != nil {
return nil, errors.Wrap(err, "failed to open destination file for writing")
}

_, err = io.Copy(fileWrt, tarRdr)
if err != nil {
return nil, errors.Wrap(err, "failed to copy container file to local disk")
}

destPaths = append(destPaths, destFilePath)
}

return destPaths, nil
}
4 changes: 4 additions & 0 deletions deployment/localdeploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,7 @@ func (d *Deployer) BlockNodeTraffic(ctx context.Context, clusterID string, nodeI
func (d *Deployer) AllowNodeTraffic(ctx context.Context, clusterID string, nodeID string) error {
return errors.New("localdeploy does not support traffic control")
}

func (d *Deployer) CollectLogs(ctx context.Context, clusterID string, destPath string) ([]string, error) {
return nil, errors.New("localdeploy does not support log collection")
}
88 changes: 80 additions & 8 deletions utils/clustercontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,24 +337,96 @@ func (c *Controller) BeginRebalance(ctx context.Context, opts *BeginRebalanceOpt
return c.doFormPost(ctx, "/controller/rebalance", form, true, nil)
}

type Task struct {
type BeginLogsCollectionOptions struct {
Nodes []string `url:"nodes,comma"`
LogRedactionLevel string `url:"logRedactionLevel"`
}

func (c *Controller) BeginLogsCollection(ctx context.Context, opts *BeginLogsCollectionOptions) error {
form, _ := query.Values(opts)
return c.doFormPost(ctx, "/controller/startLogsCollection", form, true, nil)
}

type Task interface {
GetStatus() string
GetType() string
}

type GenericTask struct {
Status string
Type string
}

func (t GenericTask) GetStatus() string { return t.Status }
func (t GenericTask) GetType() string { return t.Type }

type CollectLogsTask struct {
GenericTask
PerNode map[string]CollectLogsTask_PerNode
}

type CollectLogsTask_PerNode struct {
Status string
Path string
}

func (c *Controller) ListTasks(ctx context.Context) ([]*Task, error) {
var resp []struct {
func (c *Controller) ListTasks(ctx context.Context) ([]Task, error) {
type genericTaskJson struct {
Status string `json:"status"`
}
Type string `json:"type"`
}
type clusterLogsCollectionTaskJson struct {
genericTaskJson
Node string `json:"node"`
PerNode map[string]struct {
Status string `json:"status"`
Path string `json:"path"`
} `json:"perNode"`
Progress int `json:"progress"`
Ts string `json:"ts"`
RecommendedRefreshPeriod int `json:"recommendedRefreshPeriod"`
CancelURI string `json:"cancelURI"`
}

var resp []json.RawMessage
err := c.doGet(ctx, "/pools/default/tasks", &resp)
if err != nil {
return nil, err
}

tasks := make([]*Task, len(resp))
for statusIdx, status := range resp {
tasks[statusIdx] = &Task{
Status: status.Status,
tasks := make([]Task, len(resp))
for taskIdx, taskJson := range resp {
var baseTask genericTaskJson
err := json.Unmarshal(taskJson, &baseTask)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal task")
}

var outTask Task
if baseTask.Type == "clusterLogsCollection" {
var task clusterLogsCollectionTaskJson
err := json.Unmarshal(taskJson, &task)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal clusterLogsCollection task")
}

perNode := make(map[string]CollectLogsTask_PerNode)
for nodeId, nodeInfo := range task.PerNode {
perNode[nodeId] = CollectLogsTask_PerNode{
Status: nodeInfo.Status,
Path: nodeInfo.Path,
}
}

outTask = CollectLogsTask{
GenericTask: GenericTask(task.genericTaskJson),
PerNode: perNode,
}
} else {
outTask = GenericTask(baseTask)
}

tasks[taskIdx] = outTask
}

return tasks, nil
Expand Down
66 changes: 65 additions & 1 deletion utils/clustercontrol/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ func (m *NodeManager) WaitForNoRunningTasks(ctx context.Context) error {

hasRunningTask := false
for _, task := range tasks {
if task.Status != "notRunning" {
taskStatus := task.GetStatus()
if taskStatus != "notRunning" &&
taskStatus != "completed" &&
taskStatus != "cancelled" {
hasRunningTask = true
}
}
Expand All @@ -165,3 +168,64 @@ func (m *NodeManager) WaitForNoRunningTasks(ctx context.Context) error {

return nil
}

func (m *NodeManager) WaitForTaskRunning(ctx context.Context, taskType string) error {
c := m.Controller()

for {
tasks, err := c.ListTasks(ctx)
if err != nil {
return errors.Wrap(err, "failed to fetch list of tasks")
}

hasRunningTask := false
for _, task := range tasks {
if task.GetType() == taskType && task.GetStatus() == "running" {
hasRunningTask = true
}
}

if !hasRunningTask {
time.Sleep(1 * time.Second)
continue
}

break
}

return nil
}

// waits for log collection and returns a map of otp -> log path
func (m *NodeManager) WaitForLogCollection(ctx context.Context) (map[string]string, error) {
c := m.Controller()

for {
tasks, err := c.ListTasks(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch list of tasks")
}

var foundLogTask *CollectLogsTask
for _, task := range tasks {
if logTask, ok := task.(CollectLogsTask); ok {
foundLogTask = &logTask
}
}
if foundLogTask == nil {
return nil, errors.New("failed to find log collection task")
}

if foundLogTask.Status != "completed" {
time.Sleep(1 * time.Second)
continue
}

paths := make(map[string]string)
for nodeId, nodeInfo := range foundLogTask.PerNode {
paths[nodeId] = nodeInfo.Path
}

return paths, nil
}
}

0 comments on commit 0b1edab

Please sign in to comment.