@@ -17,7 +17,9 @@ use temporal_sdk_core_protos::{
17
17
workflow_activation:: {
18
18
workflow_activation_job, FireTimer , ResolveActivity , WorkflowActivationJob ,
19
19
} ,
20
- workflow_commands:: { ActivityCancellationType , RequestCancelActivity , StartTimer } ,
20
+ workflow_commands:: {
21
+ ActivityCancellationType , RequestCancelActivity , ScheduleActivity , StartTimer ,
22
+ } ,
21
23
workflow_completion:: WorkflowActivationCompletion ,
22
24
ActivityHeartbeat , ActivityTaskCompletion , AsJsonPayloadExt , FromJsonPayloadExt ,
23
25
IntoCompletion ,
@@ -728,17 +730,24 @@ async fn activity_cancelled_after_heartbeat_times_out() {
728
730
let activity_id = "act-1" ;
729
731
let task = core. poll_workflow_activation ( ) . await . unwrap ( ) ;
730
732
// Complete workflow task and schedule activity
731
- core. complete_workflow_activation (
732
- schedule_activity_cmd (
733
- 0 ,
734
- & task_q,
735
- activity_id,
736
- ActivityCancellationType :: WaitCancellationCompleted ,
737
- Duration :: from_secs ( 60 ) ,
738
- Duration :: from_secs ( 1 ) ,
739
- )
740
- . into_completion ( task. run_id ) ,
741
- )
733
+ core. complete_workflow_activation ( WorkflowActivationCompletion :: from_cmd (
734
+ task. run_id ,
735
+ ScheduleActivity {
736
+ seq : 0 ,
737
+ activity_id : activity_id. to_string ( ) ,
738
+ activity_type : "dontcare" . to_string ( ) ,
739
+ task_queue : task_q. clone ( ) ,
740
+ schedule_to_close_timeout : Some ( prost_dur ! ( from_secs( 10 ) ) ) ,
741
+ heartbeat_timeout : Some ( prost_dur ! ( from_secs( 1 ) ) ) ,
742
+ retry_policy : Some ( RetryPolicy {
743
+ maximum_attempts : 2 ,
744
+ initial_interval : Some ( prost_dur ! ( from_secs( 5 ) ) ) ,
745
+ ..Default :: default ( )
746
+ } ) ,
747
+ ..Default :: default ( )
748
+ }
749
+ . into ( ) ,
750
+ ) )
742
751
. await
743
752
. unwrap ( ) ;
744
753
// Poll activity and verify that it's been scheduled
@@ -753,8 +762,8 @@ async fn activity_cancelled_after_heartbeat_times_out() {
753
762
754
763
// Verify activity got cancelled
755
764
let cancel_task = core. poll_activity_task ( ) . await . unwrap ( ) ;
756
- assert_eq ! ( cancel_task. task_token, task. task_token. clone( ) ) ;
757
765
assert_matches ! ( cancel_task. variant, Some ( act_task:: Variant :: Cancel ( _) ) ) ;
766
+ assert_eq ! ( cancel_task. task_token, task. task_token. clone( ) ) ;
758
767
759
768
// Complete activity with cancelled result
760
769
core. complete_activity_task ( ActivityTaskCompletion {
@@ -770,7 +779,7 @@ async fn activity_cancelled_after_heartbeat_times_out() {
770
779
starter
771
780
. get_client ( )
772
781
. await
773
- . terminate_workflow_execution ( task_q. clone ( ) , None )
782
+ . terminate_workflow_execution ( task_q, None )
774
783
. await
775
784
. unwrap ( ) ;
776
785
}
0 commit comments