Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncHttpConsumer regression and disconnect handling #1249

Open
jkarneges opened this issue Feb 17, 2019 · 6 comments
Open

AsyncHttpConsumer regression and disconnect handling #1249

jkarneges opened this issue Feb 17, 2019 · 6 comments
Assignees

Comments

@jkarneges
Copy link
Contributor

Hi folks,

I just discovered Channels >=2.1.6 introduced an incompatibility with the django-eventstream library. Client requests hang, due to what I believe is the consumer task ending too quickly.

Our library implements handle, which returns quickly and spawns a subtask to handle the ongoing stream. At the time the code was written, my understanding was that spawning a task was the proper way to handle streaming responses as it allowed the original task to keep running in order to detect disconnects.

However, as of commit a256139, disconnect is called immediately after handle returns. This breaks spawn-style streaming implementations.

I'm attempting to adjust our code to work with the latest Channels behavior, essentially by not spawning a task (just run the streaming logic within handle without returning), but now I run into the problem that I can't cleanly handle disconnects. Instead, after the client disconnects, the task is eventually killed:

Application instance <Task pending coro=<SessionMiddlewareInstance.__call__() running at /home/justin/dev/django-eventstream/examples/chat2/venv/lib/python3.6/site-packages/channels/sessions.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fafd82f6ac8>()]>> for connection <WebRequest at 0x7fafd934e9b0 method=GET uri=/rooms/default/events/ clientproto=HTTP/1.1> took too long to shut down and was killed.

Unfortunately this doesn't give the task a chance to clean up properly.

This issue is also oddly familiar: #1051

@jkarneges
Copy link
Contributor Author

The fix I'm looking at now is copying the older __call__ implementation into our library:

class EventsConsumer(AsyncHttpConsumer):
    ...

    # this function adapted from channels 2.1.5
    async def __call__(self, receive, send):
        """
        Async entrypoint - concatenates body fragments and hands off control
        to ``self.handle`` when the body has been completely received.
        """
        self.send = send
        body = []
        try:
            while True:
                message = await receive()
                if message["type"] == "http.disconnect":
                    return
                else:
                    if "body" in message:
                        body.append(message["body"])
                    if not message.get("more_body"):
                        await self.handle(b"".join(body))
        finally:
            await self.disconnect()

    ...

This seems to work, but I wonder if there's something I'm missing about how to handle disconnects with the newer Channels behavior.

@carltongibson
Copy link
Member

Hi @jkarneges. Thanks for the report. Could you put together a minimal test case (in a PR?) that demonstrates the change in behaviour?

@jkarneges
Copy link
Contributor Author

jkarneges commented Feb 18, 2019

I think it would be good to first understand the intended way to use AsyncHttpConsumer for streaming responses. For example, the docs suggest doing something like this:

class ServerSentEventsConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await self.send_headers(headers=[
            ("Cache-Control", "no-cache"),
            ("Content-Type", "text/event-stream"),
            ("Transfer-Encoding", "chunked"),
        ])
        while True:
            payload = "data: %s\n\n" % datetime.now().isoformat()
            await self.send_body(payload.encode("utf-8"), more_body=True)
            await asyncio.sleep(1)

In a real app you'd want this object to register with a data source somehow, so that it knows when there is data to send to the client. And you'd want to clean up that registration when the client disconnects. The docs suggest that the way to detect disconnects is to override the disconnect method. This may work for the WebSocket examples, but it won't work for ServerSentEventsConsumer because handle blocks and the task never has a chance to dispatch further events.

This raises a question: should overriding disconnect in the ServerSentEventsConsumer example work? In other words, should disconnect get called even before handle returns? If so, I could write a test case in a PR for that, which could be used to confirm a future fix.

If, on the other hand, the proper way to detect disconnects is to not block during event handling, meaning an early return from handle is required, then I could write a test case for that instead, showing how early returns from that method no longer work. I'd probably also change the ServerSentEventsConsumer example code to return early from handle, to set up the user for success.

@carltongibson
Copy link
Member

Hey @jkarneges. What I was initially interested in is exactly what changed. i.e. a test (for your use-case) that passes in 2.1.5 and fails in 2.1.6. It would help to be able to play with what you were doing.

I agree you've diagnosed the question here. Even if handle() does not return I'm expecting http_disconnect() to receive the http.disconnect event and call your disconnect() method:

async def http_disconnect(self, message):
"""
Let the user do their cleanup and close the consumer.
"""
await self.disconnect()
raise StopConsumer()

@jkarneges
Copy link
Contributor Author

Okay here we go: #1250

@MarioRial22
Copy link

This is addressed here! #1255

@carltongibson carltongibson self-assigned this Aug 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants