@@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
152
152
}
153
153
154
154
pub ( crate ) fn get_broadcast_peer ( & self , shred : & ShredId ) -> Option < & ContactInfo > {
155
- let shred_seed = shred. seed ( & self . pubkey ) ;
156
- let mut rng = ChaChaRng :: from_seed ( shred_seed) ;
155
+ let mut rng = get_seeded_rng ( /*leader:*/ & self . pubkey , shred) ;
157
156
let index = self . weighted_shuffle . first ( & mut rng) ?;
158
157
self . nodes [ index] . contact_info ( )
159
158
}
@@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
187
186
shred : & ShredId ,
188
187
fanout : usize ,
189
188
) -> Result < RetransmitPeers , Error > {
190
- let shred_seed = shred. seed ( slot_leader) ;
191
189
let mut weighted_shuffle = self . weighted_shuffle . clone ( ) ;
192
190
// Exclude slot leader from list of nodes.
193
191
if slot_leader == & self . pubkey {
@@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
200
198
weighted_shuffle. remove_index ( * index) ;
201
199
}
202
200
let mut addrs = HashMap :: < SocketAddr , Pubkey > :: with_capacity ( self . nodes . len ( ) ) ;
203
- let mut rng = ChaChaRng :: from_seed ( shred_seed ) ;
201
+ let mut rng = get_seeded_rng ( slot_leader , shred ) ;
204
202
let protocol = get_broadcast_protocol ( shred) ;
205
203
let nodes: Vec < _ > = weighted_shuffle
206
204
. shuffle ( & mut rng)
@@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
233
231
addrs,
234
232
} )
235
233
}
234
+
235
+ // Returns the parent node in the turbine broadcast tree.
236
+ // Returns None if the node is the root of the tree or if it is not staked.
237
+ #[ allow( unused) ]
238
+ fn get_retransmit_parent (
239
+ & self ,
240
+ leader : & Pubkey ,
241
+ shred : & ShredId ,
242
+ fanout : usize ,
243
+ ) -> Result < Option < Pubkey > , Error > {
244
+ // Exclude slot leader from list of nodes.
245
+ if leader == & self . pubkey {
246
+ return Err ( Error :: Loopback {
247
+ leader : * leader,
248
+ shred : * shred,
249
+ } ) ;
250
+ }
251
+ // Unstaked nodes' position in the turbine tree is not deterministic
252
+ // and depends on gossip propagation of contact-infos. Therefore, if
253
+ // this node is not staked return None.
254
+ if self . nodes [ self . index [ & self . pubkey ] ] . stake == 0 {
255
+ return Ok ( None ) ;
256
+ }
257
+ let mut weighted_shuffle = self . weighted_shuffle . clone ( ) ;
258
+ if let Some ( index) = self . index . get ( leader) . copied ( ) {
259
+ weighted_shuffle. remove_index ( index) ;
260
+ }
261
+ let mut rng = get_seeded_rng ( leader, shred) ;
262
+ // Only need shuffled nodes until this node itself.
263
+ let nodes: Vec < _ > = weighted_shuffle
264
+ . shuffle ( & mut rng)
265
+ . map ( |index| & self . nodes [ index] )
266
+ . take_while ( |node| node. pubkey ( ) != self . pubkey )
267
+ . collect ( ) ;
268
+ let parent = get_retransmit_parent ( fanout, nodes. len ( ) , & nodes) ;
269
+ Ok ( parent. map ( Node :: pubkey) )
270
+ }
236
271
}
237
272
238
273
pub fn new_cluster_nodes < T : ' static > (
@@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
296
331
. collect ( )
297
332
}
298
333
334
+ fn get_seeded_rng ( leader : & Pubkey , shred : & ShredId ) -> ChaChaRng {
335
+ let seed = shred. seed ( leader) ;
336
+ ChaChaRng :: from_seed ( seed)
337
+ }
338
+
299
339
// root : [0]
300
340
// 1st layer: [1, 2, ..., fanout]
301
341
// 2nd layer: [[fanout + 1, ..., fanout * 2],
@@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
327
367
. copied ( )
328
368
}
329
369
370
+ // Returns the parent node in the turbine broadcast tree.
371
+ // Returns None if the node is the root of the tree.
372
+ fn get_retransmit_parent < T : Copy > (
373
+ fanout : usize ,
374
+ index : usize , // Local node's index within the nodes slice.
375
+ nodes : & [ T ] ,
376
+ ) -> Option < T > {
377
+ // Node's index within its neighborhood.
378
+ let offset = index. saturating_sub ( 1 ) % fanout;
379
+ let index = index. checked_sub ( 1 ) ? / fanout;
380
+ let index = index - index. saturating_sub ( 1 ) % fanout;
381
+ let index = if index == 0 { index } else { index + offset } ;
382
+ nodes. get ( index) . copied ( )
383
+ }
384
+
330
385
impl < T > ClusterNodesCache < T > {
331
386
pub fn new (
332
387
// Capacity of underlying LRU-cache in terms of number of epochs.
@@ -527,7 +582,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &
527
582
528
583
#[ cfg( test) ]
529
584
mod tests {
530
- use super :: * ;
585
+ use {
586
+ super :: * ,
587
+ std:: { fmt:: Debug , hash:: Hash } ,
588
+ test_case:: test_case,
589
+ } ;
531
590
532
591
#[ test]
533
592
fn test_cluster_nodes_retransmit ( ) {
@@ -600,10 +659,42 @@ mod tests {
600
659
}
601
660
}
602
661
662
+ // Checks (1) computed retransmit children against expected children and
663
+ // (2) computed parent of each child against the expected parent.
664
+ fn check_retransmit_nodes < T > ( fanout : usize , nodes : & [ T ] , peers : Vec < Vec < T > > )
665
+ where
666
+ T : Copy + Eq + PartialEq + Debug + Hash ,
667
+ {
668
+ // Map node identities to their index within the shuffled tree.
669
+ let index: HashMap < _ , _ > = nodes
670
+ . iter ( )
671
+ . copied ( )
672
+ . enumerate ( )
673
+ . map ( |( k, node) | ( node, k) )
674
+ . collect ( ) ;
675
+ let offset = peers. len ( ) ;
676
+ // Root node's parent is None.
677
+ assert_eq ! ( get_retransmit_parent( fanout, /*index:*/ 0 , nodes) , None ) ;
678
+ for ( k, peers) in peers. into_iter ( ) . enumerate ( ) {
679
+ assert_eq ! (
680
+ get_retransmit_peers( fanout, k, nodes) . collect:: <Vec <_>>( ) ,
681
+ peers
682
+ ) ;
683
+ let parent = Some ( nodes[ k] ) ;
684
+ for peer in peers {
685
+ assert_eq ! ( get_retransmit_parent( fanout, index[ & peer] , nodes) , parent) ;
686
+ }
687
+ }
688
+ // Remaining nodes have no children.
689
+ for k in offset..=nodes. len ( ) {
690
+ assert_eq ! ( get_retransmit_peers( fanout, k, nodes) . next( ) , None ) ;
691
+ }
692
+ }
693
+
603
694
#[ test]
604
- fn test_get_retransmit_peers ( ) {
695
+ fn test_get_retransmit_nodes ( ) {
605
696
// fanout 2
606
- let index = vec ! [
697
+ let nodes = [
607
698
7 , // root
608
699
6 , 10 , // 1st layer
609
700
// 2nd layer
@@ -631,16 +722,9 @@ mod tests {
631
722
vec![ 16 , 9 ] ,
632
723
vec![ 8 ] ,
633
724
] ;
634
- for ( k, peers) in peers. into_iter ( ) . enumerate ( ) {
635
- let retransmit_peers = get_retransmit_peers ( /*fanout:*/ 2 , k, & index) ;
636
- assert_eq ! ( retransmit_peers. collect:: <Vec <_>>( ) , peers) ;
637
- }
638
- for k in 10 ..=index. len ( ) {
639
- let mut retransmit_peers = get_retransmit_peers ( /*fanout:*/ 2 , k, & index) ;
640
- assert_eq ! ( retransmit_peers. next( ) , None ) ;
641
- }
725
+ check_retransmit_nodes ( /*fanout:*/ 2 , & nodes, peers) ;
642
726
// fanout 3
643
- let index = vec ! [
727
+ let nodes = [
644
728
19 , // root
645
729
14 , 15 , 28 , // 1st layer
646
730
// 2nd layer
@@ -672,13 +756,84 @@ mod tests {
672
756
vec![ 24 , 32 ] ,
673
757
vec![ 34 ] ,
674
758
] ;
675
- for ( k, peers) in peers. into_iter ( ) . enumerate ( ) {
676
- let retransmit_peers = get_retransmit_peers ( /*fanout:*/ 3 , k, & index) ;
677
- assert_eq ! ( retransmit_peers. collect:: <Vec <_>>( ) , peers) ;
759
+ check_retransmit_nodes ( /*fanout:*/ 3 , & nodes, peers) ;
760
+ let nodes = [
761
+ 5 , // root
762
+ 34 , 52 , 8 , // 1st layer
763
+ // 2nd layar
764
+ 44 , 18 , 2 , // 1st neigborhood
765
+ 42 , 47 , 46 , // 2nd
766
+ 11 , 26 , 28 , // 3rd
767
+ // 3rd layer
768
+ 53 , 23 , 37 , // 1st neighborhood
769
+ 40 , 13 , 7 , // 2nd
770
+ 50 , 35 , 22 , // 3rd
771
+ 3 , 27 , 31 , // 4th
772
+ 10 , 48 , 15 , // 5th
773
+ 19 , 6 , 30 , // 6th
774
+ 36 , 45 , 1 , // 7th
775
+ 38 , 12 , 17 , // 8th
776
+ 4 , 32 , 16 , // 9th
777
+ // 4th layer
778
+ 41 , 49 , 24 , // 1st neighborhood
779
+ 14 , 9 , 0 , // 2nd
780
+ 29 , 21 , 39 , // 3rd
781
+ 43 , 51 , 33 , // 4th
782
+ 25 , 20 , // 5th
783
+ ] ;
784
+ let peers = vec ! [
785
+ vec![ 34 , 52 , 8 ] ,
786
+ vec![ 44 , 42 , 11 ] ,
787
+ vec![ 18 , 47 , 26 ] ,
788
+ vec![ 2 , 46 , 28 ] ,
789
+ vec![ 53 , 40 , 50 ] ,
790
+ vec![ 23 , 13 , 35 ] ,
791
+ vec![ 37 , 7 , 22 ] ,
792
+ vec![ 3 , 10 , 19 ] ,
793
+ vec![ 27 , 48 , 6 ] ,
794
+ vec![ 31 , 15 , 30 ] ,
795
+ vec![ 36 , 38 , 4 ] ,
796
+ vec![ 45 , 12 , 32 ] ,
797
+ vec![ 1 , 17 , 16 ] ,
798
+ vec![ 41 , 14 , 29 ] ,
799
+ vec![ 49 , 9 , 21 ] ,
800
+ vec![ 24 , 0 , 39 ] ,
801
+ vec![ 43 , 25 ] ,
802
+ vec![ 51 , 20 ] ,
803
+ vec![ 33 ] ,
804
+ ] ;
805
+ check_retransmit_nodes ( /*fanout:*/ 3 , & nodes, peers) ;
806
+ }
807
+
808
+ #[ test_case( 2 , 1_347 ) ]
809
+ #[ test_case( 3 , 1_359 ) ]
810
+ #[ test_case( 4 , 4_296 ) ]
811
+ #[ test_case( 5 , 3_925 ) ]
812
+ #[ test_case( 6 , 8_778 ) ]
813
+ #[ test_case( 7 , 9_879 ) ]
814
+ fn test_get_retransmit_nodes_round_trip ( fanout : usize , size : usize ) {
815
+ let mut rng = rand:: thread_rng ( ) ;
816
+ let mut nodes: Vec < _ > = ( 0 ..size) . collect ( ) ;
817
+ nodes. shuffle ( & mut rng) ;
818
+ // Map node identities to their index within the shuffled tree.
819
+ let index: HashMap < _ , _ > = nodes
820
+ . iter ( )
821
+ . copied ( )
822
+ . enumerate ( )
823
+ . map ( |( k, node) | ( node, k) )
824
+ . collect ( ) ;
825
+ // Root node's parent is None.
826
+ assert_eq ! ( get_retransmit_parent( fanout, /*index:*/ 0 , & nodes) , None ) ;
827
+ for k in 1 ..size {
828
+ let parent = get_retransmit_parent ( fanout, k, & nodes) . unwrap ( ) ;
829
+ let mut peers = get_retransmit_peers ( fanout, index[ & parent] , & nodes) ;
830
+ assert_eq ! ( peers. find( |& peer| peer == nodes[ k] ) , Some ( nodes[ k] ) ) ;
678
831
}
679
- for k in 13 ..=index. len ( ) {
680
- let mut retransmit_peers = get_retransmit_peers ( /*fanout:*/ 3 , k, & index) ;
681
- assert_eq ! ( retransmit_peers. next( ) , None ) ;
832
+ for k in 0 ..size {
833
+ let parent = Some ( nodes[ k] ) ;
834
+ for peer in get_retransmit_peers ( fanout, k, & nodes) {
835
+ assert_eq ! ( get_retransmit_parent( fanout, index[ & peer] , & nodes) , parent) ;
836
+ }
682
837
}
683
838
}
684
839
}
0 commit comments