Skip to content

Commit ef1d248

Browse files
committed
IGNITE-18006 Add reproducers
1 parent a0309cd commit ef1d248

File tree

2 files changed

+212
-1
lines changed

2 files changed

+212
-1
lines changed

pyignite/connection/aio_connection.py

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
# limitations under the License.
3030

3131
import asyncio
32-
from asyncio import CancelledError
3332
from collections import OrderedDict
3433
from typing import Union
3534

tests/custom/test_timeouts.py

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import asyncio
16+
import sys
17+
import time
18+
from asyncio.exceptions import TimeoutError, InvalidStateError
19+
20+
import pytest
21+
22+
from pyignite import AioClient
23+
from tests.util import start_ignite_gen
24+
25+
26+
@pytest.fixture(autouse=True)
27+
def server1():
28+
yield from start_ignite_gen(idx=1)
29+
30+
31+
@pytest.fixture(autouse=True)
32+
async def proxy(event_loop, server1, cache):
33+
proxy = ProxyServer(("127.0.0.1", 10802), ("127.0.0.1", 10801))
34+
try:
35+
await proxy.start()
36+
yield proxy
37+
finally:
38+
await proxy.close()
39+
40+
41+
@pytest.fixture(autouse=True)
42+
async def cache(server1):
43+
c = AioClient(partition_aware=False)
44+
async with c.connect("127.0.0.1", 10801):
45+
try:
46+
cache = await c.get_or_create_cache("test")
47+
yield cache
48+
finally:
49+
await cache.destroy()
50+
51+
52+
@pytest.fixture(autouse=True)
53+
def invalid_states_errors():
54+
errors = []
55+
56+
def trace(_, event, arg):
57+
if event == 'exception':
58+
etype, _, _ = arg
59+
if etype is InvalidStateError:
60+
errors.append(arg)
61+
62+
return trace
63+
64+
try:
65+
sys.settrace(trace)
66+
yield errors
67+
finally:
68+
sys.settrace(None)
69+
70+
71+
@pytest.mark.asyncio
72+
async def test_cancellation_on_slow_response(event_loop, proxy, invalid_states_errors):
73+
c = AioClient(partition_aware=False)
74+
async with c.connect("127.0.0.1", 10802):
75+
cache = await c.get_cache("test")
76+
proxy.slow_response = True
77+
with pytest.raises(TimeoutError):
78+
await asyncio.wait_for(cache.put(1, 2), 0.1)
79+
80+
assert len(invalid_states_errors) == 0
81+
82+
83+
@pytest.mark.asyncio
84+
async def test_cancellation_on_disconnect(event_loop, proxy, invalid_states_errors):
85+
c = AioClient(partition_aware=False)
86+
async with c.connect("127.0.0.1", 10802):
87+
cache = await c.get_cache("test")
88+
proxy.discard_response = True
89+
90+
asyncio.ensure_future(asyncio.wait_for(cache.put(1, 2), 0.1))
91+
await asyncio.sleep(0.2)
92+
await proxy.disconnect_peers()
93+
94+
assert len(invalid_states_errors) == 0
95+
96+
97+
class ProxyServer:
98+
"""
99+
Proxy for simulating slow or discarding response ignite server
100+
Set `slow_response`, `discard_response` to `True` to simulate these conditions.
101+
Call `disconnect_peers()` in order to simulate lost connection to Ignite server.
102+
"""
103+
def __init__(self, local_host, remote_host):
104+
self.local_host = local_host
105+
self.remote_host = remote_host
106+
self.peers = {}
107+
self.slow_response = False
108+
self.discard_response = False
109+
self.server = None
110+
111+
async def start(self):
112+
loop = asyncio.get_event_loop()
113+
host, port = self.local_host
114+
self.server = await loop.create_server(
115+
lambda: ProxyTcpProtocol(self), host=host, port=port)
116+
117+
async def disconnect_peers(self):
118+
peers = dict(self.peers)
119+
for k, v in peers.items():
120+
if not v:
121+
return
122+
123+
local, remote = v
124+
if local:
125+
await remote.close()
126+
if remote:
127+
await local.close()
128+
129+
async def close(self):
130+
try:
131+
await self.disconnect_peers()
132+
except TimeoutError:
133+
pass
134+
135+
self.server.close()
136+
137+
138+
class ProxyTcpProtocol(asyncio.Protocol):
139+
def __init__(self, proxy):
140+
self.addr, self.port = proxy.remote_host
141+
self.proxy = proxy
142+
self.transport, self.remote_protocol, self.conn_info, self.close_fut = None, None, None, None
143+
super().__init__()
144+
145+
def connection_made(self, transport):
146+
self.transport = transport
147+
self.conn_info = transport.get_extra_info("peername")
148+
149+
def data_received(self, data):
150+
if self.remote_protocol and self.remote_protocol.transport:
151+
self.remote_protocol.transport.write(data)
152+
return
153+
154+
loop = asyncio.get_event_loop()
155+
self.remote_protocol = RemoteTcpProtocol(self.proxy, self, data)
156+
coro = loop.create_connection(lambda: self.remote_protocol, host=self.addr, port=self.port)
157+
asyncio.ensure_future(coro)
158+
159+
self.proxy.peers[self.conn_info] = (self, self.remote_protocol)
160+
161+
async def close(self):
162+
if not self.transport:
163+
return
164+
165+
self.close_fut = asyncio.get_event_loop().create_future()
166+
self.transport.close()
167+
168+
try:
169+
await asyncio.wait_for(self.close_fut, 0.1)
170+
except TimeoutError:
171+
pass
172+
173+
def connection_lost(self, exc):
174+
if self.close_fut:
175+
self.close_fut.done()
176+
177+
178+
class RemoteTcpProtocol(asyncio.Protocol):
179+
def __init__(self, proxy, proxy_protocol, data):
180+
self.proxy = proxy
181+
self.proxy_protocol = proxy_protocol
182+
self.data = data
183+
self.transport, self.close_fut = None, None
184+
super().__init__()
185+
186+
def connection_made(self, transport):
187+
self.transport = transport
188+
self.transport.write(self.data)
189+
190+
async def close(self):
191+
if not self.transport:
192+
return
193+
194+
self.close_fut = asyncio.get_event_loop().create_future()
195+
self.transport.close()
196+
try:
197+
await asyncio.wait_for(self.close_fut, 0.1)
198+
except TimeoutError:
199+
pass
200+
201+
def connection_lost(self, exc):
202+
if self.close_fut:
203+
self.close_fut.done()
204+
205+
def data_received(self, data):
206+
if self.proxy.discard_response:
207+
return
208+
209+
if self.proxy.slow_response:
210+
time.sleep(0.5)
211+
212+
self.proxy_protocol.transport.write(data)

0 commit comments

Comments
 (0)