Skip to content

Commit 1af6e3f

Browse files
committed
tests: Add tests for the request-response protocol
1 parent 1da8911 commit 1af6e3f

File tree

1 file changed

+221
-2
lines changed

1 file changed

+221
-2
lines changed

tests/protocol/request_response.rs

+221-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@
2121
use litep2p::{
2222
config::Litep2pConfigBuilder,
2323
crypto::ed25519::Keypair,
24-
protocol::request_response::types::{Config as RequestResponseConfig, RequestResponseEvent},
24+
protocol::request_response::types::{
25+
Config as RequestResponseConfig, RequestResponseError, RequestResponseEvent,
26+
},
2527
transport::tcp::config::TransportConfig as TcpTransportConfig,
2628
types::protocol::ProtocolName,
2729
Litep2p, Litep2pEvent,
2830
};
31+
use tokio::time::sleep;
32+
33+
use std::{collections::HashMap, time::Duration};
2934

3035
async fn connect_peers(litep2p1: &mut Litep2p, litep2p2: &mut Litep2p) {
3136
let address = litep2p2.listen_addresses().next().unwrap().clone();
@@ -55,7 +60,7 @@ async fn connect_peers(litep2p1: &mut Litep2p, litep2p2: &mut Litep2p) {
5560
}
5661
}
5762

58-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
63+
sleep(Duration::from_millis(100)).await;
5964
}
6065

6166
#[tokio::test]
@@ -126,3 +131,217 @@ async fn send_request_receive_response() {
126131
}
127132
);
128133
}
134+
135+
#[tokio::test]
136+
async fn reject_request() {
137+
let _ = tracing_subscriber::fmt()
138+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
139+
.try_init();
140+
141+
let (req_resp_config1, mut handle1) =
142+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
143+
let config1 = Litep2pConfigBuilder::new()
144+
.with_keypair(Keypair::generate())
145+
.with_tcp(TcpTransportConfig {
146+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
147+
})
148+
.with_request_response_protocol(req_resp_config1)
149+
.build();
150+
151+
let (req_resp_config2, mut handle2) =
152+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
153+
let config2 = Litep2pConfigBuilder::new()
154+
.with_keypair(Keypair::generate())
155+
.with_tcp(TcpTransportConfig {
156+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
157+
})
158+
.with_request_response_protocol(req_resp_config2)
159+
.build();
160+
161+
let mut litep2p1 = Litep2p::new(config1).await.unwrap();
162+
let mut litep2p2 = Litep2p::new(config2).await.unwrap();
163+
164+
let peer1 = *litep2p1.local_peer_id();
165+
let peer2 = *litep2p2.local_peer_id();
166+
167+
// wait until peers have connected
168+
connect_peers(&mut litep2p1, &mut litep2p2).await;
169+
tokio::spawn(async move {
170+
loop {
171+
tokio::select! {
172+
_ = litep2p1.next_event() => {},
173+
_ = litep2p2.next_event() => {},
174+
}
175+
}
176+
});
177+
178+
// send request to remote peer
179+
let request_id = handle1.send_request(peer2, vec![1, 3, 3, 7]).await.unwrap();
180+
if let RequestResponseEvent::RequestReceived {
181+
peer,
182+
request_id,
183+
request,
184+
} = handle2.next_event().await.unwrap()
185+
{
186+
assert_eq!(peer, peer1);
187+
assert_eq!(request, vec![1, 3, 3, 7]);
188+
handle2.reject_request(request_id).await;
189+
} else {
190+
panic!("invalid event received");
191+
};
192+
193+
assert_eq!(
194+
handle1.next_event().await.unwrap(),
195+
RequestResponseEvent::RequestFailed {
196+
peer: peer2,
197+
request_id,
198+
error: RequestResponseError::Rejected
199+
}
200+
);
201+
}
202+
203+
#[tokio::test]
204+
async fn multiple_simultaneous_requests() {
205+
let _ = tracing_subscriber::fmt()
206+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
207+
.try_init();
208+
209+
let (req_resp_config1, mut handle1) =
210+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
211+
let config1 = Litep2pConfigBuilder::new()
212+
.with_keypair(Keypair::generate())
213+
.with_tcp(TcpTransportConfig {
214+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
215+
})
216+
.with_request_response_protocol(req_resp_config1)
217+
.build();
218+
219+
let (req_resp_config2, mut handle2) =
220+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
221+
let config2 = Litep2pConfigBuilder::new()
222+
.with_keypair(Keypair::generate())
223+
.with_tcp(TcpTransportConfig {
224+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
225+
})
226+
.with_request_response_protocol(req_resp_config2)
227+
.build();
228+
229+
let mut litep2p1 = Litep2p::new(config1).await.unwrap();
230+
let mut litep2p2 = Litep2p::new(config2).await.unwrap();
231+
232+
let peer1 = *litep2p1.local_peer_id();
233+
let peer2 = *litep2p2.local_peer_id();
234+
235+
// wait until peers have connected
236+
connect_peers(&mut litep2p1, &mut litep2p2).await;
237+
tokio::spawn(async move {
238+
loop {
239+
tokio::select! {
240+
_ = litep2p1.next_event() => {},
241+
_ = litep2p2.next_event() => {},
242+
}
243+
}
244+
});
245+
246+
// send multiple requests to remote peer
247+
let request_id1 = handle1.send_request(peer2, vec![1, 3, 3, 6]).await.unwrap();
248+
let request_id2 = handle1.send_request(peer2, vec![1, 3, 3, 7]).await.unwrap();
249+
let request_id3 = handle1.send_request(peer2, vec![1, 3, 3, 8]).await.unwrap();
250+
let request_id4 = handle1.send_request(peer2, vec![1, 3, 3, 9]).await.unwrap();
251+
let expected: HashMap<usize, Vec<u8>> = HashMap::from_iter([
252+
(request_id1, vec![2, 3, 3, 6]),
253+
(request_id2, vec![2, 3, 3, 7]),
254+
(request_id3, vec![2, 3, 3, 8]),
255+
(request_id4, vec![2, 3, 3, 9]),
256+
]);
257+
258+
for i in 0..4 {
259+
if let RequestResponseEvent::RequestReceived {
260+
peer,
261+
request_id,
262+
request,
263+
} = handle2.next_event().await.unwrap()
264+
{
265+
assert_eq!(peer, peer1);
266+
assert_eq!(request, vec![1, 3, 3, 6 + i]);
267+
handle2
268+
.send_response(request_id, vec![2, 3, 3, 6 + i])
269+
.await
270+
.unwrap();
271+
} else {
272+
panic!("invalid event received");
273+
};
274+
}
275+
276+
for _ in 0..4 {
277+
if let RequestResponseEvent::ResponseReceived {
278+
peer,
279+
request_id,
280+
response,
281+
} = handle1.next_event().await.unwrap()
282+
{
283+
assert_eq!(peer, peer2);
284+
assert_eq!(response, expected.get(&request_id).unwrap().to_vec());
285+
} else {
286+
panic!("invalid event received");
287+
};
288+
}
289+
}
290+
291+
#[tokio::test]
292+
async fn request_timeout() {
293+
let _ = tracing_subscriber::fmt()
294+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
295+
.try_init();
296+
297+
let (req_resp_config1, mut handle1) =
298+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
299+
let config1 = Litep2pConfigBuilder::new()
300+
.with_keypair(Keypair::generate())
301+
.with_tcp(TcpTransportConfig {
302+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
303+
})
304+
.with_request_response_protocol(req_resp_config1)
305+
.build();
306+
307+
let (req_resp_config2, _handle2) =
308+
RequestResponseConfig::new(ProtocolName::from("/protocol/1"), 64);
309+
let config2 = Litep2pConfigBuilder::new()
310+
.with_keypair(Keypair::generate())
311+
.with_tcp(TcpTransportConfig {
312+
listen_address: "/ip6/::1/tcp/0".parse().unwrap(),
313+
})
314+
.with_request_response_protocol(req_resp_config2)
315+
.build();
316+
317+
let mut litep2p1 = Litep2p::new(config1).await.unwrap();
318+
let mut litep2p2 = Litep2p::new(config2).await.unwrap();
319+
320+
let _peer1 = *litep2p1.local_peer_id();
321+
let peer2 = *litep2p2.local_peer_id();
322+
323+
// wait until peers have connected
324+
connect_peers(&mut litep2p1, &mut litep2p2).await;
325+
tokio::spawn(async move {
326+
loop {
327+
tokio::select! {
328+
_ = litep2p1.next_event() => {},
329+
_ = litep2p2.next_event() => {},
330+
}
331+
}
332+
});
333+
334+
// send request to remote peer and wait until the requet timeout occurs
335+
let request_id = handle1.send_request(peer2, vec![1, 3, 3, 7]).await.unwrap();
336+
337+
sleep(Duration::from_secs(7)).await;
338+
339+
assert_eq!(
340+
handle1.next_event().await.unwrap(),
341+
RequestResponseEvent::RequestFailed {
342+
peer: peer2,
343+
request_id,
344+
error: RequestResponseError::Timeout,
345+
}
346+
);
347+
}

0 commit comments

Comments
 (0)