Skip to content

Commit

Permalink
Implement metadata propagation on the workload cluster nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Danil Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Oct 13, 2023
1 parent 8ae2ee5 commit 35b0da3
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 102 deletions.
8 changes: 8 additions & 0 deletions bootstrap/api/v1alpha1/rke2config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ type RKE2AgentConfig struct {
//+optional
NodeLabels []string `json:"nodeLabels,omitempty"`

// NodeAnnotations Aplying annotations on created nodes post bootstrap phase
//
// Unfortunately it is not possible to apply annotations via kubelet
// using current bootstrap configurations.
// Issue: https://github.com/kubernetes/kubernetes/issues/108046
//+optional
NodeAnnotations map[string]string `json:"nodeAnnotations,omitempty"`

// NodeTaints Registering kubelet with set of taints.
//+optional
NodeTaints []string `json:"nodeTaints,omitempty"`
Expand Down
7 changes: 7 additions & 0 deletions controlplane/api/v1alpha1/condition_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ const (
// is up to date. Whe this condition is false, the RKE2ControlPlane is executing a rolling upgrade.
MachinesSpecUpToDateCondition clusterv1.ConditionType = "MachinesSpecUpToDate"

// NodeMetadataUpToDate documents that the metadata of the nodes controlled by the RKE2 machines
// is up to date. Whe this condition is false, the node metadata is not propagated.
NodeMetadataUpToDate clusterv1.ConditionType = "NodeMetadataUpToDate"

// MachineAgentHealthyCondition reports a machine's rke2 agent's operational status.
MachineAgentHealthyCondition clusterv1.ConditionType = "AgentHealthy"

// NodePatchFailedReason (Severity=Error) documents reasong why Node object could not be patched

Check failure on line 64 in controlplane/api/v1alpha1/condition_consts.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)

Check failure on line 64 in controlplane/api/v1alpha1/condition_consts.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)

Check failure on line 64 in controlplane/api/v1alpha1/condition_consts.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
NodePatchFailedReason = "NodePatchFailed"

// PodInspectionFailedReason documents a failure in inspecting the pod status.
PodInspectionFailedReason = "PodInspectionFailed"

Expand Down
10 changes: 8 additions & 2 deletions controlplane/internal/controllers/rke2controlplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,19 @@ func (r *RKE2ControlPlaneReconciler) reconcileControlPlaneConditions(ctx context
workloadCluster.UpdateAgentConditions(ctx, controlPlane)
workloadCluster.UpdateEtcdConditions(ctx, controlPlane)

errs := []error{}
// Patch nodes metadata
if err := workloadCluster.UpdateNodeMetadata(ctx, controlPlane); err != nil {
errs = append(errs, err)
}

// Patch machines with the updated conditions.
if err := controlPlane.PatchMachines(ctx); err != nil {
return ctrl.Result{}, err
errs = append(errs, err)
}

// RCP will be patched at the end of Reconcile to reflect updated conditions, so we can return now.
return ctrl.Result{}, nil
return ctrl.Result{}, kerrors.NewAggregate(errs)
}

func (r *RKE2ControlPlaneReconciler) upgradeControlPlane(
Expand Down
1 change: 1 addition & 0 deletions pkg/rke2/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (c *ControlPlane) PatchMachines(ctx context.Context) error {
if err := helper.Patch(ctx, machine, patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{
controlplanev1.MachineAgentHealthyCondition,
controlplanev1.MachineEtcdMemberHealthyCondition,
controlplanev1.NodeMetadataUpToDate,
}}); err != nil {
errList = append(errList, errors.Wrapf(err, "failed to patch machine %s", machine.Name))
}
Expand Down
225 changes: 125 additions & 100 deletions pkg/rke2/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import (
"strings"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"

controlplanev1 "github.com/rancher-sandbox/cluster-api-provider-rke2/controlplane/api/v1alpha1"
)
Expand All @@ -49,6 +52,7 @@ type WorkloadCluster interface {
ClusterStatus(ctx context.Context) (ClusterStatus, error)
UpdateAgentConditions(ctx context.Context, controlPlane *ControlPlane)
UpdateEtcdConditions(ctx context.Context, controlPlane *ControlPlane)
UpdateNodeMetadata(ctx context.Context, controlPlane *ControlPlane) error
// Upgrade related tasks.

// RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error
Expand All @@ -63,6 +67,48 @@ type WorkloadCluster interface {
// Workload defines operations on workload clusters.
type Workload struct {
Client ctrlclient.Client

Nodes map[string]*corev1.Node
nodePatchHelpers map[string]*patch.Helper
}

func NewWorkload(ctx context.Context, cl client.Client, cp *ControlPlane) (*Workload, error) {
w := &Workload{
Client: cl,
Nodes: map[string]*corev1.Node{},
nodePatchHelpers: map[string]*patch.Helper{},
}

nodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
conditions.MarkUnknown(
cp.RCP,
controlplanev1.ControlPlaneComponentsHealthyCondition,
controlplanev1.ControlPlaneComponentsInspectionFailedReason, "Failed to list nodes which are hosting control plane components")

return nil, err
}

for _, node := range nodes.Items {
nodeCopy := node
w.Nodes[node.Name] = &nodeCopy
}

for _, node := range w.Nodes {
patchHelper, err := patch.NewHelper(node, cl)
if err != nil {
conditions.MarkUnknown(
cp.RCP,
controlplanev1.ControlPlaneComponentsHealthyCondition,
controlplanev1.ControlPlaneComponentsInspectionFailedReason, "Failed to create patch helpers for control plane nodes")

return nil, errors.Wrapf(err, "failed to create patch helper for node %s", node.Name)
}

w.nodePatchHelpers[node.Name] = patchHelper
}

return w, nil
}

// ClusterStatus holds stats information about the cluster.
Expand All @@ -86,21 +132,45 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList,
return nodes, nil
}

// PatchNodes patches the nodes in the workload cluster.
func (c *Workload) PatchNodes(ctx context.Context, cp *ControlPlane) error {
errList := []error{}

for i := range c.Nodes {
node := c.Nodes[i]
if helper, ok := c.nodePatchHelpers[node.Name]; ok {
if err := helper.Patch(ctx, node); err != nil {
conditions.MarkUnknown(
cp.Machines[node.Name],
controlplanev1.NodeMetadataUpToDate,
controlplanev1.NodePatchFailedReason, errors.Wrapf(err, "failed to patch node %s", node.Name).Error())

errList = append(errList, errors.Wrapf(err, "failed to patch node %s", node.Name))
}

conditions.MarkTrue(
cp.Machines[node.Name],
controlplanev1.NodeMetadataUpToDate)

continue
}

errList = append(errList, errors.Errorf("failed to get patch helper for node %s", node.Name))
}

return kerrors.NewAggregate(errList)
}

// ClusterStatus returns the status of the cluster.
func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
status := ClusterStatus{}

// count the control plane nodes
nodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return status, err
}

for _, node := range nodes.Items {
for _, node := range w.Nodes {
nodeCopy := node
status.Nodes++

if util.IsNodeReady(&nodeCopy) {
if util.IsNodeReady(nodeCopy) {
status.ReadyNodes++
}
}
Expand Down Expand Up @@ -137,34 +207,16 @@ func (w *Workload) UpdateAgentConditions(ctx context.Context, controlPlane *Cont
controlplanev1.MachineAgentHealthyCondition,
}

// NOTE: this fun uses control plane nodes from the workload cluster as a source of truth for the current state.
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
conditions.MarkUnknown(
controlPlane.RCP,
controlplanev1.ControlPlaneComponentsHealthyCondition,
controlplanev1.ControlPlaneComponentsInspectionFailedReason, "Failed to list nodes which are hosting control plane components")

return
}

// Update conditions for control plane components hosted as static pods on the nodes.
var rcpErrors []string

for _, node := range controlPlaneNodes.Items {
// Search for the machine corresponding to the node.
var machine *clusterv1.Machine

for _, m := range controlPlane.Machines {
if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name {
machine = m

break
}
}
for k := range w.Nodes {
node := w.Nodes[k]

// Search for the machine corresponding to the node.
machine, found := controlPlane.Machines[node.Name]
// If there is no machine corresponding to a node, determine if this is an error or not.
if machine == nil {
if !found {
// If there are machines still provisioning there is the chance that a chance that a node might be linked to a machine soon,
// otherwise report the error at RCP level given that there is no machine to report on.
if hasProvisioningMachine(controlPlane.Machines) {
Expand All @@ -186,7 +238,7 @@ func (w *Workload) UpdateAgentConditions(ctx context.Context, controlPlane *Cont
}

// If the node is Unreachable, information about static pods could be stale so set all conditions to unknown.
if nodeHasUnreachableTaint(node) {
if nodeHasUnreachableTaint(*node) {
// NOTE: We are assuming unreachable as a temporary condition, leaving to MHC
// the responsibility to determine if the node is unhealthy or not.
for _, condition := range allMachinePodConditions {
Expand All @@ -195,37 +247,6 @@ func (w *Workload) UpdateAgentConditions(ctx context.Context, controlPlane *Cont

continue
}

targetnode := corev1.Node{}
nodeKey := ctrlclient.ObjectKey{
Namespace: metav1.NamespaceSystem,
Name: node.Name,
}

if err := w.Client.Get(ctx, nodeKey, &targetnode); err != nil {
// If there is an error getting the Pod, do not set any conditions.
if apierrors.IsNotFound(err) {
conditions.MarkFalse(machine,
controlplanev1.MachineAgentHealthyCondition,
controlplanev1.PodMissingReason,
clusterv1.ConditionSeverityError,
"Node %s is missing", nodeKey.Name)

return
}

conditions.MarkUnknown(machine,
controlplanev1.MachineAgentHealthyCondition,
controlplanev1.PodInspectionFailedReason, "Failed to get node status")

return
}

for _, condition := range targetnode.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
conditions.MarkTrue(machine, controlplanev1.MachineAgentHealthyCondition)
}
}
}

// If there are provisioned machines without corresponding nodes, report this as a failing conditions with SeverityError.
Expand All @@ -235,20 +256,19 @@ func (w *Workload) UpdateAgentConditions(ctx context.Context, controlPlane *Cont
continue
}

found := false

for _, node := range controlPlaneNodes.Items {
if machine.Status.NodeRef.Name == node.Name {
found = true

break
}
}

node, found := w.Nodes[machine.Status.NodeRef.Name]
if !found {
for _, condition := range allMachinePodConditions {
conditions.MarkFalse(machine, condition, controlplanev1.PodFailedReason, clusterv1.ConditionSeverityError, "Missing node")
}

continue
}

for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
conditions.MarkTrue(machine, controlplanev1.MachineAgentHealthyCondition)
}
}
}

Expand Down Expand Up @@ -385,32 +405,10 @@ func (w *Workload) UpdateEtcdConditions(ctx context.Context, controlPlane *Contr
func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane *ControlPlane) {
// NOTE: This methods uses control plane nodes only to get in contact with etcd but then it relies on etcd
// as ultimate source of truth for the list of members and for their health.
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
conditions.MarkUnknown(
controlPlane.RCP,
controlplanev1.EtcdClusterHealthyCondition,
controlplanev1.EtcdClusterInspectionFailedReason, "Failed to list nodes which are hosting the etcd members")

for _, m := range controlPlane.Machines {
conditions.MarkUnknown(m,
controlplanev1.MachineEtcdMemberHealthyCondition,
controlplanev1.EtcdMemberInspectionFailedReason, "Failed to get the node which is hosting the etcd member")
}

return
}

for _, node := range controlPlaneNodes.Items {
var machine *clusterv1.Machine

for _, m := range controlPlane.Machines {
if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name {
machine = m
}
}

if machine == nil {
for k := range w.Nodes {
node := w.Nodes[k]
machine, found := controlPlane.Machines[node.Name]
if !found {
// If there are machines still provisioning there is the chance that a chance that a node might be linked to a machine soon,
// otherwise report the error at RCP level given that there is no machine to report on.
if hasProvisioningMachine(controlPlane.Machines) {
Expand All @@ -430,3 +428,30 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane
conditions.MarkTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition)
}
}

// UpdateNodeMetadata is responsible for populating node metadata after
// it is referenced from machine object.
func (w *Workload) UpdateNodeMetadata(ctx context.Context, controlPlane *ControlPlane) error {
for commonName, rkeConfig := range controlPlane.rke2Configs {
node, nodeFound := controlPlane.Nodes[commonName]

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes)) (typecheck)

Check failure on line 436 in pkg/rke2/workload_cluster.go

View workflow job for this annotation

GitHub Actions / lint

controlPlane.Nodes undefined (type *ControlPlane has no field or method Nodes) (typecheck)
if !nodeFound {
conditions.MarkUnknown(
controlPlane.Machines[commonName],
controlplanev1.NodeMetadataUpToDate,
controlplanev1.NodePatchFailedReason, "associated node not found")

continue
} else if name, ok := node.Labels[clusterv1.MachineAnnotation]; !ok || name != commonName {
conditions.MarkUnknown(
controlPlane.Machines[commonName],
controlplanev1.NodeMetadataUpToDate,
controlplanev1.NodePatchFailedReason, fmt.Sprintf("node object is missing %s annotation", clusterv1.MachineAnnotation))

continue
}

annotations.AddAnnotations(node, rkeConfig.Spec.AgentConfig.NodeAnnotations)
}

return w.PatchNodes(ctx, controlPlane)
}

0 comments on commit 35b0da3

Please sign in to comment.