Skip to content

Commit

Permalink
GH-800: final changes
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarshg6 committed Oct 29, 2024
1 parent 540d259 commit 9dfd4ea
Showing 1 changed file with 8 additions and 67 deletions.
75 changes: 8 additions & 67 deletions node/src/proxy_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use tokio::prelude::Future;
pub const CRASH_KEY: &str = "PROXYSERVER";
pub const RETURN_ROUTE_TTL: Duration = Duration::from_secs(120);

pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(5);
pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30);

struct ProxyServerOutSubs {
dispatcher: Recipient<TransmitDataMsg>,
Expand Down Expand Up @@ -1427,6 +1427,7 @@ mod tests {
fn constants_have_correct_values() {
assert_eq!(CRASH_KEY, "PROXYSERVER");
assert_eq!(RETURN_ROUTE_TTL, Duration::from_secs(120));
assert_eq!(STREAM_KEY_PURGE_DELAY, Duration::from_secs(30));
}

const STANDARD_CONSUMING_WALLET_BALANCE: i64 = 0;
Expand Down Expand Up @@ -3790,7 +3791,7 @@ mod tests {
*/

init_test_logging();
let unique_identifier =
let test_name =
"proxy_server_schedules_stream_key_purge_once_shutdown_order_is_received_for_stream";
let cryptde = main_cryptde();
let stream_key_purge_delay_in_millis = 500;
Expand All @@ -3803,9 +3804,9 @@ mod tests {
false,
);
subject.stream_key_purge_delay = Duration::from_millis(stream_key_purge_delay_in_millis);
subject.logger = Logger::new(&unique_identifier);
subject.logger = Logger::new(&test_name);
subject.subs = Some(make_proxy_server_out_subs());
let stream_key = StreamKey::make_meaningful_stream_key(&unique_identifier);
let stream_key = StreamKey::make_meaningful_stream_key(&test_name);
subject
.keys_and_addrs
.insert(stream_key.clone(), msg.peer_addr.clone());
Expand All @@ -3832,7 +3833,7 @@ mod tests {
let schedule_stream_key_purge_sub = proxy_server_addr.clone().recipient();
let mut peer_actors = peer_actors_builder().build();
peer_actors.proxy_server.schedule_stream_key_purge = schedule_stream_key_purge_sub;
let system = System::new(unique_identifier);
let system = System::new(test_name);
let bind_msg = BindMessage { peer_actors };
proxy_server_addr.try_send(bind_msg).unwrap();
let time_before_sending_package = SystemTime::now();
Expand All @@ -3857,7 +3858,7 @@ mod tests {
assert!(!proxy_server.stream_key_routes.is_empty());
assert!(!proxy_server.tunneled_hosts.is_empty());
TestLogHandler::new().exists_log_containing(&format!(
"DEBUG: {unique_identifier}: Client closed stream referenced by stream key {:?}, \
"DEBUG: {test_name}: Client closed stream referenced by stream key {:?}, \
which was tunneling to the host \"hostname\". \
It will be purged after {stream_key_purge_delay_in_millis}ms.",
stream_key
Expand All @@ -3877,7 +3878,7 @@ mod tests {
assert!(proxy_server.tunneled_hosts.is_empty());
assert!(proxy_server.stream_key_ttl.is_empty());
TestLogHandler::new().exists_log_containing(&format!(
"DEBUG: {unique_identifier}: Retiring stream key {:?}",
"DEBUG: {test_name}: Retiring stream key {:?}",
stream_key
));
System::current().stop();
Expand Down Expand Up @@ -5836,66 +5837,6 @@ mod tests {
));
}

#[test]
fn handle_stream_shutdown_msg_does_not_report_to_counterpart_when_unnecessary() {
let mut subject = ProxyServer::new(main_cryptde(), alias_cryptde(), true, None, false);
let unaffected_socket_addr = SocketAddr::from_str("2.3.4.5:6789").unwrap();
let unaffected_stream_key = StreamKey::make_meaningful_stream_key("unaffected");
let affected_socket_addr = SocketAddr::from_str("3.4.5.6:7890").unwrap();
let affected_stream_key = StreamKey::make_meaningful_stream_key("affected");
subject
.keys_and_addrs
.insert(unaffected_stream_key, unaffected_socket_addr);
subject
.keys_and_addrs
.insert(affected_stream_key, affected_socket_addr);
subject.stream_key_routes.insert(
unaffected_stream_key,
RouteQueryResponse {
route: Route { hops: vec![] },
expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234),
},
);
subject.stream_key_routes.insert(
affected_stream_key,
RouteQueryResponse {
route: Route { hops: vec![] },
expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234),
},
);
subject
.tunneled_hosts
.insert(unaffected_stream_key, "blah".to_string());
subject
.tunneled_hosts
.insert(affected_stream_key, "blah".to_string());

subject.handle_stream_shutdown_msg(StreamShutdownMsg {
peer_addr: affected_socket_addr,
stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes {
reception_port: HTTP_PORT,
sequence_number: 1234,
}),
report_to_counterpart: false,
});

// Subject is unbound but didn't panic; therefore, no attempt to send to Hopper: perfect!
assert!(subject
.keys_and_addrs
.a_to_b(&unaffected_stream_key)
.is_some());
assert!(subject
.stream_key_routes
.contains_key(&unaffected_stream_key));
assert!(subject.tunneled_hosts.contains_key(&unaffected_stream_key));
assert!(subject
.keys_and_addrs
.a_to_b(&affected_stream_key)
.is_none());
assert!(!subject.stream_key_routes.contains_key(&affected_stream_key));
assert!(!subject.tunneled_hosts.contains_key(&affected_stream_key));
}

#[test]
fn handle_stream_shutdown_msg_logs_errors_from_handling_normal_client_data() {
init_test_logging();
Expand Down

0 comments on commit 9dfd4ea

Please sign in to comment.