Skip to content

Commit

Permalink
Reimplement in 100% safe code (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull authored Jun 11, 2023
1 parent f17bf33 commit 6995e2e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.35']
rust: ['1.63']
steps:
- uses: actions/checkout@v3
- name: Install Rust
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "easy-parallel"
version = "3.3.0"
authors = ["Stjepan Glavina <[email protected]>"]
edition = "2018"
rust-version = "1.35"
rust-version = "1.63"
description = "Run closures in parallel"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/easy-parallel"
Expand Down
90 changes: 35 additions & 55 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![forbid(unsafe_code)]

use std::fmt;
use std::iter::{self, FromIterator};
use std::mem;
use std::panic;
use std::process;
use std::sync::mpsc;
use std::thread;

Expand Down Expand Up @@ -256,58 +255,50 @@ impl<'a, T> Parallel<'a, T> {
T: Send + 'a,
C: FromIterator<T>,
{
// Set up a guard that aborts on panic.
let guard = NoPanic;
// Set up a new thread scope.
thread::scope(|scope| {
// Join handles for spawned threads.
let mut handles = Vec::new();

// Join handles for spawned threads.
let mut handles = Vec::new();
// Channels to collect results from spawned threads.
let mut receivers = Vec::new();

// Channels to collect results from spawned threads.
let mut receivers = Vec::new();
for f in self.closures.into_iter() {
// Wrap into a closure that sends the result back.
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();

// Spawn a thread for each closure after the first one.
for f in self.closures.into_iter() {
// Wrap into a closure that sends the result back.
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();

// Erase the `'a` lifetime.
let f: Box<dyn FnOnce() + Send + 'a> = Box::new(f);
let f: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(f) };

// Spawn a thread for the closure.
handles.push(thread::spawn(f));
receivers.push(receiver);
}
// Spawn it on the scope.
handles.push(scope.spawn(f));
receivers.push(receiver);
}

let mut last_err = None;
let mut last_err = None;

// Run the main closure on the main thread.
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
// Run the main closure on the main thread.
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));

// Join threads and save the last panic if there was one.
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
// Join threads and save the last panic if there was one.
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
}
}
}

// Drop the guard because we may resume a panic now.
drop(guard);

// If a thread has panicked, resume the last collected panic.
if let Some(err) = last_err {
panic::resume_unwind(err);
}
// If a thread has panicked, resume the last collected panic.
if let Some(err) = last_err {
panic::resume_unwind(err);
}

// Collect the results from threads.
let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();
// Collect the results from threads.
let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();

// If the main closure panicked, resume its panic.
match res {
Ok(r) => (results, r),
Err(err) => panic::resume_unwind(err),
}
// If the main closure panicked, resume its panic.
match res {
Ok(r) => (results, r),
Err(err) => panic::resume_unwind(err),
}
})
}
}

Expand All @@ -324,14 +315,3 @@ impl<T> Default for Parallel<'_, T> {
Self::new()
}
}

/// Aborts the process if dropped while panicking.
struct NoPanic;

impl Drop for NoPanic {
fn drop(&mut self) {
if thread::panicking() {
process::abort();
}
}
}

0 comments on commit 6995e2e

Please sign in to comment.