diff --git a/crates/core/src/exec/connection.rs b/crates/core/src/exec/connection.rs index 96015d0b..b69925a8 100644 --- a/crates/core/src/exec/connection.rs +++ b/crates/core/src/exec/connection.rs @@ -43,10 +43,8 @@ where }; match res { - ExecutorResult::Task(task, shutdown_tx) => { - let task_id = task.id; + ExecutorResult::Task(task) => { self.streams.insert(task); - self.subscriptions.insert(task_id, shutdown_tx); } ExecutorResult::Future(fut) => { self.streams.insert(fut.into()); diff --git a/crates/core/src/exec/execute.rs b/crates/core/src/exec/execute.rs index 681ffe6b..8b782f8d 100644 --- a/crates/core/src/exec/execute.rs +++ b/crates/core/src/exec/execute.rs @@ -25,7 +25,7 @@ pub enum ExecutorResult { /// A future that will resolve to a response. Future(RequestFuture), /// A task that should be queued onto an async runtime. - Task(Task, oneshot::Sender<()>), + Task(Task), } // TODO: Move this into `build_router.rs` and turn it into a module with all the other `exec::*` types @@ -62,20 +62,19 @@ impl Router { Some(subs) if subs.contains_key(data.id) => { Err(ExecError::ErrSubscriptionDuplicateId) } - Some(_) => match get_subscription(self, ctx, data) { + Some(subs) => match get_subscription(self, ctx, data) { None => Err(ExecError::OperationNotFound), Some(stream) => { let (tx, rx) = oneshot::channel(); - Ok(ExecutorResult::Task( - Task { - id, - stream, - done: false, - shutdown_rx: Some(rx), - }, - tx, - )) + subs.insert(id, tx); + + Ok(ExecutorResult::Task(Task { + id, + stream, + done: false, + shutdown_rx: Some(rx), + })) } }, } diff --git a/crates/httpz/src/httpz_endpoint.rs b/crates/httpz/src/httpz_endpoint.rs index f58070a1..68eb55f8 100644 --- a/crates/httpz/src/httpz_endpoint.rs +++ b/crates/httpz/src/httpz_endpoint.rs @@ -128,7 +128,7 @@ where ExecutorResult::Future(fut) => fut.await, ExecutorResult::Response(response) => response, #[allow(clippy::panic)] - ExecutorResult::Task(_, _) => { + ExecutorResult::Task(_) => { #[cfg(debug_assertions)] panic!("rspc: unexpected HTTP endpoint returned 'Task'"); } @@ -220,7 +220,7 @@ where responses.push(resp); } #[allow(clippy::panic)] - ExecutorResult::Task(_, _) => { + ExecutorResult::Task(_) => { #[cfg(debug_assertions)] panic!("rspc: unexpected HTTP endpoint returned 'Task'"); }