-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathproxy.cpp
399 lines (365 loc) · 15.7 KB
/
proxy.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
// Copyright (c) 2019 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <mp/proxy.h>
#include <mp/proxy-io.h>
#include <mp/proxy-types.h>
#include <mp/proxy.capnp.h>
#include <mp/type-threadmap.h>
#include <mp/util.h>
#include <assert.h>
#include <atomic>
#include <capnp/blob.h>
#include <capnp/capability.h>
#include <condition_variable>
#include <functional>
#include <future>
#include <kj/async-io.h>
#include <kj/async.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/memory.h>
#include <map>
#include <memory>
#include <mutex>
#include <stddef.h>
#include <stdexcept>
#include <string>
#include <sys/socket.h>
#include <thread>
#include <tuple>
#include <unistd.h>
#include <utility>
namespace mp {
template <typename Interface>
struct ProxyServer;
thread_local ThreadContext g_thread_context;
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
{
KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
m_loop.log() << "Uncaught exception in daemonized task.";
}
Connection::~Connection()
{
// Shut down RPC system first, since this will garbage collect Server
// objects that were not freed before the connection was closed, some of
// which may call addAsyncCleanup and add more cleanup callbacks which can
// run below.
m_rpc_system.reset();
// ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
// handlers are in the async list.
//
// The ProxyClient cleanup handlers are synchronous because they are fast
// and don't do anything besides release capnp resources and reset state so
// future calls to client methods immediately throw exceptions instead of
// trying to communicating across the socket. The synchronous callbacks set
// ProxyClient capability pointers to null, so new method calls on client
// objects fail without triggering i/o or relying on event loop which may go
// out of scope or trigger obscure capnp i/o errors.
//
// The ProxySever cleanup handlers call user defined destructors on server
// object, which can run arbitrary blocking bitcoin code so they have to run
// asynchronously in a different thread. The asynchronous cleanup functions
// intentionally aren't started until after the synchronous cleanup
// functions run, so client objects are fully disconnected before bitcoin
// code in the destructors are run. This way if the bitcoin code tries to
// make client requests the requests will just fail immediately instead of
// sending i/o or accessing the event loop.
//
// The context where Connection objects are destroyed and this destructor is invoked
// is different depending on whether this is an outgoing connection being used
// to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
// connection implementing the Init interface and handling the Init.makeX() calls.
//
// Either way when a connection is closed, capnp behavior is to call all
// ProxyServer object destructors first, and then trigger an onDisconnect
// callback.
//
// On incoming side of the connection, the onDisconnect callback is written
// to delete the Connection object from the m_incoming_connections and call
// this destructor which calls Connection::disconnect.
//
// On the outgoing side, the Connection object is owned by top level client
// object client, which onDisconnect handler doesn't have ready access to,
// so onDisconnect handler just calls Connection::disconnect directly
// instead.
//
// Either way disconnect code runs in the event loop thread and called both
// on clean and unclean shutdowns. In unclean shutdown case when the
// connection is broken, sync and async cleanup lists will filled with
// callbacks. In the clean shutdown case both lists will be empty.
while (!m_sync_cleanup_fns.empty()) {
m_sync_cleanup_fns.front()();
m_sync_cleanup_fns.pop_front();
}
while (!m_async_cleanup_fns.empty()) {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
m_async_cleanup_fns.pop_front();
}
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.startAsyncThread(lock);
m_loop.removeClient(lock);
}
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
// Add cleanup callbacks to the front of list, so sync cleanup functions run
// in LIFO order. This is a good approach because sync cleanup functions are
// added as client objects are created, and it is natural to clean up
// objects in the reverse order they were created. In practice, however,
// order should not be significant because the cleanup callbacks run
// synchronously in a single batch when the connection is broken, and they
// only reset the connection pointers in the client objects without actually
// deleting the client objects.
return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
}
void Connection::removeSyncCleanup(CleanupIt it)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_sync_cleanup_fns.erase(it);
}
void Connection::addAsyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
// Add async cleanup callbacks to the back of the list. Unlike the sync
// cleanup list, this list order is more significant because it determines
// the order server objects are destroyed when there is a sudden disconnect,
// and it is possible objects may need to be destroyed in a certain order.
// This function is called in ProxyServerBase destructors, and since capnp
// destroys ProxyServer objects in LIFO order, we should preserve this
// order, and add cleanup callbacks to the end of the list so they can be
// run starting from the beginning of the list.
//
// In bitcoin core, running these callbacks in the right order is
// particularly important for the wallet process, because it uses blocking
// shared_ptrs and requires Chain::Notification pointers owned by the node
// process to be destroyed before the WalletLoader objects owned by the node
// process, otherwise shared pointer counts of the CWallet objects (which
// inherit from Chain::Notification) will not be 1 when WalletLoader
// destructor runs and it will wait forever for them to be released.
m_async_cleanup_fns.emplace(m_async_cleanup_fns.end(), std::move(fn));
}
EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
: m_exe_name(exe_name),
m_io_context(kj::setupAsyncIo()),
m_task_set(new kj::TaskSet(m_error_handler)),
m_log_fn(std::move(log_fn)),
m_context(context)
{
int fds[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
m_wait_fd = fds[0];
m_post_fd = fds[1];
}
EventLoop::~EventLoop()
{
if (m_async_thread.joinable()) m_async_thread.join();
const std::lock_guard<std::mutex> lock(m_mutex);
KJ_ASSERT(m_post_fn == nullptr);
KJ_ASSERT(m_async_fns.empty());
KJ_ASSERT(m_wait_fd == -1);
KJ_ASSERT(m_post_fd == -1);
KJ_ASSERT(m_num_clients == 0);
// Spin event loop. wait for any promises triggered by RPC shutdown.
// auto cleanup = kj::evalLater([]{});
// cleanup.wait(m_io_context.waitScope);
}
void EventLoop::loop()
{
assert(!g_thread_context.loop_thread);
g_thread_context.loop_thread = true;
KJ_DEFER(g_thread_context.loop_thread = false);
kj::Own<kj::AsyncIoStream> wait_stream{
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
int post_fd{m_post_fd};
char buffer = 0;
for (;;) {
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
std::unique_lock<std::mutex> lock(m_mutex);
if (m_post_fn) {
Unlock(lock, *m_post_fn);
m_post_fn = nullptr;
m_cv.notify_all();
} else if (done(lock)) {
// Intentionally do not break if m_post_fn was set, even if done()
// would return true, to ensure that the removeClient write(post_fd)
// call always succeeds and the loop does not exit between the time
// that the done condition is set and the write call is made.
break;
}
}
log() << "EventLoop::loop done, cancelling event listeners.";
m_task_set.reset();
log() << "EventLoop::loop bye.";
wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd));
const std::unique_lock<std::mutex> lock(m_mutex);
m_wait_fd = -1;
m_post_fd = -1;
}
void EventLoop::post(const std::function<void()>& fn)
{
if (std::this_thread::get_id() == m_thread_id) {
fn();
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
addClient(lock);
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
m_post_fn = &fn;
int post_fd{m_post_fd};
Unlock(lock, [&] {
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1));
});
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
removeClient(lock);
}
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
{
m_num_clients -= 1;
if (done(lock)) {
m_cv.notify_all();
int post_fd{m_post_fd};
lock.unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
return true;
}
return false;
}
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
{
if (m_async_thread.joinable()) {
m_cv.notify_all();
} else if (!m_async_fns.empty()) {
m_async_thread = std::thread([this] {
std::unique_lock<std::mutex> lock(m_mutex);
while (true) {
if (!m_async_fns.empty()) {
addClient(lock);
const std::function<void()> fn = std::move(m_async_fns.front());
m_async_fns.pop_front();
Unlock(lock, fn);
if (removeClient(lock)) break;
continue;
} else if (m_num_clients == 0) {
break;
}
m_cv.wait(lock);
}
});
}
}
bool EventLoop::done(std::unique_lock<std::mutex>& lock) const
{
assert(m_num_clients >= 0);
assert(lock.owns_lock());
assert(lock.mutex() == &m_mutex);
return m_num_clients == 0 && m_async_fns.empty();
}
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
{
const std::unique_lock<std::mutex> lock(mutex);
auto thread = threads.find(connection);
if (thread != threads.end()) return {thread, false};
thread = threads.emplace(
std::piecewise_construct, std::forward_as_tuple(connection),
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
thread->second.setCleanup([&threads, &mutex, thread] {
// Note: it is safe to use the `thread` iterator in this cleanup
// function, because the iterator would only be invalid if the map entry
// was removed, and if the map entry is removed the ProxyClient<Thread>
// destructor unregisters the cleanup.
// Connection is being destroyed before thread client is, so reset
// thread client m_cleanup_it member so thread client destructor does not
// try unregister this callback after connection is destroyed.
thread->second.m_cleanup_it.reset();
// Remove connection pointer about to be destroyed from the map
const std::unique_lock<std::mutex> lock(mutex);
threads.erase(thread);
});
return {thread, true};
}
ProxyClient<Thread>::~ProxyClient()
{
// If thread is being destroyed before connection is destroyed, remove the
// cleanup callback that was registered to handle the connection being
// destroyed before the thread being destroyed.
if (m_cleanup_it) {
m_context.connection->removeSyncCleanup(*m_cleanup_it);
}
}
void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
{
assert(fn);
assert(!m_cleanup_it);
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
}
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
: m_thread_context(thread_context), m_thread(std::move(thread))
{
assert(m_thread_context.waiter.get() != nullptr);
}
ProxyServer<Thread>::~ProxyServer()
{
if (!m_thread.joinable()) return;
// Stop async thread and wait for it to exit. Need to wait because the
// m_thread handle needs to outlive the thread to avoid "terminate called
// without an active exception" error. An alternative to waiting would be
// detach the thread, but this would introduce nondeterminism which could
// make code harder to debug or extend.
assert(m_thread_context.waiter.get());
std::unique_ptr<Waiter> waiter;
{
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
//! Reset thread context waiter pointer, as shutdown signal for done
//! lambda passed as waiter->wait() argument in makeThread code below.
waiter = std::move(m_thread_context.waiter);
//! Assert waiter is idle. This destructor shouldn't be getting called if it is busy.
assert(!waiter->m_fn);
// Clear client maps now to avoid deadlock in m_thread.join() call
// below. The maps contain Thread::Client objects that need to be
// destroyed from the event loop thread (this thread), which can't
// happen if this thread is busy calling join.
m_thread_context.request_threads.clear();
m_thread_context.callback_threads.clear();
//! Ping waiter.
waiter->m_cv.notify_all();
}
m_thread.join();
}
kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
{
context.getResults().setResult(m_thread_context.thread_name);
return kj::READY_NOW;
}
ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {}
kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
{
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
});
auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
context.getResults().setResult(kj::mv(thread_client));
return kj::READY_NOW;
}
std::atomic<int> server_reqs{0};
std::string LongThreadName(const char* exe_name)
{
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
}
} // namespace mp