Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe.send occasionally hangs when submitting a large job to child process #17

Open
lheinold opened this issue Nov 7, 2024 · 10 comments

Comments

@lheinold
Copy link

lheinold commented Nov 7, 2024

This is an intermittent issue that seems to happen in the following circumstances:

  1. Job 1 is submitted to child A
  2. Job 1 starts running on child A
  3. Job 2 is submitted to child A
  4. Job 1 returns and calls Pipe.send to give results to parent
  5. Parent and child both hang

I believe this is because step (3) fills up the buffer in the pipe and therefore blocks until the data is read. Meanwhile, step (4) tries to write to the pipe with the full buffer and also blocks, waiting to be read.

The child process should check if there is data in the pipe (and read it if so) before sending the job results back to the parent.

@mdavis-xyz
Copy link
Owner

Interesting.

I've written a reproducible example.

import multiprocessing
import lambda_multiprocessing
from time import sleep

def work(x):
    print(f"work called with x = {x[:3]}...")
    sleep(1)
    print(f"work returning x = {x[:3]}...")
    return x

sz = 2**30
data = [b'x' * sz, b'y' * sz]


print("Using standard library")
with multiprocessing.Pool(processes=1) as p:
    results = p.map(work, data)

print("Using lambda library")
with lambda_multiprocessing.Pool(processes=1) as p:
    results = p.map(work, data)

This works for the standard library, and hangs for this library.

I think my understanding of pipes is limited. Are you saying that if there's lots of data in the pipe sent by the parent which the child hasn't read, then when the child sends data back to the parent, it gets blocked? Is .poll() the right function to check for this condition?

I suppose we could add some kind of buffer in the child, to read payloads into when this happens. Could we get a race condition? e.g. suppose the child wants to send a response back to the parent. It notices the pipe is full, so reads from the pipe to prematurely get the argument for the next task. Then between that happening and the child sending the response back down the pipe for the previous task, the parent writes a third argument into the pipe, so now the child is still blocked.

What if instead of using one pipe bidirectionally, we use two pipes, unidirectionally? So if the pipe from parent to child fills up, the pipe from child to parent is still free? (Although maybe then they can both hit a bottleneck where they are writing to a full pipe?)

@mdavis-xyz
Copy link
Owner

I've had a think about this overnight.

I could be wrong, but I think that actually a bidirectional pipe has a separate buffer in each direction. They can fill up separately. In this issue the pipe fills up in both directions. If I change this mwe to send big data and receive a small response, it doesn't hang. If I send small data and receive a big response, it doesn't hang.

So the issue is that the child process is halfway through sending the response, waiting for the parent to read from the buffer on the return direction, and the parent is halfway through sending the next argument, waiting for the child to read from the buffer on the initial direction. That's the deadlock, and I'm not sure how to resolve it. Perhaps we could use locks for each pipe (one lock for both of the directions) to make each send/recv atomic, and the child has a cache of unprocessed work from the parent and unsent results. Although locking seems like overkill, and if the work is slow, the parent might hog the lock and just keep sending data, and the child never has a chance to send any back, so the child process fills up with too much memory.

Do you have any other ideas?

Here's another MWE. Here we're not using this lambda_multiprocessing library. It's just two processes which each try to send a large amount of data to the other, then receive a large amount. They deadlock.

from multiprocessing import Pipe, Process
import time
import select

num_payloads = 3
big_data = b'x' * (2**30)

def work(conn, label):
    print(f"Starting in proc {label}")
    num_sent = 0
    num_recvd = 0
    while (num_sent < num_payloads) and (num_recvd < num_payloads):
        readable, writable, _ = select.select(
            [conn],
            [conn],
            [], 
            0.1
        )
        if conn in writable:
            print(f"Sending Data from {label}")
            conn.send(b'x' * (2**20))
            print(f"Sent Data from {label}")
            num_sent += 1
        elif conn in readable:
            print(f"Receiving Data from {label}")
            conn.recv()
            print(f"Received Data from {label}")
            num_recvd += 1
    print(f"Process exiting from {label}")


def test_bidirectional_blocking():
    parent_conn, child_conn = Pipe(duplex=True)
    
    p1 = Process(target=work, args=(parent_conn, 'A'))
    p2 = Process(target=work, args=(child_conn, 'B'))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    test_bidirectional_blocking()

@lheinold
Copy link
Author

lheinold commented Nov 8, 2024

So I did some more research into pipes and they do have separate read/write buffers. However, if the write buffer fills up, write blocks until the other side of the socket reads from it. So you can end up with a situation like this:

  1. Parent writes to child and waits for child to read (payload > buffer size)
  2. Child finishes _do_work and writes to parent (payload > buffer size)

Both the parent and the child are now blocked on their write call, since a single write filled up each of their send buffers and they can't do anything until the other side reads.

I tried adding a poll call before writing, but I ended up in the following situation:

  1. Child finishes _do_work and calls poll() to check if parent has sent anything
  2. poll() returns false
  3. Parent writes to child (payload > buffer size)
  4. Since poll() returned false, child writes to parent (payload > buffer size)

Since the parent and child are different processes, I'm not sure there's a way to ensure that the parent doesn't write to the pipe between the child calling poll() and calling write().

I took a look at the multiprocessing Pipe code and it looks like it uses getsocketpair() and then explicitly sets them to be blocking sockets. I changed that to non-blocking as a test, but turns out that if a non-blocking socket writes something greater than the buffer size, it'll just return an error. Even if you're calling recv() on the other side, it's now non-blocking too, so it isn't waiting for data. It's possible you could put recv() a while loop until it receives data, but I didn't try this.

socket.settimeout() doesn't appear to be working for me - send(max_buffer) seems to return immediately with an error instead of waiting the specified amount of time. setsockopt does seem to work directly, so I tried this:

child_socket = socket.socket(fileno=self.child_conn.fileno())
child_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, (8*b'\x00')+(500000).to_bytes(8, 'little'))

It's a bit annoying to read, but the "500000" are microseconds, which translates to a 0.5s timeout.

I set SO_SNDTIMEO for both parent and child (I left the receive timeout alone since it doesn't really matter anyway). With that send waits the appropriate amount of time before returning an error - it does return a BlockingIOError instead of a timeout exception, though.

I used the following logic for spin and it seemed to work (most of the time):

  1. Call recv
  2. Call _do_work
  3. Call send in a try/catch
  4. If exception: call recv and buffer the result
  5. Save the result of the _do_work call from (2) in a separate buffer

edit: hit enter too soon...

There is still an issue where if a write times out, partial writes have already been made, meaning that the other side reads the partial data and hangs waiting for the rest of it:

21715 14:47:30.032854 write(25, "\0\3e\267", 4) = 4               // [child] write length (222647)
 | 00000  00 03 65 b7                                       ..e.             |
21715 14:47:30.032896 write(25, "\200\4\225\367\16\1\0\0\0\0\0}\224\214\4uuid\224\214\4UUID\224\22
3\224)\201\224"..., 222647 <unfinished ...>
21715 14:47:30.534388 <... write resumed> ) = 219264         // [child] wrote 219264 / 222647
21715 14:47:30.539621 write(25, "\7Unrated\224\214\7Unrated\224\214\7Unrated\224\214\7U"..., 3383 <unfinished ...>
21626 14:47:30.031525 write(24, "\0\5)\t", 4) = 4                    // [parent] write length (338185)
 | 00000  00 05 29 09                                       ..).             |
21626 14:47:30.031572 write(24, "\200\4\225\270\34\1\0\0\0\0\0]\224((\214\4uuid\224\214\4UUID\224\223\224)"..., 338185 <unfinished ...>
21626 14:47:30.536515 <... write resumed> ) = 219264         // [parent] write 219264 / 338185
21626 14:47:30.539651 write(24, "ated\224\214\7Unrated\224\214\7Unrated\224\214\7Unrat"..., 118921 <unfinished ...>
21626 14:47:31.046494 <... write resumed> ) = -1 EAGAIN (Resource temporarily unavailable) // [parent] write failed
21715 14:47:31.046392 <... write resumed> ) = -1 EAGAIN (Resource temporarily unavailable) // [child] write failed
21715 14:47:31.047628 read(25, "\0\5)\t", 4) = 4                    // [child] since write failed, read length (338185)
 | 00000  00 05 29 09                                       ..).             |
21715 14:47:31.047703 read(25, "\200\4\225\270\34\1\0\0\0\0\0]\224((\214\4uuid\224\214\4UUID\224\223\224)"..., 338185) = 219264 // [child] got 219264 / 338185
21715 14:47:31.049758 read(25,  <unfinished ...> // [child] wait for the rest
21626 14:47:31.049999 poll([{fd=24, events=POLLIN}], 1, 0) = 1 ([{fd=24, revents=POLLIN}]) // [parent] poll child
21626 14:47:31.050200 read(24, "\0\3e\267", 4) = 4             // [parent] since write failed, read length (222647)
 | 00000  00 03 65 b7                                       ..e.             |
21626 14:47:31.050240 read(24, "\200\4\225\367\16\1\0\0\0\0\0}\224\214\4uuid\224\214\4UUID\224\223\224)\201\224"..., 222647) = 219264 // [parent] read 219264 / 222647
21626 14:47:31.052306 read(24, // [parent] wait for the rest

Not sure how to fix this one, still thinking about it...

edit 2: could probably be fixed by adding a timeout to recv too. duh

@mdavis-xyz
Copy link
Owner

Thanks.

I spent some more time on this too. (It's an interesting problem. Although I really should be working on my university assignments instead of this.)

I had a similar idea like yours. I think the problem is that if you do (as you suggested):

  1. Call recv
  2. Call _do_work
  3. Call send in a try/catch
  4. If exception: call recv and buffer the result
  5. Save the result of the _do_work call from (2) in a separate buffer

The problem is that step 4 is not atomic. If the child notices that what it sends is not being read, so it reads the next payload written by the parent, it's likely that as soon as that payload is read and the buffer is empty, the parent will immediately send another payload for the next task. So when the child writes the next result, we end up with the same deadlock.

With your timeout solution, I'm also worried about performance. Suppose there are 1000 tasks which should take 10ms each. For a large number of tasks, with the current implementation the parent will try to immediately write all 1000 payloads into the pipes before reading anything from the child. So this deadlock situation is actually quite likely (for large data or large number of payloads), not an edge case. You'd end up taking 0.5 seconds on most task executions (do_work).

I think the key challenge is that the parent is trying to give the next task to a child before it has finished working on the previous task.

I played around with modifying how the parent sends data. I wrote code to chunk up the pickled object into bytes equal to buffer size (which is difficult to measure/set), and had the parent poll the data coming from the child in between each chunk it writes to the child (and call .recv() if there's something there). This still resulted in some deadlocks, but I'm not sure why.

Another idea I had was to write the actual payloads to a temporary file, and then pass a file handler (or file path). This reduces the size of data sent through the pipe. However for a sufficiently large number of (small) payloads, you could still fill up the buffer.

I came up with a solution for map and starmap (but not the async versions) where the scheduler choosing which child to assign tasks to here doesn't give tasks to a child until after it's received the response from the previous task. I've written some code for this, and it does seem to resolve the issue. (I just need to tidy up the code before I push it.)

For the async ones, since the map_async call is supposed to return immediately, all the payloads must be sent from the parent at the start, which may be before any children have finished their first task. One idea I had was to send all the payloads to an additional (n+1)th process, and then that just tries to push the payloads into each worker process (blocking most of the time), and then the worker processes push the results back to the parent (with a local queue of unsent results so they can process the 2nd task before the parent has read the results of the first task). That results in each byte of input data being written through two pipes, not one, which is probably not ideal in terms of performance.
(Personally I don't ever use the async calls. I could just remove support for them? Although that would be a breaking change. Even with a small userbase I'd hate to do that.)

@lheinold
Copy link
Author

I think I got this working with both recv and send having a timeout. Here's my code for spin (could totally be cleaned up but it seems to work so far):

job_list = []
result_buf = []
while True:
    if len(job_list) == 0:
        try:
            (job, quit_signal) = self.child_conn.recv()
        except (BlockingIOError, PickleError, UnicodeDecodeError):
            pass
        else:
            job_list.append((job, quit_signal))
    if quit_signal:
        break
    else:
        result = None
        if len(result_buf) == 0 and len(job_list) > 0:
            (job, quit_signal) = job_list.pop(0)
            (id, func, args, kwds) = job
            result = self._do_work(id, func, args, kwds)
        elif len(result_buf) > 0:
            result = result_buf.pop(0)
        try:
            if result is not None:
                self.child_conn.send(result)
        except BlockingIOError as e:
            if self.child_conn.poll():
                try:
                    (job, quit_signal) = self.child_conn.recv()
                    if quit_signal:
                        break
                except (BlockingIOError, PickleError, UnicodeDecodeError):
                    pass
                else:
                    job_list.append((job, quit_signal))
            result_buf.append(result)
self.child_conn.close()

In submit:

job_sent = False
while not job_sent:
    try:
        self.parent_conn.send([(id, func, args, kwds), None])
        job_sent = True
    except BlockingIOError as e:
        self.flush()

And flush:

while (not self.parent_conn.closed) and (self.queue_sz > 0) and self.parent_conn.poll(0):
    try:
        result = self.parent_conn.recv()
    except (BlockingIOError, PickleError, UnicodeDecodeError):
        pass
    else:
        assert isinstance(list(result.keys())[0], UUID)
        self.result_cache.update(result)
        self.queue_sz -= 1

There are three outcomes from send/recv we have to worry about:

  1. Successful
  2. BlockingIOError (recv: nothing to recv, send: other side is busy)
  3. PickleError/UnicodeDecodeError: A send on the other side timed out, but it still wrote some data to the pipe. It's fine to discard this as the other side will send the relevant payload again

In the case of large jobs, there are two outcomes to a send call:

  1. Success
  2. BlockingIOError (other side is not currently calling recv and therefore cannot read the data)

I think that the submit call should always attempt to read results from the child if it fails to send a job - that way we are not sending all jobs before reading one result. Similarly, the child should always attempt to receive a new job if sending a result fails. If either recv call fails, that's fine, and it should go back to trying to send whatever it was trying to send.

I haven't run into any deadlocks with this code so far but I'm going to continue testing locally to see if I run into any. Thoughts?

@mdavis-xyz
Copy link
Owner

Thanks for your help on this. I still suspect there might still be a deadlock issue with this solution:

  1. child notices it can't send
  2. child pulls latest object from parent
  3. parent finishes sending object and immediately starts sending the next object
  4. child sends previous result, thinking the parent is not currently sending, but it is

I don't have much time at the moment to work on this.
I want to spend more time checking whether there's still a deadlock condition, and also checking the performance implications.
How urgent is this for you?

I noticed that the behavior of map_async and starmap_async in this library do not match the standard library. They should return a single AsyncResult, and the .get() from that returns a list. (Currently this library returns a list of AsyncResults.) Since no one else seems to have noticed, I assume that means no one else is using those functions.

I've pushed a new branch (see #19) with these new tests, and my proposed solution for map (which passes the tests) so that you can see. I haven't implemented either solution for map_async yet.

@lheinold
Copy link
Author

I actually don't think that should cause a deadlock.

Step 4 has two possible outcomes:

  1. Sending the results to the parent does not fill up the queue. In this case, the results will stay in the queue until the parent reads them
  2. Sending the results to the parent fills up the queue: In this case, if the parent is currently sending, the child will receive a BlockingIOError and will try again later

For me it's urgent-ish? I'm also happy to make the changes myself and open a PR for them, if you don't have the time.

I noticed that too (I was going to make a ticket for it, but I forgot...). I'm actually using the async functions but currently I just changed my code to work with a list of results.

@mdavis-xyz
Copy link
Owner

I've tried to implement your solution in #20. I get an error.

OSError: [Errno 9] Bad file descriptor

I think I'm not setting the timeout where you wanted to set it.

I also tried to implement my proposed solution with an additional process in the middle that just acts as a buffer, reading all payloads from the parent before sending any to the child. That's more complicated than I thought, but still possible. (I've run out of time tonight though.)

Since you're in a hurry, I could just merge #19 tomorrow, which fixes this deadlock bug for the non-async calls (but the bug is still present for async). If you need this bug resolved for async urgently, you could perhaps write a workaround to write each data payload to a temporary file (e.g. explicitly write to /tmp, or use the tmpfile module), and then just send a small file handler/filename through the pipe. Then your child process reads from the file. Unless you're sending tens of thousands of large payloads per process (in a lambda?) that shouldn't fill up the buffer, hopefully?

Note that with your timeout solution, I think that map_async will not return immediately. (This is another bug with the current implementation, not introduced by your proposal.) It will actually be a blocking call, trying (and retrying) to send all payloads, and only returning once all payloads are sent. So if you have C child processes and N tasks taking T time each, map_async will return after ceil(N / C)*T time. So it's not really async. (If each child reads in a few payloads at once into a buffer, maybe it will be. But I'm not sure that's guarenteed.)

@mdavis-xyz
Copy link
Owner

I had an idea for a solution that doesn't involve timeouts, middleman processes, temporary file etc. It also means that the *async functions are non-blocking even for large payloads.

The idea is that we send tasks to children in batches. Inside .spin() the child fetches several tasks in a row, saving them to a local buffer. Only after receiving a certain special payload (RunBatchSignal) does it start working on the tasks.

It's relatively simple. One concern I had about the timeout solution was that the code for submit() and spin() became relatively complex. e.g. when I tried that solution, I added exponential backoff for the retries. I also spotted a bug where a quit signal might make the child finish the current task and then quit, prior to finishing the other tasks in the buffer. With the timeout and retry solution, I was worried that there would be edge cases that we haven't thought of. Thanks for your suggestions and work though! I really appreciate it.

So I've just implemented that simple batching solution. It passes the tests, and I've got no more time left to spend on this right now. I've released it now as version 1.0.

@lheinold
Copy link
Author

That error is interesting. I was setting my timeouts before self.proc.start() and also called detach() on both sockets (not sure if this last part is necessary though).

I think that map_async not returning immediately is a fundamental problem with how Pipes work in python - I've looked at the source code and both sockets used are explicitly set to the blocking type. I did initially try to set it to non-blocking using setsockopt, but unfortunately, that seems to entirely prevent payloads larger than the buffer from being sent. Despite it not returning immediately, I did manage to do other work in the parent between calling map_async and get on the results. So not perfect, but the parent does still have a chance to do other work while the children complete the do_work calls.

I'll definitely pull your new version and see how it goes for my use case. Fingers crossed that it will "just work" - I'll let you know if I run into any more issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants