Skip to content

Commit

Permalink
Ignore kube proxy version + typed queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuckal777 committed Sep 6, 2024
1 parent dbd6b15 commit 421c7f8
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 37 deletions.
6 changes: 3 additions & 3 deletions pkg/controller/base/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ type Reconciler interface {
type controller struct {
config.Factories

queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface // nolint: staticcheck
reconciler Reconciler

logger log.Logger
threadiness int
}

func NewController(threadiness int, factories config.Factories, reconciler Reconciler, logger log.Logger, queue workqueue.RateLimitingInterface, name string) Controller {
func NewController(threadiness int, factories config.Factories, reconciler Reconciler, logger log.Logger, queue workqueue.RateLimitingInterface, name string) Controller { // nolint: staticcheck
c := &controller{
Factories: factories,
queue: queue,
Expand All @@ -50,7 +50,7 @@ func NewController(threadiness int, factories config.Factories, reconciler Recon
threadiness: threadiness,
}
if c.queue == nil {
c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(BASE_DELAY, MAX_DELAY), name)
c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(BASE_DELAY, MAX_DELAY), name) // nolint: staticcheck
}

c.Factories.Kubernikus.Kubernikus().V1().Klusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/ground.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type GroundControl struct {
config.Config
Recorder record.EventRecorder

queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface // nolint: staticcheck
klusterInformer informers_kubernikus.KlusterInformer
podInformer informers_v1.PodInformer

Expand All @@ -80,7 +80,7 @@ func NewGroundController(threadiness int, factories config.Factories, clients co
Factories: factories,
Config: config,
Recorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second), "ground"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second), "ground"), // nolint: staticcheck
klusterInformer: factories.Kubernikus.Kubernikus().V1().Klusters(),
podInformer: factories.Kubernetes.Core().V1().Pods(),
Logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/launch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewController(threadiness int, factories config.Factories, clients config.C
Failed: metrics.LaunchFailedOperationsTotal,
}

queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(base.BASE_DELAY, base.MAX_DELAY), "launch")
queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(base.BASE_DELAY, base.MAX_DELAY), "launch") // nolint: staticcheck
factories.NodesObservatory.NodeInformer().AddEventHandlerFuncs(nodeobservatory.NodeEventHandlerFuncs{
AddFunc: func(kluster *v1.Kluster, node *core_v1.Node) {
if key, err := cache.MetaNamespaceKeyFunc(kluster); err == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodeobservatory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type (
clientFactory kube.SharedClientFactory
klusterInformer kubernikus_informers_v1.KlusterInformer
namespace string
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface // nolint: staticcheck
logger log.Logger
nodeInformerMap sync.Map
handlersMux sync.RWMutex
Expand All @@ -62,7 +62,7 @@ func NewController(informer kubernikus_informers_v1.KlusterInformer, factory kub
controller := &NodeObservatory{
clientFactory: factory,
klusterInformer: informer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(BaseDelay, MaxDelay), "nodeobservatory"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(BaseDelay, MaxDelay), "nodeobservatory"), // nolint: staticcheck
logger: logger,
nodeInformerMap: sync.Map{},
threadiness: threadiness,
Expand Down
6 changes: 0 additions & 6 deletions pkg/controller/servicing/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,10 @@ func (c *servicingNodesCollector) Collect(ch chan<- prometheus.Metric) {
waitingUptodate := float64(len(nodes.All())) - waitingReboot - waitingReplace

kubeletVersions := map[string]int{}
proxyVersions := map[string]int{}
osVersions := map[string]int{}

for _, node := range nodes.All() {
kubeletVersions[node.Status.NodeInfo.KubeletVersion]++
proxyVersions[node.Status.NodeInfo.KubeProxyVersion]++

osVersion, err := flatcar.ExractVersion(node)
if err != nil {
Expand All @@ -135,10 +133,6 @@ func (c *servicingNodesCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(c.kubelet, prometheus.GaugeValue, float64(count), kluster.GetName(), version)
}

for version, count := range proxyVersions {
ch <- prometheus.MustNewConstMetric(c.proxy, prometheus.GaugeValue, float64(count), kluster.GetName(), version)
}

for version, count := range osVersions {
ch <- prometheus.MustNewConstMetric(c.osimage, prometheus.GaugeValue, float64(count), kluster.GetName(), version)
}
Expand Down
19 changes: 0 additions & 19 deletions pkg/controller/servicing/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,6 @@ func (d *NodeLister) Replace() []*core_v1.Node {
continue
}

kubeProxyVersion, err := getKubeProxyVersion(node)
if err != nil {
d.Logger.Log(
"msg", "Couldn't get KubeProxy version from Node. Skipping node upgrade.",
"node", node.GetName(),
"err", err,
)
continue
}

if kubeProxyVersion.LessThan(klusterVersion) {
found = append(found, node)
continue
}

if util.IsFlatcarNodeWithRkt(node) {
uptodate := true

Expand Down Expand Up @@ -608,10 +593,6 @@ func getKubeletVersion(node *core_v1.Node) (*version.Version, error) {
return version.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
}

func getKubeProxyVersion(node *core_v1.Node) (*version.Version, error) {
return version.ParseSemantic(node.Status.NodeInfo.KubeProxyVersion)
}

// GetNodeCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
func getNodeCondition(status *core_v1.NodeStatus,
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/servicing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,8 @@ func NewFakeKluster(opts *FakeKlusterOptions, afterFlatCarRktRemoval bool) (*v1.

if p.NodeKubeletOutdated {
node.Status.NodeInfo.KubeletVersion = "v1.10.11"
node.Status.NodeInfo.KubeProxyVersion = "v1.10.11"
} else {
node.Status.NodeInfo.KubeletVersion = "v1.10.15"
node.Status.NodeInfo.KubeProxyVersion = "v1.10.15"
}

nodes = append(nodes, node)
Expand Down
4 changes: 2 additions & 2 deletions pkg/wormhole/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
type Controller struct {
nodes informers.NodeInformer
tunnel *guttle.Server
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface // nolint: staticcheck
store map[string][]route
storeMu sync.RWMutex
iptables iptables.Interface
Expand All @@ -48,7 +48,7 @@ func NewController(informer informers.NodeInformer, serviceCIDR string, tunnel *
c := &Controller{
nodes: informer,
tunnel: tunnel,
queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second)),
queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 300*time.Second)), // nolint: staticcheck
store: make(map[string][]route),
iptables: iptables.New(utilexec.New(), iptables.ProtocolIpv4, logger),
hijackPort: 9191,
Expand Down

0 comments on commit 421c7f8

Please sign in to comment.