Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "use tpu-client-next in send_transaction_service (#3515)" #4252

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use {
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
std::{
io,
Expand Down Expand Up @@ -455,16 +454,17 @@ pub async fn start_tcp_server(
.map(move |chan| {
let (sender, receiver) = unbounded();

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_addr,
&bank_forks,
None,
None,
receiver,
connection_cache.clone(),
5_000,
0,
exit.clone(),
);

SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone());

let server = BanksServer::new(
bank_forks.clone(),
block_commitment_cache.clone(),
Expand Down
31 changes: 1 addition & 30 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 53 additions & 24 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -44,7 +47,7 @@ impl TpuInfo for ClusterTpuInfo {
.collect();
}

fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
Expand All @@ -64,23 +67,37 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
let recorder = self.poh_recorder.read().unwrap();
let leader_pubkeys: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
.collect();
drop(recorder);
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
self.recent_peers
.get(leader_pubkey)
.map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
})
})
.collect()
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
}
}

Expand Down Expand Up @@ -255,12 +272,12 @@ mod test {
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_unique_leader_tpus(1, Protocol::UDP),
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -274,12 +291,15 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_unique_leader_tpus(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -294,17 +314,26 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
leader_info.get_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
assert!(
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
);
}
}
Expand Down
39 changes: 17 additions & 22 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ use {
solana_runtime::commitment::CommitmentSlots,
solana_send_transaction_service::{
send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down Expand Up @@ -459,19 +458,14 @@ impl JsonRpcRequestProcessor {
.tpu(connection_cache.protocol())
.unwrap();
let (transaction_sender, transaction_receiver) = unbounded();

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
None,
None,
1,
);
SendTransactionService::new(
&bank_forks,
None,
transaction_receiver,
client,
connection_cache,
1000,
1,
exit.clone(),
);

Expand Down Expand Up @@ -4569,9 +4563,7 @@ pub mod tests {
},
vote::state::VoteState,
},
solana_send_transaction_service::{
tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient,
},
solana_send_transaction_service::tpu_info::NullTpuInfo,
solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
TransactionDetails,
Expand Down Expand Up @@ -6692,14 +6684,16 @@ pub mod tests {
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
None,
receiver,
connection_cache,
1000,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
Expand Down Expand Up @@ -6972,15 +6966,16 @@ pub mod tests {
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
None,
receiver,
connection_cache,
1000,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {
Expand Down
18 changes: 5 additions & 13 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use {
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol,
},
solana_send_transaction_service::{
send_transaction_service::{self, SendTransactionService},
transaction_client::ConnectionCacheClient,
},
solana_send_transaction_service::send_transaction_service::{self, SendTransactionService},
solana_storage_bigtable::CredentialType,
std::{
net::SocketAddr,
Expand Down Expand Up @@ -470,20 +467,15 @@ impl JsonRpcService {

let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let client = ConnectionCacheClient::new(
connection_cache,
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
tpu_address,
send_transaction_service_config.tpu_peers.clone(),
leader_info,
send_transaction_service_config.leader_forward_count,
);
let _send_transaction_service = SendTransactionService::new_with_config(
&bank_forks,
leader_info,
receiver,
client,
connection_cache,
send_transaction_service_config,
exit,
);
));

#[cfg(test)]
let test_request_processor = request_processor.clone();
Expand Down
9 changes: 1 addition & 8 deletions send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,20 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-quic-client = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
solana-tpu-client = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

[features]
dev-context-only-utils = []

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
Loading
Loading