|
23 | 23 | namespace mp {
|
24 | 24 | namespace test {
|
25 | 25 |
|
| 26 | +/** |
| 27 | + * Test setup class creating a two way connection between a |
| 28 | + * ProxyServer<FooInterface> object and a ProxyClient<FooInterface>. |
| 29 | + * |
| 30 | + * Provides client_disconnect and server_disconnect lambdas that can be used to |
| 31 | + * trigger disconnects and test handling of broken and closed connections. |
| 32 | + * |
| 33 | + * Accepts a client_owns_connection option to test different ProxyClient |
| 34 | + * destroy_connection values and control whether destroying the ProxyClient |
| 35 | + * object destroys the client Connection object. Normally it makes sense for |
| 36 | + * this to be true to simplify shutdown and avoid needing to call |
| 37 | + * client_disconnect manually, but false allows testing more ProxyClient |
| 38 | + * behavior and the "clientInvoke call made after disconnect" code path. |
| 39 | + */ |
| 40 | +class TestSetup |
| 41 | +{ |
| 42 | +public: |
| 43 | + std::thread thread; |
| 44 | + std::function<void()> server_disconnect; |
| 45 | + std::function<void()> client_disconnect; |
| 46 | + std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise; |
| 47 | + std::unique_ptr<ProxyClient<messages::FooInterface>> client; |
| 48 | + |
| 49 | + TestSetup(bool client_owns_connection = true) |
| 50 | + : thread{[&] { |
| 51 | + EventLoop loop("mptest", [](bool raise, const std::string& log) { |
| 52 | + std::cout << "LOG" << raise << ": " << log << "\n"; |
| 53 | + if (raise) throw std::runtime_error(log); |
| 54 | + }); |
| 55 | + auto pipe = loop.m_io_context.provider->newTwoWayPipe(); |
| 56 | + |
| 57 | + auto server_connection = |
| 58 | + std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) { |
| 59 | + auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>( |
| 60 | + std::make_shared<FooImplementation>(), connection); |
| 61 | + return capnp::Capability::Client(kj::mv(server_proxy)); |
| 62 | + }); |
| 63 | + server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; |
| 64 | + // Set handler to destroy the server when the client disconnects. This |
| 65 | + // is ignored if server_disconnect() is called instead. |
| 66 | + server_connection->onDisconnect([&] { server_connection.reset(); }); |
| 67 | + |
| 68 | + auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1])); |
| 69 | + auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>( |
| 70 | + client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(), |
| 71 | + client_connection.get(), /* destroy_connection= */ client_owns_connection); |
| 72 | + if (client_owns_connection) { |
| 73 | + client_connection.release(); |
| 74 | + } else { |
| 75 | + client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); }; |
| 76 | + } |
| 77 | + |
| 78 | + client_promise.set_value(std::move(client_proxy)); |
| 79 | + loop.loop(); |
| 80 | + }} |
| 81 | + { |
| 82 | + client = client_promise.get_future().get(); |
| 83 | + } |
| 84 | + |
| 85 | + ~TestSetup() |
| 86 | + { |
| 87 | + // Test that client cleanup_fns are executed. |
| 88 | + bool destroyed = false; |
| 89 | + client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; }); |
| 90 | + client.reset(); |
| 91 | + KJ_EXPECT(destroyed); |
| 92 | + |
| 93 | + thread.join(); |
| 94 | + } |
| 95 | +}; |
| 96 | + |
26 | 97 | KJ_TEST("Call FooInterface methods")
|
27 | 98 | {
|
28 |
| - std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> foo_promise; |
29 |
| - std::function<void()> disconnect_client; |
30 |
| - std::thread thread([&]() { |
31 |
| - EventLoop loop("mptest", [](bool raise, const std::string& log) { |
32 |
| - std::cout << "LOG" << raise << ": " << log << "\n"; |
33 |
| - }); |
34 |
| - auto pipe = loop.m_io_context.provider->newTwoWayPipe(); |
35 |
| - |
36 |
| - auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0])); |
37 |
| - auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>( |
38 |
| - connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(), |
39 |
| - connection_client.get(), /* destroy_connection= */ false); |
40 |
| - foo_promise.set_value(std::move(foo_client)); |
41 |
| - disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); }; |
42 |
| - |
43 |
| - auto connection_server = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) { |
44 |
| - auto foo_server = kj::heap<ProxyServer<messages::FooInterface>>(std::make_shared<FooImplementation>(), connection); |
45 |
| - return capnp::Capability::Client(kj::mv(foo_server)); |
46 |
| - }); |
47 |
| - connection_server->onDisconnect([&] { connection_server.reset(); }); |
48 |
| - loop.loop(); |
49 |
| - }); |
50 |
| - |
51 |
| - auto foo = foo_promise.get_future().get(); |
| 99 | + TestSetup setup; |
| 100 | + ProxyClient<messages::FooInterface>* foo = setup.client.get(); |
| 101 | + |
52 | 102 | KJ_EXPECT(foo->add(1, 2) == 3);
|
53 | 103 |
|
54 | 104 | FooStruct in;
|
@@ -127,14 +177,42 @@ KJ_TEST("Call FooInterface methods")
|
127 | 177 | mut.message = "init";
|
128 | 178 | foo->passMutable(mut);
|
129 | 179 | KJ_EXPECT(mut.message == "init build pass call return read");
|
| 180 | +} |
130 | 181 |
|
131 |
| - disconnect_client(); |
132 |
| - thread.join(); |
| 182 | +KJ_TEST("Call IPC method after client connection is closed") |
| 183 | +{ |
| 184 | + TestSetup setup{/*client_owns_connection=*/false}; |
| 185 | + ProxyClient<messages::FooInterface>* foo = setup.client.get(); |
| 186 | + KJ_EXPECT(foo->add(1, 2) == 3); |
| 187 | + setup.client_disconnect(); |
| 188 | + |
| 189 | + bool disconnected{false}; |
| 190 | + try { |
| 191 | + foo->add(1, 2); |
| 192 | + } catch (const std::logic_error& e) { |
| 193 | + KJ_EXPECT(std::string_view{e.what()} == "clientInvoke call made after disconnect"); |
| 194 | + disconnected = true; |
| 195 | + } |
| 196 | + KJ_EXPECT(disconnected); |
| 197 | +} |
133 | 198 |
|
134 |
| - bool destroyed = false; |
135 |
| - foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; }); |
136 |
| - foo.reset(); |
137 |
| - KJ_EXPECT(destroyed); |
| 199 | +KJ_TEST("Calling IPC method after server connection is closed") |
| 200 | +{ |
| 201 | + TestSetup setup; |
| 202 | + ProxyClient<messages::FooInterface>* foo = setup.client.get(); |
| 203 | + KJ_EXPECT(foo->add(1, 2) == 3); |
| 204 | + setup.server_disconnect(); |
| 205 | + |
| 206 | + bool disconnected{false}; |
| 207 | + try { |
| 208 | + foo->add(1, 2); |
| 209 | + } catch (const std::runtime_error& e) { |
| 210 | + std::string_view error{e.what()}; |
| 211 | + KJ_EXPECT(error.starts_with("kj::Exception: ")); |
| 212 | + KJ_EXPECT(error.find("disconnected: Peer disconnected.") != std::string_view::npos); |
| 213 | + disconnected = true; |
| 214 | + } |
| 215 | + KJ_EXPECT(disconnected); |
138 | 216 | }
|
139 | 217 |
|
140 | 218 | } // namespace test
|
|
0 commit comments