forked from Thriftpy/thriftpy2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_framed_transport.py
140 lines (111 loc) · 4.19 KB
/
test_framed_transport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import logging
import socket
import sys
import threading
import time
from os import path
from unittest import TestCase
import pytest
from tornado import ioloop
import thriftpy2
from thriftpy2._compat import CYTHON
from thriftpy2.tornado import make_server
from thriftpy2.rpc import make_client
from thriftpy2.transport.framed import TFramedTransportFactory
from thriftpy2.protocol.binary import TBinaryProtocolFactory
try:
import asyncio
except ImportError:
asyncio = None
if sys.platform == "win32":
pytest.skip("add_socket is not implemented on Windiws",
allow_module_level=True)
logging.basicConfig(level=logging.INFO)
addressbook = thriftpy2.load(path.join(path.dirname(__file__),
"addressbook.thrift"))
class Dispatcher(object):
def __init__(self, io_loop):
self.io_loop = io_loop
self.registry = {}
def add(self, person):
"""
bool add(1: Person person);
"""
if person.name in self.registry:
return False
self.registry[person.name] = person
return True
def get(self, name):
"""
Person get(1: string name)
"""
if name not in self.registry:
raise addressbook.PersonNotExistsError()
return self.registry[name]
class FramedTransportTestCase(TestCase):
TRANSPORT_FACTORY = TFramedTransportFactory()
PROTOCOL_FACTORY = TBinaryProtocolFactory()
def mk_server(self):
sock = self.server_sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
sock.setblocking(0)
self.port = sock.getsockname()[-1]
self.server_thread = threading.Thread(target=self.listen)
self.server_thread.setDaemon(True)
self.server_thread.start()
def listen(self):
self.server_sock.listen(128)
if asyncio:
# In Tornado 5.0+, the asyncio event loop will be used
# automatically by default
asyncio.set_event_loop(asyncio.new_event_loop())
self.io_loop = ioloop.IOLoop.current()
server = make_server(addressbook.AddressBookService,
Dispatcher(self.io_loop), io_loop=self.io_loop)
server.add_socket(self.server_sock)
self.io_loop.start()
def mk_client(self):
return make_client(addressbook.AddressBookService,
'127.0.0.1', self.port,
proto_factory=self.PROTOCOL_FACTORY,
trans_factory=self.TRANSPORT_FACTORY)
def mk_client_with_url(self):
return make_client(addressbook.AddressBookService,
proto_factory=self.PROTOCOL_FACTORY,
trans_factory=self.TRANSPORT_FACTORY,
url='thrift://127.0.0.1:{port}'.format(
port=self.port))
def setUp(self):
self.mk_server()
time.sleep(0.1)
self.client = self.mk_client()
self.client_created_using_url = self.mk_client_with_url()
def tearDown(self):
self.io_loop.stop()
def test_make_client(self):
linus = addressbook.Person('Linus Torvalds')
success = self.client_created_using_url.add(linus)
assert success
success = self.client.add(linus)
assert not success
def test_able_to_communicate(self):
dennis = addressbook.Person(name='Dennis Ritchie')
success = self.client.add(dennis)
assert success
success = self.client.add(dennis)
assert not success
def test_zero_length_string(self):
dennis = addressbook.Person(name='')
success = self.client.add(dennis)
assert success
success = self.client.get(name='')
assert success
if CYTHON:
from thriftpy2.transport.framed import TCyFramedTransportFactory
from thriftpy2.protocol.cybin import TCyBinaryProtocolFactory
class CyFramedTransportTestCase(FramedTransportTestCase):
PROTOCOL_FACTORY = TCyBinaryProtocolFactory()
TRANSPORT_FACTORY = TCyFramedTransportFactory()