From de7865ce93b40ad58ba6e2ffc7aca048aaa9b244 Mon Sep 17 00:00:00 2001 From: Fred Rolland Date: Mon, 18 Dec 2023 09:45:55 +0200 Subject: [PATCH] Add support to orphaned driver pod - Add orphaned driver pods in the state built - Ignore not scheduled pod in the state built - Support upgrade-requested annotation to force moving to upgrade-required state With this functionality, upgrading from a DS to a new one is possible, with the following assumptions: - New DS should have Node Anti Affinity to prevent scheduling new driver pods where old still run. - The old DS should be deleted by Operator with DeletePropagationOrphan option to keep the old driver pods running until the upgrade flow replaces them. In addition, it will also be possible to only detach the pod from DaemonSet to migrate to new DaemonSet. Signed-off-by: Fred Rolland --- pkg/upgrade/consts.go | 2 + pkg/upgrade/upgrade_state.go | 121 +++++++++++++++++++++--------- pkg/upgrade/upgrade_state_test.go | 44 +++++++++++ pkg/upgrade/util.go | 5 ++ 4 files changed, 135 insertions(+), 37 deletions(-) diff --git a/pkg/upgrade/consts.go b/pkg/upgrade/consts.go index bbf3dda2..d3fdabd9 100644 --- a/pkg/upgrade/consts.go +++ b/pkg/upgrade/consts.go @@ -31,6 +31,8 @@ const ( UpgradeWaitForPodCompletionStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-wait-for-pod-completion-start-time" // UpgradeValidationStartTimeAnnotationKeyFmt is the format of the node annotation indicating start time for validation-required state UpgradeValidationStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-validation-start-time" + // UpgradeRequestedLabelKeyFmt is the format of the node label key indicating driver upgrade was requested (used for orphaned pods) + UpgradeRequestedLabelKeyFmt = "nvidia.com/%s-driver-upgrade-requested" // UpgradeStateUnknown Node has this state when the upgrade flow is disabled or the node hasn't been processed yet UpgradeStateUnknown = "" // UpgradeStateUpgradeRequired is set when the driver pod on the node is not up-to-date and required upgrade diff --git a/pkg/upgrade/upgrade_state.go b/pkg/upgrade/upgrade_state.go index 44dbd1af..99006ea3 100644 --- a/pkg/upgrade/upgrade_state.go +++ b/pkg/upgrade/upgrade_state.go @@ -43,6 +43,11 @@ type NodeUpgradeState struct { DriverDaemonSet *appsv1.DaemonSet } +// IsOrphanedPod returns true if Pod is not connected to a DaemonSet +func (nus *NodeUpgradeState) IsOrphanedPod() bool { + return nus.DriverDaemonSet == nil && nus.DriverPod != nil +} + // ClusterUpgradeState contains a snapshot of the driver upgrade state in the cluster // It contains driver upgrade policy and mappings between nodes and their upgrade state // Nodes are grouped together with the driver POD running on them and the daemon set, controlling this pod @@ -230,11 +235,24 @@ func (m *ClusterUpgradeStateManagerImpl) BuildState(ctx context.Context, namespa filteredPodList = append(filteredPodList, dsPods...) } + // Collect also orphaned driver pods + filteredPodList = append(filteredPodList, m.getOrphanedPods(podList.Items)...) + upgradeStateLabel := GetUpgradeStateLabelKey() for i := range filteredPodList { pod := &filteredPodList[i] - ownerDaemonSet := daemonSets[pod.OwnerReferences[0].UID] + var ownerDaemonSet *appsv1.DaemonSet + if m.isOrphanedPod(pod) { + ownerDaemonSet = nil + } else { + ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID] + } + // Check if pod is already scheduled to a Node + if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending { + m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name) + continue + } nodeState, err := m.buildNodeUpgradeState(ctx, pod, ownerDaemonSet) if err != nil { m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod) @@ -290,7 +308,7 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, dsPodList := []corev1.Pod{} for i := range pods { pod := &pods[i] - if pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 { + if m.isOrphanedPod(pod) { m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no owner DaemonSet", "pod", pod.Name) continue } @@ -306,6 +324,23 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, return dsPodList } +// getOrphanedPods returns a list of the pods not owned by any DaemonSet +func (m *ClusterUpgradeStateManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.Pod { + podList := []corev1.Pod{} + for i := range pods { + pod := &pods[i] + if m.isOrphanedPod(pod) { + podList = append(podList, *pod) + } + } + m.Log.V(consts.LogLevelInfo).Info("Total orphaned Pods found:", "count", len(podList)) + return podList +} + +func (m *ClusterUpgradeStateManagerImpl) isOrphanedPod(pod *corev1.Pod) bool { + return pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 +} + // ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state. // Based on the current state of the node, it is calculated if the node can be moved to the next state right now // or whether any actions need to be scheduled for the node to move to the next state. @@ -438,20 +473,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( m.Log.V(consts.LogLevelInfo).Info("ProcessDoneOrUnknownNodes") for _, nodeState := range currentClusterState.NodeStates[nodeStateName] { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return err - } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) + isUpgradeRequested := m.isUpgradeRequested(nodeState.Node) isWaitingForSafeDriverLoad, err := m.SafeDriverLoadManager.IsWaitingForSafeDriverLoad(ctx, nodeState.Node) if err != nil { m.Log.V(consts.LogLevelError).Error( @@ -462,7 +489,7 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( m.Log.V(consts.LogLevelInfo).Info("Node is waiting for safe driver load, initialize upgrade", "node", nodeState.Node.Name) } - if podRevisionHash != daemonsetRevisionHash || isWaitingForSafeDriverLoad { + if !isPodSynced || isWaitingForSafeDriverLoad || isUpgradeRequested { // If node requires upgrade and is Unschedulable, track this in an // annotation and leave node in Unschedulable state when upgrade completes. if isNodeUnschedulable(nodeState.Node) { @@ -483,6 +510,14 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( } m.Log.V(consts.LogLevelInfo).Info("Node requires upgrade, changed its state to UpgradeRequired", "node", nodeState.Node.Name) + if isUpgradeRequested { + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node, GetUpgradeRequestedAnnotationKey(), "null") + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to delete node upgrade-requested annotation") + return err + } + } continue } @@ -503,6 +538,33 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( return nil } +// podInSyncWithDS returns true if the Pod and DS have the same Controller Revision Hash, or if Pod is orphaned +func (m *ClusterUpgradeStateManagerImpl) podInSyncWithDS(ctx context.Context, nodeState *NodeUpgradeState) (bool, error) { + if nodeState.IsOrphanedPod() { + return true, nil + } + podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) + return false, err + } + m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) + daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + return false, err + } + m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) + return podRevisionHash == daemonsetRevisionHash, nil +} + +// isUpgradeRequested returns true if node is labelled to request an upgrade +func (m *ClusterUpgradeStateManagerImpl) isUpgradeRequested(node *corev1.Node) bool { + return node.Annotations[GetUpgradeRequestedAnnotationKey()] == "true" +} + // ProcessUpgradeRequiredNodes processes UpgradeStateUpgradeRequired nodes and moves them to UpgradeStateCordonRequired until // the limit on max parallel upgrades is reached. func (m *ClusterUpgradeStateManagerImpl) ProcessUpgradeRequiredNodes( @@ -673,21 +735,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessPodRestartNodes( pods := make([]*corev1.Pod, 0, len(currentClusterState.NodeStates[UpgradeStatePodRestartRequired])) for _, nodeState := range currentClusterState.NodeStates[UpgradeStatePodRestartRequired] { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return err - } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) - if podRevisionHash != daemonsetRevisionHash { + if !isPodSynced || nodeState.IsOrphanedPod() { // Pods should only be scheduled for restart if they are not terminating or restarting already // To determinate terminating state we need to check for deletion timestamp with will be filled // one pod termination process started @@ -847,22 +900,16 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessUncordonRequiredNodes( } func (m *ClusterUpgradeStateManagerImpl) isDriverPodInSync(ctx context.Context, nodeState *NodeUpgradeState) (bool, error) { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return false, err + if nodeState.IsOrphanedPod() { + return false, nil } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return false, err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) // If the pod generation matches the daemonset generation - if podRevisionHash == daemonsetRevisionHash && + if isPodSynced && // And the pod is running nodeState.DriverPod.Status.Phase == "Running" && // And it has at least 1 container diff --git a/pkg/upgrade/upgrade_state_test.go b/pkg/upgrade/upgrade_state_test.go index 9bf2e3b4..c9490248 100644 --- a/pkg/upgrade/upgrade_state_test.go +++ b/pkg/upgrade/upgrade_state_test.go @@ -1046,6 +1046,50 @@ var _ = Describe("UpgradeStateManager tests", func() { Expect(stateManager.ApplyState(ctx, &clusterState, policy)).ToNot(Succeed()) Expect(getNodeUpgradeState(node)).ToNot(Equal(upgrade.UpgradeStateDone)) }) + It("UpgradeStateManager should not move outdated node to UpgradeRequired states with orphaned pod", func() { + orphanedPod := &corev1.Pod{} + + UnknownToUpgradeRequiredNode := nodeWithUpgradeState("") + DoneToUpgradeRequiredNode := nodeWithUpgradeState(upgrade.UpgradeStateDone) + + clusterState := upgrade.NewClusterUpgradeState() + unknownNodes := []*upgrade.NodeUpgradeState{ + {Node: UnknownToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + doneNodes := []*upgrade.NodeUpgradeState{ + {Node: DoneToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + clusterState.NodeStates[""] = unknownNodes + clusterState.NodeStates[upgrade.UpgradeStateDone] = doneNodes + + Expect(stateManager.ApplyState(ctx, &clusterState, &v1alpha1.DriverUpgradePolicySpec{AutoUpgrade: true})).To(Succeed()) + Expect(getNodeUpgradeState(UnknownToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateDone)) + Expect(getNodeUpgradeState(DoneToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateDone)) + }) + It("UpgradeStateManager should move outdated node to UpgradeRequired states with orphaned pod if upgrade-requested", func() { + orphanedPod := &corev1.Pod{} + + UnknownToUpgradeRequiredNode := nodeWithUpgradeState("") + UnknownToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()] = "true" + DoneToUpgradeRequiredNode := nodeWithUpgradeState(upgrade.UpgradeStateDone) + DoneToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()] = "true" + + clusterState := upgrade.NewClusterUpgradeState() + unknownNodes := []*upgrade.NodeUpgradeState{ + {Node: UnknownToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + doneNodes := []*upgrade.NodeUpgradeState{ + {Node: DoneToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + clusterState.NodeStates[""] = unknownNodes + clusterState.NodeStates[upgrade.UpgradeStateDone] = doneNodes + + Expect(stateManager.ApplyState(ctx, &clusterState, &v1alpha1.DriverUpgradePolicySpec{AutoUpgrade: true})).To(Succeed()) + Expect(getNodeUpgradeState(UnknownToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateUpgradeRequired)) + Expect(getNodeUpgradeState(DoneToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateUpgradeRequired)) + Expect(UnknownToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()]).To(Equal("")) + Expect(DoneToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()]).To(Equal("")) + }) }) func nodeWithUpgradeState(state string) *corev1.Node { diff --git a/pkg/upgrade/util.go b/pkg/upgrade/util.go index b99f1db1..bab32d4b 100644 --- a/pkg/upgrade/util.go +++ b/pkg/upgrade/util.go @@ -104,6 +104,11 @@ func GetUpgradeDriverWaitForSafeLoadAnnotationKey() string { return fmt.Sprintf(UpgradeWaitForSafeDriverLoadAnnotationKeyFmt, DriverName) } +// GetUpgradeRequestedAnnotationKey returns the key for annotation used to mark node as driver upgrade is requested externally (orphaned pod) +func GetUpgradeRequestedAnnotationKey() string { + return fmt.Sprintf(UpgradeRequestedLabelKeyFmt, DriverName) +} + // GetUpgradeInitialStateAnnotationKey returns the key for annotation used to track initial state of the node func GetUpgradeInitialStateAnnotationKey() string { return fmt.Sprintf(UpgradeInitialStateAnnotationKeyFmt, DriverName)