-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_console.py
84 lines (66 loc) · 2.72 KB
/
client_console.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python
"""
Example using stdio, Deferreds, LineReceiver and twisted.web.client.
Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
a telnet server instead; see the comments for details.
Based on an example by Abe Fettig.
"""
from twisted.internet import stdio, reactor
from twisted.protocols import basic
import client
class ClientConsole(basic.LineReceiver):
delimiter = '\n' # unix terminal style newlines. remove this line
# for use with Telnet
def __init__(self, remote_host = 'localhost', remote_port = 10001):
self.connector = client.Connect(remote_host, remote_port)
self.remoteHostName = remote_host
self.remotePort = remote_port
def connectionMade(self):
self.sendLine('Client console. Type "help" for help.')
self.sendLine('Remote server: ' + self.remoteHostName)
def lineReceived(self, line):
# Ignore blank lines
if not line: return
# Parse the command
commandParts = line.split()
command = commandParts[0].lower()
args = commandParts[1:]
# Dispatch the command to the appropriate method. Note that all you
# need to do to implement a new command is add another do_* method.
try:
method = getattr(self, 'do_' + command)
except AttributeError, e:
self.sendLine('Error: no such command.')
else:
try:
method(*args)
except Exception, e:
self.sendLine('Error: ' + str(e))
def do_help(self, command=None):
"""help [command]: List commands, or show help on the given command"""
if command:
self.sendLine(getattr(self, 'do_' + command).__doc__)
else:
commands = [cmd[3:] for cmd in dir(self) if cmd.startswith('do_')]
self.sendLine("Valid commands: " +" ".join(commands))
def do_quit(self):
"""quit: Quit this session"""
self.sendLine('Goodbye.')
self.transport.loseConnection()
def do_send(self, data):
if self.connector.connection is None:
self.sendLine('Connection not established')
return
self.connector.connection.Send(data)
def do_nreq(self, data):
self.connector.connection.Send('NREQ')
def do_connect(self, data = None):
if self.connector.connection is None:
self.connector = client.Connect(self.remoteHostName, self.remotePort)
def connectionLost(self, reason):
# stop the reactor, only because this is meant to be run in Stdio.
if reactor.running:
reactor.stop()
if __name__ == "__main__":
stdio.StandardIO(ClientConsole())
reactor.run()