Skip to content

Commit

Permalink
Control state-based deletion using the enableTransitionHistory dynami…
Browse files Browse the repository at this point in the history
…c config (#7163)

## What changed?
<!-- Describe what has changed in this PR -->
Control state-based deletion using the enableTransitionHistory dynamic
config

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: Roey Berman <[email protected]>
Co-authored-by: Roey Berman <[email protected]>
  • Loading branch information
3 people authored Jan 31, 2025
1 parent 3dd9794 commit 5bd8857
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 7 deletions.
19 changes: 15 additions & 4 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History
return err
}

return root.DeleteChild(node.Key)
return maybeDeleteNode(node)
}

func (d CompletedEventDefinition) Type() enumspb.EventType {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve
return err
}

return root.DeleteChild(node.Key)
return maybeDeleteNode(node)
}

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return root.DeleteChild(node.Key)
return maybeDeleteNode(node)
}

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return root.DeleteChild(node.Key)
return maybeDeleteNode(node)
}

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -304,3 +304,14 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node
}
return node, nil
}

func maybeDeleteNode(node *hsm.Node) error {
ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent)
if err != nil {
return err
}
if !ms.IsTransitionHistoryEnabled() {
return node.Parent.DeleteChild(node.Key)
}
return nil
}
4 changes: 4 additions & 0 deletions components/nexusoperations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (root) IsWorkflowExecutionRunning() bool {
return true
}

func (root) IsTransitionHistoryEnabled() bool {
return false
}

func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historypb.HistoryEvent {
conv := converter.GetDefaultDataConverter()
payload, err := conv.ToPayload("input")
Expand Down
25 changes: 23 additions & 2 deletions components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ func (ch *commandHandler) HandleCancelCommand(

coll := nexusoperations.MachineCollection(ms.HSM())
nodeID := strconv.FormatInt(attrs.ScheduledEventId, 10)
_, err := coll.Node(nodeID)
node, err := coll.Node(nodeID)
hasBufferedEvent := ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId))
if err != nil {
if errors.Is(err, hsm.ErrStateMachineNotFound) {
if !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
if !hasBufferedEvent {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
Expand All @@ -227,6 +228,26 @@ func (ch *commandHandler) HandleCancelCommand(
}
}

if node != nil {
// TODO(bergundy): Remove this when operation auto-deletes itself on terminal state.
// Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept
// cancelation in this case.
op, err := hsm.MachineData[nexusoperations.Operation](node)
if err != nil {
return err
}
// The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered.
// We allow the workflow to request canceling an operation that has just completed while a workflow task is in
// flight since it cannot know about the state of the operation.
if !nexusoperations.TransitionCanceled.Possible(op) && !hasBufferedEvent {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
}
// END TODO
}

// Always create the event even if there's a buffered completion to avoid breaking replay in the SDK.
// The event will be applied before the completion since buffered events are reordered and put at the end of the
// batch, after command events from the workflow task.
Expand Down
4 changes: 3 additions & 1 deletion components/nexusoperations/workflow/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func newTestContext(t *testing.T, cfg *nexusoperations.Config) testContext {
ms := workflow.NewMockMutableState(gomock.NewController(t))
node, err := hsm.NewRoot(smReg, workflow.StateMachineType, ms, make(map[string]*persistencespb.StateMachineMap), ms)
require.NoError(t, err)
ms.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes()
ms.EXPECT().HSM().Return(node).AnyTimes()
lastEventID := int64(4)
history := &historypb.History{}
Expand Down Expand Up @@ -537,7 +538,7 @@ func TestHandleCancelCommand(t *testing.T) {

t.Run("operation already completed - completion buffered", func(t *testing.T) {
tcx := newTestContext(t, defaultConfig)
tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true)
tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true).AnyTimes()

err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{
Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{
Expand Down Expand Up @@ -578,6 +579,7 @@ func TestHandleCancelCommand(t *testing.T) {

t.Run("sets event attributes with UserMetadata and spawns cancelation child machine", func(t *testing.T) {
tcx := newTestContext(t, defaultConfig)
tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(false).AnyTimes()
err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{
Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{
ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state_rebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *stateBuilderSuite) SetupTest() {
root, err := hsm.NewRoot(reg, StateMachineType, s.mockMutableState, make(map[string]*persistencespb.StateMachineMap), s.mockMutableState)
s.NoError(err)
s.mockMutableState.EXPECT().HSM().Return(root).AnyTimes()
s.mockMutableState.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes()

s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
Expand Down

0 comments on commit 5bd8857

Please sign in to comment.