Skip to content

Commit

Permalink
Rafaelraposo/force abort impl (#4)
Browse files Browse the repository at this point in the history
* Refactor k8s executor abort

Signed-off-by: Rafael Raposo <[email protected]>
Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so committed Oct 10, 2024
1 parent 197ae13 commit f33059a
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 111 deletions.
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,7 @@ func (m *ExecutionManager) TerminateExecution(

ExecutionID: request.Id,
Cluster: executionModel.Cluster,
Force: request.Force,
})
if err != nil {
m.systemMetrics.TerminateExecutionFailures.Inc()
Expand Down
64 changes: 64 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3450,6 +3450,7 @@ func TestTerminateExecution(t *testing.T) {
Domain: "domain",
Name: "name",
}, data.ExecutionID))
assert.False(t, data.Force)
return true
})).Return(nil)
mockExecutor.OnID().Return("customMockExecutor")
Expand All @@ -3473,6 +3474,69 @@ func TestTerminateExecution(t *testing.T) {
assert.NotNil(t, resp)
}

func TestForceTerminateExecution(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startTime := time.Now()
executionGetFunc := makeExecutionGetFunc(t, []byte{}, &startTime)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

abortCause := "abort cause"
updateExecutionFunc := func(
context context.Context, execution models.Execution) error {
assert.Equal(t, "project", execution.Project)
assert.Equal(t, "domain", execution.Domain)
assert.Equal(t, "name", execution.Name)
assert.Equal(t, uint(1), execution.LaunchPlanID)
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
assert.Equal(t, execution.ExecutionCreatedAt, execution.ExecutionUpdatedAt,
"an abort call should not change ExecutionUpdatedAt until a corresponding execution event is received")
assert.Equal(t, abortCause, execution.AbortCause)
assert.Equal(t, testCluster, execution.Cluster)

var unmarshaledClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &unmarshaledClosure)
assert.NoError(t, err)
assert.True(t, proto.Equal(&admin.AbortMetadata{
Cause: abortCause,
Principal: principal,
}, unmarshaledClosure.GetAbortMetadata()))
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)

mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.AbortData) bool {
assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}, data.ExecutionID))
assert.True(t, data.Force)
return true
})).Return(nil)
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
ctx := identity.WithContext(context.Background())
resp, err := execManager.TerminateExecution(ctx, &admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Cause: abortCause,
Force: true,
})

assert.Nil(t, err)
assert.NotNil(t, resp)
}

func TestTerminateExecution_PropellerError(t *testing.T) {
var expectedError = errors.New("expected error")

Expand Down
33 changes: 30 additions & 3 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

var deletePropagationBackground = v1.DeletePropagationBackground

const AbortedWorkflowAnnotation = "workflow-aborted"
const defaultIdentifier = "DefaultK8sExecutor"

// K8sWorkflowExecutor directly creates and delete Flyte workflow execution CRD objects using the configured execution
Expand Down Expand Up @@ -94,9 +95,35 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
})

w, err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Get(ctx, data.ExecutionID.GetName(), v1.GetOptions{})
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)

} else if k8_api_err.IsNotFound(err) {
logger.Infof(ctx, "Trying to abort an execution [%+v] that is not found in cluster: %s, skipping...", data.ExecutionID, target.ID)
return nil
}

if data.Force {
logger.Debugf(ctx, "Force deleting execution [%+v] in cluster: %s", data.ExecutionID, target.ID)

// Remove finalizers if any
if w.Finalizers != nil || w.ObjectMeta.Finalizers != nil {
w.Finalizers = []string{}
}

// Write the updated workflow
if _, err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Update(ctx, w, v1.UpdateOptions{}); err != nil {
if k8_api_err.IsNotFound(err) {
logger.Debugf(ctx, "Trying to force delete an execution [%+v] that is not found in cluster: %s, skipping...", data.ExecutionID, target.ID)
return nil

Check warning on line 120 in flyteadmin/pkg/workflowengine/impl/k8s_executor.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/workflowengine/impl/k8s_executor.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to remove finalizer for execution [%+v] in cluster: %s with err %v", data.ExecutionID, target.ID, err)

Check warning on line 122 in flyteadmin/pkg/workflowengine/impl/k8s_executor.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/workflowengine/impl/k8s_executor.go#L122

Added line #L122 was not covered by tests
}

}

// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
Expand Down
102 changes: 96 additions & 6 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}

type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
type deleteCallback func(name string, options *v1.DeleteOptions) error
type updateCallback func(*v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)
type getCallback func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error)
type FakeFlyteWorkflow struct {
v1alpha12.FlyteWorkflowInterface
createCallback createCallback
deleteCallback deleteCallback
getCallback getCallback
updateCallback updateCallback
}

func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
Expand All @@ -52,6 +56,20 @@ func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.
return nil
}

func (b *FakeFlyteWorkflow) Get(ctx context.Context, name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
if b.getCallback != nil {
return b.getCallback(name, options)
}
return nil, nil
}

func (b *FakeFlyteWorkflow) Update(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) {
if b.updateCallback != nil {
return b.updateCallback(wf)
}
return nil, nil
}

type flyteWorkflowsCallback func(string) v1alpha12.FlyteWorkflowInterface

type FakeFlyteWorkflowV1alpha1 struct {
Expand Down Expand Up @@ -86,9 +104,10 @@ var execID = &core.WorkflowExecutionIdentifier{
}

var flyteWf = &v1alpha1.FlyteWorkflow{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: execID,
ObjectMeta: v1.ObjectMeta{
Finalizers: []string{"mock-finalizer"},
},
ExecutionID: v1alpha1.ExecutionID{WorkflowExecutionIdentifier: execID},
}

var testInputs = &core.LiteralMap{
Expand Down Expand Up @@ -278,13 +297,47 @@ func TestExecute_MiscError(t *testing.T) {
assert.EqualError(t, err, "failed to create workflow in propeller call failed")
}

func TestAbort(t *testing.T) {
func TestAbort_ForceTrue(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
assert.Empty(t, flyteWorkflow.Finalizers)
return nil, nil
}

fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
assert.Equal(t, execID.Name, name)
assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
return nil
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
}
err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
Force: true,
})
assert.NoError(t, err)
}

func TestAbort_ForceFalse(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
Expand All @@ -300,14 +353,50 @@ func TestAbort(t *testing.T) {
assert.NoError(t, err)
}

func TestAbort_Notfound(t *testing.T) {
func TestAbort_NotfoundWhenGet(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}

fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
return nil, k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
}
err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
})
assert.NoError(t, err)
}

func TestAbort_NotfoundWhenForceDelete(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}

fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
return nil, nil
}

fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
}

fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
Expand All @@ -319,14 +408,15 @@ func TestAbort_Notfound(t *testing.T) {
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
Force: true,
})
assert.NoError(t, err)
}

func TestAbort_MiscError(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
return errors.New("call failed")
fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
return nil, errors.New("call failed")
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type AbortData struct {
ExecutionID *core.WorkflowExecutionIdentifier
// Cluster identifier where the execution was created
Cluster string
// Is force abort
Force bool
}

// WorkflowExecutor is a client interface used to create and delete Flyte workflow CRD objects.
Expand Down
8 changes: 8 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f33059a

Please sign in to comment.