Skip to content

Commit

Permalink
Replace std::mpsc with tokio::mpsc to reduce MSRV down to 1.63
Browse files Browse the repository at this point in the history
  • Loading branch information
Finomnis committed Oct 13, 2023
1 parent 768cb45 commit e35490f
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
uses: actions/checkout@v3

- name: Install MSRV toolchain
uses: dtolnay/rust-toolchain@1.72.0
uses: dtolnay/rust-toolchain@1.63.0

#- uses: Swatinem/rust-cache@v1

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "tokio-graceful-shutdown"
authors = ["Finomnis <[email protected]>"]
version = "0.14.0"
edition = "2021"
rust-version = "1.72"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/Finomnis/tokio-graceful-shutdown"
Expand Down
2 changes: 1 addition & 1 deletion src/into_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type SubsystemFunction<Err, ErrWrapper> =
/// #[async_trait::async_trait]
/// impl IntoSubsystem<miette::Report> for MySubsystem {
/// async fn run(self, subsys: SubsystemHandle) -> Result<()> {
/// subsys.request_shutdown();
/// subsys.initiate_shutdown();
/// Ok(())
/// }
/// }
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! log::info!("Countdown cancelled.");
//! },
//! _ = countdown() => {
//! subsys.request_shutdown();
//! subsys.initiate_shutdown();
//! }
//! };
//!
Expand Down
8 changes: 5 additions & 3 deletions src/subsystem/error_collector.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::sync::{mpsc, Arc};
use std::sync::Arc;

use tokio::sync::mpsc;

use crate::{errors::SubsystemError, ErrTypeTraits};

pub(crate) enum ErrorCollector<ErrType: ErrTypeTraits> {
Collecting(mpsc::Receiver<SubsystemError<ErrType>>),
Collecting(mpsc::UnboundedReceiver<SubsystemError<ErrType>>),
Finished(Arc<[SubsystemError<ErrType>]>),
}

impl<ErrType: ErrTypeTraits> ErrorCollector<ErrType> {
pub(crate) fn new(receiver: mpsc::Receiver<SubsystemError<ErrType>>) -> Self {
pub(crate) fn new(receiver: mpsc::UnboundedReceiver<SubsystemError<ErrType>>) -> Self {
Self::Collecting(receiver)
}

Expand Down
8 changes: 4 additions & 4 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{
future::Future,
mem::ManuallyDrop,
sync::{atomic::Ordering, mpsc, Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
};

use atomic::Atomic;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::{
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
{
let alive_guard = AliveGuard::new();

let (error_sender, errors) = mpsc::channel();
let (error_sender, errors) = mpsc::unbounded_channel();

let cancellation_token = self.inner.cancellation_token.child_token();

Expand All @@ -92,7 +92,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
match error_action {
ErrorAction::Forward => Some(e),
ErrorAction::CatchAndLocalShutdown => {
if let Err(mpsc::SendError(e)) = error_sender.send(e) {
if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
cancellation_token.cancel();
Expand Down
17 changes: 7 additions & 10 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{
future::Future,
sync::{mpsc, Arc},
time::Duration,
};
use std::{future::Future, sync::Arc, time::Duration};

use atomic::Atomic;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
Expand All @@ -27,7 +24,7 @@ use crate::{
/// use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
///
/// async fn my_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// subsys.request_shutdown();
/// subsys.initiate_shutdown();
/// Ok(())
/// }
///
Expand All @@ -47,7 +44,7 @@ use crate::{
pub struct Toplevel<ErrType: ErrTypeTraits = BoxedError> {
root_handle: SubsystemHandle<ErrType>,
toplevel_subsys: NestedSubsystem<ErrType>,
errors: mpsc::Receiver<SubsystemError<ErrType>>,
errors: mpsc::UnboundedReceiver<SubsystemError<ErrType>>,
}

impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
Expand All @@ -65,7 +62,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Fut: 'static + Future<Output = ()> + Send,
{
let (error_sender, errors) = mpsc::channel();
let (error_sender, errors) = mpsc::unbounded_channel();

let root_handle = subsystem::root_handle(move |e| {
match &e {
Expand All @@ -77,7 +74,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
}
};

if let Err(mpsc::SendError(e)) = error_sender.send(e) {
if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
});
Expand Down Expand Up @@ -150,7 +147,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
/// An error of type [`GracefulShutdownError`] if an error occurred.
///
pub async fn handle_shutdown_requests(
self,
mut self,
shutdown_timeout: Duration,
) -> Result<(), GracefulShutdownError<ErrType>> {
let collect_errors = move || {
Expand Down

0 comments on commit e35490f

Please sign in to comment.