-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a threadpool to handle incoming requests #27
Conversation
Adds a pool of worker threads to handle incoming requests. Uses a bounded channel to pass incoming requests to each worker. Closes #24
@@ -18,9 +18,11 @@ atoi = "^0.3" | |||
slog = "^2.5" | |||
slog-async = "^2.5" | |||
slog-term = "^2.6" | |||
crossbeam-channel = "^0.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the version of crossbeam-channel we pull in through slog-async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely prefer something with backpressure to the pool.submit(move || handle_stream(...))
one where some collection of unaccepted tasks grows unbounded out of our view.
I do agree, with the version in this PR, we need some communication back up from threads which errored or panicked to the main thread, possibly crashing the process.
Err(channel::SendTimeoutError::Disconnected(_)) => { | ||
anyhow::bail!("aborting: worker channel is disconnected") | ||
} | ||
_ => { /*ok!*/ } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the last clause just Ok
? I'm worried about send_timeout starting to return new error types and us silently ignoring them. I'd rather have an exhaustive match block.
let rx = rx.clone(); | ||
|
||
pool.execute(move || loop { | ||
let log = log.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need another clone here?
let log = log.clone(); | ||
match rx.recv() { | ||
Ok(stream) => handle_stream(log, stream), | ||
Err(channel::RecvError) => break, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we log something useful on error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. It's just a sentinel telling us the channel is closed.
https://docs.rs/crossbeam-channel/0.5.1/crossbeam_channel/struct.RecvError.html
|
||
let logger = slog::Logger::root(drain, slog::o!()); | ||
|
||
let (pool, handle) = worker_pool(logger.clone(), N_WORKERS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call handle
channel
or tx
or something instead? "handle" is so generic
Closed in favor of #28 |
Adds a pool of worker threads to handle incoming requests. Uses a bounded channel to pass incoming requests to each worker. Closes #24.
Alternatives
This is an alternative to #25 which uses a semaphore to limit the number of requests in flight but still spawns a thread per request. This version uses existing dependencies instead of rolling our own semaphore.
There's another (easy) alternative to this approach that doesn't use a channel to pass the incoming UnixStream to worker threads:
Calling
pool.execute
like this doesn't apply any backpressure.ThreadPool
is build onstd::sync::mpsc
unbuffered channels sopool.execute
never blocks. Using this means we have no signal we can use to do something (log? crash the process?) if things get slow.On the plus side, this approach is less code and handles panics gracefully - ThreadPool makes sure there are always N_WORKERS threads in the pool. With the approach in the PR, on panic we'll kill a worker thread but nothing starts a new worker task.
I don't have a strong preference between the threadpool approaches, but they both seem better than #25. The
pool.execute
version requires the least code but doesn't let us detect a hang and crash.