-
Notifications
You must be signed in to change notification settings - Fork 318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ForwardingStage #4508
base: master
Are you sure you want to change the base?
ForwardingStage #4508
Conversation
@tao-stones (and other reviewers!) - |
core/src/forwarding_stage.rs
Outdated
let _res = conn.send_data_batch_async(batch); | ||
} | ||
|
||
fn send_non_vote_batch(&self, non_vote_batch: &mut Vec<(Vec<u8>, SocketAddr)>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KirillLykov, this PR replaced the old one (adding ForwardingStage separatel from the hookup/removal).
Is this still the way to send packets? I recall you had built upon the old PR, please let me know if this should change here or we can/will do it in follow-up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to move these changes on top of this PR Monday and will leave comments on the current PR.
core/src/forwarding_stage.rs
Outdated
forward_address_getter, | ||
connection_cache, | ||
data_budget: DataBudget::default(), | ||
udp_socket: bind_to_unspecified().unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should stop binding on all interfaces @alexpyattaev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should I use instead? I based the actual forwarding code in here on how we previously did it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know that we do this way but we should stop doing this. Maybe Alex has in mind some opinion about what function to use for this (since he told me that 0.0.0.0 is a problem). I think that it would make sense to pass udp_socket address to the constructor, so that on the level of validator the bind address is selected, which by default should be 127.0.0.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the stuff in agave you should prefer functions bind_in_range or bind_in_range_with_config. This will allow your code to respect bind-address CLI argument. Not respecting it will break firewall policies for some users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I will abstract out all the network logic in this component, not sure, rebasing my old changes
core/src/forwarding_stage.rs
Outdated
}); | ||
} | ||
|
||
fn send_vote_batch(&self, addr: SocketAddr, vote_batch: &mut Vec<Vec<u8>>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that votes are send using UDP while the rest using QUIC, looks like it is vise-versa now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah - tests were all using UDP (non-votes should go out over whatever the connection cache uses), so didn't catch that I reversed these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/forwarding_stage.rs
Outdated
} | ||
} | ||
|
||
struct PacketContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be its own file with unittests etc. Having it hanging with access to private fields is asking for trouble with silly bugs etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/forwarding_stage.rs
Outdated
forward_address_getter: F, | ||
connection_cache: Arc<ConnectionCache>, | ||
data_budget: DataBudget, | ||
udp_socket: UdpSocket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that logic for networking should be split:
- one simple struct for votes (UDP)
- one structure for other transactions (TPU)
See apfitzge#6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cherry-picked the commit but had to modify some stuff due to conflicts. Test was also failing with your bind_to
instead of bind_to_unspecified
, so I reverted that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, why the test fails. Maybe leave this 0.0.0.0 as it is and I will try to change it to localhost in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least for now I'd like to keep the actual networking changes to a minimum - I'm trying to just move this out of banking stage, not fix all the issues with it.
|
||
fn send_batch(&self, input_batch: &mut Vec<(Vec<u8>, SocketAddr)>) { | ||
let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); | ||
core::mem::swap(&mut batch, input_batch); // why do we swap? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here you picked also my comment // why do we swap?
which was referring to why can't we consume this vector. I later figured out that this problem might be resolved with adding a version of batch_send
that takes SocketAddr
and Vec<Vec<u8>>
instead of Vec or pairs as it is now.
) -> Self { | ||
Self { | ||
receiver, | ||
packet_container: PacketContainer::with_capacity(4 * 4096), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you document the choice of this number? Or it has been taken from Forwarder and it is currently unclear how was chosen?
} | ||
}; | ||
|
||
// If timeout waas reached, prevent backup by draining all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If timeout waas reached, prevent backup by draining all | |
// If timeout was reached, prevent backup by draining all |
.filter(|p| initial_packet_meta_filter(p.meta())) | ||
{ | ||
let Some(packet_data) = packet.data(..) else { | ||
// should never occur since we've already checked the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it should never occur, shouldn't we assert?
} | ||
|
||
fn new( | ||
receiver: Receiver<(BankingPacketBatch, bool)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what will happen is this receiver
will grow over time and ForwardingStage
is not able to process packets with the desired speed. The processing seems to take O(num_txs_in_batch*tx_size)
but I don't know if this might become a problem. One strategy would be just use bound channel and drop on the sender side. Do we write the size of the channel in some metric?
Problem
Summary of Changes
ForwardingStage
for simple receiving, buffering, forwarding logic, but does NOT hook it up yet.ForwardingStage
with the removal of all forwarding logic inBankingStage
Fixes #