diff --git a/pkg/upgrade/consts.go b/pkg/upgrade/consts.go index bbf3dda2..d3fdabd9 100644 --- a/pkg/upgrade/consts.go +++ b/pkg/upgrade/consts.go @@ -31,6 +31,8 @@ const ( UpgradeWaitForPodCompletionStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-wait-for-pod-completion-start-time" // UpgradeValidationStartTimeAnnotationKeyFmt is the format of the node annotation indicating start time for validation-required state UpgradeValidationStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-validation-start-time" + // UpgradeRequestedLabelKeyFmt is the format of the node label key indicating driver upgrade was requested (used for orphaned pods) + UpgradeRequestedLabelKeyFmt = "nvidia.com/%s-driver-upgrade-requested" // UpgradeStateUnknown Node has this state when the upgrade flow is disabled or the node hasn't been processed yet UpgradeStateUnknown = "" // UpgradeStateUpgradeRequired is set when the driver pod on the node is not up-to-date and required upgrade diff --git a/pkg/upgrade/upgrade_state.go b/pkg/upgrade/upgrade_state.go index 44dbd1af..99006ea3 100644 --- a/pkg/upgrade/upgrade_state.go +++ b/pkg/upgrade/upgrade_state.go @@ -43,6 +43,11 @@ type NodeUpgradeState struct { DriverDaemonSet *appsv1.DaemonSet } +// IsOrphanedPod returns true if Pod is not connected to a DaemonSet +func (nus *NodeUpgradeState) IsOrphanedPod() bool { + return nus.DriverDaemonSet == nil && nus.DriverPod != nil +} + // ClusterUpgradeState contains a snapshot of the driver upgrade state in the cluster // It contains driver upgrade policy and mappings between nodes and their upgrade state // Nodes are grouped together with the driver POD running on them and the daemon set, controlling this pod @@ -230,11 +235,24 @@ func (m *ClusterUpgradeStateManagerImpl) BuildState(ctx context.Context, namespa filteredPodList = append(filteredPodList, dsPods...) } + // Collect also orphaned driver pods + filteredPodList = append(filteredPodList, m.getOrphanedPods(podList.Items)...) + upgradeStateLabel := GetUpgradeStateLabelKey() for i := range filteredPodList { pod := &filteredPodList[i] - ownerDaemonSet := daemonSets[pod.OwnerReferences[0].UID] + var ownerDaemonSet *appsv1.DaemonSet + if m.isOrphanedPod(pod) { + ownerDaemonSet = nil + } else { + ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID] + } + // Check if pod is already scheduled to a Node + if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending { + m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name) + continue + } nodeState, err := m.buildNodeUpgradeState(ctx, pod, ownerDaemonSet) if err != nil { m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod) @@ -290,7 +308,7 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, dsPodList := []corev1.Pod{} for i := range pods { pod := &pods[i] - if pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 { + if m.isOrphanedPod(pod) { m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no owner DaemonSet", "pod", pod.Name) continue } @@ -306,6 +324,23 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, return dsPodList } +// getOrphanedPods returns a list of the pods not owned by any DaemonSet +func (m *ClusterUpgradeStateManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.Pod { + podList := []corev1.Pod{} + for i := range pods { + pod := &pods[i] + if m.isOrphanedPod(pod) { + podList = append(podList, *pod) + } + } + m.Log.V(consts.LogLevelInfo).Info("Total orphaned Pods found:", "count", len(podList)) + return podList +} + +func (m *ClusterUpgradeStateManagerImpl) isOrphanedPod(pod *corev1.Pod) bool { + return pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 +} + // ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state. // Based on the current state of the node, it is calculated if the node can be moved to the next state right now // or whether any actions need to be scheduled for the node to move to the next state. @@ -438,20 +473,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( m.Log.V(consts.LogLevelInfo).Info("ProcessDoneOrUnknownNodes") for _, nodeState := range currentClusterState.NodeStates[nodeStateName] { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return err - } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) + isUpgradeRequested := m.isUpgradeRequested(nodeState.Node) isWaitingForSafeDriverLoad, err := m.SafeDriverLoadManager.IsWaitingForSafeDriverLoad(ctx, nodeState.Node) if err != nil { m.Log.V(consts.LogLevelError).Error( @@ -462,7 +489,7 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( m.Log.V(consts.LogLevelInfo).Info("Node is waiting for safe driver load, initialize upgrade", "node", nodeState.Node.Name) } - if podRevisionHash != daemonsetRevisionHash || isWaitingForSafeDriverLoad { + if !isPodSynced || isWaitingForSafeDriverLoad || isUpgradeRequested { // If node requires upgrade and is Unschedulable, track this in an // annotation and leave node in Unschedulable state when upgrade completes. if isNodeUnschedulable(nodeState.Node) { @@ -483,6 +510,14 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( } m.Log.V(consts.LogLevelInfo).Info("Node requires upgrade, changed its state to UpgradeRequired", "node", nodeState.Node.Name) + if isUpgradeRequested { + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node, GetUpgradeRequestedAnnotationKey(), "null") + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to delete node upgrade-requested annotation") + return err + } + } continue } @@ -503,6 +538,33 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes( return nil } +// podInSyncWithDS returns true if the Pod and DS have the same Controller Revision Hash, or if Pod is orphaned +func (m *ClusterUpgradeStateManagerImpl) podInSyncWithDS(ctx context.Context, nodeState *NodeUpgradeState) (bool, error) { + if nodeState.IsOrphanedPod() { + return true, nil + } + podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) + return false, err + } + m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) + daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + return false, err + } + m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) + return podRevisionHash == daemonsetRevisionHash, nil +} + +// isUpgradeRequested returns true if node is labelled to request an upgrade +func (m *ClusterUpgradeStateManagerImpl) isUpgradeRequested(node *corev1.Node) bool { + return node.Annotations[GetUpgradeRequestedAnnotationKey()] == "true" +} + // ProcessUpgradeRequiredNodes processes UpgradeStateUpgradeRequired nodes and moves them to UpgradeStateCordonRequired until // the limit on max parallel upgrades is reached. func (m *ClusterUpgradeStateManagerImpl) ProcessUpgradeRequiredNodes( @@ -673,21 +735,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessPodRestartNodes( pods := make([]*corev1.Pod, 0, len(currentClusterState.NodeStates[UpgradeStatePodRestartRequired])) for _, nodeState := range currentClusterState.NodeStates[UpgradeStatePodRestartRequired] { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return err - } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) - if podRevisionHash != daemonsetRevisionHash { + if !isPodSynced || nodeState.IsOrphanedPod() { // Pods should only be scheduled for restart if they are not terminating or restarting already // To determinate terminating state we need to check for deletion timestamp with will be filled // one pod termination process started @@ -847,22 +900,16 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessUncordonRequiredNodes( } func (m *ClusterUpgradeStateManagerImpl) isDriverPodInSync(ctx context.Context, nodeState *NodeUpgradeState) (bool, error) { - podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod) - return false, err + if nodeState.IsOrphanedPod() { + return false, nil } - m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash) - daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet) + isPodSynced, err := m.podInSyncWithDS(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet) + m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash") return false, err } - m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash) // If the pod generation matches the daemonset generation - if podRevisionHash == daemonsetRevisionHash && + if isPodSynced && // And the pod is running nodeState.DriverPod.Status.Phase == "Running" && // And it has at least 1 container diff --git a/pkg/upgrade/upgrade_state_test.go b/pkg/upgrade/upgrade_state_test.go index 9bf2e3b4..c9490248 100644 --- a/pkg/upgrade/upgrade_state_test.go +++ b/pkg/upgrade/upgrade_state_test.go @@ -1046,6 +1046,50 @@ var _ = Describe("UpgradeStateManager tests", func() { Expect(stateManager.ApplyState(ctx, &clusterState, policy)).ToNot(Succeed()) Expect(getNodeUpgradeState(node)).ToNot(Equal(upgrade.UpgradeStateDone)) }) + It("UpgradeStateManager should not move outdated node to UpgradeRequired states with orphaned pod", func() { + orphanedPod := &corev1.Pod{} + + UnknownToUpgradeRequiredNode := nodeWithUpgradeState("") + DoneToUpgradeRequiredNode := nodeWithUpgradeState(upgrade.UpgradeStateDone) + + clusterState := upgrade.NewClusterUpgradeState() + unknownNodes := []*upgrade.NodeUpgradeState{ + {Node: UnknownToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + doneNodes := []*upgrade.NodeUpgradeState{ + {Node: DoneToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + clusterState.NodeStates[""] = unknownNodes + clusterState.NodeStates[upgrade.UpgradeStateDone] = doneNodes + + Expect(stateManager.ApplyState(ctx, &clusterState, &v1alpha1.DriverUpgradePolicySpec{AutoUpgrade: true})).To(Succeed()) + Expect(getNodeUpgradeState(UnknownToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateDone)) + Expect(getNodeUpgradeState(DoneToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateDone)) + }) + It("UpgradeStateManager should move outdated node to UpgradeRequired states with orphaned pod if upgrade-requested", func() { + orphanedPod := &corev1.Pod{} + + UnknownToUpgradeRequiredNode := nodeWithUpgradeState("") + UnknownToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()] = "true" + DoneToUpgradeRequiredNode := nodeWithUpgradeState(upgrade.UpgradeStateDone) + DoneToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()] = "true" + + clusterState := upgrade.NewClusterUpgradeState() + unknownNodes := []*upgrade.NodeUpgradeState{ + {Node: UnknownToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + doneNodes := []*upgrade.NodeUpgradeState{ + {Node: DoneToUpgradeRequiredNode, DriverPod: orphanedPod, DriverDaemonSet: nil}, + } + clusterState.NodeStates[""] = unknownNodes + clusterState.NodeStates[upgrade.UpgradeStateDone] = doneNodes + + Expect(stateManager.ApplyState(ctx, &clusterState, &v1alpha1.DriverUpgradePolicySpec{AutoUpgrade: true})).To(Succeed()) + Expect(getNodeUpgradeState(UnknownToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateUpgradeRequired)) + Expect(getNodeUpgradeState(DoneToUpgradeRequiredNode)).To(Equal(upgrade.UpgradeStateUpgradeRequired)) + Expect(UnknownToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()]).To(Equal("")) + Expect(DoneToUpgradeRequiredNode.Annotations[upgrade.GetUpgradeRequestedAnnotationKey()]).To(Equal("")) + }) }) func nodeWithUpgradeState(state string) *corev1.Node { diff --git a/pkg/upgrade/util.go b/pkg/upgrade/util.go index b99f1db1..bab32d4b 100644 --- a/pkg/upgrade/util.go +++ b/pkg/upgrade/util.go @@ -104,6 +104,11 @@ func GetUpgradeDriverWaitForSafeLoadAnnotationKey() string { return fmt.Sprintf(UpgradeWaitForSafeDriverLoadAnnotationKeyFmt, DriverName) } +// GetUpgradeRequestedAnnotationKey returns the key for annotation used to mark node as driver upgrade is requested externally (orphaned pod) +func GetUpgradeRequestedAnnotationKey() string { + return fmt.Sprintf(UpgradeRequestedLabelKeyFmt, DriverName) +} + // GetUpgradeInitialStateAnnotationKey returns the key for annotation used to track initial state of the node func GetUpgradeInitialStateAnnotationKey() string { return fmt.Sprintf(UpgradeInitialStateAnnotationKeyFmt, DriverName)