Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ForwardingStage #4508

Merged
merged 21 commits into from
Jan 30, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
basic count metrics
apfitzge committed Jan 16, 2025

Verified

This commit was signed with the committer’s verified signature.
apfitzge Andrew Fitzgerald
commit e439f53175cd27fbd1712fa533ec7076f9884d6a
170 changes: 146 additions & 24 deletions core/src/forwarding_stage.rs
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ pub struct ForwardingStage<F: ForwardAddressGetter> {
connection_cache: Arc<ConnectionCache>,
data_budget: DataBudget,
udp_socket: UdpSocket,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that logic for networking should be split:

  • one simple struct for votes (UDP)
  • one structure for other transactions (TPU)

See apfitzge#6

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cherry-picked the commit but had to modify some stuff due to conflicts. Test was also failing with your bind_to instead of bind_to_unspecified, so I reverted that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, why the test fails. Maybe leave this 0.0.0.0 as it is and I will try to change it to localhost in a separate PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least for now I'd like to keep the actual networking changes to a minimum - I'm trying to just move this out of banking stage, not fix all the issues with it.


metrics: ForwardingStageMetrics,
}

impl<F: ForwardAddressGetter> ForwardingStage<F> {
@@ -93,6 +95,7 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
connection_cache,
data_budget: DataBudget::default(),
udp_socket: bind_to_unspecified().unwrap(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stop binding on all interfaces @alexpyattaev

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should I use instead? I based the actual forwarding code in here on how we previously did it

Copy link

@KirillLykov KirillLykov Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know that we do this way but we should stop doing this. Maybe Alex has in mind some opinion about what function to use for this (since he told me that 0.0.0.0 is a problem). I think that it would make sense to pass udp_socket address to the constructor, so that on the level of validator the bind address is selected, which by default should be 127.0.0.1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the stuff in agave you should prefer functions bind_in_range or bind_in_range_with_config. This will allow your code to respect bind-address CLI argument. Not respecting it will break firewall policies for some users.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I will abstract out all the network logic in this component, not sure, rebasing my old changes

metrics: ForwardingStageMetrics::default(),
}
}

@@ -103,24 +106,26 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
break;
}
self.forward_buffered_packets();
self.metrics.maybe_report();
}
}

fn receive_and_buffer(&mut self, bank: &Bank) -> bool {
let now = Instant::now();
const TIMEOUT: Duration = Duration::from_millis(10);
match self.receiver.recv_timeout(TIMEOUT) {
Ok((packet_batches, _tpu_vote_batch)) => {
self.buffer_packet_batches(packet_batches, bank);
Ok((packet_batches, tpu_vote_batch)) => {
self.metrics.did_something = true;
self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank);

// Drain the channel up to timeout
let timed_out = loop {
if now.elapsed() >= TIMEOUT {
break true;
}
match self.receiver.try_recv() {
Ok((packet_batches, _tpu_vote_batch)) => {
self.buffer_packet_batches(packet_batches, bank)
Ok((packet_batches, tpu_vote_batch)) => {
self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank)
}
Err(_) => break false,
}
@@ -130,7 +135,10 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
// packets in the channel.
if timed_out {
warn!("ForwardingStage is backed up, dropping packets");
while self.receiver.try_recv().is_ok() {}
while let Ok((packet_batch, _)) = self.receiver.try_recv() {
self.metrics.dropped_on_timeout +=
packet_batch.iter().map(|b| b.len()).sum::<usize>();
}
}

true
@@ -140,7 +148,12 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
}
}

fn buffer_packet_batches(&mut self, packet_batches: BankingPacketBatch, bank: &Bank) {
fn buffer_packet_batches(
&mut self,
packet_batches: BankingPacketBatch,
is_tpu_vote_batch: bool,
bank: &Bank,
) {
for batch in packet_batches.iter() {
for packet in batch.iter().filter(|p| Self::initial_packet_meta_filter(p)) {
let Some(packet_data) = packet.data(..) else {
@@ -149,24 +162,29 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
continue;
};

// Parse the transaction, make sure it passes basic sanitization checks.
let Ok(transaction) = SanitizedTransactionView::try_new_sanitized(packet_data)
let vote_count = usize::from(is_tpu_vote_batch);
let non_vote_count = usize::from(!is_tpu_vote_batch);

self.metrics.votes_received += vote_count;
self.metrics.non_votes_received += non_vote_count;

// Perform basic sanitization checks and calculate priority.
// If any steps fail, drop the packet.
let Some(priority) = SanitizedTransactionView::try_new_sanitized(packet_data)
.map_err(|_| ())
.and_then(|transaction| {
RuntimeTransaction::<SanitizedTransactionView<_>>::try_from(
transaction,
MessageHash::Compute,
Some(packet.meta().is_simple_vote_tx()),
)
.map_err(|_| ())
})
.ok()
.and_then(|transaction| calculate_priority(&transaction, bank))
else {
continue;
};

// Calculate static metadata for the transaction so that we
// are able to calculate fees for prioritization.
let Ok(transaction) = RuntimeTransaction::<SanitizedTransactionView<_>>::try_from(
transaction,
MessageHash::Compute,
Some(packet.meta().is_simple_vote_tx()),
) else {
continue;
};

// Calculate priority if we can, if this fails we drop.
let Some(priority) = calculate_priority(&transaction, bank) else {
self.metrics.votes_dropped_on_receive += vote_count;
self.metrics.non_votes_dropped_on_receive += non_vote_count;
continue;
};

@@ -183,6 +201,8 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
// If priority of current packet is not higher than the min
// drop the current packet.
if min_priority >= priority {
self.metrics.votes_dropped_on_capacity += vote_count;
self.metrics.non_votes_dropped_on_capacity += non_vote_count;
continue;
}

@@ -192,7 +212,11 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
.pop_min()
.expect("not empty")
.index;
self.packet_container.packets.remove(dropped_index);
let dropped_packet = self.packet_container.packets.remove(dropped_index);
self.metrics.votes_dropped_on_capacity +=
usize::from(dropped_packet.meta().is_simple_vote_tx());
self.metrics.non_votes_dropped_on_capacity +=
usize::from(!dropped_packet.meta().is_simple_vote_tx());
}

let entry = self.packet_container.packets.vacant_entry();
@@ -210,6 +234,7 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
}

fn forward_buffered_packets(&mut self) {
self.metrics.did_something |= !self.packet_container.priority_queue.is_empty();
self.refresh_data_budget();

// Get forwarding addresses otherwise return now.
@@ -236,6 +261,10 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {

// If it exceeds our data-budget, drop.
if !self.data_budget.take(packet.meta().size) {
self.metrics.votes_dropped_on_data_budget +=
usize::from(packet.meta().is_simple_vote_tx());
self.metrics.non_votes_dropped_on_data_budget +=
usize::from(!packet.meta().is_simple_vote_tx());
self.packet_container.packets.remove(priority_index.index);
continue;
}
@@ -254,6 +283,16 @@ impl<F: ForwardAddressGetter> ForwardingStage<F> {
}
}
}

// Send out remaining packets
if !vote_batch.is_empty() {
self.metrics.votes_forwarded += vote_batch.len();
self.send_vote_batch(tpu_vote, &mut vote_batch);
}
if !non_vote_batch.is_empty() {
self.metrics.non_votes_forwarded += non_vote_batch.len();
self.send_non_vote_batch(&mut non_vote_batch);
}
}

/// Re-fill the data budget if enough time has passed
@@ -357,3 +396,86 @@ fn calculate_priority(
.wrapping_div(cost.sum().saturating_add(1)),
)
}

struct ForwardingStageMetrics {
last_reported: Instant,
did_something: bool,

votes_received: usize,
votes_dropped_on_receive: usize,
votes_dropped_on_capacity: usize,
votes_dropped_on_data_budget: usize,
votes_forwarded: usize,

non_votes_received: usize,
non_votes_dropped_on_receive: usize,
non_votes_dropped_on_capacity: usize,
non_votes_dropped_on_data_budget: usize,
non_votes_forwarded: usize,

dropped_on_timeout: usize,
}

impl ForwardingStageMetrics {
fn maybe_report(&mut self) {
const REPORTING_INTERVAL: Duration = Duration::from_secs(1);
KirillLykov marked this conversation as resolved.
Show resolved Hide resolved

if self.last_reported.elapsed() > REPORTING_INTERVAL {
// Reset time and all counts.
let metrics = core::mem::take(self);

// Only report if something happened.
if !metrics.did_something {
return;
}

datapoint_info!(
"forwarding_stage",
("votes_received", metrics.votes_received, i64),
(
"votes_dropped_on_receive",
metrics.votes_dropped_on_receive,
i64
),
(
"votes_dropped_on_data_budget",
metrics.votes_dropped_on_data_budget,
i64
),
("votes_forwarded", metrics.votes_forwarded, i64),
("non_votes_received", metrics.non_votes_received, i64),
(
"votes_dropped_on_receive",
metrics.votes_dropped_on_receive,
i64
),
(
"votes_dropped_on_data_budget",
metrics.votes_dropped_on_data_budget,
i64
),
("votes_forwarded", metrics.votes_forwarded, i64),
);
}
}
}

impl Default for ForwardingStageMetrics {
fn default() -> Self {
Self {
last_reported: Instant::now(),
did_something: false,
votes_received: 0,
votes_dropped_on_receive: 0,
votes_dropped_on_capacity: 0,
votes_dropped_on_data_budget: 0,
votes_forwarded: 0,
non_votes_received: 0,
non_votes_dropped_on_receive: 0,
non_votes_dropped_on_capacity: 0,
non_votes_dropped_on_data_budget: 0,
non_votes_forwarded: 0,
dropped_on_timeout: 0,
}
}
}