Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read() and read_multiple() block when consuming from python-managed pipes or sockets #354

Open
fungs opened this issue Feb 14, 2024 · 2 comments

Comments

@fungs
Copy link

fungs commented Feb 14, 2024

Hi, I know that there is ongoing work to remove or reduce the dependency on raw file descriptors in #283 and #311. However, this issue I have is with the existing implementation (reference 2.0.0b2) and how it works with pipe (os.pipe) and socket (socket.socket) objects in Python, which do expose usable raw file descriptors via fileno().

We can construct a pipe using

pipe_read, pipe_write = os.pipe()

Data can be generated or read in one thread (or process) like

chunk_size = 1024**2  # 1 MiB
with os.fdopen(pipe_write, mode = "wb") as write_file:
  while chunk := file_like_source.read(chunk_size):
    write_file.write(chunk)

and be consumed in another thread like

with os.fdopen(pipe_read, mode = "rb") as read_file:
  for item in MyStruct.read_multiple(read_file):
    do_something(item)

What I observe is, that both MyStruct.read_multiple(read_file) and write_file.write(chunk) block, if chunk_size is shorter than the serialized struct item. I hypothesize, that this has to do how the reader peaks into the data, which is in fact a stream, without actually consuming it, but I don't know.

Strangely, if a process outside Python generates the stream via process = subprocess.Popen() and writes it into a pipe via standard output using stdout=subprocess.PIPE, read_multiple() can read it without issues using process.stdout.

Maybe someone has an idea why this happens and how it could be circumvented or fixed? Happy to hear your thoughts.

@LasseBlaauwbroek
Copy link
Contributor

LasseBlaauwbroek commented Feb 14, 2024

My first guess would be that your second snippet is somehow blocking in the C++ code, while not relinquishing the GIL. That would prevent the other thread from continuing, creating a deadlock. However, the following snippet shows that the GIL is being released since #308:

pycapnp/capnp/lib/capnp.pyx

Lines 3872 to 3873 in 1fb1687

with nogil:
self.thisptr = new schema_cpp.InputStreamMessageReader(stream, opts)

Are you sure you are using a pycapnp version that is new enough?

@fungs
Copy link
Author

fungs commented Feb 16, 2024

I finally figured out how to make this approach work using processes, not threads!

Most imporantanly, for the pipe version, the buffer on the write side has to be set to 0 using os.fdopen(pipe_write, mode = "wb", buffering=0):

chunk_size = 1024**2  # 1 MiB
with os.fdopen(pipe_write, mode = "wb", buffering=0) as write_file:
  while chunk := file_like_source.read(chunk_size):
    write_file.write(chunk)

The socket version using socket.socket seems to work with buffers on the sender and receiver side.

There are also fallpits when using a process, in particular using os.pipe(): depending on the mode and platform (fork, spawn), you may have to transfer the file descriptors into the child process or make sure to close the file descriptors in both, parent and child process, otherwise the reader will wait for EOF and not terminate.

However, when I run the exact same code using threading.Thread() instead of multiprocessing.Process, the data channel blocks, still. For me, this is a strong indication, that there is an issue involving the GIL. In my case, the parent is actually a thread spawned from the main process, which could also be an issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants