Skip to content

Commit

Permalink
bug: abort immediately if we can't read one of the pod logs (#4943)
Browse files Browse the repository at this point in the history
if we are unable to attach to one of the listener pods we abort with an
error. if we don't do this then we were showing up as if the traffic on
the given port was blocked.

this pr solves the following scenario:

1. we deploy the listeners daemonset
2. we wait for the daemonset to rollout.
3. we attempt to attach to listener pod logs (all pods).
4. due to a cluster misconfiguration we can't reach kubelet in one of
   the nodes.

before this pr the step 4 was ignored and we moved on as if we were
attached to the pod, generating a misleading error message.
  • Loading branch information
ricardomaraschini authored Nov 16, 2023
1 parent 55341ac commit d36aaee
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.2.4 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down Expand Up @@ -262,6 +263,7 @@ require (
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a h1:eqjiAL3qoof
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -875,6 +876,7 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v0.1.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk=
github.com/go-logr/zapr v1.2.4 h1:QHVo+6stLbfJmYGkQ7uGHUCu5hnAFAj6mDe6Ea0SeOo=
github.com/go-logr/zapr v1.2.4/go.mod h1:FyHWQIzQORZ0QVE1BtVHv3cKtNLuXsbNLtpuhNapBOA=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
Expand Down Expand Up @@ -2419,8 +2421,10 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand All @@ -2434,7 +2438,9 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
58 changes: 52 additions & 6 deletions pkg/cli/nodes_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os/signal"
"strings"
"syscall"
Expand All @@ -20,7 +21,10 @@ import (
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/kustomize/api/types"
"sigs.k8s.io/yaml"

"github.com/google/uuid"
"github.com/replicatedhq/plumber/v2"
Expand Down Expand Up @@ -56,6 +60,14 @@ type nodeConnectivityOptions struct {
port int32
cli client.Client
wait time.Duration
verbose bool
}

func (n nodeConnectivityOptions) debugf(format string, args ...interface{}) {
if !n.verbose || n.printf == nil {
return
}
n.printf(format, args...)
}

func newNetutilNodesConnectivity(_ CLI) *cobra.Command {
Expand Down Expand Up @@ -101,13 +113,17 @@ func newNetutilNodesConnectivity(_ CLI) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()
k8slogger := zap.New(func(o *zap.Options) { o.DestWriter = io.Discard })
log.SetLogger(k8slogger)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
opts.debugf("Deleting listener DaemonSet")
if err := deleteListeners(ctx, opts); err != nil {
opts.printf("Failed to delete DaemonSet listeners overlay: %s\n", err)
}
opts.debugf("Deleting pinger job")
if err := deletePinger(ctx, opts); err != nil {
opts.printf("Failed to delete pinger job: %s\n", err)
}
Expand All @@ -132,6 +148,7 @@ func newNetutilNodesConnectivity(_ CLI) *cobra.Command {
cmd.Flags().StringVar(&opts.image, "image", "replicated/kurl-util:latest", "The image to use for the test (image must contain bash, nc and echo).")
cmd.Flags().Int32Var(&opts.port, "port", 0, "The port to use for the test.")
cmd.Flags().IntVar(&opts.attempts, "udp-attempts", 5, "The number of connection attempts when using udp.")
cmd.Flags().BoolVar(&opts.verbose, "verbose", false, "Enable verbose output.")
return cmd
}

Expand Down Expand Up @@ -189,54 +206,69 @@ func deployListenersDaemonset(ctx context.Context, opts nodeConnectivityOptions)
{Name: "PORT", Value: fmt.Sprintf("%d", opts.port)},
}
}
data, _ := yaml.Marshal(obj)
opts.debugf("Creating object:\n%s", string(data))
return nil
}),
plumber.WithPostApplyAction(func(ctx context.Context, obj client.Object) error {
if ds, ok := obj.(*appsv1.DaemonSet); ok {
opts.debugf("Waiting for DaemonSet %s to rollout\n", ds.Name)
err := k8sutil.WaitForDaemonsetRollout(ctx, opts.cliset, ds, time.Minute)
if err != nil {
printDaemonsetStatus(ctx, opts, ds)
return fmt.Errorf("failed to wait for DaemonSet rollout: %w", err)
}
opts.debugf("DaemonSet %s ready\n", ds.Name)
}
return nil
}),
}
overlay := fmt.Sprintf("%s-listeners", strings.ToLower(opts.proto))
renderer := plumber.NewRenderer(opts.cli, nodes_connectivity.Static, options...)
if err := renderer.Apply(ctx, overlay); err != nil {
return fmt.Errorf("failed to create listeners daemonset: %w", err)
return fmt.Errorf("failed to create listeners DaemonSet: %w", err)
}
opts.printf("Listeners DaemonSet deployed successfully.\n")
return nil
}

// attachToListenersPods attaches to all provided pod logs. returns a channel from where printed messages (or errors) can
// be read.
func attachToListenersPods(ctx context.Context, opts nodeConnectivityOptions, pods []corev1.Pod) <-chan logLine {
func attachToListenersPods(ctx context.Context, opts nodeConnectivityOptions, pods []corev1.Pod) (<-chan logLine, error) {
var out = make(chan logLine)
for _, pod := range pods {
opts.printf("Attaching to pod %s\n", pod.Name)
errs := make(chan error)
go func(pod corev1.Pod) {
logopts := &corev1.PodLogOptions{Follow: true}
req := opts.cliset.CoreV1().Pods(opts.namespace).GetLogs(pod.Name, logopts)
stream, err := req.Stream(ctx)
if err != nil {
out <- logLine{err: fmt.Errorf("failed to get logs for pod %s: %w", pod.Name, err)}
errs <- fmt.Errorf("failed to attach to pod %s: %w", pod.Name, err)
return
}
defer stream.Close()

errs <- nil
scanner := bufio.NewScanner(stream)
scanner.Split(bufio.ScanLines)
opts.debugf("Waiting for logs on pod %s\n", pod.Name)
for scanner.Scan() {
txt := scanner.Text()
opts.debugf("Pod %s log: %s\n", pod.Name, txt)
out <- logLine{message: scanner.Text()}
}
if err := scanner.Err(); err != nil {
opts.debugf("Error closing scanner on pod %s: %v\n", pod.Name, err)
out <- logLine{err: fmt.Errorf("failed to read logs for pod %s: %w", pod.Name, err)}
}
}(pod)
if err := <-errs; err != nil {
opts.debugf("Fail to attach to pod %s: %v\n", pod.Name, err)
return nil, err
}
}
return out
return out, nil
}

// kustomizeMutator returns a kustomize mutator that sets the namespace and image.
Expand Down Expand Up @@ -302,13 +334,17 @@ func runPinger(ctx context.Context, opts nodeConnectivityOptions, model corev1.P
//job.Spec.Template.Spec.Tolerations = model.Spec.Tolerations
job.Spec.Template.Spec.Containers[0].Env = env
}
data, _ := yaml.Marshal(obj)
opts.debugf("Creating object:\n%s", string(data))
return nil
}),
plumber.WithPostApplyAction(func(ctx context.Context, obj client.Object) error {
if job, ok := obj.(*batchv1.Job); ok {
opts.debugf("Waiting for job %s to finish\n", string(job.Name))
if _, err := k8sutil.WaitForJob(ctx, opts.cliset, job, time.Minute); err != nil {
return fmt.Errorf("failed to create job: %s", err)
}
opts.debugf("Job %s finished\n", string(job.Name))
}
return nil
}),
Expand All @@ -329,7 +365,13 @@ func testNodesConnectivity(ctx context.Context, opts nodeConnectivityOptions) er
if err != nil {
return fmt.Errorf("failed to get listener pods: %w", err)
}
receiver := attachToListenersPods(ctx, opts, pods.Items)
for _, pod := range pods.Items {
opts.debugf("Found %s as part of the listeners DaemonSet\n", pod.Name)
}
receiver, err := attachToListenersPods(ctx, opts, pods.Items)
if err != nil {
return fmt.Errorf("failed to attach to listeners: %w", err)
}
var nodes corev1.NodeList
if err := opts.cli.List(ctx, &nodes); err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
Expand All @@ -345,10 +387,12 @@ func testNodesConnectivity(ctx context.Context, opts nodeConnectivityOptions) er
func readLogLine(ctx context.Context, opts nodeConnectivityOptions, receiver <-chan logLine) (string, error) {
select {
case line := <-receiver:
opts.debugf("Event received from listener pods: message: %s err: %v\n", line.message, line.err)
return line.message, line.err
case <-ctx.Done():
return "", fmt.Errorf("failed while waiting for logs: %w", ctx.Err())
case <-time.After(opts.wait):
opts.debugf("Timeout while waiting for logs\n")
return "", nil
}
}
Expand Down Expand Up @@ -376,15 +420,17 @@ func connectToNodeFromPods(ctx context.Context, opts nodeConnectivityOptions, no
if err != nil {
return fmt.Errorf("failed to connect node %s from node %s: %v", src, dst, err)
}
opts.debugf("Reading logs from listeners\n")
line, err := readLogLine(ctx, opts, receiver)
if err != nil {
return fmt.Errorf("failed to read log line: %w", err)
}
opts.debugf("Received %s, expected %s\n", line, id)
if success = line == id; success {
opts.printf("Success, packet received.\n")
break
}
opts.printf("Failed to connect from %s to %s, %s\n", src, dst)
opts.printf("Failed to connect from %s to %s\n", src, dst)
}
if success {
continue
Expand Down

0 comments on commit d36aaee

Please sign in to comment.