Should async-byte-channel work with a Tokio LocalSet? #500
-
I can successfully use async-byte-channel with I wanted to try with a multi-threaded Tokio executor, but Should I be able to use async-byte-channel with a Tokio |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 1 reply
-
Note that I tried using Tokio's |
Beta Was this translation helpful? Give feedback.
-
I expect it to work with any executor. Can you post your code? |
Beta Was this translation helpful? Give feedback.
-
EDIT: I was missing a line, see the next answer 🤦♂️. Sure. I have a simple test interface that implements use capnp::Error;
use capnp::capability::Promise;
use crate::test_capnp::test_interface;
use capnp_rpc::pry;
pub struct TestImpl;
impl test_interface::Server for TestImpl {
fn echo(
&mut self,
params: test_interface::EchoParams,
mut results: test_interface::EchoResults,
) -> Promise<(), Error> {
let input = pry!(pry!(params.get()).get_input()).to_string().unwrap();
results.get().set_output(input);
Promise::ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp};
use crate::test::TestImpl;
use crate::test_capnp;
use tokio::io::duplex;
use tokio::runtime::UnhandledPanic;
use tokio::time::{sleep, Duration};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
#[tokio::test]
async fn it_echoes_successfully() {
let mut local_task_set = tokio::task::LocalSet::new();
local_task_set.unhandled_panic(UnhandledPanic::ShutdownRuntime);
let (client_writer, server_reader) = async_byte_channel::channel();
let (server_writer, client_reader) = async_byte_channel::channel();
// Create server_rpc_system
let server_network = Box::new(twoparty::VatNetwork::new(
server_reader,
server_writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
));
let test_impl: test_capnp::test_interface::Client = capnp_rpc::new_client(TestImpl {});
let server_rpc_system = RpcSystem::new(server_network, Some(test_impl.client));
// Create client_rpc_system
let client_network = Box::new(twoparty::VatNetwork::new(
client_reader,
client_writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut client_rpc_system = RpcSystem::new(client_network, None);
// Bootstrap client
let client: test_capnp::test_interface::Client = client_rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
// Server
local_task_set.spawn_local(async {
tokio::task::spawn_local(server_rpc_system);
});
// Timeout task
local_task_set.spawn_local(async {
sleep(Duration::from_millis(3000)).await;
panic!("Test timed out!!!");
});
// Run the client task
local_task_set.run_until(async {
let mut echo_request = client.echo_request();
echo_request.get().set_input("Hello, World");
let response = echo_request.send().promise.await.unwrap();
let output = response.get().unwrap().get_output().unwrap().to_string().unwrap();
assert_eq!("Hello, World", output);
}).await;
}
} I would expect the test to pass, but instead it times out with:
As mentioned above, instead of the // Prepare DuplexStream and convert it for VatNetwork
let (client_tokio, server_tokio) = duplex(4096);
let (client_reader_tokio, client_writer_tokio) = tokio::io::split(client_tokio);
let client_reader = TokioAsyncReadCompatExt::compat(client_reader_tokio);
let client_writer = TokioAsyncWriteCompatExt::compat_write(client_writer_tokio);
let (server_reader_tokio, server_writer_tokio) = tokio::io::split(server_tokio);
let server_reader = TokioAsyncReadCompatExt::compat(server_reader_tokio);
let server_writer = TokioAsyncWriteCompatExt::compat_write(server_writer_tokio); But it fails the exact same way: the client doesn't go further than the line |
Beta Was this translation helpful? Give feedback.
-
Argh I found it, I was not calling Just for my undersatnding: it is intuitive to me that I need to call |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
Argh I found it, I was not calling
tokio::task::spawn_local(client_rpc_system);
🤦♂️. Adding it to the client task above makes it work with both async-byte-channel and DuplexStream.Just for my undersatnding: it is intuitive to me that I need to call
tokio::task::spawn_local(server_rpc_system);
(because the server has to be running, obviously), but why is there a need for the same on the client side?