Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ForwardingStage #4508

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open

ForwardingStage #4508

wants to merge 18 commits into from

Conversation

apfitzge
Copy link

Problem

  • Forwarding logic makes banking stage excessively more complicated than it needs to be.
  • Most nodes run without forwarding of non-vote transactions at all

Summary of Changes

  • Create a new ForwardingStage in the TPU that is intended to run parallel to banking-stage
  • Since the message types sent from Sigverify to BankingStage are wrapped in an Arc, we can (optionally) clone them and send to ForwardingStage.
  • This initial PR introduces ForwardingStage for simple receiving, buffering, forwarding logic, but does NOT hook it up yet.
  • A follow-up PR will be done to combine the hooking up of this ForwardingStage with the removal of all forwarding logic in BankingStage

Fixes #

@apfitzge apfitzge self-assigned this Jan 16, 2025
@apfitzge apfitzge mentioned this pull request Jan 16, 2025
@apfitzge
Copy link
Author

@tao-stones (and other reviewers!) - f292ed1 (#4518) has an update of the documentation. Not including it in this PR since this PR does not actually hookup the ForwardingStage to SigVerify.

@apfitzge apfitzge marked this pull request as ready for review January 17, 2025 15:12
@apfitzge apfitzge requested a review from tao-stones January 17, 2025 15:12
let _res = conn.send_data_batch_async(batch);
}

fn send_non_vote_batch(&self, non_vote_batch: &mut Vec<(Vec<u8>, SocketAddr)>) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KirillLykov, this PR replaced the old one (adding ForwardingStage separatel from the hookup/removal).
Is this still the way to send packets? I recall you had built upon the old PR, please let me know if this should change here or we can/will do it in follow-up

Copy link

@KirillLykov KirillLykov Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to move these changes on top of this PR Monday and will leave comments on the current PR.

forward_address_getter,
connection_cache,
data_budget: DataBudget::default(),
udp_socket: bind_to_unspecified().unwrap(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stop binding on all interfaces @alexpyattaev

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should I use instead? I based the actual forwarding code in here on how we previously did it

Copy link

@KirillLykov KirillLykov Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know that we do this way but we should stop doing this. Maybe Alex has in mind some opinion about what function to use for this (since he told me that 0.0.0.0 is a problem). I think that it would make sense to pass udp_socket address to the constructor, so that on the level of validator the bind address is selected, which by default should be 127.0.0.1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the stuff in agave you should prefer functions bind_in_range or bind_in_range_with_config. This will allow your code to respect bind-address CLI argument. Not respecting it will break firewall policies for some users.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I will abstract out all the network logic in this component, not sure, rebasing my old changes

});
}

fn send_vote_batch(&self, addr: SocketAddr, vote_batch: &mut Vec<Vec<u8>>) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that votes are send using UDP while the rest using QUIC, looks like it is vise-versa now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - tests were all using UDP (non-votes should go out over whatever the connection cache uses), so didn't catch that I reversed these.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

struct PacketContainer {
Copy link

@alexpyattaev alexpyattaev Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be its own file with unittests etc. Having it hanging with access to private fields is asking for trouble with silly bugs etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module separation: fb06eef
Added tests: 24630c8

forward_address_getter: F,
connection_cache: Arc<ConnectionCache>,
data_budget: DataBudget,
udp_socket: UdpSocket,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that logic for networking should be split:

  • one simple struct for votes (UDP)
  • one structure for other transactions (TPU)

See apfitzge#6

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cherry-picked the commit but had to modify some stuff due to conflicts. Test was also failing with your bind_to instead of bind_to_unspecified, so I reverted that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, why the test fails. Maybe leave this 0.0.0.0 as it is and I will try to change it to localhost in a separate PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least for now I'd like to keep the actual networking changes to a minimum - I'm trying to just move this out of banking stage, not fix all the issues with it.


fn send_batch(&self, input_batch: &mut Vec<(Vec<u8>, SocketAddr)>) {
let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE);
core::mem::swap(&mut batch, input_batch); // why do we swap?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you picked also my comment // why do we swap? which was referring to why can't we consume this vector. I later figured out that this problem might be resolved with adding a version of batch_send that takes SocketAddr and Vec<Vec<u8>> instead of Vec or pairs as it is now.

) -> Self {
Self {
receiver,
packet_container: PacketContainer::with_capacity(4 * 4096),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document the choice of this number? Or it has been taken from Forwarder and it is currently unclear how was chosen?

}
};

// If timeout waas reached, prevent backup by draining all

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If timeout waas reached, prevent backup by draining all
// If timeout was reached, prevent backup by draining all

.filter(|p| initial_packet_meta_filter(p.meta()))
{
let Some(packet_data) = packet.data(..) else {
// should never occur since we've already checked the

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it should never occur, shouldn't we assert?

}

fn new(
receiver: Receiver<(BankingPacketBatch, bool)>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what will happen is this receiver will grow over time and ForwardingStage is not able to process packets with the desired speed. The processing seems to take O(num_txs_in_batch*tx_size) but I don't know if this might become a problem. One strategy would be just use bound channel and drop on the sender side. Do we write the size of the channel in some metric?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants