Skip to content

Commit

Permalink
Added service multiplexing support.
Browse files Browse the repository at this point in the history
This introduces a `TMultiplexingProcessor` class which extends `TProcessor`.
This adds the ability for developers to develop services that implement
multiple Thrift interfaces. It is inspired by the `TMultiplexedProcessor`
from the Thrift Java library.
  • Loading branch information
Travis Mehlinger committed Feb 9, 2015
1 parent fbb7bfd commit 0877531
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 23 deletions.
7 changes: 7 additions & 0 deletions examples/multiplexer/dingdong.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ding service demo
service DingService {
/*
* Sexy c style comment
*/
string ding(),
}
23 changes: 23 additions & 0 deletions examples/multiplexer/multiplexed_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-

import thriftpy
from thriftpy.rpc import client_context

dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift")
pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift")


def main():
with client_context(dd_thrift.DingService, '127.0.0.1', 9090) as c:
# ring that doorbell
dong = c.ding()
print(dong)

with client_context(pp_thrift.PingService, '127.0.0.1', 9090) as c:
# play table tennis like a champ
pong = c.ping()
print(pong)


if __name__ == '__main__':
main()
41 changes: 41 additions & 0 deletions examples/multiplexer/multiplexed_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-

import thriftpy
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.server import TThreadedServer
from thriftpy.thrift import TProcessor, TMultiplexingProcessor
from thriftpy.transport import TBufferedTransportFactory, TServerSocket


dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift")
pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift")


class DingDispatcher(object):
def ding(self):
print("ding dong!")
return 'dong'


class PingDispatcher(object):
def ping(self):
print("ping pong!")
return 'pong'


def main():
dd_proc = TProcessor(dd_thrift.DingService, DingDispatcher())
pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher())

mux_proc = TMultiplexingProcessor()
mux_proc.register_processor(dd_proc)
mux_proc.register_processor(pp_proc)

server = TThreadedServer(mux_proc, TServerSocket(),
iprot_factory=TBinaryProtocolFactory(),
itrans_factory=TBufferedTransportFactory())
server.serve()


if __name__ == '__main__':
main()
7 changes: 7 additions & 0 deletions examples/multiplexer/pingpong.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ping service demo
service PingService {
/*
* Sexy c style comment
*/
string ping(),
}
7 changes: 7 additions & 0 deletions tests/multiplexed.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
service ThingOneService {
bool doThingOne();
}

service ThingTwoService {
bool doThingTwo();
}
74 changes: 74 additions & 0 deletions tests/test_multiplexed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-

from __future__ import absolute_import

import os
import multiprocessing
import time

import pytest

import thriftpy
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.rpc import client_context
from thriftpy.server import TThreadedServer
from thriftpy.thrift import TProcessor, TMultiplexingProcessor
from thriftpy.transport import TBufferedTransportFactory, TServerSocket


mux = thriftpy.load(os.path.join(os.path.dirname(__file__),
"multiplexed.thrift"))
sock_path = "./thriftpy_test.sock"


class DispatcherOne(object):
def doThingOne(self):
return True


class DispatcherTwo(object):
def doThingTwo(self):
return True


@pytest.fixture(scope="module")
def server(request):
p1 = TProcessor(mux.ThingOneService, DispatcherOne())
p2 = TProcessor(mux.ThingTwoService, DispatcherTwo())

mux_proc = TMultiplexingProcessor()
mux_proc.register_processor(p1)
mux_proc.register_processor(p2)

_server = TThreadedServer(mux_proc, TServerSocket(unix_socket=sock_path),
iprot_factory=TBinaryProtocolFactory(),
itrans_factory=TBufferedTransportFactory())
ps = multiprocessing.Process(target=_server.serve)
ps.start()
time.sleep(0.1)

def fin():
if ps.is_alive():
ps.terminate()
try:
os.remove(sock_path)
except IOError:
pass
request.addfinalizer(fin)


def client_one(timeout=3000):
return client_context(mux.ThingOneService, unix_socket=sock_path,
timeout=timeout)


def client_two(timeout=3000):
return client_context(mux.ThingTwoService, unix_socket=sock_path,
timeout=timeout)


def test_multiplexed_server(server):
with client_one() as c:
assert c.doThingOne() is True
with client_two() as c:
assert c.doThingTwo() is True
96 changes: 73 additions & 23 deletions thriftpy/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import absolute_import

import functools
import inspect

from ._compat import init_func_generator, with_metaclass

Expand Down Expand Up @@ -189,21 +190,20 @@ def process_in(self, iprot):
iprot.skip(TType.STRUCT)
iprot.read_message_end()
return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa
else:
args = getattr(self._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(self._service, api + "_result")()

# convert kwargs to args
api_args = [args.thrift_spec[k][1]
for k in sorted(args.thrift_spec)]
args = getattr(self._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(self._service, api + "_result")()

def call():
return getattr(self._handler, api)(
*(args.__dict__[k] for k in api_args)
)
return api, seqid, result, call
# convert kwargs to args
api_args = [args.thrift_spec[k][1]
for k in sorted(args.thrift_spec)]
def call():
return getattr(self._handler, api)(
*(args.__dict__[k] for k in api_args)
)
return api, seqid, result, call

def send_exception(self, oprot, api, exc, seqid):
oprot.write_message_begin(api, TMessageType.EXCEPTION, seqid)
Expand Down Expand Up @@ -233,16 +233,66 @@ def process(self, iprot, oprot):
api, seqid, result, call = self.process_in(iprot)

if isinstance(result, TApplicationException):
self.send_exception(oprot, api, result, seqid)
else:
try:
result.success = call()
except Exception as e:
# raise if api don't have throws
self.handle_exception(e, result)

if not result.oneway:
self.send_result(oprot, api, result, seqid)
return self.send_exception(oprot, api, result, seqid)

try:
result.success = call()
except Exception as e:
# raise if api don't have throws
self.handle_exception(e, result)

if not result.oneway:
self.send_result(oprot, api, result, seqid)


class TMultiplexingProcessor(TProcessor):
processors = {}
service_map = {}

def __init__(self):
pass

def register_processor(self, processor):
service = processor._service
module = inspect.getmodule(processor)
name = '{0}:{1}'.format(module.__name__, service.__name__)
if name in self.processors:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='processor for `{0}` already registered'.format(name))

for srv in service.thrift_services:
if srv in self.service_map:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='cannot multiplex processor for `{0}`; '
'`{1}` is already a registered method for `{2}`'
.format(name, srv, self.service_map[srv]))
self.service_map[srv] = name

self.processors[name] = processor

def process_in(self, iprot):
api, type, seqid = iprot.read_message_begin()
if api not in self.service_map:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
e = TApplicationException(TApplicationException.UNKNOWN_METHOD)
return api, seqid, e, None # noqa

proc = self.processors[self.service_map[api]]
args = getattr(proc._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(proc._service, api + "_result")()

# convert kwargs to args
api_args = [args.thrift_spec[k][1]
for k in sorted(args.thrift_spec)]
call = lambda: getattr(proc._handler, api)(
*(args.__dict__[k] for k in api_args)
)
return api, seqid, result, call


class TException(TPayload, Exception):
Expand Down

0 comments on commit 0877531

Please sign in to comment.