-
Notifications
You must be signed in to change notification settings - Fork 267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WebSocket server does not initiate a TCP Close after receiving WebSocket Close message (RFC 6455 violation) #2320
Comments
Do you by chance have a client to replicate the problem? I've never touched websockets so this is a little new to me. I'm well versed in wireshark/tcpdump/tcp and becoming more so in asyncio, so I should be able to run with it if you can provide a little details on a simple setup to replicate the issue. Don't assume I know what GNS3 does with websockets so if there is something I need to do on that side let me know. Good hunting! |
So, WebSockets is such a client. As far as I remember, they try to be as close to the standard (RFC), as possible (they call it
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
WS_CLOSE_TIMEOUT = 0.05
SILENCE_ACTIONS = [
'compute.updated',
'ping',
]
RECONNECT_TIMEOUT = 1.618
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_WS))
tasks.create_task(websocket_logger(COMPUTE_WS))
async def websocket_logger(endpoint: str) -> None:
while True:
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
async for message_text in websocket:
message = json.loads(message_text)
if message['action'] not in SILENCE_ACTIONS:
log.debug(f'{endpoint}: {message}')
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT) Something like this. To replicate the issue just remove (set to default) |
I was really hoping for a fully working client replication. I'll play around with this but it would be helpful if you fleshed out the entire thing. I'm doing a crash course in websockets and the ws endpoints in gns3. I'll reply back when i think I have your code tied into a working replication. |
I think I have a replication now. Just to verify, the issue is the client calls close() and then the server shuts down the connection exactly 10 seconds later? |
The server does NOT shut down the TCP connection (does not become an active closer, does not initiate TCP Close). The client does. That is the issue. 10 seconds is just client default timeout. |
hmm pretty sure the issue is you didn't submit a working replication and I'm attempting to put it together (and failing at it). Feel free to remove my guess work at anytime. |
Is this the correct way to replicate this? Your code seemed to never come out of the loop so I wasn't sure what was triggering the close call from the client. I removed the loop and just called close(). import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 5
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/v2/notifications/ws'
COMPUTE_WS = '/v2/compute/notifications/ws'
SERVER = 'x.x.x.x:3080'
USER = 'USER'
PASS = 'PASS'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
async with websockets.connect(endpoint, extra_headers=headers) as websocket:
async for message_text in websocket:
print(f'Slow: We got a message, tell the server to we\'re done')
print(f'Slow: Is the socket closed? :{websocket.closed}:')
reply = await websocket.close()
print(f'Slow: We got a reply of :{reply}:. Did that take a while?')
print(f'Slow: Is the socket closed? :{websocket.closed}:')
break
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
print("\n\nSwitching to lower timeout.\n\n")
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
async for message_text in websocket:
print(f'Fast: We got a message, tell the serve to we\'re done')
print(f'Fast: Is the socket closed? :{websocket.closed}:')
reply = await websocket.close()
print(f'Fast: We got a reply of :{reply}:. Did that take a while?')
print(f'Fast: Is the socket closed? :{websocket.closed}:')
break
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main()) It does seem like the server is ignoring the close call. Just thinking out loud. If the server ignored the close message shouldn't it return instantly? I poked around in GNS3 trying to wrap my head around how its handling the close call and didn't make much progress.
Slow
Fast
|
This isn't from the run above but these are the logs that fire while in debug mode. Slow
Fast
|
Well, there are a lot of aiohttp bugs that talk about race conditions while closing. One of the fixes is throwing a asyncio.sleep(0), but i'm not %100 where that would go. I'm thinking in the shutdown of the server section. To make sure things work in an ideal world I (cough cough) made a server that traces out what files are called on a shutdown. import aiohttp
from aiohttp import web
import logging
import sys
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def trace_calls(frame, event, arg):
if event == "call":
filename = frame.f_code.co_filename
lineno = frame.f_lineno
func_name = frame.f_code.co_name
logger.debug(f"Call to {func_name} on line {lineno} of {filename}")
return trace_calls
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
logger.debug('WebSocket connection opened')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
logger.debug('Message received: %s', msg.data)
if msg.data == 'close':
await ws.close()
logger.debug('Close command received. Closing connection.')
else:
await ws.send_str("Message received: " + msg.data)
logger.debug('Sent message back to client: %s', msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error('WebSocket connection closed with exception %s', ws.exception())
logger.debug('WebSocket connection closed')
return ws
def main():
# Set the trace function
sys.settrace(trace_calls)
app = web.Application()
app.add_routes([web.get('/v2/notifications/ws', websocket_handler)])
web.run_app(app, port=8000)
if __name__ == '__main__':
main() Here is the modified client. import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 5
# RFC compliant timeout value per 6455.
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/v2/notifications/ws'
COMPUTE_WS = '/v2/compute/notifications/ws'
SERVER = '127.0.0.1:8000'
USER = 'XX'
PASS = 'XX'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
#async with websockets.connect(endpoint, extra_headers=headers, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#reply = await websocket.close()
print("Call close")
await websocket.close()
print("close complete")
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main()) output
I've verified the client sends the close and the server starts socket shutdown. I'll check out the server debugs and compare to what is happening in GNS3. That should shed some light on whats happening. I'll also try importing the aiohttp from the gns3 install and see if it still works. |
I think I've narrowed down what is going on, but i'm not getting what the fix is. I believe something with the context manager of the queue is preventing ws from closing correctly. gns3server/handlers/api/controller/notification_handler.py ( showing so the debugs at the end hopefully make sense.) In the while true loop on the first pass I can add ws.close anywhere and the client will get shutdown correctly. On the 2nd pass we now fall into the break statement and close no longer works correctly. Queue doesn't seem to know about ws so i'm thinking its a clean up from the "with controller.notification.controller_queue() as queue:" that is doing something to the client that is breaking the ability to close. Any idea? Here is all the debugs I have for kicking around this issue. # -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import aiohttp
from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route
from gns3server.controller import Controller
import logging
log = logging.getLogger(__name__)
async def process_websocket(ws):
"""
Process ping / pong and close message
"""
try:
await ws.receive()
except aiohttp.WSServerHandshakeError:
pass
class NotificationHandler:
@Route.get(
r"/notifications",
description="Receive notifications about the controller",
status_codes={
200: "End of stream"
})
async def notification(request, response):
controller = Controller.instance()
response.content_type = "application/json"
response.set_status(200)
response.enable_chunked_encoding()
await response.prepare(request)
with controller.notification.controller_queue() as queue:
while True:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
@Route.get(
r"/notifications/ws",
description="Receive notifications about controller from a Websocket",
status_codes={
200: "End of stream"
})
async def notification_ws(request, response):
controller = Controller.instance()
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
#request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to controller WebSocket")
try:
print("while true")
with controller.notification.controller_queue() as queue:
while True:
print("await get_json")
notification = await queue.get_json(5)
print("if closed")
if ws.closed:
print(f"Excpt: {ws.exception()}")
print("break")
break
print(f"notification: {notification}")
await ws.send_str(notification)
#await ws.close()
finally:
log.info("Client has disconnected from controller WebSocket")
print("if not ws.closed")
if not ws.closed:
print("ws.close")
await ws.close()
#request.app['websockets'].discard(ws)
print("return ws")
return ws ws-client.py import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 10
# RFC compliant timeout value per 6455.
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/notifications/ws'
COMPUTE_WS = '/notifications/ws'
SERVER = '127.0.0.1:8080'
USER = 'XXX'
PASS = 'XXX'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
#async with websockets.connect(endpoint, extra_headers=headers, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#reply = await websocket.close()
print("Call close")
await websocket.close()
print("close complete")
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main()) and main.py (i'm using this for the replication). import os
import sys
import aiohttp.web
import logging
# Assuming you're running this as admin2 and your home directory is /home/admin2
home_dir = os.path.expanduser('~') # This will correctly resolve to /home/admin2
gns3server_path = os.path.join(home_dir, '.local/lib/python3.11/site-packages/gns3server/')
controller_path = os.path.join(home_dir, '.local/lib/python3.11/site-packages/gns3server/handlers/api/controller/')
sys.path.append(gns3server_path)
sys.path.append(controller_path)
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def trace_calls(frame, event, arg):
if event == "call":
filename = frame.f_code.co_filename
lineno = frame.f_lineno
func_name = frame.f_code.co_name
logger.debug(f"Call to {func_name} on line {lineno} of {filename}")
return trace_calls
from notification_handler import NotificationHandler
async def init_app():
app = aiohttp.web.Application()
app.router.add_get('/notifications', NotificationHandler.notification)
app.router.add_get('/notifications/ws', NotificationHandler.notification_ws)
return app
def main():
#sys.settrace(trace_calls)
app = init_app()
aiohttp.web.run_app(app, port=8080)
if __name__ == '__main__':
main() When i run the client I get this output on the server.
gns3server/notification_queue.py async def get(self, timeout):
"""
When timeout is expire we send a ping notification with server information
"""
# At first get we return a ping so the client immediately receives data
if self._first:
self._first = False
return ("ping", self._getPing(), {})
#pdb.set_trace()
try:
(action, msg, kwargs) = await asyncio.wait_for(super().get(), timeout)
print(f"acction: {action} msg: {msg} kwargs: {kwargs}")
except asyncio.TimeoutError:
print("Timed out returning ping")
return ("ping", self._getPing(), {})
except Exception as e:
print(f"Failed: {e}")
print("Last return")
return (action, msg, kwargs) |
I don't know how to do this just yet, but my goal is to asyncio.shield() ws's exit function inside the with statement to see if that can prove its something to do with running inside a context manager that is causing this. As a side note it seems a little strange the "with" statement from gns3server/handlers/api/controller/notification_handler.py doesn't use a "async with" as gns3server/notification_queue.py inherits asyncio.queue. Maybe nothing as this is the most I've ever done with a context manager. |
This is a band-aid for the issue. Everything I see says the server has shutdown the websocket except the tcp socket. This will force the server to close the websocket's tcp socket, which it should have done already anyway. @andrei-korshikov Can you give it a try and see? My latest theory is its really the ws.receive() that is the blame in aiohttp. I tried a aiohttp patch but it didn't help. That being said it was very late when I tried it. Maybe you want to give it a spin as well if you don't like the PR I've submitted? BTW it also seems strange that we're using a none async context manager to manage queue() but async calls to fetch json data. |
After a little more digging it finally dawned on me the issue has been the receive call the entire time. The patch I listed above seems like it was intended to fix this type of issue, but I noticed receive() set closing = True on a close message. Then the close() method returns too early when closing is true which prevents the patch from closing the socket. Seem more details here. |
Looks like i'll be opening a new issue for aiohttp per request. |
This is with aiohttp 3.9.3. See the thread here. |
@andrei-korshikov Would be helpful if you test this. The aiohttp folks are looking for feedback. |
I will test it, of course, but in a week or so, I'm very sorry for delay. Thank you for your investigations! |
aiohttp team is asking for an update. I think if you want this fix in the next version of aiohttp you may want to test asap. I believe the goal is for this to be included in 3.9.4. |
I have no access to my home laptop for indefinite duration. Anyway, if I'm the only one who cares about the issue, there is no sense to fix it. |
ok no problem. I'll pass that along. Thats for the reply. |
Version 2.2.44. Issue is the same as python-trio/trio-websocket#115
Here is what RFC 6455 Section 7.1.1 "Close the WebSocket Connection" says:
Observable behavior is contrary: active closer is the client and so it holds TIME_WAIT state.
That is our case. Can easily be viewed with Wireshark and
ss
.timed out waiting for TCP close
andhalf-closing TCP connection
. By default,close_timeout
in WebSockets is 10 seconds, so such delay on application closure is very visible.I'm not familiar with aiohttp.web, so I can't say if it is upstream issue, or it can be tweaked with some parameters.
The text was updated successfully, but these errors were encountered: