Skip to content

Commit

Permalink
Union/upstream pod helper node preemption handling (#6259)
Browse files Browse the repository at this point in the history
* set terminated primary container w/ sigkill exit code to be a system failure (#657)

* set terminated primary container to be a system failure

Signed-off-by: Paul Dittamo <[email protected]>

* update logging level + comment (#662)

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* count unknown reason for failed pods as system errors (#667)

* count unknown errors for failed pods as system errors

Signed-off-by: Paul Dittamo <[email protected]>

* unit test

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Feb 27, 2025
1 parent 9b2ced5 commit 588ceec
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 35 deletions.
67 changes: 56 additions & 11 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const Interrupted = "Interrupted"
const PrimaryContainerNotFound = "PrimaryContainerNotFound"
const SIGKILL = 137

// unsignedSIGKILL = 256 - 9
const unsignedSIGKILL = 247

const defaultContainerTemplateName = "default"
const defaultInitContainerTemplateName = "default-init"
const primaryContainerTemplateName = "primary"
Expand Down Expand Up @@ -1109,24 +1112,44 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo
// DeterminePrimaryContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object
// corresponding to the phase of the primaryContainer which is identified using the provided name.
// This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container.
func DeterminePrimaryContainerPhase(primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo {
func DeterminePrimaryContainerPhase(ctx context.Context, primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo {
for _, s := range statuses {
if s.Name == primaryContainerName {
if s.State.Waiting != nil || s.State.Running != nil {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
}

if s.State.Terminated != nil {
if s.State.Terminated.ExitCode != 0 || strings.Contains(s.State.Terminated.Reason, OOMKilled) {
message := fmt.Sprintf("\r\n[%v] terminated with exit code (%v). Reason [%v]. Message: \n%v.",
s.Name,
s.State.Terminated.ExitCode,
s.State.Terminated.Reason,
s.State.Terminated.Message)
return pluginsCore.PhaseInfoRetryableFailure(
message := fmt.Sprintf("\r\n[%v] terminated with exit code (%v). Reason [%v]. Message: \n%v.",
s.Name,
s.State.Terminated.ExitCode,
s.State.Terminated.Reason,
s.State.Terminated.Message)

var phaseInfo pluginsCore.PhaseInfo
switch {
case strings.Contains(s.State.Terminated.Reason, OOMKilled):
// OOMKilled typically results in a SIGKILL signal, but we classify it as a user error
phaseInfo = pluginsCore.PhaseInfoRetryableFailure(
s.State.Terminated.Reason, message, info)
case isTerminatedWithSigKill(s.State):
// If the primary container exited with SIGKILL, we treat it as a system-level error
// (such as node termination or preemption). This best-effort approach accepts some false positives.
// In the case that node preemption terminates the kubelet *before* the kubelet is able to persist
// the pod's state to the Kubernetes API server, we rely on Kubernetes to eventually resolve
// the state. This will enable Propeller to eventually query the API server and determine that
// the pod no longer exists, which will then be counted as a system error.
phaseInfo = pluginsCore.PhaseInfoSystemRetryableFailure(
s.State.Terminated.Reason, message, info)
case s.State.Terminated.ExitCode != 0:
phaseInfo = pluginsCore.PhaseInfoRetryableFailure(
s.State.Terminated.Reason, message, info)
default:
return pluginsCore.PhaseInfoSuccess(info)
}
return pluginsCore.PhaseInfoSuccess(info)

logger.Warnf(ctx, "Primary container terminated with issue. Message: '%s'", message)
return phaseInfo
}
}
}
Expand All @@ -1138,7 +1161,7 @@ func DeterminePrimaryContainerPhase(primaryContainerName string, statuses []v1.C

// DemystifyFailure resolves the various Kubernetes pod failure modes to determine
// the most appropriate course of action
func DemystifyFailure(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
func DemystifyFailure(ctx context.Context, status v1.PodStatus, info pluginsCore.TaskInfo, primaryContainerName string) (pluginsCore.PhaseInfo, error) {
code := "UnknownError"
message := "Pod failed. No message received from kubernetes."
if len(status.Reason) > 0 {
Expand Down Expand Up @@ -1198,10 +1221,19 @@ func DemystifyFailure(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo
if containerState.Terminated != nil {
if strings.Contains(containerState.Terminated.Reason, OOMKilled) {
code = OOMKilled
} else if containerState.Terminated.ExitCode == SIGKILL {
} else if isTerminatedWithSigKill(containerState) {
// in some setups, node termination sends SIGKILL to all the containers running on that node. Capturing and
// tagging that correctly.
code = Interrupted
// If the primary container exited with SIGKILL, we treat it as a system-level error
// (such as node termination or preemption). This best-effort approach accepts some false positives.
// In the case that node preemption terminates the kubelet *before* the kubelet is able to persist
// the pod's state to the Kubernetes API server, we rely on Kubernetes to eventually resolve
// the state. This will enable Propeller to eventually query the API server and determine that
// the pod no longer exists, which will then be counted as a system error.
if c.Name == primaryContainerName {
isSystemError = true
}
}

if containerState.Terminated.ExitCode == 0 {
Expand All @@ -1216,10 +1248,19 @@ func DemystifyFailure(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo
}
}

// If the code remains 'UnknownError', it indicates that the kubelet did not have a chance
// to record a more specific failure before the node was terminated or preempted.
// In such cases, we classify the error as system-level and accept false positives
if code == "UnknownError" {
isSystemError = true
}

if isSystemError {
logger.Warnf(ctx, "Pod failed with a system error. Code: %s, Message: %s", code, message)
return pluginsCore.PhaseInfoSystemRetryableFailure(Interrupted, message, &info), nil
}

logger.Warnf(ctx, "Pod failed with a user error. Code: %s, Message: %s", code, message)
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
}

Expand Down Expand Up @@ -1257,3 +1298,7 @@ func GetReportedAt(pod *v1.Pod) metav1.Time {

return reportedAt
}

func isTerminatedWithSigKill(state v1.ContainerState) bool {
return state.Terminated != nil && (state.Terminated.ExitCode == SIGKILL || state.Terminated.ExitCode == unsignedSIGKILL)
}
118 changes: 99 additions & 19 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,24 +1622,26 @@ func TestDemystifySuccess(t *testing.T) {
}

func TestDemystifyFailure(t *testing.T) {
ctx := context.TODO()

t.Run("unknown-error", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(v1.PodStatus{}, pluginsCore.TaskInfo{})
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{}, pluginsCore.TaskInfo{}, "")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "UnknownError", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
assert.Equal(t, "Interrupted", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_SYSTEM, phaseInfo.Err().GetKind())
})

t.Run("known-error", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(v1.PodStatus{Reason: "hello"}, pluginsCore.TaskInfo{})
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{Reason: "hello"}, pluginsCore.TaskInfo{}, "")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "hello", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
})

t.Run("OOMKilled", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(v1.PodStatus{
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Expand All @@ -1650,15 +1652,15 @@ func TestDemystifyFailure(t *testing.T) {
},
},
},
}, pluginsCore.TaskInfo{})
}, pluginsCore.TaskInfo{}, "")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "OOMKilled", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
})

t.Run("SIGKILL", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(v1.PodStatus{
t.Run("SIGKILL non-primary container", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
LastTerminationState: v1.ContainerState{
Expand All @@ -1667,18 +1669,39 @@ func TestDemystifyFailure(t *testing.T) {
ExitCode: SIGKILL,
},
},
Name: "non-primary-container",
},
},
}, pluginsCore.TaskInfo{})
}, pluginsCore.TaskInfo{}, "primary-container")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "Interrupted", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
})

t.Run("SIGKILL primary container", func(t *testing.T) {
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
LastTerminationState: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Reason: "some reason",
ExitCode: SIGKILL,
},
},
Name: "primary-container",
},
},
}, pluginsCore.TaskInfo{}, "primary-container")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "Interrupted", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_SYSTEM, phaseInfo.Err().GetKind())
})

t.Run("GKE kubelet graceful node shutdown", func(t *testing.T) {
containerReason := "some reason"
phaseInfo, err := DemystifyFailure(v1.PodStatus{
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{
Message: "Pod Node is in progress of shutting down, not admitting any new pods",
Reason: "Shutdown",
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -1691,7 +1714,7 @@ func TestDemystifyFailure(t *testing.T) {
},
},
},
}, pluginsCore.TaskInfo{})
}, pluginsCore.TaskInfo{}, "")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "Interrupted", phaseInfo.Err().GetCode())
Expand All @@ -1701,7 +1724,7 @@ func TestDemystifyFailure(t *testing.T) {

t.Run("GKE kubelet graceful node shutdown", func(t *testing.T) {
containerReason := "some reason"
phaseInfo, err := DemystifyFailure(v1.PodStatus{
phaseInfo, err := DemystifyFailure(ctx, v1.PodStatus{
Message: "Foobar",
Reason: "Terminated",
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -1714,7 +1737,7 @@ func TestDemystifyFailure(t *testing.T) {
},
},
},
}, pluginsCore.TaskInfo{})
}, pluginsCore.TaskInfo{}, "")
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "Interrupted", phaseInfo.Err().GetCode())
Expand Down Expand Up @@ -1760,6 +1783,7 @@ func TestDemystifyPending_testcases(t *testing.T) {
}

func TestDeterminePrimaryContainerPhase(t *testing.T) {
ctx := context.TODO()
primaryContainerName := "primary"
secondaryContainer := v1.ContainerStatus{
Name: "secondary",
Expand All @@ -1771,7 +1795,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
}
var info = &pluginsCore.TaskInfo{}
t.Run("primary container waiting", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Expand All @@ -1784,7 +1808,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
})
t.Run("primary container running", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Expand All @@ -1797,7 +1821,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
})
t.Run("primary container failed", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Expand All @@ -1811,10 +1835,47 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
}, info)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "foo", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
assert.Equal(t, "\r\n[primary] terminated with exit code (1). Reason [foo]. Message: \nfoo failed.", phaseInfo.Err().GetMessage())
})
t.Run("primary container failed - SIGKILL", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 137,
Reason: "foo",
Message: "foo failed",
},
},
},
}, info)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "foo", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_SYSTEM, phaseInfo.Err().GetKind())
assert.Equal(t, "\r\n[primary] terminated with exit code (137). Reason [foo]. Message: \nfoo failed.", phaseInfo.Err().GetMessage())
})
t.Run("primary container failed - SIGKILL unsigned", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 247,
Reason: "foo",
Message: "foo failed",
},
},
},
}, info)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "foo", phaseInfo.Err().GetCode())
assert.Equal(t, core.ExecutionError_SYSTEM, phaseInfo.Err().GetKind())
assert.Equal(t, "\r\n[primary] terminated with exit code (247). Reason [foo]. Message: \nfoo failed.", phaseInfo.Err().GetMessage())
})
t.Run("primary container succeeded", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Expand All @@ -1827,15 +1888,15 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
})
t.Run("missing primary container", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer,
}, info)
assert.Equal(t, pluginsCore.PhasePermanentFailure, phaseInfo.Phase())
assert.Equal(t, PrimaryContainerNotFound, phaseInfo.Err().GetCode())
assert.Equal(t, "Primary container [primary] not found in pod's container statuses", phaseInfo.Err().GetMessage())
})
t.Run("primary container failed with OOMKilled", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Expand All @@ -1848,9 +1909,28 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
},
}, info)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
assert.Equal(t, OOMKilled, phaseInfo.Err().GetCode())
assert.Equal(t, "\r\n[primary] terminated with exit code (0). Reason [OOMKilled]. Message: \nfoo failed.", phaseInfo.Err().GetMessage())
})
t.Run("primary container failed with OOMKilled - SIGKILL", func(t *testing.T) {
phaseInfo := DeterminePrimaryContainerPhase(ctx, primaryContainerName, []v1.ContainerStatus{
secondaryContainer, {
Name: primaryContainerName,
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 137,
Reason: OOMKilled,
Message: "foo failed",
},
},
},
}, info)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, core.ExecutionError_USER, phaseInfo.Err().GetKind())
assert.Equal(t, OOMKilled, phaseInfo.Err().GetCode())
assert.Equal(t, "\r\n[primary] terminated with exit code (137). Reason [OOMKilled]. Message: \nfoo failed.", phaseInfo.Err().GetMessage())
})
}

func TestGetPodTemplate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
assert.Equal(t, "UnknownError", ec)
assert.Equal(t, "Interrupted", ec)
})

t.Run("failConditionUnschedulable", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 588ceec

Please sign in to comment.