Skip to content

Commit

Permalink
GH-800: remove warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarshg6 committed Aug 27, 2024
1 parent 1880006 commit d81c2f0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 35 deletions.
7 changes: 1 addition & 6 deletions node/src/proxy_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use crate::sub_lib::stream_key::StreamKey;
use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY};
use crate::sub_lib::versioned_data::VersionedData;
use crate::sub_lib::wallet::Wallet;
use actix::Actor;
use actix::Addr;
use actix::Context;
use actix::Handler;
use actix::Recipient;
use actix::{Actor, Message};
use masq_lib::logger::Logger;
use masq_lib::ui_gateway::NodeFromUiMessage;
use pretty_hex::PrettyHex;
Expand All @@ -48,11 +48,6 @@ use trust_dns_resolver::config::ResolverOpts;

pub const CRASH_KEY: &str = "PROXYCLIENT";

#[derive(Message)]
struct StopListeningForThisStream {
stream_key: StreamKey,
}

pub struct ProxyClient {
dns_servers: Vec<SocketAddr>,
resolver_wrapper_factory: Box<dyn ResolverWrapperFactory>,
Expand Down
2 changes: 1 addition & 1 deletion node/src/proxy_client/stream_establisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use crate::proxy_client::stream_handler_pool::StreamSenders;
use crate::proxy_client::stream_reader::StreamReader;
use crate::proxy_client::stream_writer::StreamWriter;
use crate::sub_lib::channel_wrappers::FuturesChannelFactory;
use crate::sub_lib::channel_wrappers::FuturesChannelFactoryReal;
use crate::sub_lib::channel_wrappers::SenderWrapper;
use crate::sub_lib::channel_wrappers::{FuturesChannelFactory, SenderWrapperReal};
use crate::sub_lib::cryptde::CryptDE;
use crate::sub_lib::proxy_client::{InboundServerData, ProxyClientSubs};
use crate::sub_lib::proxy_server::ClientRequestPayload_0v1;
Expand Down
47 changes: 21 additions & 26 deletions node/src/proxy_client/stream_handler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::sub_lib::proxy_server::ClientRequestPayload_0v1;
use crate::sub_lib::sequence_buffer::SequencedPacket;
use crate::sub_lib::stream_key::StreamKey;
use crate::sub_lib::wallet::Wallet;
use actix::{Message, Recipient};
use actix::Recipient;
use crossbeam_channel::{unbounded, Receiver};
use futures::future;
use futures::future::Future;
use futures::{future, Sink};
use masq_lib::logger::Logger;
use std::collections::HashMap;
use std::io;
Expand All @@ -26,8 +26,7 @@ use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::prelude::future::FutureResult;
use tokio::prelude::future::{err, ok};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;
use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup_ip::LookupIp;

Expand Down Expand Up @@ -193,7 +192,7 @@ impl StreamHandlerPoolReal {
inner.logger,
"Couldn't process request from CORES package: {}", error
);
if let Some(mut stream_senders) = inner.stream_writer_channels.remove(stream_key) {
if let Some(stream_senders) = inner.stream_writer_channels.remove(stream_key) {
debug!(
inner.logger,
"Removing stream writer for {}",
Expand Down Expand Up @@ -248,7 +247,7 @@ impl StreamHandlerPoolReal {
if last_data {
debug!(test_logger, "last_data = true");
match inner.stream_writer_channels.remove(&stream_key) {
Some(mut stream_senders) => {
Some(stream_senders) => {
debug!(
inner.logger,
"Removing StreamWriter and Shutting down StreamReader for {:?} to {}",
Expand Down Expand Up @@ -500,7 +499,8 @@ impl StreamHandlerPoolReal {
data: vec![],
})
.expect("ProxyClient is dead");
if let Err(e) = stream_senders.reader_shutdown_tx.try_send(()) {
// TODO: GH-800: Perhaps you want the function that you created over here
if let Err(_e) = stream_senders.reader_shutdown_tx.try_send(()) {
debug!(inner.logger, "Unable to send a shutdown signal to the StreamReader for stream key {:?}. The channel is already gone.", stream_key)
};
// Test should have a fake server, and the (read and write should be different) server
Expand Down Expand Up @@ -578,7 +578,6 @@ impl StreamHandlerPoolFactory for StreamHandlerPoolFactoryReal {
#[cfg(test)]
mod tests {
use super::*;
use crate::match_every_type_id;
use crate::node_test_utils::check_timestamp;
use crate::proxy_client::local_test_utils::make_send_error;
use crate::proxy_client::local_test_utils::ResolverWrapperMock;
Expand All @@ -597,14 +596,11 @@ mod tests {
use crate::test_utils::make_wallet;
use crate::test_utils::recorder::peer_actors_builder;
use crate::test_utils::recorder::{make_proxy_client_subs_from_recorder, make_recorder};
use crate::test_utils::recorder_stop_conditions::StopCondition;
use crate::test_utils::recorder_stop_conditions::StopConditions;
use crate::test_utils::stream_connector_mock::StreamConnectorMock;
use crate::test_utils::tokio_wrapper_mocks::ReadHalfWrapperMock;
use crate::test_utils::tokio_wrapper_mocks::WriteHalfWrapperMock;
use actix::{Actor, System};
use core::any::TypeId;
use futures::{Poll, Stream};
use futures::Stream;
use masq_lib::constants::HTTP_PORT;
use masq_lib::test_utils::logging::init_test_logging;
use masq_lib::test_utils::logging::TestLogHandler;
Expand All @@ -613,12 +609,11 @@ mod tests {
use std::io::ErrorKind;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::ops::{Add, Deref};
use std::ptr::addr_of;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{sync, thread};
use std::thread;
use std::time::Duration;
use tokio;
use tokio::prelude::Async;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -945,8 +940,8 @@ mod tests {
fn while_housekeeping_the_shutdown_signal_is_sent() {
init_test_logging();
let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true";
let (shutdown_tx, mut shutdown_rx) = unbounded_channel();
let (stream_adder_tx, stream_adder_rx) = unbounded();
let (shutdown_tx, shutdown_rx) = unbounded_channel();
let (stream_adder_tx, _stream_adder_rx) = unbounded();
let mut system = System::new(test_name);
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
let client_request_payload = ClientRequestPayload_0v1 {
Expand All @@ -970,7 +965,7 @@ mod tests {
);
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
let peer_actors = peer_actors_builder().build();
let mut subject = StreamHandlerPoolReal::new(
let subject = StreamHandlerPoolReal::new(
Box::new(ResolverWrapperMock::new()),
main_cryptde(),
peer_actors.accountant.report_exit_service_provided.clone(),
Expand Down Expand Up @@ -1021,7 +1016,7 @@ mod tests {
init_test_logging();
let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true";
let mut system = System::new(test_name);
let (shutdown_tx, mut shutdown_rx) = unbounded_channel();
let (shutdown_tx, shutdown_rx) = unbounded_channel();
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
let client_request_payload = ClientRequestPayload_0v1 {
stream_key,
Expand All @@ -1044,7 +1039,7 @@ mod tests {
);
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
let peer_actors = peer_actors_builder().build();
let mut subject = StreamHandlerPoolReal::new(
let subject = StreamHandlerPoolReal::new(
Box::new(ResolverWrapperMock::new()),
main_cryptde(),
peer_actors.accountant.report_exit_service_provided.clone(),
Expand Down Expand Up @@ -1116,7 +1111,7 @@ mod tests {
);
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
let peer_actors = peer_actors_builder().build();
let mut subject = StreamHandlerPoolReal::new(
let subject = StreamHandlerPoolReal::new(
Box::new(ResolverWrapperMock::new()),
main_cryptde(),
peer_actors.accountant.report_exit_service_provided.clone(),
Expand Down Expand Up @@ -2108,17 +2103,17 @@ mod tests {
subject.inner.lock().unwrap().logger = Logger::new(test_name);
}
let first_stream_key = StreamKey::make_meaningful_stream_key("first_stream_key");
let (first_writer_data_tx, first_writer_data_rx) = futures::sync::mpsc::unbounded();
let (first_shutdown_tx, first_shutdown_rx) = unbounded_channel();
let (first_writer_data_tx, _first_writer_data_rx) = futures::sync::mpsc::unbounded();
let (first_shutdown_tx, _first_shutdown_rx) = unbounded_channel();
let first_stream_senders = StreamSenders {
writer_data: Box::new(SenderWrapperReal::new(
SocketAddr::from_str("1.2.3.4:5678").unwrap(),
first_writer_data_tx,
)),
reader_shutdown_tx: first_shutdown_tx,
};
let (second_writer_data_tx, second_writer_data_rx) = futures::sync::mpsc::unbounded();
let (second_shutdown_tx, second_shutdown_rx) = unbounded_channel();
let (second_writer_data_tx, _second_writer_data_rx) = futures::sync::mpsc::unbounded();
let (second_shutdown_tx, _second_shutdown_rx) = unbounded_channel();
let second_stream_key = StreamKey::make_meaningful_stream_key("second_stream_key");
let second_stream_senders = StreamSenders {
writer_data: Box::new(SenderWrapperReal::new(
Expand Down
4 changes: 2 additions & 2 deletions node/src/proxy_client/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use crate::sub_lib::utils;
use crate::sub_lib::utils::indicates_dead_stream;
use actix::Recipient;
use crossbeam_channel::{Receiver, Sender, TryRecvError};
use futures::{Poll, Stream};
use crossbeam_channel::Sender;
use futures::Stream;
use masq_lib::logger::Logger;
use std::net::SocketAddr;
use tokio::prelude::Async;
Expand Down

0 comments on commit d81c2f0

Please sign in to comment.