Skip to content

Commit d128633

Browse files
authored
Enable setting activity task queue throttle (#283)
1 parent 3f7f70d commit d128633

File tree

11 files changed

+81
-28
lines changed

11 files changed

+81
-28
lines changed

.buildkite/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM rust:1.58
1+
FROM rust:1.59
22

33
RUN rustup component add rustfmt && \
44
rustup component add clippy

client/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use uuid::Uuid;
6262

6363
#[cfg(any(feature = "mocks", test))]
6464
use futures::Future;
65+
use temporal_sdk_core_protos::temporal::api::taskqueue::v1::TaskQueueMetadata;
6566

6667
static LONG_POLL_METHOD_NAMES: [&str; 2] = ["PollWorkflowTaskQueue", "PollActivityTaskQueue"];
6768
/// The server times out polls after 60 seconds. Set our timeout to be slightly beyond that.
@@ -532,8 +533,11 @@ pub trait ServerGatewayApis {
532533

533534
/// Fetch new activity tasks from the provided queue. Should block indefinitely if there is no
534535
/// work.
535-
async fn poll_activity_task(&self, task_queue: String)
536-
-> Result<PollActivityTaskQueueResponse>;
536+
async fn poll_activity_task(
537+
&self,
538+
task_queue: String,
539+
max_tasks_per_sec: Option<f64>,
540+
) -> Result<PollActivityTaskQueueResponse>;
537541

538542
/// Notifies the server that workflow tasks for a given workflow should be sent to the normal
539543
/// non-sticky task queue. This normally happens when workflow has been evicted from the cache.
@@ -731,6 +735,7 @@ impl ServerGatewayApis for ServerGateway {
731735
async fn poll_activity_task(
732736
&self,
733737
task_queue: String,
738+
max_tasks_per_sec: Option<f64>,
734739
) -> Result<PollActivityTaskQueueResponse> {
735740
let request = PollActivityTaskQueueRequest {
736741
namespace: self.namespace.clone(),
@@ -739,7 +744,9 @@ impl ServerGatewayApis for ServerGateway {
739744
kind: TaskQueueKind::Normal as i32,
740745
}),
741746
identity: self.opts.identity.clone(),
742-
task_queue_metadata: None,
747+
task_queue_metadata: max_tasks_per_sec.map(|tps| TaskQueueMetadata {
748+
max_tasks_per_second: Some(tps),
749+
}),
743750
};
744751

745752
Ok(self

client/src/mocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mockall::mock! {
5454
-> impl Future<Output = Result<PollWorkflowTaskQueueResponse>> + Send + 'b
5555
where 'a: 'b, Self: 'b;
5656

57-
fn poll_activity_task<'a, 'b>(&self, task_queue: String)
57+
fn poll_activity_task<'a, 'b>(&self, task_queue: String, max_tasks_per_sec: Option<f64>)
5858
-> impl Future<Output = Result<PollActivityTaskQueueResponse>> + Send + 'b
5959
where 'a: 'b, Self: 'b;
6060

client/src/retry.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,14 @@ where
229229
async fn poll_activity_task(
230230
&self,
231231
task_queue: String,
232+
max_tasks_per_sec: Option<f64>,
232233
) -> Result<PollActivityTaskQueueResponse> {
233-
retry_call!(self, poll_activity_task, task_queue.clone())
234+
retry_call!(
235+
self,
236+
poll_activity_task,
237+
task_queue.clone(),
238+
max_tasks_per_sec
239+
)
234240
}
235241

236242
async fn reset_sticky_task_queue(
@@ -483,14 +489,16 @@ mod tests {
483489
.times(1);
484490
mock_gateway
485491
.expect_poll_activity_task()
486-
.returning(move |_| Err(Status::new(code, "non-retryable failure")))
492+
.returning(move |_, _| Err(Status::new(code, "non-retryable failure")))
487493
.times(1);
488494
let retry_gateway = RetryGateway::new(mock_gateway, Default::default());
489495
let result = retry_gateway
490496
.poll_workflow_task("tq".to_string(), false)
491497
.await;
492498
assert!(result.is_err());
493-
let result = retry_gateway.poll_activity_task("tq".to_string()).await;
499+
let result = retry_gateway
500+
.poll_activity_task("tq".to_string(), None)
501+
.await;
494502
assert!(result.is_err());
495503
}
496504
}
@@ -530,11 +538,11 @@ mod tests {
530538
.times(1);
531539
mock_gateway
532540
.expect_poll_activity_task()
533-
.returning(move |_| Err(Status::new(Code::Unknown, "retryable failure")))
541+
.returning(move |_, _| Err(Status::new(Code::Unknown, "retryable failure")))
534542
.times(50);
535543
mock_gateway
536544
.expect_poll_activity_task()
537-
.returning(|_| Ok(Default::default()))
545+
.returning(|_, _| Ok(Default::default()))
538546
.times(1);
539547

540548
let retry_gateway = RetryGateway::new(mock_gateway, Default::default());
@@ -543,7 +551,9 @@ mod tests {
543551
.poll_workflow_task("tq".to_string(), false)
544552
.await;
545553
assert!(result.is_ok());
546-
let result = retry_gateway.poll_activity_task("tq".to_string()).await;
554+
let result = retry_gateway
555+
.poll_activity_task("tq".to_string(), None)
556+
.await;
547557
assert!(result.is_ok());
548558
}
549559

@@ -562,11 +572,11 @@ mod tests {
562572
.times(1);
563573
mock_gateway
564574
.expect_poll_activity_task()
565-
.returning(move |_| Err(Status::new(code, "retryable failure")))
575+
.returning(move |_, _| Err(Status::new(code, "retryable failure")))
566576
.times(5);
567577
mock_gateway
568578
.expect_poll_activity_task()
569-
.returning(|_| Ok(Default::default()))
579+
.returning(|_, _| Ok(Default::default()))
570580
.times(1);
571581

572582
let retry_gateway = RetryGateway::new(mock_gateway, Default::default());
@@ -575,7 +585,9 @@ mod tests {
575585
.poll_workflow_task("tq".to_string(), false)
576586
.await;
577587
assert!(result.is_ok());
578-
let result = retry_gateway.poll_activity_task("tq".to_string()).await;
588+
let result = retry_gateway
589+
.poll_activity_task("tq".to_string(), None)
590+
.await;
579591
assert!(result.is_ok());
580592
}
581593
}

core-api/src/worker.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::time::Duration;
44
#[derive(Debug, Clone, derive_builder::Builder)]
55
#[builder(setter(into), build_fn(validate = "Self::validate"))]
66
#[non_exhaustive]
7-
// TODO: per-second queue limits
87
pub struct WorkerConfig {
98
/// The Temporal service namespace this worker is bound to
109
pub namespace: String,
@@ -65,6 +64,13 @@ pub struct WorkerConfig {
6564
/// `heartbeat_timeout * 0.8`.
6665
#[builder(default = "Duration::from_secs(30)")]
6766
pub default_heartbeat_throttle_interval: Duration,
67+
68+
/// Sets the maximum number of activities per second the task queue will dispatch, controlled
69+
/// server-side. Note that this only takes effect upon an activity poll request. If multiple
70+
/// workers on the same queue have different values set, they will thrash with the last poller
71+
/// winning.
72+
#[builder(setter(strip_option), default)]
73+
pub max_task_queue_activities_per_second: Option<f64>,
6874
}
6975

7076
impl WorkerConfig {

core/src/core_tests/activity_tasks.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use crate::{
55
mock_worker, poll_and_reply, test_worker_cfg, MockWorker, MocksHolder,
66
},
77
workflow::WorkflowCachingPolicy::NonSticky,
8-
ActivityHeartbeat,
8+
ActivityHeartbeat, MetricsContext, Worker, WorkerConfigBuilder,
99
};
1010
use futures::FutureExt;
11+
use std::sync::Arc;
1112
use std::{
1213
cell::RefCell,
1314
collections::{hash_map::Entry, HashMap, VecDeque},
@@ -16,7 +17,7 @@ use std::{
1617
time::Duration,
1718
};
1819
use temporal_client::mocks::{mock_gateway, mock_manual_gateway};
19-
use temporal_sdk_core_api::Worker;
20+
use temporal_sdk_core_api::Worker as WorkerTrait;
2021
use temporal_sdk_core_protos::{
2122
coresdk::{
2223
activity_result::{activity_resolution, ActivityExecutionResult, ActivityResolution},
@@ -61,7 +62,7 @@ async fn max_activities_respected() {
6162
mock_gateway
6263
.expect_poll_activity_task()
6364
.times(3)
64-
.returning(move |_| Ok(tasks.pop_front().unwrap()));
65+
.returning(move |_, _| Ok(tasks.pop_front().unwrap()));
6566
mock_gateway
6667
.expect_complete_activity_task()
6768
.returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default()));
@@ -347,7 +348,7 @@ async fn many_concurrent_heartbeat_cancels() {
347348
let mut calls_map = HashMap::<_, i32>::new();
348349
mock_gateway
349350
.expect_poll_activity_task()
350-
.returning(move |_| poll_resps.pop_front().unwrap());
351+
.returning(move |_, _| poll_resps.pop_front().unwrap());
351352
mock_gateway
352353
.expect_cancel_activity_task()
353354
.returning(move |_, _| async move { Ok(Default::default()) }.boxed());
@@ -585,3 +586,28 @@ async fn complete_act_with_fail_flushes_heartbeat() {
585586
let last_seen_payload = &last_seen_payload.take().unwrap().payloads[0];
586587
assert_eq!(last_seen_payload.data, &[last_hb]);
587588
}
589+
590+
#[tokio::test]
591+
async fn max_tq_acts_set_passed_to_poll_properly() {
592+
let rate = 9.28;
593+
let mut mock_gateway = mock_gateway();
594+
mock_gateway
595+
.expect_poll_activity_task()
596+
.returning(move |_, tps| {
597+
assert_eq!(tps, Some(rate));
598+
Ok(PollActivityTaskQueueResponse {
599+
task_token: vec![1],
600+
..Default::default()
601+
})
602+
});
603+
604+
let cfg = WorkerConfigBuilder::default()
605+
.namespace("enchi")
606+
.task_queue("cat")
607+
.max_concurrent_at_polls(1_usize)
608+
.max_task_queue_activities_per_second(rate)
609+
.build()
610+
.unwrap();
611+
let worker = Worker::new(cfg, None, Arc::new(mock_gateway), MetricsContext::default());
612+
worker.poll_activity_task().await.unwrap();
613+
}

core/src/core_tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async fn shutdown_interrupts_both_polls() {
5353
mock_gateway
5454
.expect_poll_activity_task()
5555
.times(1)
56-
.returning(move |_| {
56+
.returning(move |_, _| {
5757
async move {
5858
BARR.wait().await;
5959
sleep(Duration::from_secs(1)).await;

core/src/pollers/poll_buffer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,13 @@ pub fn new_activity_task_buffer(
221221
task_queue: String,
222222
concurrent_pollers: usize,
223223
buffer_size: usize,
224+
max_tps: Option<f64>,
224225
) -> PollActivityTaskBuffer {
225226
LongPollBuffer::new(
226227
move || {
227228
let sg = sg.clone();
228229
let task_queue = task_queue.clone();
229-
async move { sg.poll_activity_task(task_queue).await }
230+
async move { sg.poll_activity_task(task_queue, max_tps).await }
230231
},
231232
concurrent_pollers,
232233
buffer_size,

core/src/protosext/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ impl ValidScheduleLA {
361361
};
362362
let retry_policy = v
363363
.retry_policy
364-
.ok_or(anyhow!("Retry policy must be defined!"))?;
364+
.ok_or_else(|| anyhow!("Retry policy must be defined!"))?;
365365
let local_retry_threshold = v
366366
.local_retry_threshold
367367
.clone()

core/src/worker/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ impl Worker {
217217
config.task_queue.clone(),
218218
config.max_concurrent_at_polls,
219219
config.max_concurrent_at_polls * 2,
220+
config.max_task_queue_activities_per_second,
220221
);
221222
let act_metrics = metrics.with_new_attrs([activity_poller()]);
222223
ap.set_num_pollers_handler(move |np| act_metrics.record_num_pollers(np));
@@ -343,7 +344,7 @@ impl Worker {
343344
///
344345
/// Returns `Ok(None)` in the event of a poll timeout or if the polling loop should otherwise
345346
/// be restarted
346-
pub(crate) async fn activity_poll(&self) -> Result<Option<ActivityTask>, PollActivityError> {
347+
async fn activity_poll(&self) -> Result<Option<ActivityTask>, PollActivityError> {
347348
let act_mgr_poll = async {
348349
if let Some(ref act_mgr) = self.at_task_mgr {
349350
act_mgr.poll().await
@@ -948,7 +949,7 @@ mod tests {
948949
let mut mock_gateway = mock_gateway();
949950
mock_gateway
950951
.expect_poll_activity_task()
951-
.returning(|_| Ok(PollActivityTaskQueueResponse::default()));
952+
.returning(|_, _| Ok(PollActivityTaskQueueResponse::default()));
952953

953954
let cfg = test_worker_cfg()
954955
.max_outstanding_activities(5_usize)
@@ -980,7 +981,7 @@ mod tests {
980981
let mut mock_gateway = mock_gateway();
981982
mock_gateway
982983
.expect_poll_activity_task()
983-
.returning(|_| Err(tonic::Status::internal("ahhh")));
984+
.returning(|_, _| Err(tonic::Status::internal("ahhh")));
984985

985986
let cfg = test_worker_cfg()
986987
.max_outstanding_activities(5_usize)

sdk-core-protos/src/history_info.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ impl HistoryInfo {
4242
attrs
4343
.workflow_type
4444
.as_ref()
45-
.ok_or(anyhow!(
46-
"No workflow type defined in execution started attributes"
47-
))?
45+
.ok_or_else(|| {
46+
anyhow!("No workflow type defined in execution started attributes")
47+
})?
4848
.name
4949
.clone()
5050
}

0 commit comments

Comments
 (0)