Skip to content

Commit

Permalink
insert subscription oneshot in execute
Browse files Browse the repository at this point in the history
  • Loading branch information
Brendonovich committed Oct 9, 2023
1 parent 0e5fd70 commit bde3b2c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
4 changes: 1 addition & 3 deletions crates/core/src/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ where
};

match res {
ExecutorResult::Task(task, shutdown_tx) => {
let task_id = task.id;
ExecutorResult::Task(task) => {
self.streams.insert(task);
self.subscriptions.insert(task_id, shutdown_tx);
}
ExecutorResult::Future(fut) => {
self.streams.insert(fut.into());
Expand Down
21 changes: 10 additions & 11 deletions crates/core/src/exec/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ExecutorResult {
/// A future that will resolve to a response.
Future(RequestFuture),
/// A task that should be queued onto an async runtime.
Task(Task, oneshot::Sender<()>),
Task(Task),
}

// TODO: Move this into `build_router.rs` and turn it into a module with all the other `exec::*` types
Expand Down Expand Up @@ -62,20 +62,19 @@ impl<TCtx: Send + 'static> Router<TCtx> {
Some(subs) if subs.contains_key(data.id) => {
Err(ExecError::ErrSubscriptionDuplicateId)
}
Some(_) => match get_subscription(self, ctx, data) {
Some(subs) => match get_subscription(self, ctx, data) {
None => Err(ExecError::OperationNotFound),
Some(stream) => {
let (tx, rx) = oneshot::channel();

Ok(ExecutorResult::Task(
Task {
id,
stream,
done: false,
shutdown_rx: Some(rx),
},
tx,
))
subs.insert(id, tx);

Ok(ExecutorResult::Task(Task {
id,
stream,
done: false,
shutdown_rx: Some(rx),
}))
}
},
}
Expand Down
4 changes: 2 additions & 2 deletions crates/httpz/src/httpz_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
ExecutorResult::Future(fut) => fut.await,
ExecutorResult::Response(response) => response,
#[allow(clippy::panic)]
ExecutorResult::Task(_, _) => {
ExecutorResult::Task(_) => {
#[cfg(debug_assertions)]
panic!("rspc: unexpected HTTP endpoint returned 'Task'");
}
Expand Down Expand Up @@ -220,7 +220,7 @@ where
responses.push(resp);
}
#[allow(clippy::panic)]
ExecutorResult::Task(_, _) => {
ExecutorResult::Task(_) => {
#[cfg(debug_assertions)]
panic!("rspc: unexpected HTTP endpoint returned 'Task'");
}
Expand Down

0 comments on commit bde3b2c

Please sign in to comment.