From 339580ddb59b47895e1fab8ce523631a4d32df4b Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 18:14:04 -0700 Subject: [PATCH 1/3] Remove spawn API --- src/ticked_async_executor.rs | 43 ++++++++++-------------------------- tests/tokio_tests.rs | 8 +++---- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 6ce42e9..dbb6388 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -22,6 +22,7 @@ type Payload = (TaskIdentifier, async_task::Runnable); pub struct TickedAsyncExecutor { channel: (mpsc::Sender, mpsc::Receiver), num_woken_tasks: Arc, + num_spawned_tasks: Arc, // TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel @@ -52,22 +53,6 @@ where } } - pub fn spawn( - &self, - identifier: impl Into, - future: impl Future + Send + 'static, - ) -> Task - where - T: Send + 'static, - { - let identifier = identifier.into(); - let future = self.droppable_future(identifier.clone(), future); - let schedule = self.runnable_schedule_cb(identifier); - let (runnable, task) = async_task::spawn(future, schedule); - runnable.schedule(); - task - } - pub fn spawn_local( &self, identifier: impl Into, @@ -172,7 +157,7 @@ mod tests { fn test_multiple_tasks() { let executor = TickedAsyncExecutor::default(); executor - .spawn("A", async move { + .spawn_local("A", async move { tokio::task::yield_now().await; }) .detach(); @@ -226,16 +211,7 @@ mod tests { fn test_ticked_timer() { let executor = TickedAsyncExecutor::default(); - for _ in 0..10 { - let timer: TickedTimer = executor.create_timer(); - executor - .spawn("ThreadedTimer", async move { - timer.sleep_for(256.0).await; - }) - .detach(); - } - - for _ in 0..10 { + for _ in 0..10000 { let timer = executor.create_timer(); executor .spawn_local("LocalTimer", async move { @@ -255,25 +231,30 @@ mod tests { let elapsed = now.elapsed(); println!("Elapsed: {:?}", elapsed); println!("Total: {:?}", instances); + println!( + "Min: {:?}, Max: {:?}", + instances.iter().min(), + instances.iter().max() + ); // Test Timer cancellation let timer = executor.create_timer(); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { timer.sleep_for(1000.0).await; }) .detach(); let timer = executor.create_timer(); executor - .spawn_local("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { timer.sleep_for(1000.0).await; }) .detach(); let mut tick_event = executor.tick_channel(); executor - .spawn("ThreadedTickFuture", async move { + .spawn_local("LocalTickFuture1", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { @@ -285,7 +266,7 @@ mod tests { let mut tick_event = executor.tick_channel(); executor - .spawn_local("LocalTickFuture", async move { + .spawn_local("LocalTickFuture2", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 6e5ee0d..6dcdb47 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -9,7 +9,7 @@ fn test_tokio_join() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { let (a, b) = tokio::join!(rx1.recv(), rx2.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -19,7 +19,7 @@ fn test_tokio_join() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { let (a, b) = tokio::join!(rx3.recv(), rx4.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -46,7 +46,7 @@ fn test_tokio_select() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (_tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { tokio::select! { data = rx1.recv() => { assert_eq!(data.unwrap(), 10); @@ -59,7 +59,7 @@ fn test_tokio_select() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (_tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { tokio::select! { data = rx3.recv() => { assert_eq!(data.unwrap(), 10); From 2e5a29f00db52239a726d846da03f9e66d6818fa Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 18:47:02 -0700 Subject: [PATCH 2/3] Update ticked_async_executor.rs --- src/ticked_async_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index dbb6388..7704242 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -211,7 +211,7 @@ mod tests { fn test_ticked_timer() { let executor = TickedAsyncExecutor::default(); - for _ in 0..10000 { + for _ in 0..10 { let timer = executor.create_timer(); executor .spawn_local("LocalTimer", async move { From cddc75168ef57c7c2ede1c31d59e782263ad6b86 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Sat, 24 Aug 2024 14:01:34 -0700 Subject: [PATCH 3/3] Update rust.yml --- .github/workflows/rust.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2ad197f..cb1407c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -46,9 +46,9 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} files: target/cobertura.xml - - name: Miri - run: | - rustup toolchain install nightly --component miri - rustup override set nightly - cargo miri setup - cargo miri test + # - name: Miri + # run: | + # rustup toolchain install nightly --component miri + # rustup override set nightly + # cargo miri setup + # cargo miri test