Skip to content

Commit

Permalink
feat: add exponential backoff with jitter in the worker loop retrials
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsavio committed May 7, 2024
1 parent f041454 commit ff53453
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 12 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ htmlescape = "0.3.1"
actix-web-flash-messages = { version = "0.4", features = ["cookies"] }
actix-session = { version = "0.9", features = ["redis-rs-tls-session"] }
serde_json = "1.0"
exponential-backoff = "1.2.0"

[dependencies.reqwest]
version = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Add these validation checks to our POST /admin/password endpoint.

### Section 11
- [ ] There is no retry if the email delivery attempt fails. We could enhance this by adding a `n_retries` and `execute_after` columns to keep track of how many attemps have already taken place and when the next attempt should be executed.
- [ ] Add an exponential backoff with jitter in the `issue_delivery_worker::worker_loop` function.
- [X] Add an exponential backoff with jitter in the `issue_delivery_worker::worker_loop` function.
- [ ] Add an expiry mechanism for the idempotency keys using background workers as reference.

## Troubleshooting
Expand Down
5 changes: 1 addition & 4 deletions src/idempotency/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ pub async fn save_response(
http_response: HttpResponse,
) -> Result<HttpResponse, anyhow::Error> {
let (response_head, body) = http_response.into_parts();
let body = to_bytes(body)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
;
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
let status_code = response_head.status().as_u16() as i16;
let headers = {
let mut h = Vec::with_capacity(response_head.headers().len());
Expand Down
30 changes: 23 additions & 7 deletions src/issue_delivery_worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::domain::SubscriberEmail;
use crate::email_client::EmailClient;
use crate::{configuration::Settings, startup::get_connection_pool};
use exponential_backoff::Backoff;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use std::ops::Add;
use std::time::Duration;
use tracing::{field::display, Span};
use uuid::Uuid;
Expand Down Expand Up @@ -34,16 +36,30 @@ async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, any
Ok(issue)
}

fn get_backoff() -> Backoff {
let retries = 8;
let min = Duration::from_secs(1);
let max = Duration::from_secs(60);
let mut backoff = Backoff::new(retries, min, max);
backoff.set_jitter(0.25);
backoff
}

async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> {
let backoff = get_backoff();
loop {
match try_execute_task(&pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
Err(_) => {
tokio::time::sleep(Duration::from_secs(1)).await;
for duration in &backoff {
match try_execute_task(&pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
tokio::time::sleep(duration.add(Duration::from_secs(10))).await;
}
Err(_) => {
tokio::time::sleep(duration).await;
}
Ok(ExecutionOutcome::TaskCompleted) => {
break;
}
}
Ok(ExecutionOutcome::TaskCompleted) => {}
}
}
}
Expand Down

0 comments on commit ff53453

Please sign in to comment.