diff --git a/.travis.yml b/.travis.yml index 67c4269..4a7ec64 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: python python: + - "3.7" - "3.6" - "3.5" - "3.4" @@ -7,10 +8,10 @@ python: # Install dependencies install: - - pip install tornado ptyprocess + - pip install tox-travis # command to run tests -script: py.test +script: tox # Enable new Travis stack, should speed up builds sudo: false diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..62e0002 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +import setuptools + +setuptools.setup( + name="terminado", + version="0.9.0", + author="Jupyter Development Team", + author_email="jupyter@googlegroups.com", + description="A websocket backend for the Xterm.js JavaScript terminal emulator library.", + url="https://github.com/jupyter/terminado", + packages=setuptools.find_packages(exclude=["doc", "demos", "terminado/_static"]), + classifiers=[ + "Environment :: Web Environment", + "License :: OSI Approved :: BSD License", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3", + "Topic :: Terminals :: Terminal Emulators/X Terminals", + ], + license="MIT", + install_requires=[ + "ptyprocess;os_name!='nt'", + "pywinpty (>=0.5);os_name=='nt'", + "tornado (>=4)", + ] +) diff --git a/terminado/__init__.py b/terminado/__init__.py index 643ebfd..8ea209e 100644 --- a/terminado/__init__.py +++ b/terminado/__init__.py @@ -12,4 +12,4 @@ # Prevent a warning about no attached handlers in Python 2 logging.getLogger(__name__).addHandler(logging.NullHandler()) -__version__ = '0.8.2' +__version__ = '0.9.0' diff --git a/terminado/tests/basic_test.py b/terminado/tests/basic_test.py deleted file mode 100644 index 1b58ab6..0000000 --- a/terminado/tests/basic_test.py +++ /dev/null @@ -1,247 +0,0 @@ -# basic_tests.py -- Basic unit tests for Terminado - -# Copyright (c) Jupyter Development Team -# Copyright (c) 2014, Ramalingam Saravanan -# Distributed under the terms of the Simplified BSD License. - -from __future__ import absolute_import, print_function - -import unittest -from terminado import * -import tornado -import tornado.httpserver -from tornado.httpclient import HTTPError -from tornado.ioloop import IOLoop -import tornado.testing -import datetime -import logging -import json -import os -import re - - -# -# The timeout we use to assume no more messages are coming -# from the sever. -# -DONE_TIMEOUT = 1.0 -os.environ['ASYNC_TEST_TIMEOUT'] = "20" # Global test case timeout - -MAX_TERMS = 3 # Testing thresholds - -class TestTermClient(object): - """Test connection to a terminal manager""" - def __init__(self, websocket): - self.ws = websocket - self.pending_read = None - - @tornado.gen.coroutine - def read_msg(self): - - # Because the Tornado Websocket client has no way to cancel - # a pending read, we have to keep track of them... - if self.pending_read is None: - self.pending_read = self.ws.read_message() - - response = yield self.pending_read - self.pending_read = None - if response: - response = json.loads(response) - raise tornado.gen.Return(response) - - @tornado.gen.coroutine - def read_all_msg(self, timeout=DONE_TIMEOUT): - """Read messages until read times out""" - msglist = [] - delta = datetime.timedelta(seconds=timeout) - while True: - try: - mf = self.read_msg() - msg = yield tornado.gen.with_timeout(delta, mf) - except tornado.gen.TimeoutError: - raise tornado.gen.Return(msglist) - - msglist.append(msg) - - def write_msg(self, msg): - self.ws.write_message(json.dumps(msg)) - - @tornado.gen.coroutine - def read_stdout(self, timeout=DONE_TIMEOUT): - """Read standard output until timeout read reached, - return stdout and any non-stdout msgs received.""" - msglist = yield self.read_all_msg(timeout) - stdout = "".join([msg[1] for msg in msglist if msg[0] == 'stdout']) - othermsg = [msg for msg in msglist if msg[0] != 'stdout'] - raise tornado.gen.Return((stdout, othermsg)) - - def write_stdin(self, data): - """Write to terminal stdin""" - self.write_msg(['stdin', data]) - - @tornado.gen.coroutine - def get_pid(self): - """Get process ID of terminal shell process""" - yield self.read_stdout() # Clear out any pending - self.write_stdin("echo $$\r") - (stdout, extra) = yield self.read_stdout() - if os.name == 'nt': - match = re.search(r'echo \$\$\x1b\[0K\r\n(\d+)', stdout) - pid = int(match.groups()[0]) - else: - pid = int(stdout.split('\n')[1]) - raise tornado.gen.Return(pid) - - def close(self): - self.ws.close() - -class TermTestCase(tornado.testing.AsyncHTTPTestCase): - - # Factory for TestTermClient, because it has to be a Tornado co-routine. - # See: https://github.com/tornadoweb/tornado/issues/1161 - @tornado.gen.coroutine - def get_term_client(self, path): - port = self.get_http_port() - url = 'ws://127.0.0.1:%d%s' % (port, path) - request = tornado.httpclient.HTTPRequest(url, - headers={'Origin' : 'http://127.0.0.1:%d' % port}) - - ws = yield tornado.websocket.websocket_connect(request) - raise tornado.gen.Return(TestTermClient(ws)) - - @tornado.gen.coroutine - def get_term_clients(self, paths): - tms = yield [self.get_term_client(path) for path in paths] - raise tornado.gen.Return(tms) - - @tornado.gen.coroutine - def get_pids(self, tm_list): - pids = [] - for tm in tm_list: # Must be sequential, in case terms are shared - pid = yield tm.get_pid() - pids.append(pid) - - raise tornado.gen.Return(pids) - - def get_app(self): - self.named_tm = NamedTermManager(shell_command=['bash'], - max_terminals=MAX_TERMS, - ioloop=self.io_loop) - self.single_tm = SingleTermManager(shell_command=['bash'], - ioloop=self.io_loop) - self.unique_tm = UniqueTermManager(shell_command=['bash'], - max_terminals=MAX_TERMS, - ioloop=self.io_loop) - - named_tm = self.named_tm - class NewTerminalHandler(tornado.web.RequestHandler): - """Create a new named terminal, return redirect""" - def get(self): - name, terminal = named_tm.new_named_terminal() - self.redirect("/named/" + name, permanent=False) - - return tornado.web.Application([ - (r"/new", NewTerminalHandler), - (r"/named/(\w+)", TermSocket, {'term_manager': self.named_tm}), - (r"/single", TermSocket, {'term_manager': self.single_tm}), - (r"/unique", TermSocket, {'term_manager': self.unique_tm}) - ], debug=True) - - test_urls = ('/named/term1', '/unique', '/single') - -class CommonTests(TermTestCase): - @tornado.testing.gen_test - def test_basic(self): - for url in self.test_urls: - tm = yield self.get_term_client(url) - response = yield tm.read_msg() - self.assertEqual(response, ['setup', {}]) - - # Check for initial shell prompt - response = yield tm.read_msg() - self.assertEqual(response[0], 'stdout') - self.assertGreater(len(response[1]), 0) - tm.close() - - @tornado.testing.gen_test - def test_basic_command(self): - for url in self.test_urls: - tm = yield self.get_term_client(url) - yield tm.read_all_msg() - tm.write_stdin("whoami\n") - (stdout, other) = yield tm.read_stdout() - if os.name == 'nt': - assert 'whoami' in stdout - else: - assert stdout.startswith('who') - assert other == [] - tm.close() - -class NamedTermTests(TermTestCase): - def test_new(self): - response = self.fetch("/new", follow_redirects=False) - self.assertEqual(response.code, 302) - url = response.headers["Location"] - - # Check that the new terminal was created - name = url.split('/')[2] - self.assertIn(name, self.named_tm.terminals) - - @tornado.testing.gen_test - def test_namespace(self): - names = ["/named/1"]*2 + ["/named/2"]*2 - tms = yield self.get_term_clients(names) - pids = yield self.get_pids(tms) - - self.assertEqual(pids[0], pids[1]) - self.assertEqual(pids[2], pids[3]) - self.assertNotEqual(pids[0], pids[3]) - - @tornado.testing.gen_test - def test_max_terminals(self): - urls = ["/named/%d" % i for i in range(MAX_TERMS+1)] - tms = yield self.get_term_clients(urls[:MAX_TERMS]) - pids = yield self.get_pids(tms) - - # MAX_TERMS+1 should fail - tm = yield self.get_term_client(urls[MAX_TERMS]) - msg = yield tm.read_msg() - self.assertEqual(msg, None) # Connection closed - -class SingleTermTests(TermTestCase): - @tornado.testing.gen_test - def test_single_process(self): - tms = yield self.get_term_clients(["/single", "/single"]) - pids = yield self.get_pids(tms) - self.assertEqual(pids[0], pids[1]) - -class UniqueTermTests(TermTestCase): - @tornado.testing.gen_test - def test_unique_processes(self): - tms = yield self.get_term_clients(["/unique", "/unique"]) - pids = yield self.get_pids(tms) - self.assertNotEqual(pids[0], pids[1]) - - @tornado.testing.gen_test - def test_max_terminals(self): - tms = yield self.get_term_clients(['/unique'] * MAX_TERMS) - pids = yield self.get_pids(tms) - self.assertEqual(len(set(pids)), MAX_TERMS) # All PIDs unique - - # MAX_TERMS+1 should fail - tm = yield self.get_term_client("/unique") - msg = yield tm.read_msg() - self.assertEqual(msg, None) # Connection closed - - # Close one - tms[0].close() - msg = yield tms[0].read_msg() # Closed - self.assertEquals(msg, None) - - # Should be able to open back up to MAX_TERMS - tm = yield self.get_term_client("/unique") - msg = yield tm.read_msg() - self.assertEquals(msg[0], 'setup') - -if __name__ == '__main__': - unittest.main() diff --git a/terminado/tests/test_basic.py b/terminado/tests/test_basic.py new file mode 100644 index 0000000..2409b6b --- /dev/null +++ b/terminado/tests/test_basic.py @@ -0,0 +1,250 @@ +# test_basic.py -- Basic unit tests for Terminado with python 3.4 or lower + +# Copyright (c) Jupyter Development Team +# Copyright (c) 2014, Ramalingam Saravanan +# Distributed under the terms of the Simplified BSD License. + +from __future__ import absolute_import, print_function + +# Tests for older python (pre-async, etc) +import sys +if sys.version_info < (3,5): + + import unittest + from terminado import * + import tornado + import tornado.httpserver + from tornado.httpclient import HTTPError + from tornado.ioloop import IOLoop + import tornado.testing + import datetime + import logging + import json + import os + import re + + # + # The timeout we use to assume no more messages are coming + # from the sever. + # + DONE_TIMEOUT = 1.0 + os.environ['ASYNC_TEST_TIMEOUT'] = "20" # Global test case timeout + + MAX_TERMS = 3 # Testing thresholds + + class TestTermClient(object): + """Test connection to a terminal manager""" + def __init__(self, websocket): + self.ws = websocket + self.pending_read = None + + @tornado.gen.coroutine + def read_msg(self): + + # Because the Tornado Websocket client has no way to cancel + # a pending read, we have to keep track of them... + if self.pending_read is None: + self.pending_read = self.ws.read_message() + + response = yield self.pending_read + self.pending_read = None + if response: + response = json.loads(response) + raise tornado.gen.Return(response) + + @tornado.gen.coroutine + def read_all_msg(self, timeout=DONE_TIMEOUT): + """Read messages until read times out""" + msglist = [] + delta = datetime.timedelta(seconds=timeout) + while True: + try: + mf = self.read_msg() + msg = yield tornado.gen.with_timeout(delta, mf) + except tornado.gen.TimeoutError: + raise tornado.gen.Return(msglist) + + msglist.append(msg) + + def write_msg(self, msg): + self.ws.write_message(json.dumps(msg)) + + @tornado.gen.coroutine + def read_stdout(self, timeout=DONE_TIMEOUT): + """Read standard output until timeout read reached, + return stdout and any non-stdout msgs received.""" + msglist = yield self.read_all_msg(timeout) + stdout = "".join([msg[1] for msg in msglist if msg[0] == 'stdout']) + othermsg = [msg for msg in msglist if msg[0] != 'stdout'] + raise tornado.gen.Return((stdout, othermsg)) + + def write_stdin(self, data): + """Write to terminal stdin""" + self.write_msg(['stdin', data]) + + @tornado.gen.coroutine + def get_pid(self): + """Get process ID of terminal shell process""" + yield self.read_stdout() # Clear out any pending + self.write_stdin("echo $$\r") + (stdout, extra) = yield self.read_stdout() + if os.name == 'nt': + match = re.search(r'echo \$\$\x1b\[0K\r\n(\d+)', stdout) + pid = int(match.groups()[0]) + else: + pid = int(stdout.split('\n')[1]) + raise tornado.gen.Return(pid) + + def close(self): + self.ws.close() + + class TermTestCase(tornado.testing.AsyncHTTPTestCase): + + # Factory for TestTermClient, because it has to be a Tornado co-routine. + # See: https://github.com/tornadoweb/tornado/issues/1161 + @tornado.gen.coroutine + def get_term_client(self, path): + port = self.get_http_port() + url = 'ws://127.0.0.1:%d%s' % (port, path) + request = tornado.httpclient.HTTPRequest(url, + headers={'Origin' : 'http://127.0.0.1:%d' % port}) + + ws = yield tornado.websocket.websocket_connect(request) + raise tornado.gen.Return(TestTermClient(ws)) + + @tornado.gen.coroutine + def get_term_clients(self, paths): + tms = yield [self.get_term_client(path) for path in paths] + raise tornado.gen.Return(tms) + + @tornado.gen.coroutine + def get_pids(self, tm_list): + pids = [] + for tm in tm_list: # Must be sequential, in case terms are shared + pid = yield tm.get_pid() + pids.append(pid) + + raise tornado.gen.Return(pids) + + def get_app(self): + self.named_tm = NamedTermManager(shell_command=['bash'], + max_terminals=MAX_TERMS, + ioloop=self.io_loop) + self.single_tm = SingleTermManager(shell_command=['bash'], + ioloop=self.io_loop) + self.unique_tm = UniqueTermManager(shell_command=['bash'], + max_terminals=MAX_TERMS, + ioloop=self.io_loop) + + named_tm = self.named_tm + class NewTerminalHandler(tornado.web.RequestHandler): + """Create a new named terminal, return redirect""" + def get(self): + name, terminal = named_tm.new_named_terminal() + self.redirect("/named/" + name, permanent=False) + + return tornado.web.Application([ + (r"/new", NewTerminalHandler), + (r"/named/(\w+)", TermSocket, {'term_manager': self.named_tm}), + (r"/single", TermSocket, {'term_manager': self.single_tm}), + (r"/unique", TermSocket, {'term_manager': self.unique_tm}) + ], debug=True) + + test_urls = ('/named/term1', '/unique', '/single') + + class CommonTests(TermTestCase): + @tornado.testing.gen_test + def test_basic(self): + for url in self.test_urls: + tm = yield self.get_term_client(url) + response = yield tm.read_msg() + self.assertEqual(response, ['setup', {}]) + + # Check for initial shell prompt + response = yield tm.read_msg() + self.assertEqual(response[0], 'stdout') + self.assertGreater(len(response[1]), 0) + tm.close() + + @tornado.testing.gen_test + def test_basic_command(self): + for url in self.test_urls: + tm = yield self.get_term_client(url) + yield tm.read_all_msg() + tm.write_stdin("whoami\n") + (stdout, other) = yield tm.read_stdout() + if os.name == 'nt': + assert 'whoami' in stdout + else: + assert stdout.startswith('who') + assert other == [] + tm.close() + + class NamedTermTests(TermTestCase): + def test_new(self): + response = self.fetch("/new", follow_redirects=False) + self.assertEqual(response.code, 302) + url = response.headers["Location"] + + # Check that the new terminal was created + name = url.split('/')[2] + self.assertIn(name, self.named_tm.terminals) + + @tornado.testing.gen_test + def test_namespace(self): + names = ["/named/1"]*2 + ["/named/2"]*2 + tms = yield self.get_term_clients(names) + pids = yield self.get_pids(tms) + + self.assertEqual(pids[0], pids[1]) + self.assertEqual(pids[2], pids[3]) + self.assertNotEqual(pids[0], pids[3]) + + @tornado.testing.gen_test + def test_max_terminals(self): + urls = ["/named/%d" % i for i in range(MAX_TERMS+1)] + tms = yield self.get_term_clients(urls[:MAX_TERMS]) + pids = yield self.get_pids(tms) + + # MAX_TERMS+1 should fail + tm = yield self.get_term_client(urls[MAX_TERMS]) + msg = yield tm.read_msg() + self.assertEqual(msg, None) # Connection closed + + class SingleTermTests(TermTestCase): + @tornado.testing.gen_test + def test_single_process(self): + tms = yield self.get_term_clients(["/single", "/single"]) + pids = yield self.get_pids(tms) + self.assertEqual(pids[0], pids[1]) + + class UniqueTermTests(TermTestCase): + @tornado.testing.gen_test + def test_unique_processes(self): + tms = yield self.get_term_clients(["/unique", "/unique"]) + pids = yield self.get_pids(tms) + self.assertNotEqual(pids[0], pids[1]) + + @tornado.testing.gen_test + def test_max_terminals(self): + tms = yield self.get_term_clients(['/unique'] * MAX_TERMS) + pids = yield self.get_pids(tms) + self.assertEqual(len(set(pids)), MAX_TERMS) # All PIDs unique + + # MAX_TERMS+1 should fail + tm = yield self.get_term_client("/unique") + msg = yield tm.read_msg() + self.assertEqual(msg, None) # Connection closed + + # Close one + tms[0].close() + msg = yield tms[0].read_msg() # Closed + self.assertEquals(msg, None) + + # Should be able to open back up to MAX_TERMS + tm = yield self.get_term_client("/unique") + msg = yield tm.read_msg() + self.assertEquals(msg[0], 'setup') + + if __name__ == '__main__': + unittest.main() diff --git a/terminado/tests/test_basic_py3.py b/terminado/tests/test_basic_py3.py new file mode 100644 index 0000000..5926280 --- /dev/null +++ b/terminado/tests/test_basic_py3.py @@ -0,0 +1,259 @@ +# test_basic_py3.py -- Basic unit tests for Terminado with python 3.5+ + +# Copyright (c) Jupyter Development Team +# Copyright (c) 2014, Ramalingam Saravanan +# Distributed under the terms of the Simplified BSD License. + +from __future__ import absolute_import, print_function + +import sys +import traceback + +from tornado.simple_httpclient import HTTPStreamClosedError + +if sys.version_info >= (3,5): + + import asyncio + import unittest + from terminado import * + import tornado + import tornado.httpserver + from tornado.httpclient import HTTPError + import tornado.testing + import datetime + import json + import os + import re + + + # + # The timeout we use to assume no more messages are coming + # from the sever. + # + DONE_TIMEOUT = 1.0 + os.environ['ASYNC_TEST_TIMEOUT'] = "20" # Global test case timeout + + MAX_TERMS = 3 # Testing thresholds + + class TestTermClient(object): + __test__ = False + + """Test connection to a terminal manager""" + def __init__(self, websocket): + self.ws = websocket + self.pending_read = None + + async def read_msg(self): + + # Because the Tornado Websocket client has no way to cancel + # a pending read, we have to keep track of them... + if self.pending_read is None: + self.pending_read = self.ws.read_message() + + response = await self.pending_read + self.pending_read = None + if response: + response = json.loads(response) + return response + + async def read_all_msg(self, timeout=DONE_TIMEOUT): + """Read messages until read times out""" + msglist = [] + delta = datetime.timedelta(seconds=timeout) + while True: + try: + mf = self.read_msg() + msg = await tornado.gen.with_timeout(delta, mf) + except tornado.gen.TimeoutError: + return msglist + + msglist.append(msg) + + def write_msg(self, msg): + self.ws.write_message(json.dumps(msg)) + + async def read_stdout(self, timeout=DONE_TIMEOUT): + """Read standard output until timeout read reached, + return stdout and any non-stdout msgs received.""" + msglist = await self.read_all_msg(timeout) + stdout = "".join([msg[1] for msg in msglist if msg[0] == 'stdout']) + othermsg = [msg for msg in msglist if msg[0] != 'stdout'] + return (stdout, othermsg) + + def write_stdin(self, data): + """Write to terminal stdin""" + self.write_msg(['stdin', data]) + + async def get_pid(self): + """Get process ID of terminal shell process""" + await self.read_stdout() # Clear out any pending + self.write_stdin("echo $$\r") + (stdout, extra) = await self.read_stdout() + if os.name == 'nt': + match = re.search(r'echo \$\$\x1b\[0K\r\n(\d+)', stdout) + pid = int(match.groups()[0]) + else: + pid = int(stdout.split('\n')[1]) + return pid + + def close(self): + self.ws.close() + + class TermTestCase(tornado.testing.AsyncHTTPTestCase): + + # Factory for TestTermClient, because it has to be a Tornado co-routine. + # See: https://github.com/tornadoweb/tornado/issues/1161 + async def get_term_client(self, path): + port = self.get_http_port() + url = 'ws://127.0.0.1:%d%s' % (port, path) + request = tornado.httpclient.HTTPRequest(url, + headers={'Origin' : 'http://127.0.0.1:%d' % port}) + + ws = await tornado.websocket.websocket_connect(request) + return TestTermClient(ws) + + async def get_term_clients(self, paths): + tms = await asyncio.gather(*[self.get_term_client(path) for path in paths]) + return tms + + async def get_pids(self, tm_list): + pids = [] + for tm in tm_list: # Must be sequential, in case terms are shared + pid = await tm.get_pid() + pids.append(pid) + + return pids + + def get_app(self): + self.named_tm = NamedTermManager(shell_command=['bash'], + max_terminals=MAX_TERMS, + ioloop=self.io_loop) + self.single_tm = SingleTermManager(shell_command=['bash'], + ioloop=self.io_loop) + self.unique_tm = UniqueTermManager(shell_command=['bash'], + max_terminals=MAX_TERMS, + ioloop=self.io_loop) + + named_tm = self.named_tm + class NewTerminalHandler(tornado.web.RequestHandler): + """Create a new named terminal, return redirect""" + def get(self): + name, terminal = named_tm.new_named_terminal() + self.redirect("/named/" + name, permanent=False) + + return tornado.web.Application([ + (r"/new", NewTerminalHandler), + (r"/named/(\w+)", TermSocket, {'term_manager': self.named_tm}), + (r"/single", TermSocket, {'term_manager': self.single_tm}), + (r"/unique", TermSocket, {'term_manager': self.unique_tm}) + ], debug=True) + + test_urls = ('/named/term1', '/unique', '/single') + + class CommonTests(TermTestCase): + @tornado.testing.gen_test + async def test_basic(self): + for url in self.test_urls: + tm = await self.get_term_client(url) + response = await tm.read_msg() + self.assertEqual(response, ['setup', {}]) + + # Check for initial shell prompt + response = await tm.read_msg() + self.assertEqual(response[0], 'stdout') + self.assertGreater(len(response[1]), 0) + tm.close() + + @tornado.testing.gen_test + async def test_basic_command(self): + for url in self.test_urls: + tm = await self.get_term_client(url) + await tm.read_all_msg() + tm.write_stdin("whoami\n") + (stdout, other) = await tm.read_stdout() + if os.name == 'nt': + assert 'whoami' in stdout + else: + assert stdout.startswith('who') + assert other == [] + tm.close() + + class NamedTermTests(TermTestCase): + def test_new(self): + response = self.fetch("/new", follow_redirects=False) + self.assertEqual(response.code, 302) + url = response.headers["Location"] + + # Check that the new terminal was created + name = url.split('/')[2] + self.assertIn(name, self.named_tm.terminals) + + @tornado.testing.gen_test + async def test_namespace(self): + names = ["/named/1"]*2 + ["/named/2"]*2 + tms = await self.get_term_clients(names) + pids = await self.get_pids(tms) + + self.assertEqual(pids[0], pids[1]) + self.assertEqual(pids[2], pids[3]) + self.assertNotEqual(pids[0], pids[3]) + + [tm.close() for tm in tms] + + @tornado.testing.gen_test + async def test_max_terminals(self): + urls = ["/named/%d" % i for i in range(MAX_TERMS+1)] + tms = await self.get_term_clients(urls[:MAX_TERMS]) + pids = await self.get_pids(tms) + + # MAX_TERMS+1 should fail + tm = await self.get_term_client(urls[MAX_TERMS]) + msg = await tm.read_msg() + self.assertEqual(msg, None) # Connection closed + + [tm.close() for tm in tms] + + class SingleTermTests(TermTestCase): + @tornado.testing.gen_test + async def test_single_process(self): + tms = await self.get_term_clients(["/single", "/single"]) + pids = await self.get_pids(tms) + self.assertEqual(pids[0], pids[1]) + + [tm.close() for tm in tms] + + class UniqueTermTests(TermTestCase): + @tornado.testing.gen_test + async def test_unique_processes(self): + tms = await self.get_term_clients(["/unique", "/unique"]) + pids = await self.get_pids(tms) + self.assertNotEqual(pids[0], pids[1]) + + [tm.close() for tm in tms] + + @tornado.testing.gen_test + async def test_max_terminals(self): + tms = await self.get_term_clients(['/unique'] * MAX_TERMS) + pids = await self.get_pids(tms) + self.assertEqual(len(set(pids)), MAX_TERMS) # All PIDs unique + + # MAX_TERMS+1 should fail + tm = await self.get_term_client("/unique") + msg = await tm.read_msg() + self.assertEqual(msg, None) # Connection closed + + # Close one + tms[0].close() + msg = await tms[0].read_msg() # Closed + self.assertEqual(msg, None) + + # Should be able to open back up to MAX_TERMS + tm = await self.get_term_client("/unique") + msg = await tm.read_msg() + self.assertEqual(msg[0], 'setup') + + [tm.close() for tm in tms] + tm.close() + + if __name__ == '__main__': + unittest.main() diff --git a/terminado/websocket.py b/terminado/websocket.py index 3d3fe39..2f247cb 100644 --- a/terminado/websocket.py +++ b/terminado/websocket.py @@ -18,6 +18,9 @@ import tornado.web import tornado.websocket +from .management import MaxTerminalsReached + + def _cast_unicode(s): if isinstance(s, bytes): return s.decode('utf-8') @@ -51,7 +54,12 @@ def open(self, url_component=None): url_component = _cast_unicode(url_component) self.term_name = url_component or 'tty' - self.terminal = self.term_manager.get_terminal(url_component) + try: + self.terminal = self.term_manager.get_terminal(url_component) + except MaxTerminalsReached as err: + # Raise HTTPError as "Too Many Requests" instead of having tornado have "unhandled exception" + raise tornado.web.HTTPError(429, str(err)) + for s in self.terminal.read_buffer: self.on_pty_read(s) self.terminal.clients.append(self) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..f00514c --- /dev/null +++ b/tox.ini @@ -0,0 +1,24 @@ +[tox] +# Due to flit setup dependency, py27 env won't built/run properly +#envlist = {py27}-{tornado5},{py36}-{tornado5,tornado6},{py37,py38}-{tornado5,tornado6} +envlist = {py36}-{tornado5,tornado6},{py37,py38}-{tornado5,tornado6} +skip_missing_interpreters = True + +[testenv] +deps = + mock + pytest +setenv = + #tornado4: TORNADO_VERSION=>=4.0.0,<5.0.0 + tornado5: TORNADO_VERSION=>=5.0.0,<6.0.0 + tornado6: TORNADO_VERSION=>=6.0.0,<7.0.0 +commands = + pip install "tornado{env:TORNADO_VERSION}" flit ptyprocess + py.test +passenv = + TORNADO_VERSION + +[testenv:py27-tornado5] +commands = + pip install "tornado{env:TORNADO_VERSION}" + py.test