Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-800: Drain the Pipeline #450

Merged
merged 69 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
4cfa253
GH-800: Write the most basic TODOs for this card
utkarshg6 Jun 13, 2024
07cfef9
GH-800: Add todos for renaming purposes
utkarshg6 Jun 25, 2024
62db40f
GH-800: Add a message scheduler for purging the stream key
utkarshg6 Jun 27, 2024
e4199ae
GH-800: Improve the test handle_client_response_payload_purges_stream…
utkarshg6 Jun 28, 2024
ee8f897
GH-800: Make delay configurable via constant
utkarshg6 Jun 28, 2024
91323d4
GH-800: introduce StreamSenders
utkarshg6 Jul 2, 2024
7cc2fbb
GH-800: introduce channel for shutdown signal
utkarshg6 Jul 4, 2024
d784157
GH-800: add test connection_shutdown_test.rs
utkarshg6 Jul 6, 2024
14a5de3
GH-800: test drive the shutdown signal in StreamReader
utkarshg6 Jul 10, 2024
d8d1f10
GH-800: refactor test stream_reader_shuts_down_when_it_receives_the_s…
utkarshg6 Jul 10, 2024
ddf7d9c
GH-800: some refactor inside the file stream_reader
utkarshg6 Jul 10, 2024
a6f2538
GH-800: some more refactoring
utkarshg6 Jul 10, 2024
130a37c
GH-800: trying_to_write_a_test_for_stream_senders was a success
utkarshg6 Jul 11, 2024
6db4c0c
GH-800: did some refactoring for trying_to_write_a_test_for_stream_se…
utkarshg6 Jul 12, 2024
c3cc6a8
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Jul 18, 2024
75edd8f
GH-800: Minor refactor for the test stream_handler_pool_sends_shutdow…
utkarshg6 Jul 18, 2024
296f449
GH-800: more refactoring of the test stream_handler_pool_sends_shutdo…
utkarshg6 Jul 18, 2024
a2c763d
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Jul 18, 2024
3c223ea
GH-800: some more todos removed
utkarshg6 Jul 18, 2024
4605d83
GH-800: clean_up_dead_streams_logs_when_the_shutdown_channel_is_down …
utkarshg6 Jul 19, 2024
e1417c8
GH-800: tests are passing in StreamHandlerPool
utkarshg6 Jul 19, 2024
6e28ff1
GH-800: Wrote a test for the case when the shutdown signal channel is…
utkarshg6 Jul 19, 2024
6849db6
GH-800: make it easier to understand who experiences the write error
utkarshg6 Jul 22, 2024
af09d25
GH-800: trying to write test while_housekeeping_the_stream_senders_ar…
utkarshg6 Jul 22, 2024
b896db6
GH-800: add test add_new_streams_works
utkarshg6 Jul 26, 2024
acf2c89
GH-800: wip: add the log
utkarshg6 Jul 26, 2024
deb1535
GH-800: test proxy_client_stream_reader_dies_when_client_stream_is_ki…
utkarshg6 Jul 31, 2024
2ecfebc
GH-800: change the channel from crossbeam to tokio
utkarshg6 Jul 31, 2024
3b387a7
GH-800: Add fn send_shutdown_signal_to_stream_reader
utkarshg6 Aug 6, 2024
f94c8d3
GH-800: wip: fixing 2 tests
utkarshg6 Aug 7, 2024
54040ae
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Aug 7, 2024
31db7c4
GH-800: all tests in stream_handler_pool.rs are passing
utkarshg6 Aug 16, 2024
4f8c141
GH-800: add test for the logs; all tests passing
utkarshg6 Aug 27, 2024
1880006
GH-800: remove lookup_ip() mock fns from a test
utkarshg6 Aug 27, 2024
d81c2f0
GH-800: remove warnings
utkarshg6 Aug 27, 2024
ae08930
GH-800: use fn send_shutdown_signal_to_stream_reader in clean_up_dead…
utkarshg6 Aug 27, 2024
23fc9ab
Revert "GH-800: use fn send_shutdown_signal_to_stream_reader in clean…
utkarshg6 Sep 4, 2024
f9491e7
Revert "GH-800: remove warnings"
utkarshg6 Sep 4, 2024
3e3fa71
Revert "GH-800: remove lookup_ip() mock fns from a test"
utkarshg6 Sep 4, 2024
d4e8075
Revert "GH-800: add test for the logs; all tests passing"
utkarshg6 Sep 4, 2024
5aa1107
Revert "GH-800: all tests in stream_handler_pool.rs are passing"
utkarshg6 Sep 4, 2024
f2dbe7b
Revert "GH-800: stream_handler_pool_sends_shutdown_signal_when_last_d…
utkarshg6 Sep 4, 2024
a24dcda
Revert "GH-800: wip: fixing 2 tests"
utkarshg6 Sep 4, 2024
c61cc5a
Revert "GH-800: Add fn send_shutdown_signal_to_stream_reader"
utkarshg6 Sep 4, 2024
af9a673
Revert "GH-800: change the channel from crossbeam to tokio"
utkarshg6 Sep 4, 2024
5ba55df
GH-800: reverted the channel to crossbeam; remove warnings
utkarshg6 Sep 4, 2024
5579c12
GH-800: solve some TODOs
utkarshg6 Sep 4, 2024
3f34e83
GH-800: add refactored fn send_shutdown_signal_to_stream_reader; crti…
utkarshg6 Sep 4, 2024
681a79f
GH-800: write_failure_for_nonexistent_stream_generates_termination_me…
utkarshg6 Sep 4, 2024
0811b5e
GH-800: everything is tested
utkarshg6 Sep 5, 2024
d86918e
GH-800: fix proxy_server_receives_terminal_response_from_hopper
utkarshg6 Sep 6, 2024
53eadfa
GH-800: trigger actions
utkarshg6 Sep 10, 2024
74eb857
GH-800: assert on a different error on non-mac os
utkarshg6 Sep 10, 2024
92d46cd
Merge branch 'master' into GH-800
utkarshg6 Sep 10, 2024
ce0d3c5
GH-80: add changes of self-review
utkarshg6 Sep 11, 2024
7afe4c7
GH-800: some other missed changes
utkarshg6 Sep 11, 2024
87a9e20
GH-800: add review 1 changes
utkarshg6 Sep 12, 2024
34a42dc
GH-800: add review 2 changes
utkarshg6 Sep 19, 2024
434fb5b
GH-800: add review 3 changes
utkarshg6 Sep 20, 2024
1bfcfd8
GH-800: add review 4 changes
utkarshg6 Sep 24, 2024
4bfe788
Merge branch 'master' into GH-800
utkarshg6 Oct 8, 2024
cb50b90
Merge branch 'master' into GH-800
utkarshg6 Oct 17, 2024
36f08df
GH-800: add better logging
utkarshg6 Oct 17, 2024
540d259
GH-800: delay purge when the report_to_counterpart is false
utkarshg6 Oct 28, 2024
9dfd4ea
GH-800: final changes
utkarshg6 Oct 29, 2024
e7aaa34
GH-800: review changes
utkarshg6 Nov 18, 2024
7e616e2
GH-800: fix the final comment discussion changes
utkarshg6 Nov 25, 2024
0a7d8b8
GH-800: fix multiple_stream_zero_hop_test
utkarshg6 Nov 25, 2024
328c24e
Merge branch 'master' into GH-800
utkarshg6 Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions node/src/proxy_client/stream_establisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use crate::proxy_client::stream_handler_pool::StreamSenders;
use crate::proxy_client::stream_reader::StreamReader;
use crate::proxy_client::stream_writer::StreamWriter;
use crate::sub_lib::channel_wrappers::FuturesChannelFactory;
Expand All @@ -14,15 +15,15 @@ use crate::sub_lib::stream_connector::StreamConnectorReal;
use crate::sub_lib::stream_key::StreamKey;
use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use actix::Recipient;
use crossbeam_channel::Sender;
use crossbeam_channel::{unbounded, Receiver, Sender};
use masq_lib::logger::Logger;
use std::io;
use std::net::IpAddr;
use std::net::SocketAddr;

pub struct StreamEstablisher {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub stream_connector: Box<dyn StreamConnector>,
pub proxy_client_sub: Recipient<InboundServerData>,
Expand Down Expand Up @@ -57,11 +58,13 @@ impl StreamEstablisher {
payload.target_port,
&self.logger,
)?;
let (shutdown_signal_tx, shutdown_signal_rx) = unbounded();

self.spawn_stream_reader(
&payload.clone(),
connection_info.reader,
connection_info.peer_addr,
shutdown_signal_rx,
);

let (tx_to_write, rx_to_write) = self.channel_factory.make(connection_info.peer_addr);
Expand All @@ -73,8 +76,13 @@ impl StreamEstablisher {
);
tokio::spawn(stream_writer);

let stream_senders = StreamSenders {
writer_data: tx_to_write.clone(),
reader_shutdown_tx: shutdown_signal_tx,
};

self.stream_adder_tx
.send((payload.stream_key, tx_to_write.clone()))
.send((payload.stream_key, stream_senders))
.expect("StreamHandlerPool died");
Ok(tx_to_write)
}
Expand All @@ -84,12 +92,14 @@ impl StreamEstablisher {
payload: &ClientRequestPayload_0v1,
read_stream: Box<dyn ReadHalfWrapper>,
peer_addr: SocketAddr,
shutdown_signal: Receiver<()>,
) {
let stream_reader = StreamReader::new(
payload.stream_key,
self.proxy_client_sub.clone(),
read_stream,
self.stream_killer_tx.clone(),
shutdown_signal,
peer_addr,
);
debug!(self.logger, "Spawning StreamReader for {}", peer_addr);
Expand All @@ -103,7 +113,7 @@ pub trait StreamEstablisherFactory: Send {

pub struct StreamEstablisherFactoryReal {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub proxy_client_subs: ProxyClientSubs,
pub logger: Logger,
Expand Down Expand Up @@ -191,6 +201,7 @@ mod tests {
},
read_stream,
SocketAddr::from_str("1.2.3.4:5678").unwrap(),
unbounded().1,
);

proxy_client_awaiter.await_message_count(1);
Expand Down
Loading
Loading