From dcd200ecaabfda5f58a4ecd8c6a8d147ace4a91c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zieli=C5=84ski?= Date: Sun, 21 Jul 2013 20:50:54 +0200 Subject: [PATCH] gravelrpc: fd passing --- Gravelfile | 2 +- gravelrpc.py | 56 +++++++++++++++++++++++++++++++++++++++++------ testrpc_client.py | 3 +++ testrpc_server.py | 10 +++++++-- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/Gravelfile b/Gravelfile index 360035d..e2e4127 100644 --- a/Gravelfile +++ b/Gravelfile @@ -1,5 +1,5 @@ name: gravel-common -requires-apt: python2.7 python-tdb python-yaml libpcre3 python-bson-ext +requires-apt: python2.7 python-tdb python-yaml libpcre3 python-bson-ext python-passfd symlinks: - [dbtool.py, /usr/local/bin/graveldbtool] - [rpctool.py, /usr/local/bin/gravelrpctool] diff --git a/gravelrpc.py b/gravelrpc.py index e5ecad9..25e8d13 100644 --- a/gravelrpc.py +++ b/gravelrpc.py @@ -4,6 +4,8 @@ import bson import os import traceback +import passfd +import struct from bson.binary import Binary @@ -16,17 +18,20 @@ def server_bind(self): SocketServer.UnixStreamServer.server_bind(self) class RPCHandler(SocketServer.StreamRequestHandler): + allow_fd_passing = False + def handle(self): - req = bson.BSON(self.rfile.read()).decode() + req = read_bson(self.request, allow_fd_passing=self.allow_fd_passing) try: + if 'fds' in req: + req['kwargs']['_fds'] = req['fds'] result = getattr(self, 'method_' + req['name'])(*req['args'], **req['kwargs']) except Exception as err: traceback.print_exc() doc = dict(error=str(err)) else: doc = dict(result=result) - self.wfile.write(bson.BSON.encode(doc)) - self.wfile.close() + write_bson(self.request, doc) @classmethod def main(cls, name): @@ -45,13 +50,50 @@ def _call(self, name, *args, **kwargs): sock.connect(self._path) f = sock.makefile('r', 0) doc = dict(name=name, args=args, kwargs=kwargs) - sock.sendall(bson.BSON.encode(doc)) - sock.shutdown(socket.SHUT_WR) - result = f.read() - result = bson.BSON(result).decode() + if '_fds' in kwargs: + doc['fds'] = kwargs['_fds'] + write_bson(sock, doc) + result = read_bson(sock) if 'error' in result: raise RPCError(result['error']) return result['result'] def __getattr__(self, name): return functools.partial(self._call, name) + +class FD(object): + def __init__(self, fileno): + self._fileno = fileno + + def fileno(self): + return self._fileno + + def open(self, *args, **kwargs): + return os.fdopen(self.fileno(), *args, **kwargs) + +def write_bson(sock, doc): + fds = doc.get('fds', []) + doc['fds'] = len(fds) + + sock.send(struct.pack('!I', len(fds))) + for fd in fds: + passfd.sendfd(sock, fd, 'whatever') + + sock.sendall(bson.BSON.encode(doc)) + sock.shutdown(socket.SHUT_WR) + +def read_bson(sock, allow_fd_passing=False): + fd_count, = struct.unpack('!I', sock.recv(4)) + + if fd_count == 0 or allow_fd_passing: + fds = [ FD(passfd.recvfd(sock)[0]) for i in xrange(fd_count) ] + else: + raise IOError('client tried to pass fds') + + raw = ''.join(iter(lambda: sock.recv(4096), '')) + result = bson.BSON(raw).decode() + if fd_count != 0: + result['fds'] = fds + elif 'fds' in result: + del result['fds'] + return result diff --git a/testrpc_client.py b/testrpc_client.py index c1e6bb0..985edb4 100644 --- a/testrpc_client.py +++ b/testrpc_client.py @@ -1,5 +1,8 @@ # coding: utf-8 import gravelrpc +import sys c = gravelrpc.Client('foo') print c.hello(u'MichaƂ!') + +c.say_hello(_fds=[sys.stdout.fileno()]) diff --git a/testrpc_server.py b/testrpc_server.py index 8bf7c5a..2aaa936 100644 --- a/testrpc_server.py +++ b/testrpc_server.py @@ -1,7 +1,13 @@ import gravelrpc class Handler(gravelrpc.RPCHandler): - def method_hello(self, name): - return 'Hello, %s!' % name + allow_fd_passing = True + + def method_hello(self, name): + return 'Hello, %s!' % name + + def method_say_hello(self, _fds): + stdout, = _fds + stdout.open('w').write('HELLO!') Handler.main('foo')