forked from Thriftpy/thriftpy2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_cytransport.py
55 lines (37 loc) · 1.34 KB
/
test_cytransport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# -*- coding: utf-8 -*-
import pytest
from thriftpy2._compat import CYTHON
if CYTHON:
from thriftpy2.transport.framed import TCyFramedTransport
from thriftpy2.transport.buffered import TCyBufferedTransport
from thriftpy2.transport import TMemoryBuffer, TTransportException
else:
pytest.skip("cython not enabled.", allow_module_level=True)
def test_transport_mismatch():
s = TMemoryBuffer()
t1 = TCyBufferedTransport(s)
t1.write(b"\x80\x01\x00\x01\x00\x00\x00\x04ping hello world")
t1.flush()
with pytest.raises(TTransportException) as exc:
t2 = TCyFramedTransport(s)
t2.read(4)
assert "No frame" in str(exc.value)
def test_buffered_read():
s = TMemoryBuffer()
t = TCyBufferedTransport(s)
t.write(b"ping")
t.flush()
assert t.read(4) == b"ping"
def test_transport_handle():
from thriftpy2._compat import CYTHON
if not CYTHON:
return
from thriftpy2.transport import TSocket
from thriftpy2.transport.memory import TCyMemoryBuffer
s = TSocket()
s.set_handle('the sock')
assert TCyBufferedTransport(s).sock == 'the sock'
assert TCyFramedTransport(s).sock == 'the sock'
assert TCyMemoryBuffer().sock is None
assert TCyBufferedTransport(TCyFramedTransport(s)).sock == 'the sock'
assert TCyBufferedTransport(TCyMemoryBuffer()).sock is None