diff --git a/Cargo.lock b/Cargo.lock index 32a3c4b8f98c..cd33c20433c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1262,9 +1262,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2 1.0.82", "quote 1.0.37", @@ -1440,7 +1440,7 @@ dependencies = [ "lazy_static", "lazycell", "peeking_take_while", - "prettyplease 0.2.12", + "prettyplease", "proc-macro2 1.0.82", "quote 1.0.37", "regex", @@ -2364,12 +2364,6 @@ dependencies = [ "tuplex", ] -[[package]] -name = "bs58" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3" - [[package]] name = "bs58" version = "0.5.1" @@ -2431,9 +2425,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.6.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "bzip2-sys" @@ -3564,21 +3558,6 @@ dependencies = [ "wasmtime-types", ] -[[package]] -name = "crc" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2b432c56615136f8dba245fed7ec3d5518c500a31108661067e61e72fe7e6bc" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.3.2" @@ -5418,7 +5397,7 @@ dependencies = [ "blake2 0.10.6", "file-guard", "fs-err", - "prettyplease 0.2.12", + "prettyplease", "proc-macro2 1.0.82", "quote 1.0.37", "syn 2.0.65", @@ -5657,21 +5636,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "fork-tree" version = "13.0.0" @@ -6696,6 +6660,51 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "hickory-proto" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner 0.6.0", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28757f23aa75c98f254cf0405e6d8c25b831b32921b050a66692427679b1f243" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.3", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -7652,9 +7661,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libflate" @@ -7847,7 +7856,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55cca1eb2bc1fd29f099f3daaab7effd01e1a54b7c577d0ed082521034d912e8" dependencies = [ - "bs58 0.5.1", + "bs58", "ed25519-dalek", "hkdf", "multihash 0.19.1", @@ -7984,7 +7993,7 @@ dependencies = [ "libp2p-tls", "log", "parking_lot 0.12.3", - "quinn 0.10.2", + "quinn", "rand", "ring 0.16.20", "rustls 0.21.7", @@ -8305,21 +8314,22 @@ dependencies = [ [[package]] name = "litep2p" -version = "0.6.2" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f46c51c205264b834ceed95c8b195026e700494bc3991aaba3b4ea9e20626d9" +checksum = "5b67484b8ac41e1cfdf012f65fa81e88c2ef5f8a7d6dec0e2678c2d06dc04530" dependencies = [ "async-trait", - "bs58 0.4.0", + "bs58", "bytes", "cid 0.10.1", "ed25519-dalek", "futures", "futures-timer", "hex-literal", + "hickory-resolver", "indexmap 2.2.3", "libc", - "mockall 0.12.1", + "mockall 0.13.1", "multiaddr 0.17.1", "multihash 0.17.0", "network-interface", @@ -8327,8 +8337,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project", "prost 0.12.6", - "prost-build 0.11.9", - "quinn 0.9.4", + "prost-build 0.13.3", "rand", "rcgen", "ring 0.16.20", @@ -8340,18 +8349,15 @@ dependencies = [ "snow", "socket2 0.5.7", "static_assertions", - "str0m", "thiserror", "tokio", "tokio-stream", "tokio-tungstenite", "tokio-util", "tracing", - "trust-dns-resolver", "uint", "unsigned-varint 0.8.0", "url", - "webpki", "x25519-dalek", "x509-parser 0.16.0", "yasna", @@ -8798,15 +8804,14 @@ dependencies = [ [[package]] name = "mockall" -version = "0.12.1" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" dependencies = [ "cfg-if", "downcast", "fragile", - "lazy_static", - "mockall_derive 0.12.1", + "mockall_derive 0.13.1", "predicates 3.0.3", "predicates-tree", ] @@ -8825,9 +8830,9 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.12.1" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" dependencies = [ "cfg-if", "proc-macro2 1.0.82", @@ -9536,47 +9541,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.64" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2 1.0.82", - "quote 1.0.37", - "syn 2.0.65", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-src" -version = "300.2.3+3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cff92b6f71555b61bb9315f7c64da3ca43d87531622120fea0195fc761b4843" -dependencies = [ - "cc", -] - [[package]] name = "openssl-sys" version = "0.9.102" @@ -9585,7 +9555,6 @@ checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" dependencies = [ "cc", "libc", - "openssl-src", "pkg-config", "vcpkg", ] @@ -13673,7 +13642,7 @@ name = "polkadot-node-metrics" version = "18.0.0" dependencies = [ "assert_cmd", - "bs58 0.5.1", + "bs58", "futures", "futures-timer", "http-body-util", @@ -14131,7 +14100,7 @@ dependencies = [ name = "polkadot-runtime-metrics" version = "17.0.0" dependencies = [ - "bs58 0.5.1", + "bs58", "frame-benchmarking", "parity-scale-codec", "polkadot-primitives", @@ -15450,16 +15419,6 @@ dependencies = [ "yansi", ] -[[package]] -name = "prettyplease" -version = "0.1.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" -dependencies = [ - "proc-macro2 1.0.82", - "syn 1.0.109", -] - [[package]] name = "prettyplease" version = "0.2.12" @@ -15705,33 +15664,42 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive 0.13.3", +] + [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.1", - "itertools 0.10.5", - "lazy_static", + "heck 0.5.0", + "itertools 0.11.0", "log", "multimap", + "once_cell", "petgraph", - "prettyplease 0.1.25", - "prost 0.11.9", - "prost-types 0.11.9", + "prettyplease", + "prost 0.12.6", + "prost-types 0.12.4", "regex", - "syn 1.0.109", + "syn 2.0.65", "tempfile", - "which", ] [[package]] name = "prost-build" -version = "0.12.4" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck 0.5.0", @@ -15740,9 +15708,9 @@ dependencies = [ "multimap", "once_cell", "petgraph", - "prettyplease 0.2.12", - "prost 0.12.6", - "prost-types 0.12.4", + "prettyplease", + "prost 0.13.3", + "prost-types 0.13.3", "regex", "syn 2.0.65", "tempfile", @@ -15775,12 +15743,16 @@ dependencies = [ ] [[package]] -name = "prost-types" -version = "0.11.9" +name = "prost-derive" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ - "prost 0.11.9", + "anyhow", + "itertools 0.11.0", + "proc-macro2 1.0.82", + "quote 1.0.37", + "syn 2.0.65", ] [[package]] @@ -15792,6 +15764,15 @@ dependencies = [ "prost 0.12.6", ] +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost 0.13.3", +] + [[package]] name = "psm" version = "0.1.21" @@ -15897,24 +15878,6 @@ dependencies = [ "rand", ] -[[package]] -name = "quinn" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e" -dependencies = [ - "bytes", - "pin-project-lite", - "quinn-proto 0.9.6", - "quinn-udp 0.3.2", - "rustc-hash 1.1.0", - "rustls 0.20.9", - "thiserror", - "tokio", - "tracing", - "webpki", -] - [[package]] name = "quinn" version = "0.10.2" @@ -15924,8 +15887,8 @@ dependencies = [ "bytes", "futures-io", "pin-project-lite", - "quinn-proto 0.10.6", - "quinn-udp 0.4.1", + "quinn-proto", + "quinn-udp", "rustc-hash 1.1.0", "rustls 0.21.7", "thiserror", @@ -15933,24 +15896,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "quinn-proto" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b0b33c13a79f669c85defaf4c275dc86a0c0372807d0ca3d78e0bb87274863" -dependencies = [ - "bytes", - "rand", - "ring 0.16.20", - "rustc-hash 1.1.0", - "rustls 0.20.9", - "slab", - "thiserror", - "tinyvec", - "tracing", - "webpki", -] - [[package]] name = "quinn-proto" version = "0.10.6" @@ -15968,19 +15913,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "quinn-udp" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" -dependencies = [ - "libc", - "quinn-proto 0.9.6", - "socket2 0.4.9", - "tracing", - "windows-sys 0.42.0", -] - [[package]] name = "quinn-udp" version = "0.4.1" @@ -18095,7 +18027,7 @@ dependencies = [ name = "sc-network-types" version = "0.12.1" dependencies = [ - "bs58 0.5.1", + "bs58", "ed25519-dalek", "libp2p-identity", "litep2p", @@ -18762,21 +18694,6 @@ dependencies = [ "untrusted 0.7.1", ] -[[package]] -name = "sctp-proto" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6220f78bb44c15f326b0596113305f6101097a18755d53727a575c97e09fb24" -dependencies = [ - "bytes", - "crc", - "fxhash", - "log", - "rand", - "slab", - "thiserror", -] - [[package]] name = "sec1" version = "0.7.3" @@ -19102,18 +19019,6 @@ dependencies = [ "opaque-debug 0.3.0", ] -[[package]] -name = "sha-1" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", - "sha1-asm", -] - [[package]] name = "sha1" version = "0.10.6" @@ -19125,15 +19030,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "sha1-asm" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ba6947745e7f86be3b8af00b7355857085dbdf8901393c89514510eb61f4e21" -dependencies = [ - "cc", -] - [[package]] name = "sha2" version = "0.9.9" @@ -19263,9 +19159,9 @@ dependencies = [ [[package]] name = "simple-dns" -version = "0.5.7" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae9a3fcdadafb6d97f4c0e007e4247b114ee0f119f650c3cbf3a8b3a1479694" +checksum = "4c80e565e7dcc4f1ef247e2f395550d4cf7d777746d5988e7e4e3156b71077fc" dependencies = [ "bitflags 2.6.0", ] @@ -19376,7 +19272,7 @@ dependencies = [ "base64 0.21.2", "bip39", "blake2-rfc", - "bs58 0.5.1", + "bs58", "chacha20", "crossbeam-queue", "derive_more", @@ -19830,7 +19726,7 @@ dependencies = [ "httparse", "log", "rand", - "sha-1 0.9.8", + "sha-1", ] [[package]] @@ -20189,7 +20085,7 @@ dependencies = [ "bitflags 1.3.2", "blake2 0.10.6", "bounded-collections", - "bs58 0.5.1", + "bs58", "criterion", "dyn-clonable", "ed25519-zebra", @@ -21077,26 +20973,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "str0m" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6706347e49b13373f7ddfafad47df7583ed52083d6fc8a594eb2c80497ef959d" -dependencies = [ - "combine", - "crc", - "fastrand 2.1.0", - "hmac 0.12.1", - "once_cell", - "openssl", - "openssl-sys", - "sctp-proto", - "serde", - "sha-1 0.10.1", - "thiserror", - "tracing", -] - [[package]] name = "string-interner" version = "0.17.0" @@ -23598,17 +23474,6 @@ dependencies = [ "westend-emulated-chain", ] -[[package]] -name = "which" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" -dependencies = [ - "either", - "libc", - "once_cell", -] - [[package]] name = "wide" version = "0.7.11" @@ -23703,21 +23568,6 @@ dependencies = [ "windows-targets 0.52.0", ] -[[package]] -name = "windows-sys" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index 1e0d2b5f7183..c4fe97be6016 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -830,7 +830,7 @@ linked-hash-map = { version = "0.5.4" } linked_hash_set = { version = "0.1.4" } linregress = { version = "0.5.1" } lite-json = { version = "0.2.0", default-features = false } -litep2p = { version = "0.6.2" } +litep2p = { version = "0.8.1", features = ["websocket"] } log = { version = "0.4.22", default-features = false } macro_magic = { version = "0.5.1" } maplit = { version = "1.0.2" } diff --git a/prdoc/pr_6497.prdoc b/prdoc/pr_6497.prdoc new file mode 100644 index 000000000000..16e219461a93 --- /dev/null +++ b/prdoc/pr_6497.prdoc @@ -0,0 +1,13 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Bring litep2p fixes and latest version to stable2409 + +doc: + - audience: [ Node Dev, Node Operator ] + description: | + This PR affects only litep2p (experimental) backend and contains critical fixes for connection stability and memory leaks. + +crates: + - name: sc-network + bump: patch diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 62d5f0fb6f06..9043f9420e8d 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -95,15 +95,6 @@ pub enum DiscoveryEvent { /// Peer ID. peer: PeerId, - /// Identify protocol version. - protocol_version: Option, - - /// Identify user agent version. - user_agent: Option, - - /// Observed address. - observed_address: Multiaddr, - /// Listen addresses. listen_addresses: Vec, @@ -125,7 +116,16 @@ pub enum DiscoveryEvent { /// New external address discovered. ExternalAddressDiscovered { - /// Discovered addresses. + /// Discovered address. + address: Multiaddr, + }, + + /// The external address has expired. + /// + /// This happens when the internal buffers exceed the maximum number of external addresses, + /// and this address is the oldest one. + ExternalAddressExpired { + /// Expired address. address: Multiaddr, }, @@ -162,6 +162,9 @@ pub enum DiscoveryEvent { /// Discovery. pub struct Discovery { + /// Local peer ID. + local_peer_id: litep2p::PeerId, + /// Ping event stream. ping_event_stream: Box + Send + Unpin>, @@ -233,6 +236,7 @@ impl Discovery { /// Enables `/ipfs/ping/1.0.0` and `/ipfs/identify/1.0.0` by default and starts /// the mDNS peer discovery if it was enabled. pub fn new + Clone>( + local_peer_id: litep2p::PeerId, config: &NetworkConfiguration, genesis_hash: Hash, fork_id: Option<&str>, @@ -243,11 +247,9 @@ impl Discovery { ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); let user_agent = format!("{} ({})", config.client_version, config.node_name); - let (identify_config, identify_event_stream) = IdentifyConfig::new( - "/substrate/1.0".to_string(), - Some(user_agent), - config.public_addresses.clone().into_iter().map(Into::into).collect(), - ); + + let (identify_config, identify_event_stream) = + IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent)); let (mdns_config, mdns_event_stream) = match config.transport { crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns { @@ -275,6 +277,7 @@ impl Discovery { ( Self { + local_peer_id, ping_event_stream, identify_event_stream, mdns_event_stream, @@ -434,7 +437,13 @@ impl Discovery { } /// Check if `address` can be considered a new external address. - fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool { + /// + /// If this address replaces an older address, the expired address is returned. + fn is_new_external_address( + &mut self, + address: &Multiaddr, + peer: PeerId, + ) -> (bool, Option) { log::trace!(target: LOG_TARGET, "verify new external address: {address}"); // is the address one of our known addresses @@ -445,7 +454,7 @@ impl Discovery { .chain(self.public_addresses.iter()) .any(|known_address| Discovery::is_known_address(&known_address, &address)) { - return true + return (true, None) } match self.address_confirmations.get(address) { @@ -453,15 +462,31 @@ impl Discovery { confirmations.insert(peer); if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS { - return true + return (true, None) } }, None => { + let oldest = (self.address_confirmations.len() >= + self.address_confirmations.limiter().max_length() as usize) + .then(|| { + self.address_confirmations.pop_oldest().map(|(address, peers)| { + if peers.len() >= MIN_ADDRESS_CONFIRMATIONS { + return Some(address) + } else { + None + } + }) + }) + .flatten() + .flatten(); + self.address_confirmations.insert(address.clone(), Default::default()); + + return (false, oldest) }, } - false + (false, None) } } @@ -533,7 +558,7 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records })); }, - Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) => + Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) => return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })), Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => { match this.find_node_query_id == Some(query_id) { @@ -556,6 +581,9 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record })) }, + // Content provider events are ignored for now. + Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) | + Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {}, } match Pin::new(&mut this.identify_event_stream).poll_next(cx) { @@ -563,24 +591,53 @@ impl Stream for Discovery { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(IdentifyEvent::PeerIdentified { peer, - protocol_version, - user_agent, listen_addresses, supported_protocols, observed_address, + .. })) => { - if this.is_new_external_address(&observed_address, peer) { - this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered { - address: observed_address.clone(), - }); + let observed_address = + if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() { + if peer_id != *this.local_peer_id.as_ref() { + log::warn!( + target: LOG_TARGET, + "Discovered external address for a peer that is not us: {observed_address}", + ); + None + } else { + Some(observed_address) + } + } else { + Some(observed_address.with(Protocol::P2p(this.local_peer_id.into()))) + }; + + // Ensure that an external address with a different peer ID does not have + // side effects of evicting other external addresses via `ExternalAddressExpired`. + if let Some(observed_address) = observed_address { + let (is_new, expired_address) = + this.is_new_external_address(&observed_address, peer); + + if let Some(expired_address) = expired_address { + log::trace!( + target: LOG_TARGET, + "Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}", + ); + + this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired { + address: expired_address, + }); + } + + if is_new { + this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered { + address: observed_address.clone(), + }); + } } return Poll::Ready(Some(DiscoveryEvent::Identified { peer, - protocol_version, - user_agent, listen_addresses, - observed_address, supported_protocols, })); }, diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 521f1a5fe0f7..81a07485d5e9 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -54,6 +54,7 @@ use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey}; use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, + error::{DialError, NegotiationError}, executor::Executor, protocol::{ libp2p::{ @@ -64,15 +65,14 @@ use litep2p::{ }, transport::{ tcp::config::Config as TcpTransportConfig, - websocket::config::Config as WebSocketTransportConfig, Endpoint, + websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint, }, types::{ multiaddr::{Multiaddr, Protocol}, ConnectionId, }, - Error as Litep2pError, Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName, + Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName, }; -use parking_lot::RwLock; use prometheus_endpoint::Registry; use sc_client_api::BlockBackend; @@ -183,9 +183,6 @@ pub struct Litep2pNetworkBackend { /// Prometheus metrics. metrics: Option, - - /// External addresses. - external_addresses: Arc>>, } impl Litep2pNetworkBackend { @@ -543,6 +540,7 @@ impl NetworkBackend for Litep2pNetworkBac let listen_addresses = Arc::new(Default::default()); let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( + local_peer_id, &network_config, params.genesis_hash, params.fork_id.as_deref(), @@ -557,6 +555,9 @@ impl NetworkBackend for Litep2pNetworkBac .with_libp2p_ping(ping_config) .with_libp2p_identify(identify_config) .with_libp2p_kademlia(kademlia_config) + .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections( + Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize), + )) .with_executor(executor); if let Some(config) = maybe_mdns_config { @@ -570,15 +571,22 @@ impl NetworkBackend for Litep2pNetworkBac let litep2p = Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?; - let external_addresses: Arc>> = Arc::new(RwLock::new( - HashSet::from_iter(network_config.public_addresses.iter().cloned().map(Into::into)), - )); litep2p.listen_addresses().for_each(|address| { log::debug!(target: LOG_TARGET, "listening on: {address}"); listen_addresses.write().insert(address.clone()); }); + let public_addresses = litep2p.public_addresses(); + for address in network_config.public_addresses.iter() { + if let Err(err) = public_addresses.add_address(address.clone().into()) { + log::warn!( + target: LOG_TARGET, + "failed to add public address {address:?}: {err:?}", + ); + } + } + let network_service = Arc::new(Litep2pNetworkService::new( local_peer_id, keypair.clone(), @@ -588,7 +596,7 @@ impl NetworkBackend for Litep2pNetworkBac block_announce_protocol.clone(), request_response_senders, Arc::clone(&listen_addresses), - Arc::clone(&external_addresses), + public_addresses, )); // register rest of the metrics now that `Litep2p` has been created @@ -614,7 +622,6 @@ impl NetworkBackend for Litep2pNetworkBac event_streams: out_events::OutChannels::new(None)?, peers: HashMap::new(), litep2p, - external_addresses, }) } @@ -913,14 +920,39 @@ impl NetworkBackend for Litep2pNetworkBac } } } - Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => { + Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols }) => { self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await; } Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => { - let mut addresses = self.external_addresses.write(); + match self.litep2p.public_addresses().add_address(address.clone().into()) { + Ok(inserted) => if inserted { + log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}"); + }, + Err(err) => { + log::warn!( + target: LOG_TARGET, + "🔍 Failed to add discovered external address {address:?}: {err:?}", + ); + }, + } + } + Some(DiscoveryEvent::ExternalAddressExpired{ address }) => { + let local_peer_id = self.litep2p.local_peer_id(); + + // Litep2p requires the peer ID to be present in the address. + let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) { + address.with(Protocol::P2p(*local_peer_id.as_ref())) + } else { + address + }; - if addresses.insert(address.clone()) { - log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}"); + if self.litep2p.public_addresses().remove_address(&address) { + log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}"); + } else { + log::warn!( + target: LOG_TARGET, + "🔍 Failed to remove expired external address {address:?}" + ); } } Some(DiscoveryEvent::Ping { peer, rtt }) => { @@ -1006,23 +1038,49 @@ impl NetworkBackend for Litep2pNetworkBac } } Some(Litep2pEvent::DialFailure { address, error }) => { - log::trace!( + log::debug!( target: LOG_TARGET, "failed to dial peer at {address:?}: {error:?}", ); - let reason = match error { - Litep2pError::PeerIdMismatch(_, _) => "invalid-peer-id", - Litep2pError::Timeout | Litep2pError::TransportError(_) | - Litep2pError::IoError(_) | Litep2pError::WebSocket(_) => "transport-error", - _ => "other", - }; + if let Some(metrics) = &self.metrics { + let reason = match error { + DialError::Timeout => "timeout", + DialError::AddressError(_) => "invalid-address", + DialError::DnsError(_) => "cannot-resolve-dns", + DialError::NegotiationError(error) => match error { + NegotiationError::Timeout => "timeout", + NegotiationError::PeerIdMissing => "missing-peer-id", + NegotiationError::StateMismatch => "state-mismatch", + NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch", + NegotiationError::MultistreamSelectError(_) => "multistream-select-error", + NegotiationError::SnowError(_) => "noise-error", + NegotiationError::ParseError(_) => "parse-error", + NegotiationError::IoError(_) => "io-error", + NegotiationError::WebSocket(_) => "webscoket-error", + } + }; + + metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc(); + } + } + Some(Litep2pEvent::ListDialFailures { errors }) => { + log::debug!( + target: LOG_TARGET, + "failed to dial peer on multiple addresses {errors:?}", + ); if let Some(metrics) = &self.metrics { - metrics.pending_connections_errors_total.with_label_values(&[reason]).inc(); + metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc(); } } - _ => {} + None => { + log::error!( + target: LOG_TARGET, + "Litep2p backend terminated" + ); + return + } }, } } diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index 67fc44e6bfe0..693217f5ad94 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -36,7 +36,10 @@ use crate::litep2p::Record; use codec::DecodeAll; use futures::{channel::oneshot, stream::BoxStream}; use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; -use litep2p::{crypto::ed25519::Keypair, types::multiaddr::Multiaddr as LiteP2pMultiaddr}; +use litep2p::{ + addresses::PublicAddresses, crypto::ed25519::Keypair, + types::multiaddr::Multiaddr as LiteP2pMultiaddr, +}; use parking_lot::RwLock; use sc_network_common::{ @@ -196,7 +199,7 @@ pub struct Litep2pNetworkService { listen_addresses: Arc>>, /// External addresses. - external_addresses: Arc>>, + external_addresses: PublicAddresses, } impl Litep2pNetworkService { @@ -210,7 +213,7 @@ impl Litep2pNetworkService { block_announce_protocol: ProtocolName, request_response_protocols: HashMap>, listen_addresses: Arc>>, - external_addresses: Arc>>, + external_addresses: PublicAddresses, ) -> Self { Self { local_peer_id, @@ -323,9 +326,8 @@ impl NetworkStatusProvider for Litep2pNetworkService { .collect(), external_addresses: self .external_addresses - .read() - .iter() - .cloned() + .get_addresses() + .into_iter() .map(|a| Multiaddr::from(a).into()) .collect(), connected_peers: HashMap::new(), @@ -491,7 +493,7 @@ impl NetworkEventStream for Litep2pNetworkService { impl NetworkStateInfo for Litep2pNetworkService { fn external_addresses(&self) -> Vec { - self.external_addresses.read().iter().cloned().map(Into::into).collect() + self.external_addresses.get_addresses().into_iter().map(Into::into).collect() } fn listen_addresses(&self) -> Vec { diff --git a/substrate/client/network/src/litep2p/shim/notification/peerset.rs b/substrate/client/network/src/litep2p/shim/notification/peerset.rs index 2fd7920909e3..fb822794ccf0 100644 --- a/substrate/client/network/src/litep2p/shim/notification/peerset.rs +++ b/substrate/client/network/src/litep2p/shim/notification/peerset.rs @@ -88,6 +88,8 @@ const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnect const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure"); /// Is the peer reserved? +/// +/// Regular peers count towards slot allocation. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum Reserved { Yes, @@ -118,6 +120,15 @@ pub enum Direction { Outbound(Reserved), } +impl Direction { + fn set_reserved(&mut self, new_reserved: Reserved) { + match self { + Direction::Inbound(ref mut reserved) | Direction::Outbound(ref mut reserved) => + *reserved = new_reserved, + } + } +} + impl From for traits::Direction { fn from(direction: Direction) -> traits::Direction { match direction { @@ -784,7 +795,9 @@ impl Peerset { } /// Calculate how many of the connected peers were counted as normal inbound/outbound peers - /// which is needed to adjust slot counts when new reserved peers are added + /// which is needed to adjust slot counts when new reserved peers are added. + /// + /// If the peer is not already in the [`Peerset`], it is added as a disconnected peer. fn calculate_slot_adjustment<'a>( &'a mut self, peers: impl Iterator, @@ -819,6 +832,26 @@ impl Peerset { }) } + /// Checks if the peer should be disconnected based on the current state of the [`Peerset`] + /// and the provided direction. + /// + /// Note: The role of the peer is not checked. + fn should_disconnect(&self, direction: Direction) -> bool { + match direction { + Direction::Inbound(_) => self.num_in >= self.max_in, + Direction::Outbound(_) => self.num_out >= self.max_out, + } + } + + /// Increment the slot count for given peer. + fn increment_slot(&mut self, direction: Direction) { + match direction { + Direction::Inbound(Reserved::No) => self.num_in += 1, + Direction::Outbound(Reserved::No) => self.num_out += 1, + _ => {}, + } + } + /// Get the number of inbound peers. #[cfg(test)] pub fn num_in(&self) -> usize { @@ -949,8 +982,9 @@ impl Stream for Peerset { }, // set new reserved peers for the protocol // - // current reserved peers not in the new set are disconnected and the new reserved - // peers are scheduled for outbound substreams + // Current reserved peers not in the new set are moved to the regular set of peers + // or disconnected (if there are no slots available). The new reserved peers are + // scheduled for outbound substreams PeersetCommand::SetReservedPeers { peers } => { log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol); @@ -960,39 +994,58 @@ impl Stream for Peerset { // // calculate how many of the previously connected peers were counted as regular // peers and substract these counts from `num_out`/`num_in` + // + // If a reserved peer is not already tracked, it is added as disconnected by + // `calculate_slot_adjustment`. This ensures at the next slot allocation (1sec) + // that we'll try to establish a connection with the reserved peer. let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter()); self.num_out -= out_peers; self.num_in -= in_peers; - // add all unknown peers to `self.peers` - peers.iter().for_each(|peer| { - if !self.peers.contains_key(peer) { - self.peers.insert(*peer, PeerState::Disconnected); - } - }); - - // collect all peers who are not in the new reserved set - let peers_to_remove = self - .peers - .iter() - .filter_map(|(peer, _)| (!peers.contains(peer)).then_some(*peer)) - .collect::>(); + // collect all *reserved* peers who are not in the new reserved set + let reserved_peers_maybe_remove = + self.reserved_peers.difference(&peers).cloned().collect::>(); self.reserved_peers = peers; - let peers = peers_to_remove + let peers_to_remove = reserved_peers_maybe_remove .into_iter() .filter(|peer| { match self.peers.remove(&peer) { - Some(PeerState::Connected { direction }) => { - log::trace!( - target: LOG_TARGET, - "{}: close connection to {peer:?}, direction {direction:?}", - self.protocol, - ); - - self.peers.insert(*peer, PeerState::Closing { direction }); - true + Some(PeerState::Connected { mut direction }) => { + // The direction contains a `Reserved::Yes` flag, because this + // is a reserve peer that we want to close. + // The `Reserved::Yes` ensures we don't adjust the slot count + // when the substream is closed. + + let disconnect = + self.reserved_only || self.should_disconnect(direction); + + if disconnect { + log::trace!( + target: LOG_TARGET, + "{}: close connection to previously reserved {peer:?}, direction {direction:?}", + self.protocol, + ); + + self.peers.insert(*peer, PeerState::Closing { direction }); + true + } else { + log::trace!( + target: LOG_TARGET, + "{}: {peer:?} is no longer reserved, move to regular peers, direction {direction:?}", + self.protocol, + ); + + // The peer is kept connected as non-reserved. This will + // further count towards the slot count. + direction.set_reserved(Reserved::No); + self.increment_slot(direction); + + self.peers + .insert(*peer, PeerState::Connected { direction }); + false + } }, // substream might have been opening but not yet fully open when // the protocol request the reserved set to be changed @@ -1021,11 +1074,13 @@ impl Stream for Peerset { log::trace!( target: LOG_TARGET, - "{}: close substreams to {peers:?}", + "{}: close substreams to {peers_to_remove:?}", self.protocol, ); - return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream { peers })) + return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream { + peers: peers_to_remove, + })) }, PeersetCommand::AddReservedPeers { peers } => { log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol); @@ -1102,6 +1157,7 @@ impl Stream for Peerset { self.peers.insert(*peer, PeerState::Backoff); None }, + // if there is a rapid change in substream state, the peer may // be canceled when the substream is asked to be closed. // @@ -1122,6 +1178,7 @@ impl Stream for Peerset { self.peers.insert(*peer, PeerState::Canceled { direction }); None }, + // substream to the peer might have failed to open which caused // the peer to be backed off // @@ -1138,6 +1195,7 @@ impl Stream for Peerset { self.peers.insert(*peer, PeerState::Disconnected); None }, + // if a node disconnects, it's put into `PeerState::Closing` // which indicates that `Peerset` wants the substream closed and // has asked litep2p to close it but it hasn't yet received a @@ -1167,125 +1225,70 @@ impl Stream for Peerset { // if there are enough slots, the peer is just converted to // a regular peer and the used slot count is increased and if the // peer cannot be accepted, litep2p is asked to close the substream. - PeerState::Connected { direction } => match direction { - Direction::Inbound(_) => match self.num_in < self.max_in { - true => { - log::trace!( - target: LOG_TARGET, - "{}: {peer:?} converted to regular inbound peer (inbound open)", - self.protocol, - ); - - self.num_in += 1; - self.peers.insert( - *peer, - PeerState::Connected { - direction: Direction::Inbound(Reserved::No), - }, - ); - - None - }, - false => { - self.peers.insert( - *peer, - PeerState::Closing { - direction: Direction::Inbound(Reserved::Yes), - }, - ); - - Some(*peer) - }, - }, - Direction::Outbound(_) => match self.num_out < self.max_out { - true => { - log::trace!( - target: LOG_TARGET, - "{}: {peer:?} converted to regular outbound peer (outbound open)", - self.protocol, - ); - - self.num_out += 1; - self.peers.insert( - *peer, - PeerState::Connected { - direction: Direction::Outbound(Reserved::No), - }, - ); - - None - }, - false => { - self.peers.insert( - *peer, - PeerState::Closing { - direction: Direction::Outbound(Reserved::Yes), - }, - ); - - Some(*peer) - }, - }, + PeerState::Connected { mut direction } => { + let disconnect = self.should_disconnect(direction); + + if disconnect { + log::trace!( + target: LOG_TARGET, + "{}: close connection to removed reserved {peer:?}, direction {direction:?}", + self.protocol, + ); + + self.peers.insert(*peer, PeerState::Closing { direction }); + Some(*peer) + } else { + log::trace!( + target: LOG_TARGET, + "{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}", + self.protocol, + ); + + // The peer is kept connected as non-reserved. This will + // further count towards the slot count. + direction.set_reserved(Reserved::No); + self.increment_slot(direction); + + self.peers + .insert(*peer, PeerState::Connected { direction }); + + None + } }, - PeerState::Opening { direction } => match direction { - Direction::Inbound(_) => match self.num_in < self.max_in { - true => { - log::trace!( - target: LOG_TARGET, - "{}: {peer:?} converted to regular inbound peer (inbound opening)", - self.protocol, - ); - - self.num_in += 1; - self.peers.insert( - *peer, - PeerState::Opening { - direction: Direction::Inbound(Reserved::No), - }, - ); - - None - }, - false => { - self.peers.insert( - *peer, - PeerState::Canceled { - direction: Direction::Inbound(Reserved::Yes), - }, - ); - - None - }, - }, - Direction::Outbound(_) => match self.num_out < self.max_out { - true => { - log::trace!( - target: LOG_TARGET, - "{}: {peer:?} converted to regular outbound peer (outbound opening)", - self.protocol, - ); - - self.num_out += 1; - self.peers.insert( - *peer, - PeerState::Opening { - direction: Direction::Outbound(Reserved::No), - }, - ); - - None - }, - false => { - self.peers.insert( - *peer, - PeerState::Canceled { - direction: Direction::Outbound(Reserved::Yes), - }, - ); - - None - }, - }, + + PeerState::Opening { mut direction } => { + let disconnect = self.should_disconnect(direction); + + if disconnect { + log::trace!( + target: LOG_TARGET, + "{}: cancel substream to disconnect removed reserved peer {peer:?}, direction {direction:?}", + self.protocol, + ); + + self.peers.insert( + *peer, + PeerState::Canceled { + direction + }, + ); + } else { + log::trace!( + target: LOG_TARGET, + "{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}", + self.protocol, + ); + + // The peer is kept connected as non-reserved. This will + // further count towards the slot count. + direction.set_reserved(Reserved::No); + self.increment_slot(direction); + + self.peers + .insert(*peer, PeerState::Opening { direction }); + } + + None }, } }) @@ -1373,12 +1376,17 @@ impl Stream for Peerset { // if the number of outbound peers is lower than the desired amount of outbound peers, // query `PeerStore` and try to get a new outbound candidated. if self.num_out < self.max_out && !self.reserved_only { + // From the candidates offered by the peerstore we need to ignore: + // - all peers that are not in the `PeerState::Disconnected` state (ie they are + // connected / closing) + // - reserved peers since we initiated a connection to them in the previous step let ignore: HashSet = self .peers .iter() .filter_map(|(peer, state)| { (!std::matches!(state, PeerState::Disconnected)).then_some(*peer) }) + .chain(self.reserved_peers.iter().cloned()) .collect(); let peers: Vec<_> = diff --git a/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs b/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs index 4f7bfffaa1fc..295a5b441b3e 100644 --- a/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs +++ b/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs @@ -794,8 +794,6 @@ async fn set_reserved_peers_but_available_slots() { // when `Peerset` is polled (along with two random peers) and later on `SetReservedPeers` // is called with the common peer and with two new random peers let common_peer = *known_peers.iter().next().unwrap(); - let disconnected_peers = known_peers.iter().skip(1).copied().collect::>(); - assert_eq!(disconnected_peers.len(), 2); let (mut peerset, to_peerset) = Peerset::new( ProtocolName::from("/notif/1"), @@ -809,6 +807,8 @@ async fn set_reserved_peers_but_available_slots() { assert_eq!(peerset.num_in(), 0usize); assert_eq!(peerset.num_out(), 0usize); + // We have less than 25 outbound peers connected. At the next slot allocation we + // query the `peerstore_handle` for more peers to connect to. match peerset.next().await { Some(PeersetNotificationCommand::OpenSubstream { peers: out_peers }) => { assert_eq!(out_peers.len(), 3); @@ -845,29 +845,167 @@ async fn set_reserved_peers_but_available_slots() { .unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() }) .unwrap(); + // The command `SetReservedPeers` might evict currently reserved peers if + // we don't have enough slot capacity to move them to regular nodes. + // In this case, we did not have previously any reserved peers. match peerset.next().await { - Some(PeersetNotificationCommand::CloseSubstream { peers: out_peers }) => { - assert_eq!(out_peers.len(), 2); + Some(PeersetNotificationCommand::CloseSubstream { peers }) => { + // This ensures we don't disconnect peers when receiving `SetReservedPeers`. + assert_eq!(peers.len(), 0); + }, + event => panic!("invalid event: {event:?}"), + } - for peer in &out_peers { - assert!(disconnected_peers.contains(peer)); + // verify that `Peerset` is aware of five peers, with two of them as outbound. + assert_eq!(peerset.peers().len(), 5); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 2usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); + + match peerset.next().await { + Some(PeersetNotificationCommand::OpenSubstream { peers }) => { + assert_eq!(peers.len(), 2); + assert!(!peers.contains(&common_peer)); + + for peer in &peers { + assert!(reserved_peers.contains(peer)); + assert!(peerset.reserved_peers().contains(peer)); assert_eq!( peerset.peers().get(peer), - Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::No) }), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }), + ); + } + }, + event => panic!("invalid event: {event:?}"), + } + + assert_eq!(peerset.peers().len(), 5); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 2usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); +} + +#[tokio::test] +async fn set_reserved_peers_move_previously_reserved() { + sp_tracing::try_init_simple(); + + let peerstore_handle = Arc::new(peerstore_handle_test()); + let known_peers = (0..3) + .map(|_| { + let peer = PeerId::random(); + peerstore_handle.add_known_peer(peer); + peer + }) + .collect::>(); + + // We'll keep this peer as reserved and move the the others to regular nodes. + let common_peer = *known_peers.iter().next().unwrap(); + let moved_peers = known_peers.iter().skip(1).copied().collect::>(); + let known_peers = known_peers.into_iter().collect::>(); + assert_eq!(moved_peers.len(), 2); + + let (mut peerset, to_peerset) = Peerset::new( + ProtocolName::from("/notif/1"), + 25, + 25, + false, + known_peers.clone(), + Default::default(), + peerstore_handle, + ); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + + // We are not connected to the reserved peers. + match peerset.next().await { + Some(PeersetNotificationCommand::OpenSubstream { peers: out_peers }) => { + assert_eq!(out_peers.len(), 3); + + for peer in &out_peers { + assert_eq!( + peerset.peers().get(&peer), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) ); } }, event => panic!("invalid event: {event:?}"), } - // verify that `Peerset` is aware of five peers, with two of them as outbound - // (the two disconnected peers) + // verify all three peers are marked as reserved peers and they don't count towards + // slot allocation. + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); + + // report that all substreams were opened + for peer in &known_peers { + assert!(std::matches!( + peerset.report_substream_opened(*peer, traits::Direction::Outbound), + OpenResult::Accept { .. } + )); + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + + // set reserved peers with `common_peer` being one of them + let reserved_peers = HashSet::from_iter([common_peer, PeerId::random(), PeerId::random()]); + to_peerset + .unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() }) + .unwrap(); + + // The command `SetReservedPeers` might evict currently reserved peers if + // we don't have enough slot capacity to move them to regular nodes. + // In this case, we have enough capacity. + match peerset.next().await { + Some(PeersetNotificationCommand::CloseSubstream { peers }) => { + // This ensures we don't disconnect peers when receiving `SetReservedPeers`. + assert_eq!(peers.len(), 0); + }, + event => panic!("invalid event: {event:?}"), + } + + // verify that `Peerset` is aware of five peers. + // 2 of the previously reserved peers are moved as outbound regular peers and + // count towards slot allocation. assert_eq!(peerset.peers().len(), 5); assert_eq!(peerset.num_in(), 0usize); assert_eq!(peerset.num_out(), 2usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); + + // Ensure the previously reserved are not regular nodes. + for (peer, state) in peerset.peers() { + // This peer was previously reserved and remained reserved after `SetReservedPeers`. + if peer == &common_peer { + assert_eq!( + state, + &PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) } + ); + continue + } + + // Part of the new reserved nodes. + if reserved_peers.contains(peer) { + assert_eq!(state, &PeerState::Disconnected); + continue + } + + // Previously reserved, but remained connected. + if moved_peers.contains(peer) { + // This was previously `Reseved::Yes` but moved to regular nodes. + assert_eq!( + state, + &PeerState::Connected { direction: Direction::Outbound(Reserved::No) } + ); + continue + } + panic!("Invalid state peer={peer:?} state={state:?}"); + } match peerset.next().await { Some(PeersetNotificationCommand::OpenSubstream { peers }) => { + // Open desires with newly reserved. assert_eq!(peers.len(), 2); assert!(!peers.contains(&common_peer)); @@ -885,7 +1023,103 @@ async fn set_reserved_peers_but_available_slots() { assert_eq!(peerset.peers().len(), 5); assert_eq!(peerset.num_in(), 0usize); - - // two substreams are closing still closing assert_eq!(peerset.num_out(), 2usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); +} + +#[tokio::test] +async fn set_reserved_peers_cannot_move_previously_reserved() { + sp_tracing::try_init_simple(); + + let peerstore_handle = Arc::new(peerstore_handle_test()); + let known_peers = (0..3) + .map(|_| { + let peer = PeerId::random(); + peerstore_handle.add_known_peer(peer); + peer + }) + .collect::>(); + + // We'll keep this peer as reserved and move the the others to regular nodes. + let common_peer = *known_peers.iter().next().unwrap(); + let moved_peers = known_peers.iter().skip(1).copied().collect::>(); + let known_peers = known_peers.into_iter().collect::>(); + assert_eq!(moved_peers.len(), 2); + + // We don't have capacity to move peers. + let (mut peerset, to_peerset) = Peerset::new( + ProtocolName::from("/notif/1"), + 0, + 0, + false, + known_peers.clone(), + Default::default(), + peerstore_handle, + ); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + + // We are not connected to the reserved peers. + match peerset.next().await { + Some(PeersetNotificationCommand::OpenSubstream { peers: out_peers }) => { + assert_eq!(out_peers.len(), 3); + + for peer in &out_peers { + assert_eq!( + peerset.peers().get(&peer), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + }, + event => panic!("invalid event: {event:?}"), + } + + // verify all three peers are marked as reserved peers and they don't count towards + // slot allocation. + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); + + // report that all substreams were opened + for peer in &known_peers { + assert!(std::matches!( + peerset.report_substream_opened(*peer, traits::Direction::Outbound), + OpenResult::Accept { .. } + )); + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + + // set reserved peers with `common_peer` being one of them + let reserved_peers = HashSet::from_iter([common_peer, PeerId::random(), PeerId::random()]); + to_peerset + .unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() }) + .unwrap(); + + // The command `SetReservedPeers` might evict currently reserved peers if + // we don't have enough slot capacity to move them to regular nodes. + // In this case, we don't have enough capacity. + match peerset.next().await { + Some(PeersetNotificationCommand::CloseSubstream { peers }) => { + // This ensures we don't disconnect peers when receiving `SetReservedPeers`. + assert_eq!(peers.len(), 2); + + for peer in peers { + // Ensure common peer is not disconnected. + assert_ne!(common_peer, peer); + + assert_eq!( + peerset.peers().get(&peer), + Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + }, + event => panic!("invalid event: {event:?}"), + } + + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + assert_eq!(peerset.reserved_peers().len(), 3usize); } diff --git a/substrate/client/network/src/litep2p/shim/request_response/mod.rs b/substrate/client/network/src/litep2p/shim/request_response/mod.rs index a77acb464144..bfd7a60ef9fe 100644 --- a/substrate/client/network/src/litep2p/shim/request_response/mod.rs +++ b/substrate/client/network/src/litep2p/shim/request_response/mod.rs @@ -29,8 +29,10 @@ use crate::{ use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt}; use litep2p::{ + error::{ImmediateDialError, NegotiationError, SubstreamError}, protocol::request_response::{ - DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle, + DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, + RequestResponseHandle, }, types::RequestId, }; @@ -372,7 +374,32 @@ impl RequestResponseProtocol { let status = match error { RequestResponseError::NotConnected => Some((RequestFailure::NotConnected, "not-connected")), - RequestResponseError::Rejected => Some((RequestFailure::Refused, "rejected")), + RequestResponseError::Rejected(reason) => { + let reason = match reason { + RejectReason::ConnectionClosed => "connection-closed", + RejectReason::SubstreamClosed => "substream-closed", + RejectReason::SubstreamOpenError(substream_error) => match substream_error { + SubstreamError::NegotiationError(NegotiationError::Timeout) => + "substream-timeout", + _ => "substream-open-error", + }, + RejectReason::DialFailed(None) => "dial-failed", + RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) => + "dial-already-connected", + RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) => + "dial-peerid-missing", + RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) => + "dial-tried-to-dial-self", + RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) => + "dial-no-address-available", + RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) => + "dial-task-closed", + RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) => + "dial-channel-clogged", + }; + + Some((RequestFailure::Refused, reason)) + }, RequestResponseError::Timeout => Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")), RequestResponseError::Canceled => { diff --git a/substrate/client/network/src/litep2p/shim/request_response/tests.rs b/substrate/client/network/src/litep2p/shim/request_response/tests.rs index e3e82aa395c5..78b6ef0a481c 100644 --- a/substrate/client/network/src/litep2p/shim/request_response/tests.rs +++ b/substrate/client/network/src/litep2p/shim/request_response/tests.rs @@ -271,7 +271,12 @@ async fn too_many_inbound_requests() { match handle2.next().await { Some(RequestResponseEvent::RequestFailed { peer, error, .. }) => { assert_eq!(peer, peer1); - assert_eq!(error, RequestResponseError::Rejected); + assert_eq!( + error, + RequestResponseError::Rejected( + litep2p::protocol::request_response::RejectReason::SubstreamClosed + ) + ); }, event => panic!("inavlid event: {event:?}"), } diff --git a/substrate/client/tracing/src/logging/mod.rs b/substrate/client/tracing/src/logging/mod.rs index 74ce5f90ede9..33fec2d41881 100644 --- a/substrate/client/tracing/src/logging/mod.rs +++ b/substrate/client/tracing/src/logging/mod.rs @@ -138,6 +138,9 @@ where .add_directive( parse_default_directive("trust_dns_proto=off").expect("provided directive is valid"), ) + .add_directive( + parse_default_directive("hickory_proto=off").expect("provided directive is valid"), + ) .add_directive( parse_default_directive("libp2p_mdns::behaviour::iface=off") .expect("provided directive is valid"),