Skip to content

Commit

Permalink
introduce skip destination drop
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 18, 2025
1 parent 9e85860 commit d755c96
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
36 changes: 19 additions & 17 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,32 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput)
_ = workflow.Sleep(ctx, time.Second)
}
}
dropDestination = func(f workflow.Future) {
destinationError = f.Get(ctx, nil)
destinationOk = destinationError == nil
if !destinationOk {
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})
selector.AddFuture(dropDestinationFuture, dropDestination)
_ = workflow.Sleep(ctx, time.Second)
}
}

dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.SourceName,
})
selector.AddFuture(dropSourceFuture, dropSource)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})

selector.AddFuture(dropDestinationFuture, dropDestination)
if !input.SkipDestinationDrop {
dropDestination = func(f workflow.Future) {
destinationError = f.Get(ctx, nil)
destinationOk = destinationError == nil
if !destinationOk {
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})
selector.AddFuture(dropDestinationFuture, dropDestination)
_ = workflow.Sleep(ctx, time.Second)
}
}
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{
FlowJobName: input.FlowJobName,
PeerName: input.FlowConnectionConfigs.DestinationName,
})
selector.AddFuture(dropDestinationFuture, dropDestination)
}

for {
selector.Select(ctx)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ message DropFlowInput {
string flow_job_name = 1;
bool drop_flow_stats = 4;
FlowConnectionConfigs flow_connection_configs = 5;
bool skip_destination_drop = 6;
}

message TableSchemaDelta {
Expand Down

0 comments on commit d755c96

Please sign in to comment.