Skip to content

Commit

Permalink
handle state machine deletion for state-based replication (#7177)
Browse files Browse the repository at this point in the history
## What changed?
Handle state machine deletion for state-based replication.

## Why?
Deletion is needed as we want to provide support for more nexus
operations.

## How did you test it?
unit test.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
hai719 authored Feb 7, 2025
1 parent 5a9bdb6 commit 9a0114b
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 60 deletions.
19 changes: 4 additions & 15 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d CompletedEventDefinition) Type() enumspb.EventType {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE
return err
}

return maybeDeleteNode(node)
return node.Parent.DeleteChild(node.Key)
}

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -304,14 +304,3 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node
}
return node, nil
}

func maybeDeleteNode(node *hsm.Node) error {
ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent)
if err != nil {
return err
}
if !ms.IsTransitionHistoryEnabled() {
return node.Parent.DeleteChild(node.Key)
}
return nil
}
16 changes: 9 additions & 7 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,18 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
)
}
localTransitionHistory := transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory)
localVersionedTransition := transitionhistory.LastVersionedTransition(localTransitionHistory)
sourceTransitionHistory := mutation.StateMutation.ExecutionInfo.TransitionHistory

// make sure mutation range is extension of local range
if workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil ||
workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory)) != nil {
workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) != nil {
return serviceerrors.NewSyncState(
fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v", localTransitionHistory, sourceTransitionHistory),
namespaceID.String(),
workflowID,
runID,
localTransitionHistory[len(localTransitionHistory)-1],
localVersionedTransition,
localMutableState.GetExecutionInfo().VersionHistories,
)
}
Expand Down Expand Up @@ -402,7 +403,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
}
}

err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1])
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
if err != nil {
return err
}
Expand Down Expand Up @@ -453,10 +454,12 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
}
var isBranchSwitched bool
var localTransitionHistory []*persistencespb.VersionedTransition
var localVersionedTransition *persistencespb.VersionedTransition
if len(localMutableState.GetExecutionInfo().TransitionHistory) != 0 {
localTransitionHistory = transitionhistory.CopyVersionedTransitions(localMutableState.GetExecutionInfo().TransitionHistory)
localVersionedTransition = transitionhistory.LastVersionedTransition(localTransitionHistory)
sourceTransitionHistory := snapshot.ExecutionInfo.TransitionHistory
err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, transitionhistory.LastVersionedTransition(localTransitionHistory))
err := workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition)
switch {
case err == nil:
// no branch switch
Expand All @@ -466,7 +469,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
namespaceID.String(),
workflowID,
runID,
transitionhistory.LastVersionedTransition(localTransitionHistory),
localVersionedTransition,
localMutableState.GetExecutionInfo().VersionHistories,
)
case errors.Is(err, consts.ErrStaleReference):
Expand Down Expand Up @@ -508,7 +511,6 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
if err != nil {
return err
}
localMutableState.PopTasks() // tasks are refreshed manually below

var newRunWorkflow Workflow
if versionedTransition.NewRunInfo != nil {
Expand All @@ -530,7 +532,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
return err
}
} else {
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localTransitionHistory[len(localTransitionHistory)-1])
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockMutableState.EXPECT().PopTasks().Times(1)
mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1)
mockTaskRefresher.EXPECT().
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
Expand Down Expand Up @@ -774,7 +773,6 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_DifferentBra
},
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockMutableState.EXPECT().PopTasks().Times(1)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,5 +461,8 @@ type (
GetReapplyCandidateEvents() []*historypb.HistoryEvent

CurrentVersionedTransition() *persistencespb.VersionedTransition

DeleteSubStateMachine(path *persistencespb.StateMachinePath) error
IsSubStateMachineDeleted() bool
}
)
75 changes: 48 additions & 27 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,15 @@ type (
workflowTaskManager *workflowTaskStateMachine
QueryRegistry QueryRegistry

shard shard.Context
clusterMetadata cluster.Metadata
eventsCache events.Cache
config *configs.Config
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
stateMachineNode *hsm.Node
shard shard.Context
clusterMetadata cluster.Metadata
eventsCache events.Cache
config *configs.Config
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
stateMachineNode *hsm.Node
subStateMachineDeleted bool

// Tracks all events added via the AddHistoryEvent method that is used by the state machine framework.
currentTransactionAddedStateMachineEventTypes []enumspb.EventType
Expand Down Expand Up @@ -4357,6 +4358,29 @@ func (ms *MutableStateImpl) AddWorkflowExecutionUpdateAdmittedEvent(request *upd
return event, nil
}

func (ms *MutableStateImpl) DeleteSubStateMachine(path *persistencespb.StateMachinePath) error {
incomingPath := make([]hsm.Key, len(path.Path))
for i, p := range path.Path {
incomingPath[i] = hsm.Key{Type: p.Type, ID: p.Id}
}

root := ms.HSM()
node, err := root.Child(incomingPath)
if err != nil {
if !errors.Is(err, hsm.ErrStateMachineNotFound) {
return err
}
// node is already deleted.
return nil
}
err = node.Parent.DeleteChild(node.Key)
if err != nil {
return err
}
ms.subStateMachineDeleted = true
return nil
}

// ApplyWorkflowExecutionUpdateAdmittedEvent applies a WorkflowExecutionUpdateAdmittedEvent to mutable state.
func (ms *MutableStateImpl) ApplyWorkflowExecutionUpdateAdmittedEvent(event *historypb.HistoryEvent, batchId int64) error {
attrs := event.GetWorkflowExecutionUpdateAdmittedEventAttributes()
Expand Down Expand Up @@ -6326,6 +6350,7 @@ func (ms *MutableStateImpl) cleanupTransaction() error {
ms.timerInfosUserDataUpdated = make(map[string]struct{})
ms.activityInfosUserDataUpdated = make(map[int64]struct{})
ms.reapplyEventsCandidate = nil
ms.subStateMachineDeleted = false

ms.stateInDB = ms.executionState.State
ms.nextEventIDInDB = ms.GetNextEventID()
Expand Down Expand Up @@ -7387,16 +7412,9 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
}

func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error {
// check if there is node been deleted
currentHSM := ms.HSM()

// we don't care about the root here which is the entire mutable state
incomingHSM, err := hsm.NewRoot(
ms.shard.StateMachineRegistry(),
StateMachineType,
ms,
incoming,
ms,
)
incomingHSM, err := hsm.NewRoot(ms.shard.StateMachineRegistry(), StateMachineType, ms, incoming, ms)
if err != nil {
return err
}
Expand All @@ -7406,22 +7424,19 @@ func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*pers
// skip root which is the entire mutable state
return nil
}

incomingNodePath := incomingNode.Path()
currentNode, err := currentHSM.Child(incomingNodePath)
if err != nil {
// 1. Already done history resend if needed before,
// and node creation today always associated with an event
// 2. Node deletion is not supported right now.
// Based on 1 and 2, node should always be found here.
return err
_, err := currentHSM.Child(incomingNodePath)
if err != nil && errors.Is(err, hsm.ErrStateMachineNotFound) {
ms.subStateMachineDeleted = true
return nil
}

return currentNode.Sync(incomingNode)
return err
}); err != nil {
return err
}

ms.executionInfo.SubStateMachinesByType = incoming
ms.mustInitHSM()
return nil
}

Expand Down Expand Up @@ -7453,6 +7468,8 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S
if _, ok := ms.pendingSignalInfoIDs[tombstone.GetSignalExternalInitiatedEventId()]; ok {
err = ms.DeletePendingSignal(tombstone.GetSignalExternalInitiatedEventId())
}
case *persistencespb.StateMachineTombstone_StateMachinePath:
err = ms.DeleteSubStateMachine(tombstone.GetStateMachinePath())
default:
// TODO: updateID and stateMachinePath
err = serviceerror.NewInternal("unknown tombstone type")
Expand Down Expand Up @@ -7609,3 +7626,7 @@ func (ms *MutableStateImpl) AddReapplyCandidateEvent(event *historypb.HistoryEve
func (ms *MutableStateImpl) GetReapplyCandidateEvents() []*historypb.HistoryEvent {
return ms.reapplyEventsCandidate
}

func (ms *MutableStateImpl) IsSubStateMachineDeleted() bool {
return ms.subStateMachineDeleted
}
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion service/history/workflow/state_machine_timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@ import (
// AddNextStateMachineTimerTask generates a state machine timer task if the first deadline doesn't have a task scheduled
// yet.
func AddNextStateMachineTimerTask(ms MutableState) {
// filter out empty timer groups
timers := ms.GetExecutionInfo().StateMachineTimers
timers = slices.DeleteFunc(timers, func(timerGroup *persistencespb.StateMachineTimerGroup) bool {
return len(timerGroup.Infos) == 0
})
ms.GetExecutionInfo().StateMachineTimers = timers

if len(timers) == 0 {
return
}

timerGroup := timers[0]
// We already have a timer for this deadline.
if timerGroup.Scheduled {
Expand Down Expand Up @@ -122,7 +129,9 @@ func TrimStateMachineTimers(

trimmedTaskInfos = append(trimmedTaskInfos, taskInfo)
}
if len(trimmedTaskInfos) > 0 {
if len(trimmedTaskInfos) > 0 || timerGroup.Scheduled {
// We still want to keep the timer group if it has been scheduled even if it has no task info.
// This will prevent us from scheduling a new timer task for the same group.
trimmedStateMachineTimers = append(trimmedStateMachineTimers, &persistencespb.StateMachineTimerGroup{
Infos: trimmedTaskInfos,
Deadline: timerGroup.Deadline,
Expand Down
4 changes: 1 addition & 3 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,7 @@ func (r *TaskRefresherImpl) refreshTasksForSubStateMachines(
return err
}

if len(nodesToRefresh) != 0 {
// TODO: after hsm node tombstone is tracked in mutable state,
// also trigger trim when there are new tombstones after minVersionedTransition
if len(nodesToRefresh) != 0 || mutableState.IsSubStateMachineDeleted() {
if err := TrimStateMachineTimers(mutableState, minVersionedTransition); err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions tests/xdc/nexus_state_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ func TestNexusStateReplicationTestSuite(t *testing.T) {
name: "DisableTransitionHistory",
enableTransitionHistory: false,
},
// TODO(hai719): Enable this test once state based replication works with HSM node deletion.
// {
// name: "EnableTransitionHistory",
// enableTransitionHistory: true,
// },
{
name: "EnableTransitionHistory",
enableTransitionHistory: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
s := &NexusStateReplicationSuite{}
Expand Down

0 comments on commit 9a0114b

Please sign in to comment.