Skip to content

Commit

Permalink
GH-800: all tests in stream_handler_pool.rs are passing
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarshg6 committed Aug 16, 2024
1 parent 54040ae commit 31db7c4
Showing 1 changed file with 62 additions and 61 deletions.
123 changes: 62 additions & 61 deletions node/src/proxy_client/stream_handler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,77 +952,78 @@ mod tests {
}

#[test]
fn while_housekeeping_the_stream_senders_are_received_by_stream_handler_pool() {
fn while_housekeeping_the_shutdown_signal_is_sent() {
init_test_logging();
let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true";
let (shutdown_tx, mut shutdown_rx) = unbounded_channel();
let (stream_adder_tx, stream_adder_rx) = unbounded();
thread::spawn(move || {
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
let client_request_payload = ClientRequestPayload_0v1 {
let mut system = System::new(test_name);
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
let client_request_payload = ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket {
data: b"I'm gonna kill you stream key".to_vec(),
sequence_number: 0,
last_data: true,
},
target_hostname: Some(String::from("3.4.5.6:80")),
target_port: HTTP_PORT,
protocol: ProxyProtocol::HTTP,
originator_public_key: PublicKey::new(&b"brutal death"[..]),
};
let package = ExpiredCoresPackage::new(
SocketAddr::from_str("1.2.3.4:1234").unwrap(),
Some(make_wallet("consuming")),
make_meaningless_route(),
client_request_payload.into(),
0,
);
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
let peer_actors = peer_actors_builder().build();
let mut subject = StreamHandlerPoolReal::new(
Box::new(ResolverWrapperMock::new()),
main_cryptde(),
peer_actors.accountant.report_exit_service_provided.clone(),
peer_actors.proxy_client_opt.unwrap().clone(),
100,
200,
);
{
let mut inner = subject.inner.lock().unwrap();
inner.logger = Logger::new(test_name);
inner.stream_writer_channels.insert(
stream_key,
sequenced_packet: SequencedPacket {
data: b"I'm gonna kill you stream key".to_vec(),
sequence_number: 0,
last_data: true,
StreamSenders {
writer_data: Box::new(SenderWrapperMock::new(peer_addr)),
reader_shutdown_tx: shutdown_tx,
},
target_hostname: Some(String::from("3.4.5.6:80")),
target_port: HTTP_PORT,
protocol: ProxyProtocol::HTTP,
originator_public_key: PublicKey::new(&b"brutal death"[..]),
};
let package = ExpiredCoresPackage::new(
SocketAddr::from_str("1.2.3.4:1234").unwrap(),
Some(make_wallet("consuming")),
make_meaningless_route(),
client_request_payload.into(),
0,
);
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
let peer_actors = peer_actors_builder().build();
let mut subject = StreamHandlerPoolReal::new(
Box::new(ResolverWrapperMock::new()),
main_cryptde(),
peer_actors.accountant.report_exit_service_provided.clone(),
peer_actors.proxy_client_opt.unwrap().clone(),
100,
200,
);
subject.stream_adder_rx = stream_adder_rx;
{
let mut inner = subject.inner.lock().unwrap();
inner.logger = Logger::new(test_name);
inner.stream_writer_channels.insert(
stream_key,
StreamSenders {
writer_data: Box::new(SenderWrapperMock::new(peer_addr)),
reader_shutdown_tx: shutdown_tx,
},
);
inner.establisher_factory = Box::new(StreamEstablisherFactoryReal {
cryptde: main_cryptde(),
stream_adder_tx,
stream_killer_tx: unbounded().0,
proxy_client_subs: make_proxy_client_subs_from_recorder(
&make_recorder().0.start(),
),
logger: Logger::new("test"),
});
}
inner.establisher_factory = Box::new(StreamEstablisherFactoryReal {
cryptde: main_cryptde(),
stream_adder_tx,
stream_killer_tx: unbounded().0,
proxy_client_subs: make_proxy_client_subs_from_recorder(&make_recorder().0.start()),
logger: Logger::new("test"),
});
}
let paying_wallet = package.paying_wallet.clone();
let payload = match package.payload {
MessageType::ClientRequest(vd) => vd
.extract(&crate::sub_lib::migrations::client_request_payload::MIGRATIONS)
.unwrap(),
_ => panic!("Expected MessageType::ClientRequest, got something else"),
};

// TODO: GH-800: Make sure that the stream_adder_tx sends something to the receiver
subject.process_package(payload, paying_wallet);

run_process_package_in_actix(subject, package);
});
let received = shutdown_rx.poll().unwrap();
assert_eq!(received, Async::Ready(Some(())));
TestLogHandler::new().await_log_containing(
&format!(
"DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \
let future_result = shutdown_rx.and_then(|_| Ok(()));
let future_with_timeout =
tokio::timer::Timeout::new(future_result, Duration::from_secs(10)).into_future();
assert!(system.block_on(future_with_timeout).is_ok());
TestLogHandler::new().exists_log_containing(&format!(
"DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \
for oUHoHuDKHjeWq+BJzBIqHpPFBQw to 3.4.5.6:80"
),
500,
);
));
}

#[test]
Expand Down

0 comments on commit 31db7c4

Please sign in to comment.