diff --git a/multinode_integration_tests/tests/data_routing_test.rs b/multinode_integration_tests/tests/data_routing_test.rs index 0b3fa9d21..0c4cef279 100644 --- a/multinode_integration_tests/tests/data_routing_test.rs +++ b/multinode_integration_tests/tests/data_routing_test.rs @@ -316,7 +316,7 @@ fn multiple_stream_zero_hop_test() { let mut another_client = zero_hop_node.make_client(8080, STANDARD_CLIENT_TIMEOUT_MILLIS); one_client.send_chunk(b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n"); - another_client.send_chunk(b"GET /online/ HTTP/1.1\r\nHost: whatever.neverssl.com\r\n\r\n"); + another_client.send_chunk(b"GET /online/ HTTP/1.1\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nAccept-Language: cs-CZ,cs;q=0.9,en;q=0.8,sk;q=0.7\r\nCache-Control: max-age=0\r\nConnection: keep-alive\r\nHost: whatever.neverssl.com\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36\r\n\r\n"); let one_response = one_client.wait_for_chunk(); let another_response = another_client.wait_for_chunk(); diff --git a/node/src/proxy_client/stream_establisher.rs b/node/src/proxy_client/stream_establisher.rs index f25d7a9fe..4d6fdad24 100644 --- a/node/src/proxy_client/stream_establisher.rs +++ b/node/src/proxy_client/stream_establisher.rs @@ -1,5 +1,6 @@ // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. +use crate::proxy_client::stream_handler_pool::StreamSenders; use crate::proxy_client::stream_reader::StreamReader; use crate::proxy_client::stream_writer::StreamWriter; use crate::sub_lib::channel_wrappers::FuturesChannelFactory; @@ -14,7 +15,7 @@ use crate::sub_lib::stream_connector::StreamConnectorReal; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; use actix::Recipient; -use crossbeam_channel::Sender; +use crossbeam_channel::{unbounded, Receiver, Sender}; use masq_lib::logger::Logger; use std::io; use std::net::IpAddr; @@ -22,7 +23,7 @@ use std::net::SocketAddr; pub struct StreamEstablisher { pub cryptde: &'static dyn CryptDE, - pub stream_adder_tx: Sender<(StreamKey, Box>)>, + pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>, pub stream_killer_tx: Sender<(StreamKey, u64)>, pub stream_connector: Box, pub proxy_client_sub: Recipient, @@ -57,11 +58,13 @@ impl StreamEstablisher { payload.target_port, &self.logger, )?; + let (shutdown_signal_tx, shutdown_signal_rx) = unbounded(); self.spawn_stream_reader( &payload.clone(), connection_info.reader, connection_info.peer_addr, + shutdown_signal_rx, ); let (tx_to_write, rx_to_write) = self.channel_factory.make(connection_info.peer_addr); @@ -73,8 +76,13 @@ impl StreamEstablisher { ); tokio::spawn(stream_writer); + let stream_senders = StreamSenders { + writer_data: tx_to_write.clone(), + reader_shutdown_tx: shutdown_signal_tx, + }; + self.stream_adder_tx - .send((payload.stream_key, tx_to_write.clone())) + .send((payload.stream_key, stream_senders)) .expect("StreamHandlerPool died"); Ok(tx_to_write) } @@ -84,12 +92,14 @@ impl StreamEstablisher { payload: &ClientRequestPayload_0v1, read_stream: Box, peer_addr: SocketAddr, + shutdown_signal: Receiver<()>, ) { let stream_reader = StreamReader::new( payload.stream_key, self.proxy_client_sub.clone(), read_stream, self.stream_killer_tx.clone(), + shutdown_signal, peer_addr, ); debug!(self.logger, "Spawning StreamReader for {}", peer_addr); @@ -103,7 +113,7 @@ pub trait StreamEstablisherFactory: Send { pub struct StreamEstablisherFactoryReal { pub cryptde: &'static dyn CryptDE, - pub stream_adder_tx: Sender<(StreamKey, Box>)>, + pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>, pub stream_killer_tx: Sender<(StreamKey, u64)>, pub proxy_client_subs: ProxyClientSubs, pub logger: Logger, @@ -191,6 +201,7 @@ mod tests { }, read_stream, SocketAddr::from_str("1.2.3.4:5678").unwrap(), + unbounded().1, ); proxy_client_awaiter.await_message_count(1); diff --git a/node/src/proxy_client/stream_handler_pool.rs b/node/src/proxy_client/stream_handler_pool.rs index 0c7f1f011..5ecbe9794 100644 --- a/node/src/proxy_client/stream_handler_pool.rs +++ b/node/src/proxy_client/stream_handler_pool.rs @@ -14,7 +14,7 @@ use crate::sub_lib::sequence_buffer::SequencedPacket; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::wallet::Wallet; use actix::Recipient; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::future; use futures::future::Future; use masq_lib::logger::Logger; @@ -29,20 +29,28 @@ use tokio::prelude::future::{err, ok}; use trust_dns_resolver::error::ResolveError; use trust_dns_resolver::lookup_ip::LookupIp; +// TODO: This should be renamed to differentiate it from the other StreamHandlerPool, +// which, unlike this, is an actor. pub trait StreamHandlerPool { fn process_package(&self, payload: ClientRequestPayload_0v1, paying_wallet_opt: Option); } +#[derive(Debug)] +pub struct StreamSenders { + pub writer_data: Box>, + pub reader_shutdown_tx: Sender<()>, +} + pub struct StreamHandlerPoolReal { inner: Arc>, - stream_adder_rx: Receiver<(StreamKey, Box>)>, + stream_adder_rx: Receiver<(StreamKey, StreamSenders)>, stream_killer_rx: Receiver<(StreamKey, u64)>, } struct StreamHandlerPoolRealInner { accountant_sub: Recipient, proxy_client_subs: ProxyClientSubs, - stream_writer_channels: HashMap>>, + stream_writer_channels: HashMap, resolver: Box, logger: Logger, establisher_factory: Box, @@ -148,6 +156,30 @@ impl StreamHandlerPoolReal { }; } + fn send_shutdown_signal_to_stream_reader( + reader_shutdown_tx: Sender<()>, + stream_key: &StreamKey, + logger: &Logger, + ) { + match reader_shutdown_tx.try_send(()) { + Ok(()) => { + debug!( + logger, + "A shutdown signal was sent to the StreamReader for stream key {:?}.", + stream_key + ); + } + Err(_e) => { + debug!( + logger, + "Unable to send a shutdown signal to the StreamReader for \ + stream key {:?}. The channel is already gone.", + stream_key + ); + } + } + } + fn clean_up_bad_stream( inner_arc: Arc>, stream_key: &StreamKey, @@ -159,12 +191,17 @@ impl StreamHandlerPoolReal { inner.logger, "Couldn't process request from CORES package: {}", error ); - if let Some(sender_wrapper) = inner.stream_writer_channels.remove(stream_key) { + if let Some(stream_senders) = inner.stream_writer_channels.remove(stream_key) { debug!( inner.logger, "Removing stream writer for {}", - sender_wrapper.peer_addr() + stream_senders.writer_data.peer_addr() ); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + stream_key, + &inner.logger, + ) } Self::send_terminating_package( stream_key, @@ -185,20 +222,6 @@ impl StreamHandlerPoolReal { Self::perform_write(payload.sequenced_packet, sender_wrapper.clone()).and_then(move |_| { let mut inner = inner_arc.lock().expect("Stream handler pool is poisoned"); - if last_data { - match inner.stream_writer_channels.remove(&stream_key) { - Some(channel) => debug!( - inner.logger, - "Removing StreamWriter {:?} to {}", - stream_key, - channel.peer_addr() - ), - None => debug!( - inner.logger, - "Trying to remove StreamWriter {:?}, but it's already gone", stream_key - ), - } - } if payload_size > 0 { match paying_wallet_opt { Some(wallet) => inner @@ -218,6 +241,30 @@ impl StreamHandlerPoolReal { ), } } + if last_data { + match inner.stream_writer_channels.remove(&stream_key) { + Some(stream_senders) => { + debug!( + inner.logger, + "Removing StreamWriter and Shutting down StreamReader for {:?} to {}", + stream_key, + stream_senders.writer_data.peer_addr() + ); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + &stream_key, + &inner.logger, + ) + } + None => { + debug!( + inner.logger, + "Trying to remove StreamWriter and StreamReader for stream key {:?}, but it's already gone", stream_key + ) + } + } + } + Ok(()) }) } @@ -385,7 +432,8 @@ impl StreamHandlerPoolReal { ) -> Option>> { let inner = inner_arc.lock().expect("Stream handler pool is poisoned"); let sender_wrapper_opt = inner.stream_writer_channels.get(stream_key); - sender_wrapper_opt.map(|sender_wrapper_box_ref| sender_wrapper_box_ref.as_ref().clone()) + sender_wrapper_opt + .map(|sender_wrapper_box_ref| sender_wrapper_box_ref.writer_data.as_ref().clone()) } fn make_logger_copy(inner_arc: &Arc>) -> Logger { @@ -430,7 +478,7 @@ impl StreamHandlerPoolReal { let mut inner = self.inner.lock().expect("Stream handler pool is poisoned"); while let Ok((stream_key, sequence_number)) = self.stream_killer_rx.try_recv() { match inner.stream_writer_channels.remove(&stream_key) { - Some(writer_channel) => { + Some(stream_senders) => { inner .proxy_client_subs .inbound_server_data @@ -438,14 +486,20 @@ impl StreamHandlerPoolReal { stream_key, last_data: true, sequence_number, - source: writer_channel.peer_addr(), + source: stream_senders.writer_data.peer_addr(), data: vec![], }) .expect("ProxyClient is dead"); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + &stream_key, + &inner.logger, + ); debug!( inner.logger, - "Killed StreamWriter to {} and sent server-drop report", - writer_channel.peer_addr() + "Killed StreamWriter and StreamReader for the stream key {:?} to {} and sent server-drop report", + stream_key, + stream_senders.writer_data.peer_addr() ) } None => debug!( @@ -461,16 +515,16 @@ impl StreamHandlerPoolReal { loop { match self.stream_adder_rx.try_recv() { Err(_) => break, - Ok((stream_key, stream_writer_channel)) => { + Ok((stream_key, stream_senders)) => { debug!( inner.logger, "Persisting StreamWriter to {} under key {:?}", - stream_writer_channel.peer_addr(), + stream_senders.writer_data.peer_addr(), stream_key ); inner .stream_writer_channels - .insert(stream_key, stream_writer_channel) + .insert(stream_key, stream_senders) } }; } @@ -519,7 +573,7 @@ mod tests { use crate::proxy_client::local_test_utils::make_send_error; use crate::proxy_client::local_test_utils::ResolverWrapperMock; use crate::proxy_client::stream_establisher::StreamEstablisher; - use crate::sub_lib::channel_wrappers::FuturesChannelFactoryReal; + use crate::sub_lib::channel_wrappers::{FuturesChannelFactoryReal, SenderWrapperReal}; use crate::sub_lib::cryptde::PublicKey; use crate::sub_lib::hopper::ExpiredCoresPackage; use crate::sub_lib::hopper::MessageType; @@ -658,8 +712,9 @@ mod tests { originator_public_key: PublicKey::new(&b"men's souls"[..]), }; let write_parameters = Arc::new(Mutex::new(vec![])); + let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); let tx_to_write = Box::new( - SenderWrapperMock::new(SocketAddr::from_str("1.2.3.4:5678").unwrap()) + SenderWrapperMock::new(peer_addr) .unbounded_send_result(Ok(())) .unbounded_send_params(&write_parameters), ); @@ -682,12 +737,13 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(stream_key, tx_to_write); + subject.inner.lock().unwrap().stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: tx_to_write, + reader_shutdown_tx: unbounded().0, + }, + ); run_process_package_in_actix(subject, package); }); @@ -703,9 +759,11 @@ mod tests { #[test] fn write_failure_for_nonexistent_stream_generates_termination_message() { init_test_logging(); + let test_name = "write_failure_for_nonexistent_stream_generates_termination_message"; let cryptde = main_cryptde(); let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); let originator_key = PublicKey::new(&b"men's souls"[..]); + let (reader_shutdown_tx, reader_shutdown_rx) = unbounded(); thread::spawn(move || { let client_request_payload = ClientRequestPayload_0v1 { stream_key: StreamKey::make_meaningless_stream_key(), @@ -727,13 +785,11 @@ mod tests { 0, ); let peer_actors = peer_actors_builder().proxy_client(proxy_client).build(); - let resolver = ResolverWrapperMock::new() - .lookup_ip_success(vec![IpAddr::from_str("2.3.4.5").unwrap()]); - - let tx_to_write = SenderWrapperMock::new(SocketAddr::from_str("2.3.4.5:80").unwrap()) - .unbounded_send_result(make_send_error( - client_request_payload.sequenced_packet.clone(), - )); + let peer_addr = SocketAddr::from_str("2.3.4.5:80").unwrap(); + let resolver = ResolverWrapperMock::new().lookup_ip_success(vec![peer_addr.ip()]); + let tx_to_write = SenderWrapperMock::new(peer_addr).unbounded_send_result( + make_send_error(client_request_payload.sequenced_packet.clone()), + ); let subject = StreamHandlerPoolReal::new( Box::new(resolver), @@ -743,17 +799,24 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(client_request_payload.stream_key, Box::new(tx_to_write)); + { + let mut inner = subject.inner.lock().unwrap(); + inner.stream_writer_channels.insert( + client_request_payload.stream_key, + StreamSenders { + writer_data: Box::new(tx_to_write), + reader_shutdown_tx, + }, + ); + inner.logger = Logger::new(test_name); + } run_process_package_in_actix(subject, package); }); proxy_client_awaiter.await_message_count(1); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); + let received = reader_shutdown_rx.try_recv(); + assert_eq!(received, Ok(())); assert_eq!( proxy_client_recording.get_record::(0), &InboundServerData { @@ -764,13 +827,15 @@ mod tests { data: vec![], } ); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: A shutdown signal was sent to the StreamReader \ + for stream key AAAAAAAAAAAAAAAAAAAAAAAAAAA." + )); } #[test] fn when_hostname_is_ip_establish_stream_without_dns_lookup() { let cryptde = main_cryptde(); - let lookup_ip_parameters = Arc::new(Mutex::new(vec![])); - let expected_lookup_ip_parameters = lookup_ip_parameters.clone(); let write_parameters = Arc::new(Mutex::new(vec![])); let expected_write_parameters = write_parameters.clone(); let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); @@ -795,12 +860,6 @@ mod tests { client_request_payload.into(), 0, ); - let resolver = ResolverWrapperMock::new() - .lookup_ip_parameters(&lookup_ip_parameters) - .lookup_ip_success(vec![ - IpAddr::from_str("2.3.4.5").unwrap(), - IpAddr::from_str("3.4.5.6").unwrap(), - ]); let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); let first_read_result = b"HTTP/1.1 200 OK\r\n\r\n"; let reader = ReadHalfWrapperMock { @@ -818,7 +877,7 @@ mod tests { shutdown_results: Arc::new(Mutex::new(vec![])), }; let mut subject = StreamHandlerPoolReal::new( - Box::new(resolver), + Box::new(ResolverWrapperMock::new()), cryptde, peer_actors.accountant.report_exit_service_provided.clone(), peer_actors.proxy_client_opt.unwrap().clone(), @@ -854,10 +913,6 @@ mod tests { }); proxy_client_awaiter.await_message_count(1); - assert_eq!( - expected_lookup_ip_parameters.lock().unwrap().deref(), - &(vec![] as Vec) - ); assert_eq!( expected_write_parameters.lock().unwrap().remove(0), b"These are the times".to_vec() @@ -875,6 +930,126 @@ mod tests { ); } + #[test] + fn stream_handler_pool_sends_shutdown_signal_when_last_data_is_true() { + init_test_logging(); + let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true"; + let (shutdown_tx, shutdown_rx) = unbounded(); + thread::spawn(move || { + let stream_key = StreamKey::make_meaningful_stream_key("I should die"); + let client_request_payload = ClientRequestPayload_0v1 { + stream_key, + sequenced_packet: SequencedPacket { + data: b"I'm gonna kill you stream key".to_vec(), + sequence_number: 0, + last_data: true, + }, + target_hostname: Some(String::from("3.4.5.6:80")), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: PublicKey::new(&b"brutal death"[..]), + }; + let package = ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("consuming")), + make_meaningless_route(), + client_request_payload.into(), + 0, + ); + let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); + let peer_actors = peer_actors_builder().build(); + let subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided.clone(), + peer_actors.proxy_client_opt.unwrap().clone(), + 100, + 200, + ); + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: shutdown_tx, + }, + ); + } + + run_process_package_in_actix(subject, package); + }); + let received = shutdown_rx.recv(); + assert_eq!(received, Ok(())); + TestLogHandler::new().await_log_containing( + &format!( + "DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \ + for oUHoHuDKHjeWq+BJzBIqHpPFBQw to 3.4.5.6:80" + ), + 500, + ); + } + + #[test] + fn stream_handler_pool_logs_when_shutdown_channel_is_broken() { + init_test_logging(); + let test_name = "stream_handler_pool_logs_when_shutdown_channel_is_broken"; + let broken_shutdown_channel_tx = unbounded().0; + thread::spawn(move || { + let stream_key = StreamKey::make_meaningful_stream_key("I should die"); + let client_request_payload = ClientRequestPayload_0v1 { + stream_key, + sequenced_packet: SequencedPacket { + data: b"I'm gonna kill you stream key".to_vec(), + sequence_number: 0, + last_data: true, + }, + target_hostname: Some(String::from("3.4.5.6:80")), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: PublicKey::new(&b"brutal death"[..]), + }; + let package = ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("consuming")), + make_meaningless_route(), + client_request_payload.into(), + 0, + ); + let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); + let peer_actors = peer_actors_builder().build(); + let subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided.clone(), + peer_actors.proxy_client_opt.unwrap().clone(), + 100, + 200, + ); + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: broken_shutdown_channel_tx, + }, + ); + } + + run_process_package_in_actix(subject, package); + }); + TestLogHandler::new().await_log_containing( + &format!( + "DEBUG: {test_name}: Unable to send a shutdown signal to the StreamReader \ + for stream key oUHoHuDKHjeWq+BJzBIqHpPFBQw. The channel is already gone." + ), + 500, + ); + } + #[test] fn ip_is_parsed_even_without_port() { let cryptde = main_cryptde(); @@ -1610,12 +1785,13 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(stream_key, Box::new(sender_wrapper)); + subject.inner.lock().unwrap().stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(sender_wrapper), + reader_shutdown_tx: unbounded().0, + }, + ); run_process_package_in_actix(subject, package); }); @@ -1709,11 +1885,16 @@ mod tests { subject.stream_killer_rx = stream_killer_rx; let stream_key = StreamKey::make_meaningless_stream_key(); let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); + let (shutdown_tx, shutdown_rx) = unbounded(); { let mut inner = subject.inner.lock().unwrap(); - inner - .stream_writer_channels - .insert(stream_key, Box::new(SenderWrapperMock::new(peer_addr))); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: shutdown_tx, + }, + ); } stream_killer_tx.send((stream_key, 47)).unwrap(); @@ -1723,6 +1904,8 @@ mod tests { system.run(); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); let report = proxy_client_recording.get_record::(0); + let shutdown_signal_received = shutdown_rx.recv(); + assert_eq!(shutdown_signal_received, Ok(())); assert_eq!( report, &InboundServerData { @@ -1735,6 +1918,49 @@ mod tests { ); } + #[test] + fn clean_up_dead_streams_logs_when_the_stream_reader_is_down() { + init_test_logging(); + let test_name = "clean_up_dead_streams_logs_when_the_shutdown_channel_is_down"; + let system = System::new(test_name); + let peer_actors = peer_actors_builder().build(); + let mut subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided, + peer_actors.proxy_client_opt.unwrap(), + 0, + 0, + ); + let (stream_killer_tx, stream_killer_rx) = unbounded(); + subject.stream_killer_rx = stream_killer_rx; + let stream_key = StreamKey::make_meaningful_stream_key("I'll be gone well before then."); + let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); + let broken_shutdown_channel_tx = unbounded().0; + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: broken_shutdown_channel_tx, + }, + ); + } + stream_killer_tx.send((stream_key, 47)).unwrap(); + + subject.clean_up_dead_streams(); + + System::current().stop_with_code(0); + system.run(); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Unable to send a shutdown signal \ + to the StreamReader for stream key cv9IZ5fizc4kZmR+0d+OQGXr3bw. \ + The channel is already gone." + )); + } + #[test] fn clean_up_dead_streams_does_not_send_server_drop_report_if_dead_stream_is_gone_already() { let system = System::new("test"); @@ -1760,4 +1986,77 @@ mod tests { let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); assert_eq!(proxy_client_recording.len(), 0); } + + #[test] + fn add_new_streams_works() { + init_test_logging(); + let test_name = "add_new_streams_works"; + let (stream_adder_tx, stream_adder_rx) = unbounded(); + let peer_actors = peer_actors_builder().build(); + let mut subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided, + peer_actors.proxy_client_opt.unwrap(), + 0, + 0, + ); + subject.stream_adder_rx = stream_adder_rx; + { + subject.inner.lock().unwrap().logger = Logger::new(test_name); + } + let first_stream_key = StreamKey::make_meaningful_stream_key("first_stream_key"); + let (first_writer_data_tx, _first_writer_data_rx) = futures::sync::mpsc::unbounded(); + let (first_shutdown_tx, _first_shutdown_rx) = unbounded(); + let first_stream_senders = StreamSenders { + writer_data: Box::new(SenderWrapperReal::new( + SocketAddr::from_str("1.2.3.4:5678").unwrap(), + first_writer_data_tx, + )), + reader_shutdown_tx: first_shutdown_tx, + }; + let (second_writer_data_tx, _second_writer_data_rx) = futures::sync::mpsc::unbounded(); + let (second_shutdown_tx, _second_shutdown_rx) = unbounded(); + let second_stream_key = StreamKey::make_meaningful_stream_key("second_stream_key"); + let second_stream_senders = StreamSenders { + writer_data: Box::new(SenderWrapperReal::new( + SocketAddr::from_str("2.3.4.5:6789").unwrap(), + second_writer_data_tx, + )), + reader_shutdown_tx: second_shutdown_tx, + }; + stream_adder_tx + .send((first_stream_key.clone(), first_stream_senders)) + .unwrap(); + stream_adder_tx + .send((second_stream_key.clone(), second_stream_senders)) + .unwrap(); + + subject.add_new_streams(); + + let mut inner = subject.inner.lock().unwrap(); + let actual_first_stream_senders = inner + .stream_writer_channels + .remove(&first_stream_key) + .unwrap(); + let actual_second_stream_senders = inner + .stream_writer_channels + .remove(&second_stream_key) + .unwrap(); + assert_eq!( + actual_first_stream_senders.writer_data.peer_addr(), + SocketAddr::from_str("1.2.3.4:5678").unwrap() + ); + assert_eq!( + actual_second_stream_senders.writer_data.peer_addr(), + SocketAddr::from_str("2.3.4.5:6789").unwrap() + ); + let tlh = TestLogHandler::new(); + tlh.exists_log_containing(&format!( + "DEBUG: {test_name}: Persisting StreamWriter to 1.2.3.4:5678 under key gY2vJ+OwPuItsBcFhbilDI61LGo" + )); + tlh.exists_log_containing(&format!( + "DEBUG: {test_name}: Persisting StreamWriter to 2.3.4.5:6789 under key 1Kbv+3/MIN4/1hLQXLeNPgdDM58" + )); + } } diff --git a/node/src/proxy_client/stream_reader.rs b/node/src/proxy_client/stream_reader.rs index 3f3d82477..992b58dbf 100644 --- a/node/src/proxy_client/stream_reader.rs +++ b/node/src/proxy_client/stream_reader.rs @@ -3,10 +3,9 @@ use crate::sub_lib::proxy_client::InboundServerData; use crate::sub_lib::sequencer::Sequencer; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; -use crate::sub_lib::utils; use crate::sub_lib::utils::indicates_dead_stream; use actix::Recipient; -use crossbeam_channel::Sender; +use crossbeam_channel::{Receiver, Sender}; use masq_lib::logger::Logger; use std::net::SocketAddr; use tokio::prelude::Async; @@ -17,6 +16,7 @@ pub struct StreamReader { proxy_client_sub: Recipient, stream: Box, stream_killer: Sender<(StreamKey, u64)>, + shutdown_signal: Receiver<()>, peer_addr: SocketAddr, logger: Logger, sequencer: Sequencer, @@ -29,6 +29,13 @@ impl Future for StreamReader { fn poll(&mut self) -> Result::Item>, ::Error> { let mut buf: [u8; 16384] = [0; 16384]; loop { + if self.shutdown_signal.try_recv().is_ok() { + info!( + self.logger, + "Shutting down for stream: {:?}", self.stream_key + ); + return Ok(Async::Ready(())); + } match self.stream.poll_read(&mut buf) { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(0)) => { @@ -44,10 +51,9 @@ impl Future for StreamReader { if self.logger.trace_enabled() { trace!( self.logger, - "Read {}-byte chunk from {}: {}", + "Read {}-byte chunk from {}", len, - self.peer_addr, - utils::to_string(&Vec::from(&buf[0..len])) + self.peer_addr ); } let stream_key = self.stream_key; @@ -82,15 +88,19 @@ impl StreamReader { proxy_client_sub: Recipient, stream: Box, stream_killer: Sender<(StreamKey, u64)>, + shutdown_signal: Receiver<()>, peer_addr: SocketAddr, ) -> StreamReader { + let logger = Logger::new(&format!("StreamReader for {:?}/{}", stream_key, peer_addr)[..]); + debug!(logger, "Initialised StreamReader"); StreamReader { stream_key, proxy_client_sub, stream, stream_killer, + shutdown_signal, peer_addr, - logger: Logger::new(&format!("StreamReader for {:?}/{}", stream_key, peer_addr)[..]), + logger, sequencer: Sequencer::new(), } } @@ -120,7 +130,7 @@ mod tests { use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; use crate::test_utils::tokio_wrapper_mocks::ReadHalfWrapperMock; - use actix::System; + use actix::{Actor, System}; use crossbeam_channel::unbounded; use masq_lib::test_utils::logging::init_test_logging; use masq_lib::test_utils::logging::TestLogHandler; @@ -175,6 +185,7 @@ mod tests { proxy_client_sub, stream, stream_killer, + shutdown_signal: unbounded().1, peer_addr: SocketAddr::from_str("8.7.4.3:50").unwrap(), logger: Logger::new("test"), sequencer: Sequencer::new(), @@ -253,15 +264,12 @@ mod tests { }); let proxy_client_sub = rx.recv().unwrap(); let (stream_killer, stream_killer_params) = unbounded(); - let mut subject = StreamReader { - stream_key: StreamKey::make_meaningless_stream_key(), - proxy_client_sub, - stream: Box::new(stream), - stream_killer, - peer_addr: SocketAddr::from_str("5.7.9.0:95").unwrap(), - logger: Logger::new("test"), - sequencer: Sequencer::new(), - }; + let peer_addr = SocketAddr::from_str("5.7.9.0:95").unwrap(); + let mut subject = make_subject(); + subject.proxy_client_sub = proxy_client_sub; + subject.stream = Box::new(stream); + subject.stream_killer = stream_killer; + subject.peer_addr = peer_addr; let result = subject.poll(); @@ -274,7 +282,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 0, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b"HTTP/1.1 200".to_vec() } ); @@ -284,7 +292,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 1, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b" OK\r\n\r\nHTTP/1.1 40".to_vec() } ); @@ -294,7 +302,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 2, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b"4 File not found\r\n\r\nHTTP/1.1 503 Server error\r\n\r\n".to_vec() } ); @@ -312,49 +320,53 @@ mod tests { #[test] fn receiving_0_bytes_kills_stream() { init_test_logging(); + let test_name = "receiving_0_bytes_kills_stream"; let stream_key = StreamKey::make_meaningless_stream_key(); let (stream_killer, kill_stream_params) = unbounded(); let mut stream = ReadHalfWrapperMock::new(); stream.poll_read_results = vec![(vec![], Ok(Async::Ready(0)))]; - - let system = System::new("receiving_0_bytes_sends_empty_cores_response_and_kills_stream"); - let peer_actors = peer_actors_builder().build(); + let peer_addr = SocketAddr::from_str("5.3.4.3:654").unwrap(); + let system = System::new(test_name); let mut sequencer = Sequencer::new(); sequencer.next_sequence_number(); sequencer.next_sequence_number(); let mut subject = StreamReader { stream_key, - proxy_client_sub: peer_actors.proxy_client_opt.unwrap().inbound_server_data, + proxy_client_sub: make_recorder().0.start().recipient(), stream: Box::new(stream), stream_killer, - peer_addr: SocketAddr::from_str("5.3.4.3:654").unwrap(), - logger: Logger::new("test"), + shutdown_signal: unbounded().1, + peer_addr, + logger: Logger::new(test_name), sequencer, }; - System::current().stop_with_code(0); + System::current().stop(); system.run(); let result = subject.poll(); assert_eq!(result, Ok(Async::Ready(()))); assert_eq!(kill_stream_params.try_recv().unwrap(), (stream_key, 2)); - TestLogHandler::new() - .exists_log_containing("Stream from 5.3.4.3:654 was closed: (0-byte read)"); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Stream from {peer_addr} was closed: (0-byte read)" + )); } #[test] fn non_dead_stream_read_errors_log_but_do_not_shut_down() { init_test_logging(); + let test_name = "non_dead_stream_read_errors_log_but_do_not_shut_down"; let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); let stream_key = StreamKey::make_meaningless_stream_key(); let (stream_killer, _) = unbounded(); let mut stream = ReadHalfWrapperMock::new(); + let http_response = b"HTTP/1.1 200 OK\r\n\r\n"; stream.poll_read_results = vec![ (vec![], Err(Error::from(ErrorKind::Other))), ( - Vec::from(&b"HTTP/1.1 200 OK\r\n\r\n"[..]), - Ok(Async::Ready(b"HTTP/1.1 200 OK\r\n\r\n".len())), + http_response.to_vec(), + Ok(Async::Ready(http_response.len())), ), (vec![], Err(Error::from(ErrorKind::BrokenPipe))), ]; @@ -371,13 +383,15 @@ mod tests { }); let proxy_client_sub = rx.recv().unwrap(); + let peer_addr = SocketAddr::from_str("6.5.4.1:8325").unwrap(); let mut subject = StreamReader { stream_key, proxy_client_sub, stream: Box::new(stream), stream_killer, - peer_addr: SocketAddr::from_str("6.5.4.1:8325").unwrap(), - logger: Logger::new("test"), + shutdown_signal: unbounded().1, + peer_addr, + logger: Logger::new(test_name), sequencer: Sequencer::new(), }; @@ -386,18 +400,50 @@ mod tests { assert_eq!(result, Err(())); proxy_client_awaiter.await_message_count(1); TestLogHandler::new().exists_log_containing( - "WARN: test: Continuing after read error on stream from 6.5.4.1:8325: other error", + &format!("WARN: {test_name}: Continuing after read error on stream from {peer_addr}: other error"), ); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); assert_eq!( proxy_client_recording.get_record::(0), &InboundServerData { - stream_key: StreamKey::make_meaningless_stream_key(), + stream_key, last_data: false, sequence_number: 0, - source: SocketAddr::from_str("6.5.4.1:8325").unwrap(), - data: b"HTTP/1.1 200 OK\r\n\r\n".to_vec() + source: peer_addr, + data: http_response.to_vec() } ); } + + #[test] + fn stream_reader_shuts_down_when_it_receives_the_shutdown_signal() { + init_test_logging(); + let test_name = "stream_reader_shuts_down_when_it_receives_the_shutdown_signal"; + let (shutdown_tx, shutdown_rx) = unbounded(); + let mut subject = make_subject(); + subject.shutdown_signal = shutdown_rx; + subject.logger = Logger::new(test_name); + shutdown_tx.send(()).unwrap(); + + let result = subject.poll(); + + assert_eq!(result, Ok(Async::Ready(()))); + TestLogHandler::new().exists_log_containing(&format!( + "INFO: {test_name}: Shutting down for stream: {:?}", + subject.stream_key + )); + } + + pub fn make_subject() -> StreamReader { + StreamReader { + stream_key: StreamKey::make_meaningless_stream_key(), + proxy_client_sub: make_recorder().0.start().recipient(), + stream: Box::new(ReadHalfWrapperMock::new()), + stream_killer: unbounded().0, + shutdown_signal: unbounded().1, + peer_addr: SocketAddr::from_str("9.8.7.6:5432").unwrap(), + logger: Logger::new("test"), + sequencer: Sequencer::new(), + } + } } diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 058c7c12f..9a519c90e 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -29,8 +29,8 @@ use crate::sub_lib::neighborhood::{ExpectedServices, RatePack}; use crate::sub_lib::neighborhood::{NRMetadataChange, RouteQueryMessage}; use crate::sub_lib::peer_actors::BindMessage; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; -use crate::sub_lib::proxy_server::AddReturnRouteMessage; use crate::sub_lib::proxy_server::ProxyServerSubs; +use crate::sub_lib::proxy_server::{AddReturnRouteMessage, StreamKeyPurge}; use crate::sub_lib::proxy_server::{ AddRouteResultMessage, ClientRequestPayload_0v1, ProxyProtocol, }; @@ -38,13 +38,13 @@ use crate::sub_lib::route::Route; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::ttl_hashmap::TtlHashMap; -use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY}; +use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY}; use crate::sub_lib::wallet::Wallet; -use actix::Addr; use actix::Context; use actix::Handler; use actix::Recipient; use actix::{Actor, MailboxError}; +use actix::{Addr, AsyncContext}; use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::MutabilityConflictHelper; @@ -58,6 +58,8 @@ use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; pub const RETURN_ROUTE_TTL: Duration = Duration::from_secs(120); +pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30); + struct ProxyServerOutSubs { dispatcher: Recipient, hopper: Recipient, @@ -67,6 +69,7 @@ struct ProxyServerOutSubs { add_return_route: Recipient, stream_shutdown_sub: Recipient, route_result_sub: Recipient, + schedule_stream_key_purge: Recipient>, } pub struct ProxyServer { @@ -77,6 +80,7 @@ pub struct ProxyServer { tunneled_hosts: HashMap, dns_failure_retries: HashMap, stream_key_routes: HashMap, + stream_key_ttl: HashMap, is_decentralized: bool, consuming_wallet_balance: Option, main_cryptde: &'static dyn CryptDE, @@ -86,6 +90,7 @@ pub struct ProxyServer { route_ids_to_return_routes: TtlHashMap, browser_proxy_sequence_offset: bool, inbound_client_data_helper_opt: Option>, + stream_key_purge_delay: Duration, } impl Actor for ProxyServer { @@ -106,6 +111,7 @@ impl Handler for ProxyServer { add_return_route: msg.peer_actors.proxy_server.add_return_route, stream_shutdown_sub: msg.peer_actors.proxy_server.stream_shutdown_sub, route_result_sub: msg.peer_actors.proxy_server.route_result_sub, + schedule_stream_key_purge: msg.peer_actors.proxy_server.schedule_stream_key_purge, }; self.subs = Some(subs); } @@ -220,6 +226,25 @@ impl Handler for ProxyServer { } } +impl Handler for ProxyServer { + type Result = (); + + fn handle(&mut self, msg: StreamKeyPurge, _ctx: &mut Self::Context) -> Self::Result { + self.purge_stream_key(&msg.stream_key, "scheduled message"); + } +} + +impl Handler> for ProxyServer +where + ProxyServer: Handler, +{ + type Result = (); + + fn handle(&mut self, msg: MessageScheduler, ctx: &mut Self::Context) -> Self::Result { + ctx.notify_later(msg.scheduled_msg, msg.delay); + } +} + impl ProxyServer { pub fn new( main_cryptde: &'static dyn CryptDE, @@ -236,6 +261,7 @@ impl ProxyServer { tunneled_hosts: HashMap::new(), dns_failure_retries: HashMap::new(), stream_key_routes: HashMap::new(), + stream_key_ttl: HashMap::new(), is_decentralized, consuming_wallet_balance, main_cryptde, @@ -245,6 +271,7 @@ impl ProxyServer { route_ids_to_return_routes: TtlHashMap::new(RETURN_ROUTE_TTL), browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), + stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, } } @@ -258,6 +285,7 @@ impl ProxyServer { stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), node_from_ui: recipient!(addr, NodeFromUiMessage), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } @@ -298,12 +326,7 @@ impl ProxyServer { } fn retire_stream_key(&mut self, stream_key: &StreamKey) { - warning!( - self.logger, - "Retiring stream key {}: DnsResolveFailure", - stream_key - ); - self.purge_stream_key(stream_key); + self.purge_stream_key(stream_key, "DNS resolution failure"); } fn send_dns_failure_response_to_the_browser( @@ -410,6 +433,48 @@ impl ProxyServer { } } + fn schedule_stream_key_purge(&mut self, stream_key: StreamKey) { + let host_info = match self.tunneled_hosts.get(&stream_key) { + None => String::from(""), + Some(hostname) => format!(", which was tunneling to the host {:?}", hostname), + }; + debug!( + self.logger, + "Client closed stream referenced by stream key {:?}{}. It will be purged after {:?}.", + &stream_key, + host_info, + self.stream_key_purge_delay + ); + self.stream_key_ttl.insert(stream_key, SystemTime::now()); + self.subs + .as_ref() + .expect("ProxyServer Subs Unbound") + .schedule_stream_key_purge + .try_send(MessageScheduler { + scheduled_msg: StreamKeyPurge { stream_key }, + delay: self.stream_key_purge_delay, + }) + .expect("ProxyServer is dead"); + } + + fn log_straggling_packet( + &self, + stream_key: &StreamKey, + packet_len: usize, + old_timestamp: &SystemTime, + ) { + let duration_since = SystemTime::now() + .duration_since(*old_timestamp) + .expect("time calculation error"); + warning!( + self.logger, + "Straggling packet of length {} received for a stream key {:?} after a delay of {:?}", + packet_len, + stream_key, + duration_since + ); + } + fn handle_client_response_payload( &mut self, msg: ExpiredCoresPackage, @@ -437,7 +502,8 @@ impl ProxyServer { response.sequenced_packet.data.len(), payload_data_len, ); - match self.remove_dns_failure_retry(&response.stream_key) { + let stream_key = response.stream_key; + match self.remove_dns_failure_retry(&stream_key) { Ok(_) => { debug!(self.logger, "Successful attempt of DNS resolution, removing DNS retry entry for stream key: {}", &response.stream_key) } @@ -449,10 +515,12 @@ impl ProxyServer { ) } } - match self.keys_and_addrs.a_to_b(&response.stream_key) { + if let Some(old_timestamp) = self.stream_key_ttl.get(&stream_key) { + self.log_straggling_packet(&stream_key, payload_data_len, old_timestamp) + } + match self.keys_and_addrs.a_to_b(&stream_key) { Some(socket_addr) => { let last_data = response.sequenced_packet.last_data; - let stream_key = response.stream_key; let sequence_number = Some( response.sequenced_packet.sequence_number + self.browser_proxy_sequence_offset as u64, @@ -469,11 +537,7 @@ impl ProxyServer { }) .expect("Dispatcher is dead"); if last_data { - debug!( - self.logger, - "Retiring stream key {}: no more data", &stream_key - ); - self.purge_stream_key(&stream_key); + self.purge_stream_key(&stream_key, "last data received from the exit node"); } } None => { @@ -551,6 +615,7 @@ impl ProxyServer { } Some(sk) => sk, }; + self.schedule_stream_key_purge(stream_key); if msg.report_to_counterpart { debug!( self.logger, @@ -570,12 +635,6 @@ impl ProxyServer { { error!(self.logger, "{}", e) }; - } else { - debug!( - self.logger, - "Retiring stream key {}: StreamShutdownMsg for peer {}", &stream_key, msg.peer_addr - ); - self.purge_stream_key(&stream_key); } } @@ -602,10 +661,15 @@ impl ProxyServer { } } - fn purge_stream_key(&mut self, stream_key: &StreamKey) { + fn purge_stream_key(&mut self, stream_key: &StreamKey, reason: &str) { + debug!( + self.logger, + "Retiring stream key {} due to {}", &stream_key, reason + ); let _ = self.keys_and_addrs.remove_a(stream_key); let _ = self.stream_key_routes.remove(stream_key); let _ = self.tunneled_hosts.remove(stream_key); + let _ = self.stream_key_ttl.remove(stream_key); } fn make_payload( @@ -1363,6 +1427,7 @@ mod tests { fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "PROXYSERVER"); assert_eq!(RETURN_ROUTE_TTL, Duration::from_secs(120)); + assert_eq!(STREAM_KEY_PURGE_DELAY, Duration::from_secs(30)); } const STANDARD_CONSUMING_WALLET_BALANCE: i64 = 0; @@ -1379,6 +1444,7 @@ mod tests { add_return_route: recipient!(addr, AddReturnRouteMessage), stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } @@ -3534,7 +3600,8 @@ mod tests { #[test] fn proxy_server_receives_terminal_response_from_hopper() { init_test_logging(); - let system = System::new("proxy_server_receives_response_from_hopper"); + let test_name = "proxy_server_receives_terminal_response_from_hopper"; + let system = System::new(test_name); let (dispatcher, _, dispatcher_recording_arc) = make_recorder(); let cryptde = main_cryptde(); let mut subject = ProxyServer::new( @@ -3544,8 +3611,9 @@ mod tests { Some(STANDARD_CONSUMING_WALLET_BALANCE), false, ); + subject.logger = Logger::new(test_name); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); - let stream_key = StreamKey::make_meaningless_stream_key(); + let stream_key = StreamKey::make_meaningful_stream_key(test_name); subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); @@ -3579,17 +3647,42 @@ mod tests { let peer_actors = peer_actors_builder().dispatcher(dispatcher).build(); subject_addr.try_send(BindMessage { peer_actors }).unwrap(); - subject_addr.try_send(first_expired_cores_package).unwrap(); - subject_addr.try_send(second_expired_cores_package).unwrap(); // should generate log because stream key is now unknown + subject_addr.try_send(first_expired_cores_package).unwrap(); // This will purge the stream key records + subject_addr.try_send(second_expired_cores_package).unwrap(); // This will be discarded System::current().stop(); system.run(); let dispatcher_recording = dispatcher_recording_arc.lock().unwrap(); - let record = dispatcher_recording.get_record::(0); - assert_eq!(record.endpoint, Endpoint::Socket(socket_addr)); - assert_eq!(record.last_data, true); - assert_eq!(record.data, b"16 bytes of data".to_vec()); - TestLogHandler::new().exists_log_containing(&format!("WARN: ProxyServer: Discarding 16-byte packet 12345678 from an unrecognized stream key: {:?}", stream_key)); + let transmit_data_msg = dispatcher_recording.get_record::(0); + assert_eq!(transmit_data_msg.endpoint, Endpoint::Socket(socket_addr)); + assert_eq!(transmit_data_msg.last_data, true); + assert_eq!(transmit_data_msg.data, b"16 bytes of data".to_vec()); + let tlh = TestLogHandler::new(); + tlh.exists_log_containing(&format!( + "DEBUG: {test_name}: Retiring stream key {:?} due to last data received from the exit node", + stream_key + )); + tlh.exists_log_containing(&format!( + "WARN: {test_name}: Discarding 16-byte packet 12345678 from an unrecognized stream key: {:?}", + stream_key + )); + } + + #[test] + #[should_panic(expected = "time calculation error")] + fn log_straggling_packet_panics_if_timestamp_is_wrong() { + let subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + ); + let stream_key = StreamKey::make_meaningless_stream_key(); + let timestamp = SystemTime::now() + .checked_add(Duration::from_secs(10)) + .unwrap(); + let _ = subject.log_straggling_packet(&stream_key, 10, ×tamp); } #[test] @@ -3651,6 +3744,234 @@ mod tests { assert!(subject.tunneled_hosts.is_empty()); } + #[test] + fn proxy_server_schedules_stream_key_purge_once_shutdown_order_is_received_for_stream() { + let common_msg = StreamShutdownMsg { + peer_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), + stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes { + reception_port: 0, + sequence_number: 0, + }), + report_to_counterpart: true, + }; + assert_stream_is_purged_with_a_delay(StreamShutdownMsg { + report_to_counterpart: true, + ..common_msg.clone() + }); + assert_stream_is_purged_with_a_delay(StreamShutdownMsg { + report_to_counterpart: false, + ..common_msg + }); + } + + fn assert_stream_is_purged_with_a_delay(msg: StreamShutdownMsg) { + /* + +------------------------------------------------------------------+ + | (0ms) | + | Stream shutdown is ordered | + +------------------------------------------------------------------+ + | + v + +------------------------------------------------------------------+ + | (400ms) (stream_key_purge_delay_in_millis - offset_in_millis) | + | Pre-purge assertion message finds records | + +------------------------------------------------------------------+ + | + v + +------------------------------------------------------------------+ + | (500ms) (stream_key_purge_delay_in_millis) | + | Stream is purged | + +------------------------------------------------------------------+ + | + v + +------------------------------------------------------------------+ + | (600ms) (stream_key_purge_delay_in_millis + offset_in_millis) | + | Post-purge assertion message finds no records | + +------------------------------------------------------------------+ + */ + + init_test_logging(); + let test_name = + "proxy_server_schedules_stream_key_purge_once_shutdown_order_is_received_for_stream"; + let cryptde = main_cryptde(); + let stream_key_purge_delay_in_millis = 500; + let offset_in_millis = 100; + let mut subject = ProxyServer::new( + cryptde, + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + ); + subject.stream_key_purge_delay = Duration::from_millis(stream_key_purge_delay_in_millis); + subject.logger = Logger::new(&test_name); + subject.subs = Some(make_proxy_server_out_subs()); + let stream_key = StreamKey::make_meaningful_stream_key(&test_name); + subject + .keys_and_addrs + .insert(stream_key.clone(), msg.peer_addr.clone()); + subject.stream_key_routes.insert( + stream_key.clone(), + RouteQueryResponse { + route: Route { hops: vec![] }, + expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + }, + ); + subject + .tunneled_hosts + .insert(stream_key.clone(), "hostname".to_string()); + subject.route_ids_to_return_routes.insert( + 1234, + AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![], + protocol: ProxyProtocol::HTTP, + hostname_opt: None, + }, + ); + let proxy_server_addr = subject.start(); + let schedule_stream_key_purge_sub = proxy_server_addr.clone().recipient(); + let mut peer_actors = peer_actors_builder().build(); + peer_actors.proxy_server.schedule_stream_key_purge = schedule_stream_key_purge_sub; + let system = System::new(test_name); + let bind_msg = BindMessage { peer_actors }; + proxy_server_addr.try_send(bind_msg).unwrap(); + let time_before_sending_package = SystemTime::now(); + + proxy_server_addr.try_send(msg).unwrap(); + + let time_after_sending_package = time_before_sending_package + .checked_add(Duration::from_secs(1)) + .unwrap(); + let pre_purge_assertions = AssertionsMessage { + assertions: Box::new(move |proxy_server: &mut ProxyServer| { + let purge_timestamp = proxy_server + .stream_key_ttl + .get(&stream_key) + .unwrap() + .clone(); + assert!( + time_before_sending_package <= purge_timestamp + && purge_timestamp <= time_after_sending_package + ); + assert!(!proxy_server.keys_and_addrs.is_empty()); + assert!(!proxy_server.stream_key_routes.is_empty()); + assert!(!proxy_server.tunneled_hosts.is_empty()); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Client closed stream referenced by stream key {:?}, \ + which was tunneling to the host \"hostname\". \ + It will be purged after {stream_key_purge_delay_in_millis}ms.", + stream_key + )); + }), + }; + proxy_server_addr + .try_send(MessageScheduler { + scheduled_msg: pre_purge_assertions, + delay: Duration::from_millis(stream_key_purge_delay_in_millis - offset_in_millis), // 400ms + }) + .unwrap(); + let post_purge_assertions = AssertionsMessage { + assertions: Box::new(move |proxy_server: &mut ProxyServer| { + assert!(proxy_server.keys_and_addrs.is_empty()); + assert!(proxy_server.stream_key_routes.is_empty()); + assert!(proxy_server.tunneled_hosts.is_empty()); + assert!(proxy_server.stream_key_ttl.is_empty()); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Retiring stream key {:?}", + stream_key + )); + System::current().stop(); + }), + }; + proxy_server_addr + .try_send(MessageScheduler { + scheduled_msg: post_purge_assertions, + delay: Duration::from_millis(stream_key_purge_delay_in_millis + offset_in_millis), // 600ms + }) + .unwrap(); + system.run(); + } + + #[test] + fn straggling_packets_are_logged() { + init_test_logging(); + let test_name = "straggling_packets_are_logged"; + let cryptde = main_cryptde(); + let mut subject = ProxyServer::new( + cryptde, + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + ); + subject.logger = Logger::new(test_name); + subject.subs = Some(make_proxy_server_out_subs()); + let stream_key = StreamKey::make_meaningful_stream_key(test_name); + let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); + subject + .keys_and_addrs + .insert(stream_key.clone(), socket_addr.clone()); + subject.stream_key_routes.insert( + stream_key.clone(), + RouteQueryResponse { + route: Route { hops: vec![] }, + expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + }, + ); + subject + .tunneled_hosts + .insert(stream_key.clone(), "hostname".to_string()); + subject.route_ids_to_return_routes.insert( + 1234, + AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![], + protocol: ProxyProtocol::HTTP, + hostname_opt: None, + }, + ); + let proxy_server_addr = subject.start(); + let schedule_stream_key_purge_sub = proxy_server_addr.clone().recipient(); + let mut peer_actors = peer_actors_builder().build(); + peer_actors.proxy_server.schedule_stream_key_purge = schedule_stream_key_purge_sub; + + let system = System::new(test_name); + let bind_msg = BindMessage { peer_actors }; + proxy_server_addr.try_send(bind_msg).unwrap(); + let stream_shutdown_msg = StreamShutdownMsg { + peer_addr: socket_addr, + stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes { + reception_port: 0, + sequence_number: 0, + }), + report_to_counterpart: true, + }; + let client_response_payload = ClientResponsePayload_0v1 { + stream_key: stream_key.clone(), + sequenced_packet: SequencedPacket::new(vec![], 1, true), + }; + let expired_cores_package: ExpiredCoresPackage = + ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("irrelevant")), + return_route_with_id(cryptde, 1234), + client_response_payload.into(), + 0, + ); + proxy_server_addr.try_send(stream_shutdown_msg).unwrap(); + + proxy_server_addr.try_send(expired_cores_package).unwrap(); + + System::current().stop(); + system.run(); + TestLogHandler::new().exists_log_containing(&format!( + "WARN: {test_name}: Straggling packet of length 0 received for a \ + stream key {:?} after a delay of", + stream_key + )); + } + #[test] fn proxy_server_receives_nonterminal_response_from_hopper() { let system = System::new("proxy_server_receives_nonterminal_response_from_hopper"); @@ -4981,7 +5302,7 @@ mod tests { let make_params = make_params_arc.lock().unwrap(); assert_eq!(make_params.len(), 3); TestLogHandler::new().exists_log_containing(&format!( - "WARN: {test_name}: Retiring stream key {stream_key_clone}: DnsResolveFailure" + "DEBUG: {test_name}: Retiring stream key {stream_key_clone} due to DNS resolution failure" )); } @@ -5394,7 +5715,10 @@ mod tests { #[test] fn handle_stream_shutdown_msg_reports_to_counterpart_without_tunnel_when_necessary() { - let system = System::new("test"); + init_test_logging(); + let test_name = + "handle_stream_shutdown_msg_reports_to_counterpart_without_tunnel_when_necessary"; + let system = System::new(test_name); let mut subject = ProxyServer::new( main_cryptde(), alias_cryptde(), @@ -5451,6 +5775,7 @@ mod tests { ), }, ); + subject.logger = Logger::new(test_name); let subject_addr = subject.start(); let (hopper, _, hopper_recording_arc) = make_recorder(); let (proxy_server, _, proxy_server_recording_arc) = make_recorder(); @@ -5505,72 +5830,18 @@ mod tests { report_to_counterpart: false } ); - } - - #[test] - fn handle_stream_shutdown_msg_does_not_report_to_counterpart_when_unnecessary() { - let mut subject = ProxyServer::new(main_cryptde(), alias_cryptde(), true, None, false); - let unaffected_socket_addr = SocketAddr::from_str("2.3.4.5:6789").unwrap(); - let unaffected_stream_key = StreamKey::make_meaningful_stream_key("unaffected"); - let affected_socket_addr = SocketAddr::from_str("3.4.5.6:7890").unwrap(); - let affected_stream_key = StreamKey::make_meaningful_stream_key("affected"); - subject - .keys_and_addrs - .insert(unaffected_stream_key, unaffected_socket_addr); - subject - .keys_and_addrs - .insert(affected_stream_key, affected_socket_addr); - subject.stream_key_routes.insert( - unaffected_stream_key, - RouteQueryResponse { - route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), - }, - ); - subject.stream_key_routes.insert( - affected_stream_key, - RouteQueryResponse { - route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), - }, - ); - subject - .tunneled_hosts - .insert(unaffected_stream_key, "blah".to_string()); - subject - .tunneled_hosts - .insert(affected_stream_key, "blah".to_string()); - - subject.handle_stream_shutdown_msg(StreamShutdownMsg { - peer_addr: affected_socket_addr, - stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes { - reception_port: HTTP_PORT, - sequence_number: 1234, - }), - report_to_counterpart: false, - }); - - // Subject is unbound but didn't panic; therefore, no attempt to send to Hopper: perfect! - assert!(subject - .keys_and_addrs - .a_to_b(&unaffected_stream_key) - .is_some()); - assert!(subject - .stream_key_routes - .contains_key(&unaffected_stream_key)); - assert!(subject.tunneled_hosts.contains_key(&unaffected_stream_key)); - assert!(subject - .keys_and_addrs - .a_to_b(&affected_stream_key) - .is_none()); - assert!(!subject.stream_key_routes.contains_key(&affected_stream_key)); - assert!(!subject.tunneled_hosts.contains_key(&affected_stream_key)); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Client closed stream referenced by stream key {:?}. \ + It will be purged after {:?}.", + &affected_stream_key, STREAM_KEY_PURGE_DELAY + )); } #[test] fn handle_stream_shutdown_msg_logs_errors_from_handling_normal_client_data() { init_test_logging(); let mut subject = ProxyServer::new(main_cryptde(), alias_cryptde(), true, Some(0), false); + subject.subs = Some(make_proxy_server_out_subs()); let helper = IBCDHelperMock::default() .handle_normal_client_data_result(Err("Our help is not welcome".to_string())); subject.inbound_client_data_helper_opt = Some(Box::new(helper)); @@ -5595,6 +5866,7 @@ mod tests { fn stream_shutdown_msg_populates_correct_inbound_client_data_msg() { let help_to_handle_normal_client_data_params_arc = Arc::new(Mutex::new(vec![])); let mut subject = ProxyServer::new(main_cryptde(), alias_cryptde(), true, Some(0), false); + subject.subs = Some(make_proxy_server_out_subs()); let icd_helper = IBCDHelperMock::default() .handle_normal_client_data_params(&help_to_handle_normal_client_data_params_arc) .handle_normal_client_data_result(Ok(())); diff --git a/node/src/stream_handler_pool.rs b/node/src/stream_handler_pool.rs index 3d5512892..db9bdd4e3 100644 --- a/node/src/stream_handler_pool.rs +++ b/node/src/stream_handler_pool.rs @@ -105,6 +105,7 @@ impl Display for StreamWriterKey { } } +// TODO: To avoid confusion with ProxyClient's StreamHandlerPool, rename this one or the other for easy identification. // It is used to store streams for both neighbors and browser. pub struct StreamHandlerPool { stream_writers: HashMap>>>, @@ -374,8 +375,31 @@ impl StreamHandlerPool { stream_writer_key ); let report_to_counterpart = match self.stream_writers.remove(&stream_writer_key) { - None | Some(None) => false, - Some(Some(_sender_wrapper)) => true, + None => { + trace!( + self.logger, + "While handling RemoveStreamMsg: Stream Writers did not contain any entry for key {}", + stream_writer_key + ); + false + } + Some(None) => { + error!( + self.logger, + "An unpopulated entry in stream_writers was found for a {:?} stream ({:?}) from \ + a client. This shouldn't be possible. Investigate!", + msg.stream_type, stream_writer_key + ); + false + } + Some(Some(_sender_wrapper)) => { + trace!( + self.logger, + "While handling RemoveStreamMsg: Stream Writers contained an entry for key {}, also found stream writer; removing", + stream_writer_key + ); + true + } }; let stream_shutdown_msg = StreamShutdownMsg { peer_addr: msg.peer_addr, @@ -1270,10 +1294,13 @@ mod tests { #[test] fn handle_remove_stream_msg_handles_stream_waiting_for_connect_scenario() { + init_test_logging(); + let test_name = "handle_remove_stream_msg_handles_stream_waiting_for_connect_scenario"; let (recorder, _, recording_arc) = make_recorder(); - let system = System::new("test"); + let system = System::new(test_name); let sub = recorder.start().recipient::(); let mut subject = StreamHandlerPool::new(vec![], false); + subject.logger = Logger::new(test_name); let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); let local_addr = SocketAddr::from_str("127.0.0.1:0").unwrap(); let sw_key = StreamWriterKey::from(peer_addr); @@ -1299,6 +1326,11 @@ mod tests { report_to_counterpart: false } ); + TestLogHandler::new().exists_log_containing(&format!( + "ERROR: {}: An unpopulated entry in stream_writers was found for a \ + Clandestine stream ({:?}) from a client. This shouldn't be possible. Investigate!", + test_name, sw_key + )); } #[test] diff --git a/node/src/sub_lib/proxy_server.rs b/node/src/sub_lib/proxy_server.rs index 78c61ee72..c3042859f 100644 --- a/node/src/sub_lib/proxy_server.rs +++ b/node/src/sub_lib/proxy_server.rs @@ -9,6 +9,7 @@ use crate::sub_lib::peer_actors::BindMessage; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; use crate::sub_lib::sequence_buffer::SequencedPacket; use crate::sub_lib::stream_key::StreamKey; +use crate::sub_lib::utils::MessageScheduler; use crate::sub_lib::versioned_data::VersionedData; use actix::Message; use actix::Recipient; @@ -68,6 +69,11 @@ pub struct AddRouteResultMessage { pub result: Result, } +#[derive(Message, Debug, PartialEq, Eq)] +pub struct StreamKeyPurge { + pub stream_key: StreamKey, +} + #[derive(Clone, PartialEq, Eq)] pub struct ProxyServerSubs { // ProxyServer will handle these messages: @@ -79,6 +85,7 @@ pub struct ProxyServerSubs { pub stream_shutdown_sub: Recipient, pub node_from_ui: Recipient, pub route_result_sub: Recipient, + pub schedule_stream_key_purge: Recipient>, } impl Debug for ProxyServerSubs { @@ -110,6 +117,7 @@ mod tests { stream_shutdown_sub: recipient!(recorder, StreamShutdownMsg), node_from_ui: recipient!(recorder, NodeFromUiMessage), route_result_sub: recipient!(recorder, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(recorder, MessageScheduler), }; assert_eq!(format!("{:?}", subject), "ProxyServerSubs"); diff --git a/node/src/sub_lib/utils.rs b/node/src/sub_lib/utils.rs index f6de206d7..d68d721bb 100644 --- a/node/src/sub_lib/utils.rs +++ b/node/src/sub_lib/utils.rs @@ -245,7 +245,7 @@ pub fn db_connection_launch_panic(err: InitializationError, data_directory: &Pat ) } -#[derive(Message, Clone, PartialEq, Eq)] +#[derive(Message, Debug, Clone, PartialEq, Eq)] pub struct MessageScheduler { pub scheduled_msg: M, pub delay: Duration, diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index 4219b2d08..da1aed944 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -39,7 +39,9 @@ use crate::sub_lib::peer_actors::PeerActors; use crate::sub_lib::peer_actors::{BindMessage, NewPublicIp, StartMessage}; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, InboundServerData}; use crate::sub_lib::proxy_client::{DnsResolveFailure_0v1, ProxyClientSubs}; -use crate::sub_lib::proxy_server::{AddReturnRouteMessage, ClientRequestPayload_0v1}; +use crate::sub_lib::proxy_server::{ + AddReturnRouteMessage, ClientRequestPayload_0v1, StreamKeyPurge, +}; use crate::sub_lib::proxy_server::{AddRouteResultMessage, ProxyServerSubs}; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; @@ -398,6 +400,7 @@ pub fn make_proxy_server_subs_from_recorder(addr: &Addr) -> ProxyServe stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), node_from_ui: recipient!(addr, NodeFromUiMessage), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } diff --git a/node/tests/connection_shutdown_test.rs b/node/tests/connection_shutdown_test.rs new file mode 100644 index 000000000..24cd9d25c --- /dev/null +++ b/node/tests/connection_shutdown_test.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +pub mod utils; + +use crossbeam_channel::{unbounded, Sender}; +use masq_lib::utils::find_free_port; +use std::io::{Read, Write}; +use std::net::{IpAddr, TcpListener, TcpStream}; +use std::net::{Shutdown, SocketAddr}; +use std::str::FromStr; +use std::time::Duration; +use std::{io, thread}; + +// 'node' below must not be named '_' alone or disappear, or the MASQNode will be immediately reclaimed. +#[test] +fn proxy_client_stream_reader_dies_when_client_stream_is_killed_integration() { + let _node = utils::MASQNode::start_standard( + "proxy_client_stream_reader_dies_when_client_stream_is_killed_integration", + None, + true, + true, + false, + true, + ); + let (server_write_error_tx, server_write_error_rx) = unbounded(); + let server_port = find_free_port(); + let join_handle = thread::spawn(move || { + endless_write_server(server_port, server_write_error_tx); + }); + let mut browser_stream = + TcpStream::connect(SocketAddr::from_str("127.0.0.1:80").unwrap()).unwrap(); + browser_stream + .set_read_timeout(Some(Duration::from_millis(1000))) + .unwrap(); + let request = format!("GET / HTTP/1.1\r\nHost: 127.0.0.1:{server_port}\r\n\r\n"); + browser_stream.write(request.as_bytes()).unwrap(); + let mut buf = [0u8; 16384]; + // We want to make sure the Server is sending before we shutdown the stream + browser_stream.read(&mut buf).unwrap(); + + browser_stream.shutdown(Shutdown::Write).unwrap(); + + let write_error = server_write_error_rx + .recv_timeout(Duration::from_secs(60)) + .unwrap(); + if cfg!(target_os = "macos") { + assert_eq!(write_error.kind(), io::ErrorKind::BrokenPipe); + } else { + assert_eq!(write_error.kind(), io::ErrorKind::ConnectionReset); + } + + join_handle.join().unwrap(); +} + +fn endless_write_server(port: u16, write_error_tx: Sender) { + let listener = TcpListener::bind(SocketAddr::new( + IpAddr::from_str("127.0.0.1").unwrap(), + port, + )) + .unwrap(); + let mut buf = [0u8; 16_384]; + let (mut stream, _) = listener.accept().unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(1))) + .unwrap(); + let _ = stream.read(&mut buf).unwrap(); + stream + .write("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".as_bytes()) + .unwrap(); + let msg = "Chancellor on brink of second bailout for banks"; + let msg_len = msg.len(); + let chunk_body = format!("{msg_len}\r\n{msg}\r\n"); + loop { + if let Err(e) = stream.write(chunk_body.as_bytes()) { + write_error_tx.send(e).unwrap(); + break; + } + + thread::sleep(Duration::from_millis(250)); + } +}