forked from Thriftpy/thriftpy2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_multiplexed.py
90 lines (67 loc) · 2.66 KB
/
test_multiplexed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import multiprocessing
import os
import sys
import time
import pytest
import thriftpy2
from thriftpy2.protocol import (
TBinaryProtocolFactory,
TMultiplexedProtocolFactory
)
from thriftpy2.rpc import client_context
from thriftpy2.server import TThreadedServer
from thriftpy2.thrift import TProcessor, TMultiplexedProcessor
from thriftpy2.transport import TBufferedTransportFactory, TServerSocket
if sys.platform == "win32":
pytest.skip("requires fork", allow_module_level=True)
mux = thriftpy2.load(os.path.join(os.path.dirname(__file__),
"multiplexed.thrift"))
sock_path = "/tmp/thriftpy_test.sock"
class DispatcherOne(object):
def doThingOne(self):
return True
class DispatcherTwo(object):
def doThingTwo(self):
return True
@pytest.fixture(scope="module")
def server(request):
p1 = TProcessor(mux.ThingOneService, DispatcherOne())
p2 = TProcessor(mux.ThingTwoService, DispatcherTwo())
mux_proc = TMultiplexedProcessor()
mux_proc.register_processor("ThingOneService", p1)
mux_proc.register_processor("ThingTwoService", p2)
_server = TThreadedServer(mux_proc, TServerSocket(unix_socket=sock_path),
iprot_factory=TBinaryProtocolFactory(),
itrans_factory=TBufferedTransportFactory())
ps = multiprocessing.Process(target=_server.serve)
ps.start()
time.sleep(0.1)
def fin():
if ps.is_alive():
ps.terminate()
try:
os.remove(sock_path)
except IOError:
pass
request.addfinalizer(fin)
def client_one(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexedProtocolFactory(binary_factory,
"ThingOneService")
return client_context(mux.ThingOneService, unix_socket=sock_path,
socket_timeout=timeout, connect_timeout=timeout,
proto_factory=multiplexing_factory)
def client_two(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexedProtocolFactory(binary_factory,
"ThingTwoService")
return client_context(mux.ThingTwoService, unix_socket=sock_path,
socket_timeout=timeout, connect_timeout=timeout,
proto_factory=multiplexing_factory)
def test_multiplexed_server(server):
with client_one() as c:
assert c.doThingOne() is True
with client_two() as c:
assert c.doThingTwo() is True