From 5bd88573a49fddc9fa5a44f9bdaae67e4fe1ccce Mon Sep 17 00:00:00 2001 From: justinp-tt <174377431+justinp-tt@users.noreply.github.com> Date: Fri, 31 Jan 2025 14:09:50 -0600 Subject: [PATCH] Control state-based deletion using the enableTransitionHistory dynamic config (#7163) ## What changed? Control state-based deletion using the enableTransitionHistory dynamic config ## Why? ## How did you test it? ## Potential risks ## Documentation ## Is hotfix candidate? --------- Co-authored-by: Roey Berman Co-authored-by: Roey Berman --- components/nexusoperations/events.go | 19 +++++++++++--- components/nexusoperations/helpers_test.go | 4 +++ .../nexusoperations/workflow/commands.go | 25 +++++++++++++++++-- .../nexusoperations/workflow/commands_test.go | 4 ++- .../workflow/mutable_state_rebuilder_test.go | 1 + 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/components/nexusoperations/events.go b/components/nexusoperations/events.go index 7f8ad79f3ff..5dcf22cfe48 100644 --- a/components/nexusoperations/events.go +++ b/components/nexusoperations/events.go @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d CompletedEventDefinition) Type() enumspb.EventType { @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -304,3 +304,14 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node } return node, nil } + +func maybeDeleteNode(node *hsm.Node) error { + ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent) + if err != nil { + return err + } + if !ms.IsTransitionHistoryEnabled() { + return node.Parent.DeleteChild(node.Key) + } + return nil +} diff --git a/components/nexusoperations/helpers_test.go b/components/nexusoperations/helpers_test.go index 6e66b511f77..89832bdd235 100644 --- a/components/nexusoperations/helpers_test.go +++ b/components/nexusoperations/helpers_test.go @@ -90,6 +90,10 @@ func (root) IsWorkflowExecutionRunning() bool { return true } +func (root) IsTransitionHistoryEnabled() bool { + return false +} + func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historypb.HistoryEvent { conv := converter.GetDefaultDataConverter() payload, err := conv.ToPayload("input") diff --git a/components/nexusoperations/workflow/commands.go b/components/nexusoperations/workflow/commands.go index 20561f82aa2..947d5a44f99 100644 --- a/components/nexusoperations/workflow/commands.go +++ b/components/nexusoperations/workflow/commands.go @@ -212,10 +212,11 @@ func (ch *commandHandler) HandleCancelCommand( coll := nexusoperations.MachineCollection(ms.HSM()) nodeID := strconv.FormatInt(attrs.ScheduledEventId, 10) - _, err := coll.Node(nodeID) + node, err := coll.Node(nodeID) + hasBufferedEvent := ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) if err != nil { if errors.Is(err, hsm.ErrStateMachineNotFound) { - if !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) { + if !hasBufferedEvent { return workflow.FailWorkflowTaskError{ Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId), @@ -227,6 +228,26 @@ func (ch *commandHandler) HandleCancelCommand( } } + if node != nil { + // TODO(bergundy): Remove this when operation auto-deletes itself on terminal state. + // Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept + // cancelation in this case. + op, err := hsm.MachineData[nexusoperations.Operation](node) + if err != nil { + return err + } + // The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered. + // We allow the workflow to request canceling an operation that has just completed while a workflow task is in + // flight since it cannot know about the state of the operation. + if !nexusoperations.TransitionCanceled.Possible(op) && !hasBufferedEvent { + return workflow.FailWorkflowTaskError{ + Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, + Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId), + } + } + // END TODO + } + // Always create the event even if there's a buffered completion to avoid breaking replay in the SDK. // The event will be applied before the completion since buffered events are reordered and put at the end of the // batch, after command events from the workflow task. diff --git a/components/nexusoperations/workflow/commands_test.go b/components/nexusoperations/workflow/commands_test.go index 50b64f81bba..5ed654c848d 100644 --- a/components/nexusoperations/workflow/commands_test.go +++ b/components/nexusoperations/workflow/commands_test.go @@ -96,6 +96,7 @@ func newTestContext(t *testing.T, cfg *nexusoperations.Config) testContext { ms := workflow.NewMockMutableState(gomock.NewController(t)) node, err := hsm.NewRoot(smReg, workflow.StateMachineType, ms, make(map[string]*persistencespb.StateMachineMap), ms) require.NoError(t, err) + ms.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes() ms.EXPECT().HSM().Return(node).AnyTimes() lastEventID := int64(4) history := &historypb.History{} @@ -537,7 +538,7 @@ func TestHandleCancelCommand(t *testing.T) { t.Run("operation already completed - completion buffered", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) - tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true) + tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true).AnyTimes() err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ @@ -578,6 +579,7 @@ func TestHandleCancelCommand(t *testing.T) { t.Run("sets event attributes with UserMetadata and spawns cancelation child machine", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) + tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(false).AnyTimes() err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index cd0cc52ec97..97096c4e3d5 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -128,6 +128,7 @@ func (s *stateBuilderSuite) SetupTest() { root, err := hsm.NewRoot(reg, StateMachineType, s.mockMutableState, make(map[string]*persistencespb.StateMachineMap), s.mockMutableState) s.NoError(err) s.mockMutableState.EXPECT().HSM().Return(root).AnyTimes() + s.mockMutableState.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes() s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata