diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 8857d0999..87c681880 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -29,7 +29,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; -use omniqueue::backends::{RedisBackend, RedisConfig}; +use omniqueue::backends::{redis::DeadLetterQueueConfig, RedisBackend, RedisConfig}; use redis::{AsyncCommands as _, RedisResult}; use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer}; @@ -50,6 +50,9 @@ const DELAYED: &str = "{queue}_svix_delayed"; /// The key for the lock guarding the delayed queue background task. const DELAYED_LOCK: &str = "{queue}_svix_delayed_lock"; +/// The key for the DLQ +const DLQ: &str = "{queue}_svix_dlq"; + // v2 KEY CONSTANTS const LEGACY_V2_MAIN: &str = "{queue}_svix_main"; const LEGACY_V2_PROCESSING: &str = "{queue}_svix_processing"; @@ -86,6 +89,7 @@ pub async fn new_pair( MAIN, DELAYED, DELAYED_LOCK, + DLQ, ) .await } @@ -124,6 +128,7 @@ async fn new_pair_inner( main_queue_name: &'static str, delayed_queue_name: &'static str, delayed_lock_name: &'static str, + dlq_name: &'static str, ) -> (TaskQueueProducer, TaskQueueConsumer) { let main_queue_name = format!("{queue_prefix}{main_queue_name}"); let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}"); @@ -206,7 +211,10 @@ async fn new_pair_inner( consumer_name: WORKER_CONSUMER.to_owned(), payload_key: QUEUE_KV_KEY.to_owned(), ack_deadline_ms: pending_duration, - dlq_config: None, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_name.to_string(), + max_receives: 3, + }), sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()), }; @@ -472,6 +480,7 @@ pub mod tests { "{test}_idle_period", "{test}_idle_period_delayed", "{test}_idle_period_delayed_lock", + "{test}_dlq", ) .await; @@ -542,6 +551,7 @@ pub mod tests { "{test}_ack", "{test}_ack_delayed", "{test}_ack_delayed_lock", + "{test}_dlq", ) .await; @@ -588,6 +598,7 @@ pub mod tests { "{test}_nack", "{test}_nack_delayed", "{test}_nack_delayed_lock", + "{test}_dlq", ) .await; @@ -631,6 +642,7 @@ pub mod tests { "{test}_delay", "{test}_delay_delayed", "{test}_delay_delayed_lock", + "{test}_dlq", ) .await; @@ -804,6 +816,7 @@ pub mod tests { v3_main, v2_delayed, v2_delayed_lock, + "dlq-bruh", ) .await; diff --git a/server/svix-server/src/v1/endpoints/admin.rs b/server/svix-server/src/v1/endpoints/admin.rs new file mode 100644 index 000000000..30fd99e95 --- /dev/null +++ b/server/svix-server/src/v1/endpoints/admin.rs @@ -0,0 +1,28 @@ +use aide::{ + axum::{routing::put_with, ApiRouter}, + transform::TransformPathItem, +}; +use axum::extract::State; +use svix_server_derive::aide_annotate; + +use crate::{core::permissions, error::Result, v1::utils::NoContent, AppState}; + +/// Redrive DLQ +#[aide_annotate(op_id = "v1.admin.redrive-dlq")] +pub async fn redrive_dlq( + State(AppState { queue_tx, .. }): State, + _: permissions::Organization, +) -> Result { + if let Err(e) = queue_tx.redrive_dlq().await { + tracing::warn!(error = ?e, "DLQ redrive failed"); + } + Ok(NoContent) +} + +pub fn router() -> ApiRouter { + ApiRouter::new().api_route_with( + "/admin/redrive-dlq", + put_with(redrive_dlq, redrive_dlq_operation), + move |op: TransformPathItem<'_>| op.tag("Admin".as_ref()).hidden(true), + ) +} diff --git a/server/svix-server/src/v1/endpoints/mod.rs b/server/svix-server/src/v1/endpoints/mod.rs index 558abc23d..001aa3115 100644 --- a/server/svix-server/src/v1/endpoints/mod.rs +++ b/server/svix-server/src/v1/endpoints/mod.rs @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: © 2022 Svix Authors // SPDX-License-Identifier: MIT +pub mod admin; pub mod application; pub mod attempt; pub mod auth; diff --git a/server/svix-server/src/v1/mod.rs b/server/svix-server/src/v1/mod.rs index 2cbd9ff46..3cc1be9d8 100644 --- a/server/svix-server/src/v1/mod.rs +++ b/server/svix-server/src/v1/mod.rs @@ -21,6 +21,7 @@ pub fn router() -> ApiRouter { .merge(endpoints::event_type::router()) .merge(endpoints::message::router()) .merge(endpoints::attempt::router()) + .merge(endpoints::admin::router()) .layer( TraceLayer::new_for_http() .make_span_with(AxumOtelSpanCreator)