Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a threadpool to handle incoming requests #27

Closed
wants to merge 1 commit into from

Conversation

blinsay
Copy link
Collaborator

@blinsay blinsay commented Jun 29, 2021

Adds a pool of worker threads to handle incoming requests. Uses a bounded channel to pass incoming requests to each worker. Closes #24.

Alternatives

This is an alternative to #25 which uses a semaphore to limit the number of requests in flight but still spawns a thread per request. This version uses existing dependencies instead of rolling our own semaphore.

There's another (easy) alternative to this approach that doesn't use a channel to pass the incoming UnixStream to worker threads:

    let pool = ThreadPool::new(N_WORKERS);

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                let logger = logger.clone();
                pool.execute(move || handle_stream(logger, stream));
            }
            Err(err) => {
                error!(logger, "error accepting connection"; "err" => %err);
                break;
            }
        }
    }

Calling pool.execute like this doesn't apply any backpressure. ThreadPool is build on std::sync::mpsc unbuffered channels so pool.execute never blocks. Using this means we have no signal we can use to do something (log? crash the process?) if things get slow.

On the plus side, this approach is less code and handles panics gracefully - ThreadPool makes sure there are always N_WORKERS threads in the pool. With the approach in the PR, on panic we'll kill a worker thread but nothing starts a new worker task.

I don't have a strong preference between the threadpool approaches, but they both seem better than #25. The pool.execute version requires the least code but doesn't let us detect a hang and crash.

Adds a pool of worker threads to handle incoming requests. Uses a
bounded channel to pass incoming requests to each worker.

Closes #24
@blinsay blinsay requested review from leifwalsh and geofft June 29, 2021 04:43
@@ -18,9 +18,11 @@ atoi = "^0.3"
slog = "^2.5"
slog-async = "^2.5"
slog-term = "^2.6"
crossbeam-channel = "^0.4"
Copy link
Collaborator Author

@blinsay blinsay Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the version of crossbeam-channel we pull in through slog-async.

Copy link
Collaborator

@leifwalsh leifwalsh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely prefer something with backpressure to the pool.submit(move || handle_stream(...)) one where some collection of unaccepted tasks grows unbounded out of our view.

I do agree, with the version in this PR, we need some communication back up from threads which errored or panicked to the main thread, possibly crashing the process.

Err(channel::SendTimeoutError::Disconnected(_)) => {
anyhow::bail!("aborting: worker channel is disconnected")
}
_ => { /*ok!*/ }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the last clause just Ok? I'm worried about send_timeout starting to return new error types and us silently ignoring them. I'd rather have an exhaustive match block.

let rx = rx.clone();

pool.execute(move || loop {
let log = log.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need another clone here?

let log = log.clone();
match rx.recv() {
Ok(stream) => handle_stream(log, stream),
Err(channel::RecvError) => break,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log something useful on error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. It's just a sentinel telling us the channel is closed.

https://docs.rs/crossbeam-channel/0.5.1/crossbeam_channel/struct.RecvError.html


let logger = slog::Logger::root(drain, slog::o!());

let (pool, handle) = worker_pool(logger.clone(), N_WORKERS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call handle channel or tx or something instead? "handle" is so generic

@blinsay
Copy link
Collaborator Author

blinsay commented Jul 2, 2021

Closed in favor of #28

@blinsay blinsay closed this Jul 2, 2021
@blinsay blinsay deleted the benl/threadpool-crossbeam branch July 2, 2021 02:27
@leifwalsh leifwalsh mentioned this pull request Oct 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

"fork rejected by pids controller" message
2 participants