@@ -53,7 +53,6 @@ use crate::{ffi, FromPyPointer, IntoPy, Py, PyObject, PyTypeCheck, PyTypeInfo};
53
53
use std:: ffi:: { CStr , CString } ;
54
54
use std:: marker:: PhantomData ;
55
55
use std:: os:: raw:: c_int;
56
- use std:: thread;
57
56
58
57
/// A marker token that represents holding the GIL.
59
58
///
@@ -316,16 +315,102 @@ impl<'py> Python<'py> {
316
315
F : Send + FnOnce ( ) -> T ,
317
316
T : Send ,
318
317
{
318
+ use std:: mem:: transmute;
319
+ use std:: panic:: { catch_unwind, resume_unwind, AssertUnwindSafe } ;
320
+ use std:: sync:: mpsc:: { sync_channel, SendError , SyncSender } ;
321
+ use std:: thread:: { spawn, Result } ;
322
+ use std:: time:: Duration ;
323
+
324
+ use parking_lot:: { const_mutex, Mutex } ;
325
+
326
+ use crate :: impl_:: panic:: PanicTrap ;
327
+
319
328
// Use a guard pattern to handle reacquiring the GIL,
320
329
// so that the GIL will be reacquired even if `f` panics.
321
330
// The `Send` bound on the closure prevents the user from
322
331
// transferring the `Python` token into the closure.
323
332
let _guard = unsafe { SuspendGIL :: new ( ) } ;
324
333
325
334
// To close soundness loopholes w.r.t. `send_wrapper` or `scoped-tls`,
326
- // we run the closure on a newly created thread so that it cannot
335
+ // we run the closure on a separate thread so that it cannot
327
336
// access thread-local storage from the current thread.
328
- thread:: scope ( |s| s. spawn ( f) . join ( ) . unwrap ( ) )
337
+
338
+ // 1. Construct a task
339
+ struct Task ( * mut dyn FnMut ( ) ) ;
340
+ unsafe impl Send for Task { }
341
+
342
+ let ( result_sender, result_receiver) = sync_channel :: < Result < T > > ( 0 ) ;
343
+
344
+ let mut f = Some ( f) ;
345
+
346
+ let mut task = || {
347
+ let f = f. take ( ) . unwrap ( ) ;
348
+
349
+ let result = catch_unwind ( AssertUnwindSafe ( f) ) ;
350
+
351
+ result_sender. send ( result) . unwrap ( ) ;
352
+ } ;
353
+
354
+ // SAFETY: the current thread will block until the closure has returned
355
+ let task = Task ( unsafe { transmute ( & mut task as & mut dyn FnMut ( ) ) } ) ;
356
+
357
+ // 2. Enqueue task and spawn thread if necessary
358
+ let trap = PanicTrap :: new ( "allow_threads unwound while stack data was potentially accessed by another thread which is a bug" ) ;
359
+
360
+ static THREADS : Mutex < Vec < SyncSender < Task > > > = const_mutex ( Vec :: new ( ) ) ;
361
+
362
+ enum State {
363
+ Pending ( Task ) ,
364
+ Dispatched ( SyncSender < Task > ) ,
365
+ }
366
+
367
+ let mut state = State :: Pending ( task) ;
368
+
369
+ while let Some ( task_sender) = THREADS . lock ( ) . pop ( ) {
370
+ match state {
371
+ State :: Pending ( task) => match task_sender. send ( task) {
372
+ Ok ( ( ) ) => {
373
+ state = State :: Dispatched ( task_sender) ;
374
+ break ;
375
+ }
376
+ Err ( SendError ( task) ) => {
377
+ state = State :: Pending ( task) ;
378
+ continue ;
379
+ }
380
+ } ,
381
+ State :: Dispatched ( _sender) => unreachable ! ( ) ,
382
+ }
383
+ }
384
+
385
+ let task_sender = match state {
386
+ State :: Pending ( task) => {
387
+ let ( task_sender, task_receiver) = sync_channel :: < Task > ( 0 ) ;
388
+
389
+ spawn ( move || {
390
+ while let Ok ( task) = task_receiver. recv_timeout ( Duration :: from_secs ( 60 ) ) {
391
+ // SAFETY: all data accessed by `task` will stay alive until it completes
392
+ unsafe { ( * task. 0 ) ( ) } ;
393
+ }
394
+ } ) ;
395
+
396
+ task_sender. send ( task) . unwrap ( ) ;
397
+
398
+ task_sender
399
+ }
400
+ State :: Dispatched ( task_sender) => task_sender,
401
+ } ;
402
+
403
+ // 3. Wait for completion and check result
404
+ let result = result_receiver. recv ( ) . unwrap ( ) ;
405
+
406
+ trap. disarm ( ) ;
407
+
408
+ THREADS . lock ( ) . push ( task_sender) ;
409
+
410
+ match result {
411
+ Ok ( result) => result,
412
+ Err ( payload) => resume_unwind ( payload) ,
413
+ }
329
414
}
330
415
331
416
/// Evaluates a Python expression in the given context and returns the result.
0 commit comments