-
Notifications
You must be signed in to change notification settings - Fork 46
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mcu-util: explicitly join on task handles (#125)
* mcu-interface: fix blocking, add kill for can_rx task, fix ack * mcu-interface: use unbounded channel * mcu-interface: introduce explicit task join handle * fix typo * switch info to debug * mcu-util: kill tasks before joining * improve task joining
- Loading branch information
Showing
11 changed files
with
388 additions
and
130 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,80 @@ | ||
use std::time::Duration; | ||
use std::{any::Any, task::Poll, time::Duration}; | ||
|
||
use pin_project::pin_project; | ||
use tokio::sync::oneshot; | ||
|
||
pub mod canfd; | ||
pub mod isotp; | ||
|
||
const ACK_RX_TIMEOUT: Duration = Duration::from_millis(1500); | ||
|
||
pub type CanTaskResult = Result<(), CanTaskJoinError>; | ||
|
||
/// Handle that can be used to detect errors in the can receive task. | ||
/// | ||
/// Note that dropping this handle doesn't kill the task. | ||
#[pin_project] | ||
#[derive(Debug)] | ||
pub struct CanTaskHandle(#[pin] oneshot::Receiver<CanTaskResult>); | ||
|
||
impl std::future::Future for CanTaskHandle { | ||
type Output = CanTaskResult; | ||
|
||
fn poll( | ||
self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> Poll<Self::Output> { | ||
let rx = self.project().0; | ||
rx.poll(cx).map(|recv| match recv { | ||
Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()), | ||
Ok(Err(err)) => Err(err), | ||
}) | ||
} | ||
} | ||
|
||
impl CanTaskHandle { | ||
/// Blocks until the task is complete. | ||
/// | ||
/// It is recommended to simply .await instead, since `CanTaskHandle` implements | ||
/// `Future`. | ||
pub fn join(self) -> CanTaskResult { | ||
match self.0.blocking_recv() { | ||
Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()), | ||
Ok(Err(err)) => Err(err), | ||
} | ||
} | ||
} | ||
|
||
#[derive(thiserror::Error, Debug)] | ||
pub enum CanTaskJoinError { | ||
#[error(transparent)] | ||
Panic(#[from] CanTaskPanic), | ||
#[error(transparent)] | ||
Err(#[from] color_eyre::Report), | ||
} | ||
|
||
#[derive(thiserror::Error)] | ||
#[error("panic in thread used to receive from canbus")] | ||
// Mutex is there to make it implement Sync without using `unsafe` | ||
pub struct CanTaskPanic(std::sync::Mutex<Box<dyn Any + Send + 'static>>); | ||
|
||
impl CanTaskPanic { | ||
fn new(err: Box<dyn Any + Send + 'static>) -> Self { | ||
Self(std::sync::Mutex::new(err)) | ||
} | ||
|
||
/// Returns the object with which the task panicked. | ||
/// | ||
/// You can pass this into [`std::panic::resume_unwind()`] to propagate the | ||
/// panic. | ||
pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { | ||
self.0.into_inner().expect("infallible") | ||
} | ||
} | ||
|
||
impl std::fmt::Debug for CanTaskPanic { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_tuple(std::any::type_name::<CanTaskPanic>()) | ||
.finish() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.