Skip to content

WIP: Workflow defintions #540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ You can buld and test the project using cargo:
Run integ tests with `cargo integ-test`. You will need to already be running the server:
`docker-compose -f .buildkite/docker/docker-compose.yaml up`

Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.

## Formatting
To format all code run:
`cargo fmt --all`
Expand Down
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ bimap = "0.6.1"
clap = { version = "4.0", features = ["derive"] }
criterion = "0.4"
rstest = "0.17"
serde = "1.0"
serde_json = "1.0"
temporal-sdk-core-test-utils = { path = "../test-utils" }
temporal-sdk = { path = "../sdk" }

Expand Down
12 changes: 8 additions & 4 deletions core/benches/workflow_replay.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use std::time::Duration;
use temporal_sdk::{WfContext, WorkflowFunction};
use temporal_sdk::{WfContext, WfExitValue, WorkflowFunction};
use temporal_sdk_core::replay::HistoryForReplay;
use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE;
use temporal_sdk_core_protos::{coresdk::AsJsonPayloadExt, DEFAULT_WORKFLOW_TYPE};
use temporal_sdk_core_test_utils::{canned_histories, replay_sdk_worker};

pub fn criterion_benchmark(c: &mut Criterion) {
Expand Down Expand Up @@ -58,7 +58,9 @@ fn timers_wf(num_timers: u32) -> WorkflowFunction {
for _ in 1..=num_timers {
ctx.timer(Duration::from_secs(1)).await;
}
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
})
}

Expand All @@ -71,6 +73,8 @@ fn big_signals_wf(num_tasks: usize) -> WorkflowFunction {
}
}

Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
})
}
8 changes: 5 additions & 3 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
time::Duration,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ActivityOptions, WfContext};
use temporal_sdk::{ActivityOptions, WfContext, WfExitValue};
use temporal_sdk_core_api::{
errors::{CompleteActivityError, PollActivityError},
Worker as WorkerTrait,
Expand All @@ -41,7 +41,7 @@ use temporal_sdk_core_protos::{
ScheduleActivity,
},
workflow_completion::WorkflowActivationCompletion,
ActivityTaskCompletion,
ActivityTaskCompletion, AsJsonPayloadExt,
},
temporal::api::{
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
Expand Down Expand Up @@ -945,7 +945,9 @@ async fn activity_tasks_from_completion_reserve_slots() {
})
.await;
complete_token.cancel();
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
}
});

Expand Down
30 changes: 20 additions & 10 deletions core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ use crate::{
worker::{client::mocks::mock_workflow_client, ManagedWFFunc},
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowFunction, WorkflowResult};
use temporal_sdk::{
ChildWorkflowOptions, Signal, WfContext, WfExitValue, WorkflowFunction, WorkflowResult,
};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
use temporal_sdk_core_protos::{
coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
},
workflow_completion::WorkflowActivationCompletion,
AsJsonPayloadExt,
},
workflow_completion::WorkflowActivationCompletion,
temporal::api::common::v1::Payload,
};
use tokio::join;

Expand Down Expand Up @@ -59,7 +65,9 @@ async fn signal_child_workflow(#[case] serial: bool) {
};
sigres.expect("signal result is ok");
res.status.expect("child wf result is ok");
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
};

worker.register_wf(wf_type.to_owned(), wf);
Expand All @@ -75,7 +83,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
worker.run_until_done().await.unwrap();
}

async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<()> {
async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<Payload> {
let child = ctx.child_workflow(ChildWorkflowOptions {
workflow_id: "child-id-1".to_string(),
workflow_type: "child".to_string(),
Expand All @@ -95,7 +103,9 @@ async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<()> {
.status
.expect("child wf result is ok");
assert_matches!(stat, child_workflow_result::Status::Cancelled(_));
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
}

#[tokio::test]
Expand Down
27 changes: 20 additions & 7 deletions core/src/core_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,31 @@ use std::{
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, WfContext, WorkflowResult,
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, WfContext, WfExitValue,
WorkflowResult,
};
use temporal_sdk_core_protos::{
coresdk::AsJsonPayloadExt,
temporal::api::{
common::v1::Payload,
enums::v1::{EventType, WorkflowTaskFailedCause},
failure::v1::Failure,
},
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE,
};

static DID_FAIL: AtomicBool = AtomicBool::new(false);
pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<()> {
pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<Payload> {
ctx.timer(Duration::from_secs(1)).await;
if DID_FAIL
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
panic!("Ahh");
}
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
}

/// Verifies that workflow panics (which in this case the Rust SDK turns into workflow activation
Expand Down Expand Up @@ -97,7 +102,9 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
ctx.timer(Duration::from_secs(1)).await;
}
ctx.timer(Duration::from_secs(1)).await;
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
});

worker
Expand Down Expand Up @@ -188,7 +195,9 @@ async fn activity_id_or_type_change_is_nondeterministic(
})
.await;
}
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
});

worker
Expand Down Expand Up @@ -257,7 +266,9 @@ async fn child_wf_id_or_type_change_is_nondeterministic(
})
.start(&ctx)
.await;
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
});

worker
Expand Down Expand Up @@ -300,7 +311,9 @@ async fn repro_channel_missing_because_nondeterminism() {
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {
ctx.patched("wrongid");
ctx.timer(Duration::from_secs(1)).await;
Ok(().into())
Ok(WfExitValue::Normal(
"success".as_json_payload().expect("serializes fine"),
))
});

worker
Expand Down
Loading