Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock Solution #19

Merged
merged 12 commits into from
Nov 13, 2024
2 changes: 1 addition & 1 deletion .github/workflows/unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
runs-on: ubuntu-latest

steps:
Expand Down
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,31 +113,32 @@ Then you can run the unit tests with:
python3 -m unittest
```

`CICD` is for the GitHub Actions which run unit tests and integration tests.
The tests themselves are defined in `lambda_multiprocessing/test_main.py`.

`CICD` is for the GitHub Actions which run the unit tests and integration tests.
You probably don't need to touch those.


## Design

When you `__enter__` the pool, it creates several `Child`s.
These contain the actual child `Process`es,
plus a duplex pipe to send tasks to the child and get results back.
plus a duplex pipe to send `Task`s to the child and get `Response`s back.

The child process just waits for payloads to appear in the pipe.
It grabs the function and arguments from it, does the work,
catches any exception, then sends the exception or result back through the pipe.
Note that the function that the client gives to this library might return an Exception for some reason,
so we return either `[result, None]` or `[None, Exception]`, to differentiate.
Note that the arguments and return functions to this function could be anything.
(It's even possible that the function _returns_ an Exception, instead of throwing one.)

To close everything up when we're done, the parent sends a payload with a different structure (`payload[-1] == True`)
and then the child will gracefully exit.
To close everything up when we're done, the parent sends a different subclass of `Request`, which is `QuitSignal`. Upon receiving this, the child will gracefully exit.

We keep a counter of how many tasks we've given to the child, minus how many results we've got back.
When assigning work, we give it to a child chosen randomly from the set of children whose counter value is smallest.
(i.e. shortest backlog)

When passing the question and answer to the child and back, we pass around a UUID.
This is because the client may submit two tasks with apply_async, then request the result for the second one,
before the first.
This is because the client may submit two tasks with apply_async, then request the result for the second one, before the first.
We can't assume that the next result coming back from the child is the one we want,
since each child can have a backlog of a few tasks.

Expand All @@ -147,3 +148,24 @@ and passing pipes through pipes is unusually slow on low-memory Lambda functions

Note that `multiprocessing.Queue` doesn't work in Lambda functions.
So we can't use that to distribute work amongst the children.

### Deadlock

Note that we must be careful about a particular deadlocking scenario,
described in [this issue](https://github.com/mdavis-xyz/lambda_multiprocessing/issues/17#issuecomment-2468560515)

Writes to `Pipe`s are usually non-blocking. However if you're writing something large
(>90kB, IIRC) the Pipe's buffer will fill up. The writer will block,
waiting for the reader at the other end to start reading.

The situation which previously occured was:

* parent sends a task to the child
* child reads the task from the pipe, and starts working on it
* parent immediately sends the next task, which blocks because the object is larger than the buffer
* child tries sending the result from the first task, which blocks because the result is larger than the buffer

In this situation both processes are mid-way through writing something, and won't continue until the other starts reading from the other pipe.
Solutions like having the parent/child check if there's data to receive before sending data won't work because those two steps are not atomic. (I did not want to introduce locks, for performance reasons.)

The solution is that the child will read all available `Task` payloads sent from the parent, into a local buffer, without commencing work on them. Then once it receives a `RunBatchSignal`, it stops reading anything else from the parent, and starts work on the tasks in the buffer. By batching tasks in this way, we can prevent the deadlock, and also ensure that the `*async` functions are non-blocking for large payloads.
Loading
Loading