Skip to content

Commit 025a77e

Browse files
committed
refactor: Use EventLoopRef instead of addClient/removeClient
Use EventLoopRef to avoid reference counting bugs and be more exception safe
1 parent 394f966 commit 025a77e

File tree

3 files changed

+29
-49
lines changed

3 files changed

+29
-49
lines changed

include/mp/proxy-io.h

+5-31
Original file line numberDiff line numberDiff line change
@@ -310,21 +310,13 @@ class Connection
310310
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
311311
: m_loop(loop), m_stream(kj::mv(stream_)),
312312
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
313-
m_rpc_system(::capnp::makeRpcClient(m_network))
314-
{
315-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
316-
m_loop.addClient(lock);
317-
}
313+
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
318314
Connection(EventLoop& loop,
319315
kj::Own<kj::AsyncIoStream>&& stream_,
320316
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
321317
: m_loop(loop), m_stream(kj::mv(stream_)),
322318
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
323-
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
324-
{
325-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
326-
m_loop.addClient(lock);
327-
}
319+
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
328320

329321
//! Run cleanup functions. Must be called from the event loop thread. First
330322
//! calls synchronous cleanup functions while blocked (to free capnp
@@ -353,12 +345,12 @@ class Connection
353345
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
354346
// error in cases where f deletes this Connection object.
355347
m_on_disconnect.add(m_network.onDisconnect().then(
356-
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
348+
[f = std::move(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
357349
}
358350

359-
EventLoop& m_loop;
351+
EventLoopRef m_loop;
360352
kj::Own<kj::AsyncIoStream> m_stream;
361-
LoggingErrorHandler m_error_handler{m_loop};
353+
LoggingErrorHandler m_error_handler{*m_loop};
362354
kj::TaskSet m_on_disconnect{m_error_handler};
363355
::capnp::TwoPartyVatNetwork m_network;
364356
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -401,21 +393,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
401393
: m_client(std::move(client)), m_context(connection)
402394

403395
{
404-
{
405-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
406-
m_context.connection->m_loop.addClient(lock);
407-
}
408-
409396
// Handler for the connection getting destroyed before this client object.
410397
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
411398
// Release client capability by move-assigning to temporary.
412399
{
413400
typename Interface::Client(std::move(m_client));
414401
}
415-
{
416-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
417-
m_context.connection->m_loop.removeClient(lock);
418-
}
419402
m_context.connection = nullptr;
420403
});
421404

@@ -448,11 +431,6 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
448431
{
449432
typename Interface::Client(std::move(m_client));
450433
}
451-
{
452-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
453-
m_context.connection->m_loop.removeClient(lock);
454-
}
455-
456434
if (destroy_connection) {
457435
delete m_context.connection;
458436
m_context.connection = nullptr;
@@ -474,8 +452,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
474452
: m_impl(std::move(impl)), m_context(&connection)
475453
{
476454
assert(m_impl);
477-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
478-
m_context.connection->m_loop.addClient(lock);
479455
}
480456

481457
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
@@ -509,8 +485,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
509485
});
510486
}
511487
assert(m_context.cleanup_fns.empty());
512-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
513-
m_context.connection->m_loop.removeClient(lock);
514488
}
515489

516490
//! If the capnp interface defined a special "destroy" method, as described the

include/mp/proxy.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class EventLoopRef
7272
struct ProxyContext
7373
{
7474
Connection* connection;
75-
EventLoop* loop;
75+
EventLoopRef loop;
7676
CleanupList cleanup_fns;
7777

7878
ProxyContext(Connection* connection);

src/mp/proxy.cpp

+23-17
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ bool EventLoopRef::reset(std::unique_lock<std::mutex>* lock)
6565
return done;
6666
}
6767

68-
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {}
68+
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
6969

7070
Connection::~Connection()
7171
{
@@ -122,18 +122,18 @@ Connection::~Connection()
122122
m_sync_cleanup_fns.pop_front();
123123
}
124124
while (!m_async_cleanup_fns.empty()) {
125-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
126-
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
125+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
126+
m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
127127
m_async_cleanup_fns.pop_front();
128128
}
129-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
130-
m_loop.startAsyncThread(lock);
131-
m_loop.removeClient(lock);
129+
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
130+
m_loop->startAsyncThread(lock);
131+
m_loop.reset(&lock);
132132
}
133133

134134
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
135135
{
136-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
136+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
137137
// Add cleanup callbacks to the front of list, so sync cleanup functions run
138138
// in LIFO order. This is a good approach because sync cleanup functions are
139139
// added as client objects are created, and it is natural to clean up
@@ -147,13 +147,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
147147

148148
void Connection::removeSyncCleanup(CleanupIt it)
149149
{
150-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
150+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
151151
m_sync_cleanup_fns.erase(it);
152152
}
153153

154154
void Connection::addAsyncCleanup(std::function<void()> fn)
155155
{
156-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
156+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
157157
// Add async cleanup callbacks to the back of the list. Unlike the sync
158158
// cleanup list, this list order is more significant because it determines
159159
// the order server objects are destroyed when there is a sudden disconnect,
@@ -244,7 +244,7 @@ void EventLoop::post(const std::function<void()>& fn)
244244
return;
245245
}
246246
std::unique_lock<std::mutex> lock(m_mutex);
247-
addClient(lock);
247+
EventLoopRef ref(*this, &lock);
248248
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
249249
m_post_fn = &fn;
250250
int post_fd{m_post_fd};
@@ -253,13 +253,13 @@ void EventLoop::post(const std::function<void()>& fn)
253253
KJ_SYSCALL(write(post_fd, &buffer, 1));
254254
});
255255
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
256-
removeClient(lock);
257256
}
258257

259258
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
260259

261260
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
262261
{
262+
assert(m_num_clients > 0);
263263
m_num_clients -= 1;
264264
if (done(lock)) {
265265
m_cv.notify_all();
@@ -279,16 +279,22 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
279279
} else if (!m_async_fns.empty()) {
280280
m_async_thread = std::thread([this] {
281281
std::unique_lock<std::mutex> lock(m_mutex);
282-
while (true) {
282+
while (!done(lock)) {
283283
if (!m_async_fns.empty()) {
284-
addClient(lock);
284+
EventLoopRef ref{*this, &lock};
285285
const std::function<void()> fn = std::move(m_async_fns.front());
286286
m_async_fns.pop_front();
287287
Unlock(lock, fn);
288-
if (removeClient(lock)) break;
288+
// Important to explictly call ref.reset() here and
289+
// explicitly break if the EventLoop is done, not relying on
290+
// while condition above. Reason is that end of `ref`
291+
// lifetime can cause EventLoop::loop() to exit, and if
292+
// there is external code that immediately deletes the
293+
// EventLoop object as soon as EventLoop::loop() method
294+
// returns, checking the while condition may crash.
295+
if (ref.reset()) break;
296+
// Continue without waiting in case there are more async_fns
289297
continue;
290-
} else if (m_num_clients == 0) {
291-
break;
292298
}
293299
m_cv.wait(lock);
294300
}
@@ -394,7 +400,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
394400
const std::string from = context.getParams().getName();
395401
std::promise<ThreadContext*> thread_context;
396402
std::thread thread([&thread_context, from, this]() {
397-
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
403+
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
398404
g_thread_context.waiter = std::make_unique<Waiter>();
399405
thread_context.set_value(&g_thread_context);
400406
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);

0 commit comments

Comments
 (0)