Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cascade Terminate and Purge Support #47

Merged
merged 22 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "submodules/durabletask-protobuf"]
path = submodules/durabletask-protobuf
url = https://github.com/microsoft/durabletask-protobuf
branch = distributed_tracing
branch = main
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Changed
### Added

- Cascading Terminate and Purge support ([#47](https://github.com/microsoft/durabletask-go/pull/47)) - by [@shivamkm07](https://github.com/shivamkm07)

### Changed

- Bump google.golang.org/grpc from 1.53.0 to 1.56.3 ([#39](https://github.com/microsoft/durabletask-go/pull/39))
- Updated durabletask-protobuf submodule to [`4207e1d`](https://github.com/microsoft/durabletask-protobuf/commit/4207e1dbd14cedc268f69c3befee60fcaad19367)

## [v0.4.0] - 2023-12-18

Expand Down
15 changes: 13 additions & 2 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type RaiseEventOptions func(*protos.RaiseEventRequest) error
// TerminateOptions is a set of options for terminating an orchestration.
type TerminateOptions func(*protos.TerminateRequest) error

// PurgeOptions is a set of options for purging an orchestration.
type PurgeOptions func(*protos.PurgeInstancesRequest) error

// WithInstanceID configures an explicit orchestration instance ID. If not specified,
// a random UUID value will be used for the orchestration instance ID.
func WithInstanceID(id InstanceID) NewOrchestrationOptions {
Expand Down Expand Up @@ -171,14 +174,22 @@ func WithRawOutput(data string) TerminateOptions {
}
}

// WithRecursive configures whether to terminate all sub-orchestrations created by the target orchestration.
func WithRecursive(recursive bool) TerminateOptions {
// WithRecursiveTerminate configures whether to terminate all sub-orchestrations created by the target orchestration.
func WithRecursiveTerminate(recursive bool) TerminateOptions {
return func(req *protos.TerminateRequest) error {
req.Recursive = recursive
return nil
}
}

// WithRecursivePurge configures whether to purge all sub-orchestrations created by the target orchestration.
func WithRecursivePurge(recursive bool) PurgeOptions {
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
return func(req *protos.PurgeInstancesRequest) error {
req.Recursive = recursive
return nil
}
}

func NewOrchestrationMetadata(
iid InstanceID,
name string,
Expand Down
71 changes: 71 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -122,3 +123,73 @@ func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error) {
}
return e, nil
}

// purgeOrchestrationState purges the orchestration state, including sub-orchestrations if [recursive] is true.
// Returns (deletedInstanceCount, error), where deletedInstanceCount is the number of instances deleted.
func purgeOrchestrationState(ctx context.Context, be Backend, iid api.InstanceID, recursive bool) (int, error) {
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
deletedInstanceCount := 0
if recursive {
owi := &OrchestrationWorkItem{
InstanceID: iid,
}
state, err := be.GetOrchestrationRuntimeState(ctx, owi)
if err != nil {
return 0, fmt.Errorf("failed to fetch orchestration state: %w", err)
}
if len(state.NewEvents())+len(state.oldEvents) == 0 {
// If there are no events, the orchestration instance doesn't exist
return 0, api.ErrInstanceNotFound
}
if !state.IsCompleted() {
// Orchestration must be completed before purging its state
return 0, api.ErrNotCompleted
}
subOrchestrationInstances := getSubOrchestrationInstances(state.OldEvents(), state.NewEvents())
for _, subOrchestrationInstance := range subOrchestrationInstances {
// Recursively purge sub-orchestrations
count, err := purgeOrchestrationState(ctx, be, subOrchestrationInstance, recursive)
// `count` sub-orchestrations have been successfully purged (even in case of error)
deletedInstanceCount += count
if err != nil {
return deletedInstanceCount, fmt.Errorf("failed to purge sub-orchestration: %w", err)
}
}
}
// Purging root orchestration
if err := be.PurgeOrchestrationState(ctx, iid); err != nil {
return deletedInstanceCount, err
}
return deletedInstanceCount + 1, nil
}

// terminateSubOrchestrationInstances submits termination requests to sub-orchestrations if [et.Recurse] is true.
func terminateSubOrchestrationInstances(ctx context.Context, be Backend, iid api.InstanceID, state *OrchestrationRuntimeState, et *protos.ExecutionTerminatedEvent) error {
if !et.Recurse {
return nil
}
subOrchestrationInstances := getSubOrchestrationInstances(state.OldEvents(), state.NewEvents())
for _, subOrchestrationInstance := range subOrchestrationInstances {
e := helpers.NewExecutionTerminatedEvent(et.Input, et.Recurse)
// Adding terminate event to sub-orchestration instance
if err := be.AddNewOrchestrationEvent(ctx, subOrchestrationInstance, e); err != nil {
return fmt.Errorf("failed to submit termination request to sub-orchestration: %w", err)
}
}
return nil
}

// getSubOrchestrationInstances returns the instance IDs of all sub-orchestrations in the specified events.
func getSubOrchestrationInstances(oldEvents []*HistoryEvent, newEvents []*HistoryEvent) []api.InstanceID {
subOrchestrationInstances := make([]api.InstanceID, 0, len(oldEvents)+len(newEvents))
for _, e := range oldEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
subOrchestrationInstances = append(subOrchestrationInstances, api.InstanceID(created.InstanceId))
}
}
for _, e := range newEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
subOrchestrationInstances = append(subOrchestrationInstances, api.InstanceID(created.InstanceId))
}
}
return subOrchestrationInstances
}
15 changes: 10 additions & 5 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TaskHubClient interface {
RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error
SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error
ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error
PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error
PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error
}

type backendClient struct {
Expand Down Expand Up @@ -139,10 +139,9 @@ func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.Insta
return fmt.Errorf("failed to configure termination request: %w", err)
}
}

e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to add terminate event: %w", err)
return fmt.Errorf("failed to submit termination request:: %w", err)
}
return nil
}
Expand Down Expand Up @@ -194,8 +193,14 @@ func (c *backendClient) ResumeOrchestration(ctx context.Context, id api.Instance
//
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
func (c *backendClient) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error {
if err := c.be.PurgeOrchestrationState(ctx, id); err != nil {
func (c *backendClient) PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error {
req := &protos.PurgeInstancesRequest{Request: &protos.PurgeInstancesRequest_InstanceId{InstanceId: string(id)}, Recursive: true}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting default behavior to be recursive Purge, similar to terminate

for _, configure := range opts {
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure purge request: %w", err)
}
}
if _, err := purgeOrchestrationState(ctx, c.be, id, req.Recursive); err != nil {
return fmt.Errorf("failed to purge orchestration state: %w", err)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,12 @@ func (g *grpcExecutor) PurgeInstances(ctx context.Context, req *protos.PurgeInst
if req.GetPurgeInstanceFilter() != nil {
return nil, errors.New("multi-instance purge is not unimplemented")
}

if err := g.backend.PurgeOrchestrationState(ctx, api.InstanceID(req.GetInstanceId())); err != nil {
return nil, err
count, err := purgeOrchestrationState(ctx, g.backend, api.InstanceID(req.GetInstanceId()), req.Recursive)
resp := &protos.PurgeInstancesResponse{DeletedInstanceCount: int32(count)}
if err != nil {
return resp, fmt.Errorf("failed to purge orchestration state: %w", err)
}
return &protos.PurgeInstancesResponse{DeletedInstanceCount: 1}, nil
return resp, nil
}

// QueryInstances implements protos.TaskHubSidecarServiceServer
Expand Down Expand Up @@ -322,9 +323,8 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := g.backend.AddNewOrchestrationEvent(ctx, api.InstanceID(req.InstanceId), e); err != nil {
return nil, err
return nil, fmt.Errorf("failed to submit termination request: %w", err)
}

return &protos.TerminateResponse{}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions backend/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ func (w *orchestratorProcessor) ProcessWorkItem(ctx context.Context, cwi WorkIte
}
w.logger.Debugf("%v: got orchestration runtime state: %s", wi.InstanceID, getOrchestrationStateDescription(wi))

var terminateEvent *protos.ExecutionTerminatedEvent = nil
for _, e := range wi.NewEvents {
if et := e.GetExecutionTerminated(); et != nil {
terminateEvent = et
break
}
}
if ctx, span, ok := w.applyWorkItem(ctx, wi); ok {
defer func() {
// Note that the span and ctx references may be updated inside the continue-as-new loop.
Expand Down Expand Up @@ -114,6 +121,11 @@ func (w *orchestratorProcessor) ProcessWorkItem(ctx context.Context, cwi WorkIte
break
}
}
if terminateEvent != nil && wi.State.IsCompleted() {
if err := terminateSubOrchestrationInstances(ctx, w.be, wi.InstanceID, wi.State, terminateEvent); err != nil {
return err
}
}
return nil
}

Expand Down
Loading
Loading