Skip to content

Commit

Permalink
gravelrpc: fd passing
Browse files Browse the repository at this point in the history
  • Loading branch information
zielmicha committed Jul 21, 2013
1 parent c8498c7 commit dcd200e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Gravelfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: gravel-common
requires-apt: python2.7 python-tdb python-yaml libpcre3 python-bson-ext
requires-apt: python2.7 python-tdb python-yaml libpcre3 python-bson-ext python-passfd
symlinks:
- [dbtool.py, /usr/local/bin/graveldbtool]
- [rpctool.py, /usr/local/bin/gravelrpctool]
Expand Down
56 changes: 49 additions & 7 deletions gravelrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import bson
import os
import traceback
import passfd
import struct

from bson.binary import Binary

Expand All @@ -16,17 +18,20 @@ def server_bind(self):
SocketServer.UnixStreamServer.server_bind(self)

class RPCHandler(SocketServer.StreamRequestHandler):
allow_fd_passing = False

def handle(self):
req = bson.BSON(self.rfile.read()).decode()
req = read_bson(self.request, allow_fd_passing=self.allow_fd_passing)
try:
if 'fds' in req:
req['kwargs']['_fds'] = req['fds']
result = getattr(self, 'method_' + req['name'])(*req['args'], **req['kwargs'])
except Exception as err:
traceback.print_exc()
doc = dict(error=str(err))
else:
doc = dict(result=result)
self.wfile.write(bson.BSON.encode(doc))
self.wfile.close()
write_bson(self.request, doc)

@classmethod
def main(cls, name):
Expand All @@ -45,13 +50,50 @@ def _call(self, name, *args, **kwargs):
sock.connect(self._path)
f = sock.makefile('r', 0)
doc = dict(name=name, args=args, kwargs=kwargs)
sock.sendall(bson.BSON.encode(doc))
sock.shutdown(socket.SHUT_WR)
result = f.read()
result = bson.BSON(result).decode()
if '_fds' in kwargs:
doc['fds'] = kwargs['_fds']
write_bson(sock, doc)
result = read_bson(sock)
if 'error' in result:
raise RPCError(result['error'])
return result['result']

def __getattr__(self, name):
return functools.partial(self._call, name)

class FD(object):
def __init__(self, fileno):
self._fileno = fileno

def fileno(self):
return self._fileno

def open(self, *args, **kwargs):
return os.fdopen(self.fileno(), *args, **kwargs)

def write_bson(sock, doc):
fds = doc.get('fds', [])
doc['fds'] = len(fds)

sock.send(struct.pack('!I', len(fds)))
for fd in fds:
passfd.sendfd(sock, fd, 'whatever')

sock.sendall(bson.BSON.encode(doc))
sock.shutdown(socket.SHUT_WR)

def read_bson(sock, allow_fd_passing=False):
fd_count, = struct.unpack('!I', sock.recv(4))

if fd_count == 0 or allow_fd_passing:
fds = [ FD(passfd.recvfd(sock)[0]) for i in xrange(fd_count) ]
else:
raise IOError('client tried to pass fds')

raw = ''.join(iter(lambda: sock.recv(4096), ''))
result = bson.BSON(raw).decode()
if fd_count != 0:
result['fds'] = fds
elif 'fds' in result:
del result['fds']
return result
3 changes: 3 additions & 0 deletions testrpc_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# coding: utf-8
import gravelrpc
import sys

c = gravelrpc.Client('foo')
print c.hello(u'Michał!')

c.say_hello(_fds=[sys.stdout.fileno()])
10 changes: 8 additions & 2 deletions testrpc_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import gravelrpc

class Handler(gravelrpc.RPCHandler):
def method_hello(self, name):
return 'Hello, %s!' % name
allow_fd_passing = True

def method_hello(self, name):
return 'Hello, %s!' % name

def method_say_hello(self, _fds):
stdout, = _fds
stdout.open('w').write('HELLO!')

Handler.main('foo')

0 comments on commit dcd200e

Please sign in to comment.