Skip to content

Commit

Permalink
Add support to orphaned driver pod
Browse files Browse the repository at this point in the history
- Add orphaned driver pods in the state built
- Ignore not scheduled pod in the state built
- Support upgrade-requested annotation to force moving to
  upgrade-required state

With this functionality, upgrading from a DS to a new one
is possible, with the following assumptions:

- New DS should have Node Anti Affinity to prevent scheduling
new driver pods where old still run.
- The old DS should be deleted by Operator with DeletePropagationOrphan
option to keep the old driver pods running until the upgrade flow
replaces them.

In addition, it will also be possible to only detach the pod from
DaemonSet to migrate to new DaemonSet.

Signed-off-by: Fred Rolland <[email protected]>
  • Loading branch information
rollandf committed Jan 10, 2024
1 parent 396e0b5 commit e1b02ad
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 37 deletions.
3 changes: 3 additions & 0 deletions pkg/upgrade/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
UpgradeWaitForPodCompletionStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-wait-for-pod-completion-start-time"
// UpgradeValidationStartTimeAnnotationKeyFmt is the format of the node annotation indicating start time for validation-required state
UpgradeValidationStartTimeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-validation-start-time"
// UpgradeRequestedAnnotationKeyFmt is the format of the node label key indicating driver upgrade was requested (used for orphaned pods)
// Setting this label will trigger setting upgrade state to upgrade-required
UpgradeRequestedAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-requested"
// UpgradeStateUnknown Node has this state when the upgrade flow is disabled or the node hasn't been processed yet
UpgradeStateUnknown = ""
// UpgradeStateUpgradeRequired is set when the driver pod on the node is not up-to-date and required upgrade
Expand Down
128 changes: 91 additions & 37 deletions pkg/upgrade/upgrade_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type NodeUpgradeState struct {
DriverDaemonSet *appsv1.DaemonSet
}

// IsOrphanedPod returns true if Pod is not associated to a DaemonSet
func (nus *NodeUpgradeState) IsOrphanedPod() bool {
return nus.DriverDaemonSet == nil
}

// ClusterUpgradeState contains a snapshot of the driver upgrade state in the cluster
// It contains driver upgrade policy and mappings between nodes and their upgrade state
// Nodes are grouped together with the driver POD running on them and the daemon set, controlling this pod
Expand Down Expand Up @@ -233,11 +238,24 @@ func (m *ClusterUpgradeStateManagerImpl) BuildState(ctx context.Context, namespa
filteredPodList = append(filteredPodList, dsPods...)
}

// Collect also orphaned driver pods
filteredPodList = append(filteredPodList, m.getOrphanedPods(podList.Items)...)

upgradeStateLabel := GetUpgradeStateLabelKey()

for i := range filteredPodList {
pod := &filteredPodList[i]
ownerDaemonSet := daemonSets[pod.OwnerReferences[0].UID]
var ownerDaemonSet *appsv1.DaemonSet
if isOrphanedPod(pod) {
ownerDaemonSet = nil
} else {
ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID]
}
// Check if pod is already scheduled to a Node
if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending {
m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name)
continue
}
nodeState, err := m.buildNodeUpgradeState(ctx, pod, ownerDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod)
Expand Down Expand Up @@ -293,7 +311,7 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet,
dsPodList := []corev1.Pod{}
for i := range pods {
pod := &pods[i]
if pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 {
if isOrphanedPod(pod) {
m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no owner DaemonSet", "pod", pod.Name)
continue
}
Expand All @@ -309,6 +327,23 @@ func (m *ClusterUpgradeStateManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet,
return dsPodList
}

// getOrphanedPods returns a list of the pods not owned by any DaemonSet
func (m *ClusterUpgradeStateManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.Pod {
podList := []corev1.Pod{}
for i := range pods {
pod := &pods[i]
if isOrphanedPod(pod) {
podList = append(podList, *pod)
}
}
m.Log.V(consts.LogLevelInfo).Info("Total orphaned Pods found:", "count", len(podList))
return podList
}

func isOrphanedPod(pod *corev1.Pod) bool {
return pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1
}

// ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state.
// Based on the current state of the node, it is calculated if the node can be moved to the next state right now
// or whether any actions need to be scheduled for the node to move to the next state.
Expand Down Expand Up @@ -441,20 +476,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes(
m.Log.V(consts.LogLevelInfo).Info("ProcessDoneOrUnknownNodes")

for _, nodeState := range currentClusterState.NodeStates[nodeStateName] {
podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod)
isPodSynced, isOrphaned, err := m.podInSyncWithDS(ctx, nodeState)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod)
m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash")
return err
}
m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash)
daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet)
return err
}
m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash)
isUpgradeRequested := m.isUpgradeRequested(nodeState.Node)
isWaitingForSafeDriverLoad, err := m.SafeDriverLoadManager.IsWaitingForSafeDriverLoad(ctx, nodeState.Node)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
Expand All @@ -465,7 +492,7 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes(
m.Log.V(consts.LogLevelInfo).Info("Node is waiting for safe driver load, initialize upgrade",
"node", nodeState.Node.Name)
}
if podRevisionHash != daemonsetRevisionHash || isWaitingForSafeDriverLoad {
if (!isPodSynced && !isOrphaned) || isWaitingForSafeDriverLoad || isUpgradeRequested {
// If node requires upgrade and is Unschedulable, track this in an
// annotation and leave node in Unschedulable state when upgrade completes.
if isNodeUnschedulable(nodeState.Node) {
Expand Down Expand Up @@ -506,12 +533,54 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessDoneOrUnknownNodes(
return nil
}

// podInSyncWithDS check if pod is in sync with DaemonSet, handling also Orphaned Pod
// Returns:
//
// bool: True if Pod is in sync with DaemonSet. (For Orphanded Pods, always false)
// bool: True if the Pod is Orphaned
// error: In case of error retrivieng the Revision Hashes
func (m *ClusterUpgradeStateManagerImpl) podInSyncWithDS(ctx context.Context, nodeState *NodeUpgradeState) (bool, bool, error) {
if nodeState.IsOrphanedPod() {
return false, true, nil
}
podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod)
return false, false, err
}
m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash)
daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet)
return false, false, err
}
m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash)
return podRevisionHash == daemonsetRevisionHash, false, nil
}

// isUpgradeRequested returns true if node is labelled to request an upgrade
func (m *ClusterUpgradeStateManagerImpl) isUpgradeRequested(node *corev1.Node) bool {
return node.Annotations[GetUpgradeRequestedAnnotationKey()] == "true"
}

// ProcessUpgradeRequiredNodes processes UpgradeStateUpgradeRequired nodes and moves them to UpgradeStateCordonRequired until
// the limit on max parallel upgrades is reached.
func (m *ClusterUpgradeStateManagerImpl) ProcessUpgradeRequiredNodes(
ctx context.Context, currentClusterState *ClusterUpgradeState, upgradesAvailable int) error {
m.Log.V(consts.LogLevelInfo).Info("ProcessUpgradeRequiredNodes")
for _, nodeState := range currentClusterState.NodeStates[UpgradeStateUpgradeRequired] {
if m.isUpgradeRequested(nodeState.Node) {
// Make sure to remove the upgrade-requested annotation
err := m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node,
GetUpgradeRequestedAnnotationKey(), "null")
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to delete node upgrade-requested annotation")
return err
}
}
if m.skipNodeUpgrade(nodeState.Node) {
m.Log.V(consts.LogLevelInfo).Info("Node is marked for skipping upgrades", "node", nodeState.Node.Name)
continue
Expand Down Expand Up @@ -676,21 +745,12 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessPodRestartNodes(

pods := make([]*corev1.Pod, 0, len(currentClusterState.NodeStates[UpgradeStatePodRestartRequired]))
for _, nodeState := range currentClusterState.NodeStates[UpgradeStatePodRestartRequired] {
podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod)
return err
}
m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash)
daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet)
isPodSynced, isOrphaned, err := m.podInSyncWithDS(ctx, nodeState)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet)
m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash")
return err
}
m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash)
if podRevisionHash != daemonsetRevisionHash {
if !isPodSynced || isOrphaned {
// Pods should only be scheduled for restart if they are not terminating or restarting already
// To determinate terminating state we need to check for deletion timestamp with will be filled
// one pod termination process started
Expand Down Expand Up @@ -850,22 +910,16 @@ func (m *ClusterUpgradeStateManagerImpl) ProcessUncordonRequiredNodes(
}

func (m *ClusterUpgradeStateManagerImpl) isDriverPodInSync(ctx context.Context, nodeState *NodeUpgradeState) (bool, error) {
podRevisionHash, err := m.PodManager.GetPodControllerRevisionHash(ctx, nodeState.DriverPod)
isPodSynced, isOrphaned, err := m.podInSyncWithDS(ctx, nodeState)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get pod template revision hash", "pod", nodeState.DriverPod)
m.Log.V(consts.LogLevelError).Error(err, "Failed to get daemonset template/pod revision hash")
return false, err
}
m.Log.V(consts.LogLevelDebug).Info("pod template revision hash", "hash", podRevisionHash)
daemonsetRevisionHash, err := m.PodManager.GetDaemonsetControllerRevisionHash(ctx, nodeState.DriverDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to get daemonset template revision hash", "daemonset", nodeState.DriverDaemonSet)
return false, err
if isOrphaned {
return false, nil
}
m.Log.V(consts.LogLevelDebug).Info("daemonset template revision hash", "hash", daemonsetRevisionHash)
// If the pod generation matches the daemonset generation
if podRevisionHash == daemonsetRevisionHash &&
if isPodSynced &&
// And the pod is running
nodeState.DriverPod.Status.Phase == "Running" &&
// And it has at least 1 container
Expand Down
Loading

0 comments on commit e1b02ad

Please sign in to comment.