Skip to content

Commit

Permalink
Re-Enable Prometheus Probers in Kubelet (CentaurusInfra#583)
Browse files Browse the repository at this point in the history
* enable kubelet probe registration

* cherry pick kublet probers from the community version
  • Loading branch information
chenqianfzh authored Aug 14, 2020
1 parent 50e561d commit 22d7f2d
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 510 deletions.
4 changes: 0 additions & 4 deletions pkg/kubelet/apis/resourcemetrics/v1alpha1/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ go_library(
srcs = ["config.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
],
)

filegroup(
Expand Down
63 changes: 3 additions & 60 deletions pkg/kubelet/apis/resourcemetrics/v1alpha1/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright 2019 The Kubernetes Authors.
Copyright 2020 Authors of Arktos - file modified.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -16,66 +17,8 @@ limitations under the License.

package v1alpha1

import (
"time"

summary "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
// TODO(RainbowMango): We don't need to maintain this package anymore.
// This package will be remove in release 1.20.0+. More details please refer to https://github.com/kubernetes/kubernetes/pull/86282.

// Version is the string representation of the version of this configuration
const Version = "v1alpha1"

// Config is the v1alpha1 resource metrics definition
func Config() stats.ResourceMetricsConfig {
return stats.ResourceMetricsConfig{
NodeMetrics: []stats.NodeResourceMetric{
{
Name: "node_cpu_usage_seconds_total",
Description: "Cumulative cpu time consumed by the node in core-seconds",
ValueFn: func(s summary.NodeStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "node_memory_working_set_bytes",
Description: "Current working set of the node in bytes",
ValueFn: func(s summary.NodeStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
ContainerMetrics: []stats.ContainerResourceMetric{
{
Name: "container_cpu_usage_seconds_total",
Description: "Cumulative cpu time consumed by the container in core-seconds",
ValueFn: func(s summary.ContainerStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "container_memory_working_set_bytes",
Description: "Current working set of the container in bytes",
ValueFn: func(s summary.ContainerStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
}
}
2 changes: 1 addition & 1 deletion pkg/kubelet/dockershim/network/testing/fake_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost {
return host
}

func (fnh *fakeNetworkHost) GetPodByName(name, namespace, tenant string) (*v1.Pod, bool) {
func (fnh *fakeNetworkHost) GetPodByName(tenant, namespace, name string) (*v1.Pod, bool) {
return nil, false
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,13 +1015,15 @@ func (kl *Kubelet) HandlePodCleanups() error {
activePods := kl.filterOutTerminatedPods(allPods)

desiredPods := make(map[types.UID]empty)
desiredActivePods := make(map[types.UID]sets.Empty)
for _, pod := range activePods {
desiredPods[pod.UID] = empty{}
desiredActivePods[pod.UID] = sets.Empty{}
}
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)
kl.probeManager.CleanupPods(desiredActivePods)

runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/prober/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
Expand Down Expand Up @@ -61,6 +61,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/prober/common_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright 2015 The Kubernetes Authors.
Copyright 2020 Authors of Arktos - file modified.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
109 changes: 60 additions & 49 deletions pkg/kubelet/prober/prober.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2020 Authors of Arktos - file modified.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -26,7 +27,7 @@ import (
"strings"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand All @@ -35,24 +36,24 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httprobe "k8s.io/kubernetes/pkg/probe/http"
tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
"k8s.io/utils/exec"

"k8s.io/klog"
)

const maxProbeRetries = 3

// Prober helps to check the liveness/readiness of a container.
// Prober helps to check the liveness/readiness/startup of a container.
type prober struct {
exec execprobe.Prober
// probe types needs different httprobe instances so they don't
// share a connection pool which can cause collsions to the
// same host:port and transient failures. See #49740.
readinessHTTP httprobe.Prober
livenessHTTP httprobe.Prober
tcp tcprobe.Prober
readinessHTTP httpprobe.Prober
livenessHTTP httpprobe.Prober
tcp tcpprobe.Prober
runner kubecontainer.ContainerCommandRunner

refManager *kubecontainer.RefManager
Expand All @@ -69,15 +70,29 @@ func newProber(
const followNonLocalRedirects = false
return &prober{
exec: execprobe.New(),
readinessHTTP: httprobe.New(followNonLocalRedirects),
livenessHTTP: httprobe.New(followNonLocalRedirects),
tcp: tcprobe.New(),
readinessHTTP: httpprobe.New(followNonLocalRedirects),
livenessHTTP: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
runner: runner,
refManager: refManager,
recorder: recorder,
}
}

// recordContainerEvent should be used by the prober for all container related events.
func (pb *prober) recordContainerEvent(pod *v1.Pod, container *v1.Container, containerID kubecontainer.ContainerID, eventType, reason, message string, args ...interface{}) {
var err error
ref, hasRef := pb.refManager.GetRef(containerID)
if !hasRef {
ref, err = kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
return
}
}
pb.recorder.Eventf(ref, eventType, reason, message, args...)
}

// probe probes the container.
func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
Expand All @@ -87,7 +102,7 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
case liveness:
probeSpec = container.LivenessProbe
default:
return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
}

ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
Expand All @@ -99,27 +114,17 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
if err != nil || (result != probe.Success && result != probe.Warning) {
// Probe failed in one way or another.
ref, hasRef := pb.refManager.GetRef(containerID)
if !hasRef {
klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
}
if err != nil {
klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
if hasRef {
pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
}
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
} else { // result != probe.Success
klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
if hasRef {
pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
}
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %v", probeType, output)
}
return results.Failure, err
}
if result == probe.Warning {
if ref, hasRef := pb.refManager.GetRef(containerID); hasRef {
pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
}
pb.recordContainerEvent(pod, &container, containerID, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %v", probeType, output)
klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
} else {
klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
Expand Down Expand Up @@ -174,11 +179,12 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
if probeType == liveness {
switch probeType {
case liveness:
return pb.livenessHTTP.Probe(url, headers, timeout)
default:
return pb.readinessHTTP.Probe(url, headers, timeout)
}
// readiness
return pb.readinessHTTP.Probe(url, headers, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
Expand All @@ -193,7 +199,7 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
return pb.tcp.Probe(host, port, timeout)
}
klog.Warningf("Failed to find probe builder for container: %v", container)
return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
Expand All @@ -210,7 +216,7 @@ func extractPort(param intstr.IntOrString, container v1.Container) (int, error)
}
}
default:
return port, fmt.Errorf("IntOrString had no kind: %+v", param)
return port, fmt.Errorf("intOrString had no kind: %+v", param)
}
if port > 0 && port < 65536 {
return port, nil
Expand Down Expand Up @@ -245,63 +251,68 @@ func formatURL(scheme string, host string, port int, path string) *url.URL {
type execInContainer struct {
// run executes a command in a container. Combined stdout and stderr output is always returned. An
// error is returned if one occurred.
run func() ([]byte, error)
run func() ([]byte, error)
writer io.Writer
}

func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return &execInContainer{run: func() ([]byte, error) {
return pb.runner.RunInContainer(containerID, cmd, timeout)
}}
}

func (eic execInContainer) Run() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Run() error {
return nil
}

func (eic execInContainer) CombinedOutput() ([]byte, error) {
func (eic *execInContainer) CombinedOutput() ([]byte, error) {
return eic.run()
}

func (eic execInContainer) Output() ([]byte, error) {
func (eic *execInContainer) Output() ([]byte, error) {
return nil, fmt.Errorf("unimplemented")
}

func (eic execInContainer) SetDir(dir string) {
func (eic *execInContainer) SetDir(dir string) {
//unimplemented
}

func (eic execInContainer) SetStdin(in io.Reader) {
func (eic *execInContainer) SetStdin(in io.Reader) {
//unimplemented
}

func (eic execInContainer) SetStdout(out io.Writer) {
//unimplemented
func (eic *execInContainer) SetStdout(out io.Writer) {
eic.writer = out
}

func (eic execInContainer) SetStderr(out io.Writer) {
//unimplemented
func (eic *execInContainer) SetStderr(out io.Writer) {
eic.writer = out
}

func (eic execInContainer) SetEnv(env []string) {
func (eic *execInContainer) SetEnv(env []string) {
//unimplemented
}

func (eic execInContainer) Stop() {
func (eic *execInContainer) Stop() {
//unimplemented
}

func (eic execInContainer) Start() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Start() error {
data, err := eic.run()
if eic.writer != nil {
eic.writer.Write(data)
}
return err
}

func (eic execInContainer) Wait() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Wait() error {
return nil
}

func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) {
func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}

func (eic execInContainer) StderrPipe() (io.ReadCloser, error) {
func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}
Loading

0 comments on commit 22d7f2d

Please sign in to comment.