@@ -36,6 +36,7 @@ use crate::{
36
36
Direction , TransportEvent , TransportService ,
37
37
} ,
38
38
substream:: Substream ,
39
+ transport:: Endpoint ,
39
40
types:: SubstreamId ,
40
41
PeerId ,
41
42
} ;
@@ -220,14 +221,18 @@ impl Kademlia {
220
221
}
221
222
222
223
/// Connection established to remote peer.
223
- fn on_connection_established ( & mut self , peer : PeerId ) -> crate :: Result < ( ) > {
224
+ fn on_connection_established ( & mut self , peer : PeerId , endpoint : Endpoint ) -> crate :: Result < ( ) > {
224
225
tracing:: trace!( target: LOG_TARGET , ?peer, "connection established" ) ;
225
226
226
227
match self . peers . entry ( peer) {
227
228
Entry :: Vacant ( entry) => {
228
- if let KBucketEntry :: Occupied ( entry) = self . routing_table . entry ( Key :: from ( peer) ) {
229
- entry. connection = ConnectionType :: Connected ;
230
- }
229
+ // Set the conenction type to connected and potentially save the address in the
230
+ // table.
231
+ //
232
+ // Note: this happens regardless of the state of the kademlia managed peers, because
233
+ // an already occupied entry in the `self.peers` map does not mean that we are
234
+ // no longer interested in the address / connection type of the peer.
235
+ self . routing_table . on_connection_established ( Key :: from ( peer) , endpoint) ;
231
236
232
237
let Some ( actions) = self . pending_dials . remove ( & peer) else {
233
238
entry. insert ( PeerContext :: new ( ) ) ;
@@ -262,7 +267,21 @@ impl Kademlia {
262
267
entry. insert ( context) ;
263
268
Ok ( ( ) )
264
269
}
265
- Entry :: Occupied ( _) => Err ( Error :: PeerAlreadyExists ( peer) ) ,
270
+ Entry :: Occupied ( _) => {
271
+ tracing:: warn!(
272
+ target: LOG_TARGET ,
273
+ ?peer,
274
+ ?endpoint,
275
+ "connection already exists, discarding opening substreams, this is unexpected"
276
+ ) ;
277
+
278
+ // Update the connection in the routing table, similar as above. The function call
279
+ // happens in two places to avoid unnecessary cloning of the endpoint for logging
280
+ // purposes.
281
+ self . routing_table . on_connection_established ( Key :: from ( peer) , endpoint) ;
282
+
283
+ Err ( Error :: PeerAlreadyExists ( peer) )
284
+ }
266
285
}
267
286
}
268
287
@@ -395,12 +414,13 @@ impl Kademlia {
395
414
. await ;
396
415
397
416
for info in peers {
398
- self . service . add_known_address ( & info. peer , info. addresses . iter ( ) . cloned ( ) ) ;
417
+ let addresses = info. addresses ( ) ;
418
+ self . service . add_known_address ( & info. peer , addresses. clone ( ) . into_iter ( ) ) ;
399
419
400
420
if std:: matches!( self . update_mode, RoutingTableUpdateMode :: Automatic ) {
401
421
self . routing_table . add_known_peer (
402
422
info. peer ,
403
- info . addresses . clone ( ) ,
423
+ addresses,
404
424
self . peers
405
425
. get ( & info. peer )
406
426
. map_or ( ConnectionType :: NotConnected , |_| ConnectionType :: Connected ) ,
@@ -529,13 +549,15 @@ impl Kademlia {
529
549
) ;
530
550
531
551
match ( providers. len ( ) , providers. pop ( ) ) {
532
- ( 1 , Some ( provider) ) =>
552
+ ( 1 , Some ( provider) ) => {
553
+ let addresses = provider. addresses ( ) ;
554
+
533
555
if provider. peer == peer {
534
556
self . store . put_provider (
535
557
key. clone ( ) ,
536
558
ContentProvider {
537
559
peer,
538
- addresses : provider . addresses . clone ( ) ,
560
+ addresses : addresses. clone ( ) ,
539
561
} ,
540
562
) ;
541
563
@@ -545,7 +567,7 @@ impl Kademlia {
545
567
provided_key : key,
546
568
provider : ContentProvider {
547
569
peer : provider. peer ,
548
- addresses : provider . addresses ,
570
+ addresses,
549
571
} ,
550
572
} )
551
573
. await ;
@@ -556,7 +578,8 @@ impl Kademlia {
556
578
provider = ?provider. peer,
557
579
"ignoring `ADD_PROVIDER` message with `publisher` != `provider`"
558
580
)
559
- } ,
581
+ }
582
+ }
560
583
( n, _) => {
561
584
tracing:: trace!(
562
585
target: LOG_TARGET ,
@@ -669,8 +692,10 @@ impl Kademlia {
669
692
}
670
693
671
694
/// Handle dial failure.
672
- fn on_dial_failure ( & mut self , peer : PeerId , address : Multiaddr ) {
673
- tracing:: trace!( target: LOG_TARGET , ?peer, ?address, "failed to dial peer" ) ;
695
+ fn on_dial_failure ( & mut self , peer : PeerId , addresses : Vec < Multiaddr > ) {
696
+ tracing:: trace!( target: LOG_TARGET , ?peer, ?addresses, "failed to dial peer" ) ;
697
+
698
+ self . routing_table . on_dial_failure ( Key :: from ( peer) , & addresses) ;
674
699
675
700
let Some ( actions) = self . pending_dials . remove ( & peer) else {
676
701
return ;
@@ -682,7 +707,7 @@ impl Kademlia {
682
707
target: LOG_TARGET ,
683
708
?peer,
684
709
query = ?query_id,
685
- ?address ,
710
+ ?addresses ,
686
711
"report failure for pending query" ,
687
712
) ;
688
713
@@ -774,7 +799,10 @@ impl Kademlia {
774
799
. send ( KademliaEvent :: FindNodeSuccess {
775
800
target,
776
801
query_id : query,
777
- peers : peers. into_iter ( ) . map ( |info| ( info. peer , info. addresses ) ) . collect ( ) ,
802
+ peers : peers
803
+ . into_iter ( )
804
+ . map ( |info| ( info. peer , info. addresses ( ) ) )
805
+ . collect ( ) ,
778
806
} )
779
807
. await ;
780
808
Ok ( ( ) )
@@ -889,8 +917,8 @@ impl Kademlia {
889
917
890
918
tokio:: select! {
891
919
event = self . service. next( ) => match event {
892
- Some ( TransportEvent :: ConnectionEstablished { peer, .. } ) => {
893
- if let Err ( error) = self . on_connection_established( peer) {
920
+ Some ( TransportEvent :: ConnectionEstablished { peer, endpoint } ) => {
921
+ if let Err ( error) = self . on_connection_established( peer, endpoint ) {
894
922
tracing:: debug!(
895
923
target: LOG_TARGET ,
896
924
?error,
@@ -923,8 +951,8 @@ impl Kademlia {
923
951
Some ( TransportEvent :: SubstreamOpenFailure { substream, error } ) => {
924
952
self . on_substream_open_failure( substream, error) . await ;
925
953
}
926
- Some ( TransportEvent :: DialFailure { peer, address , .. } ) =>
927
- self . on_dial_failure( peer, address ) ,
954
+ Some ( TransportEvent :: DialFailure { peer, addresses } ) =>
955
+ self . on_dial_failure( peer, addresses ) ,
928
956
None => return Err ( Error :: EssentialTaskClosed ) ,
929
957
} ,
930
958
context = self . executor. next( ) => {
@@ -1048,7 +1076,7 @@ impl Kademlia {
1048
1076
1049
1077
match self . routing_table. entry( Key :: from( peer) ) {
1050
1078
KBucketEntry :: Occupied ( entry) => Some ( entry. clone( ) ) ,
1051
- KBucketEntry :: Vacant ( entry) if !entry. addresses . is_empty( ) =>
1079
+ KBucketEntry :: Vacant ( entry) if !entry. address_store . is_empty( ) =>
1052
1080
Some ( entry. clone( ) ) ,
1053
1081
_ => None ,
1054
1082
}
@@ -1237,8 +1265,11 @@ mod tests {
1237
1265
KEEP_ALIVE_TIMEOUT ,
1238
1266
} ,
1239
1267
types:: protocol:: ProtocolName ,
1240
- BandwidthSink ,
1268
+ BandwidthSink , ConnectionId ,
1241
1269
} ;
1270
+ use multiaddr:: Protocol ;
1271
+ use multihash:: Multihash ;
1272
+ use std:: str:: FromStr ;
1242
1273
use tokio:: sync:: mpsc:: channel;
1243
1274
1244
1275
#[ allow( unused) ]
@@ -1379,4 +1410,89 @@ mod tests {
1379
1410
// Check the local storage should not get updated.
1380
1411
assert ! ( kademlia. store. get( & key) . is_none( ) ) ;
1381
1412
}
1413
+
1414
+ #[ tokio:: test]
1415
+ async fn check_address_store_routing_table_updates ( ) {
1416
+ let ( mut kademlia, _context, _manager) = make_kademlia ( ) ;
1417
+
1418
+ let peer = PeerId :: random ( ) ;
1419
+ let address_a = Multiaddr :: from_str ( "/dns/domain1.com/tcp/30333" ) . unwrap ( ) . with (
1420
+ Protocol :: P2p ( Multihash :: from_bytes ( & peer. to_bytes ( ) ) . unwrap ( ) ) ,
1421
+ ) ;
1422
+ let address_b = Multiaddr :: from_str ( "/dns/domain1.com/tcp/30334" ) . unwrap ( ) . with (
1423
+ Protocol :: P2p ( Multihash :: from_bytes ( & peer. to_bytes ( ) ) . unwrap ( ) ) ,
1424
+ ) ;
1425
+ let address_c = Multiaddr :: from_str ( "/dns/domain1.com/tcp/30339" ) . unwrap ( ) . with (
1426
+ Protocol :: P2p ( Multihash :: from_bytes ( & peer. to_bytes ( ) ) . unwrap ( ) ) ,
1427
+ ) ;
1428
+
1429
+ // Added only with address a.
1430
+ kademlia. routing_table . add_known_peer (
1431
+ peer,
1432
+ vec ! [ address_a. clone( ) ] ,
1433
+ ConnectionType :: NotConnected ,
1434
+ ) ;
1435
+
1436
+ // Check peer addresses.
1437
+ match kademlia. routing_table . entry ( Key :: from ( peer) ) {
1438
+ KBucketEntry :: Occupied ( entry) => {
1439
+ assert_eq ! ( entry. addresses( ) , vec![ address_a. clone( ) ] ) ;
1440
+ }
1441
+ _ => panic ! ( "Peer not found in routing table" ) ,
1442
+ } ;
1443
+
1444
+ // Report successful connection with address b via dialer endpoint.
1445
+ let _ = kademlia. on_connection_established (
1446
+ peer,
1447
+ Endpoint :: Dialer {
1448
+ address : address_b. clone ( ) ,
1449
+ connection_id : ConnectionId :: from ( 0 ) ,
1450
+ } ,
1451
+ ) ;
1452
+
1453
+ // Address B has a higher priority, as it was detected via the dialing mechanism of the
1454
+ // transport manager, while address A is not dialed yet.
1455
+ match kademlia. routing_table . entry ( Key :: from ( peer) ) {
1456
+ KBucketEntry :: Occupied ( entry) => {
1457
+ assert_eq ! (
1458
+ entry. addresses( ) ,
1459
+ vec![ address_b. clone( ) , address_a. clone( ) ]
1460
+ ) ;
1461
+ }
1462
+ _ => panic ! ( "Peer not found in routing table" ) ,
1463
+ } ;
1464
+
1465
+ // Report successful connection with a random address via listener endpoint.
1466
+ let _ = kademlia. on_connection_established (
1467
+ peer,
1468
+ Endpoint :: Listener {
1469
+ address : address_c. clone ( ) ,
1470
+ connection_id : ConnectionId :: from ( 0 ) ,
1471
+ } ,
1472
+ ) ;
1473
+ // Address C was not added, as the peer has dialed us possibly on an ephemeral port.
1474
+ match kademlia. routing_table . entry ( Key :: from ( peer) ) {
1475
+ KBucketEntry :: Occupied ( entry) => {
1476
+ assert_eq ! (
1477
+ entry. addresses( ) ,
1478
+ vec![ address_b. clone( ) , address_a. clone( ) ]
1479
+ ) ;
1480
+ }
1481
+ _ => panic ! ( "Peer not found in routing table" ) ,
1482
+ } ;
1483
+
1484
+ // Address B fails two times (which gives it a lower score than A) and
1485
+ // makes it subject to removal.
1486
+ kademlia. on_dial_failure ( peer, vec ! [ address_b. clone( ) , address_b. clone( ) ] ) ;
1487
+
1488
+ match kademlia. routing_table . entry ( Key :: from ( peer) ) {
1489
+ KBucketEntry :: Occupied ( entry) => {
1490
+ assert_eq ! (
1491
+ entry. addresses( ) ,
1492
+ vec![ address_a. clone( ) , address_b. clone( ) ]
1493
+ ) ;
1494
+ }
1495
+ _ => panic ! ( "Peer not found in routing table" ) ,
1496
+ } ;
1497
+ }
1382
1498
}
0 commit comments