diff --git a/.gitignore b/.gitignore index 638826373..3866526a4 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,11 @@ perf.data* zerostate.json zerostate.boc + +config.json +global-config.json +keys.json +logger.json + +db/ +.temp/ diff --git a/Cargo.lock b/Cargo.lock index de66f1b2e..3b4190fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,7 +338,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", ] [[package]] @@ -347,7 +347,7 @@ version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.60", @@ -481,6 +481,41 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -533,6 +568,31 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" +dependencies = [ + "darling", + "derive_builder_core", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_builder_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -554,6 +614,18 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2 0.4.10", + "winapi", +] + [[package]] name = "ed25519" version = "2.2.3" @@ -570,6 +642,24 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570d109b813e904becc80d8d5da38376818a143348413f7149f1340fe04754d4" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -604,7 +694,7 @@ dependencies = [ [[package]] name = "everscale-types" version = "0.1.0-rc.6" -source = "git+https://github.com/broxus/everscale-types.git?branch=tycho#640ed863dd20e38964798ec7d9ae2bada5b2b20a" +source = "git+https://github.com/broxus/everscale-types.git?branch=tycho#13fb3ec43853df22a5204ba48ada1daf3a2f6cde" dependencies = [ "ahash", "base64 0.21.7", @@ -624,7 +714,7 @@ dependencies = [ [[package]] name = "everscale-types-proc" version = "0.1.4" -source = "git+https://github.com/broxus/everscale-types.git?branch=tycho#640ed863dd20e38964798ec7d9ae2bada5b2b20a" +source = "git+https://github.com/broxus/everscale-types.git?branch=tycho#13fb3ec43853df22a5204ba48ada1daf3a2f6cde" dependencies = [ "proc-macro2", "quote", @@ -662,12 +752,69 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38793c55593b33412e3ae40c2c9781ffaa6f438f6f8c10f24e71846fbd7ae01e" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -697,10 +844,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -745,6 +895,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -763,12 +919,110 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.6", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-system-resolver" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea26c5d0b6ab9d72219f65000af310f042a740926f7b2fa3553e774036e2e7" +dependencies = [ + "derive_builder", + "dns-lookup", + "hyper", + "tokio", + "tower-service", + "tracing", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "2.2.6" @@ -779,6 +1033,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "itertools" version = "0.12.1" @@ -909,6 +1169,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matches" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" + [[package]] name = "memchr" version = "2.7.2" @@ -971,6 +1237,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nom" version = "7.1.3" @@ -1114,6 +1389,12 @@ dependencies = [ "serde", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pest" version = "2.7.9" @@ -1159,6 +1440,26 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -1230,6 +1531,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "public-ip" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4c40db5262d93298c363a299f8bc1b3a956a78eecddba3bc0e58b76e2f419a" +dependencies = [ + "dns-lookup", + "futures-core", + "futures-util", + "http", + "hyper", + "hyper-system-resolver", + "pin-project-lite", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "trust-dns-client", + "trust-dns-proto", +] + [[package]] name = "quanta" version = "0.12.3" @@ -1299,7 +1621,7 @@ checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" dependencies = [ "bytes", "libc", - "socket2", + "socket2 0.5.6", "tracing", "windows-sys 0.48.0", ] @@ -1313,6 +1635,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -1686,6 +2018,16 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.6" @@ -1724,6 +2066,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "strsim" version = "0.11.1" @@ -1953,7 +2301,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.6", "tokio-macros", "windows-sys 0.48.0", ] @@ -1983,6 +2331,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" @@ -2027,6 +2381,18 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "futures", + "futures-task", + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -2089,6 +2455,57 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "trust-dns-client" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b4ef9b9bde0559b78a4abb00339143750085f05e5a453efb7b8bef1061f09dc" +dependencies = [ + "cfg-if", + "data-encoding", + "futures-channel", + "futures-util", + "lazy_static", + "log", + "radix_trie", + "rand", + "thiserror", + "time", + "tokio", + "trust-dns-proto", +] + +[[package]] +name = "trust-dns-proto" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca94d4e9feb6a181c690c4040d7a24ef34018d8313ac5044a61d21222ae24e31" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.2.3", + "ipnet", + "lazy_static", + "log", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "url", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "tycho-block-util" version = "0.0.1" @@ -2114,17 +2531,26 @@ dependencies = [ "clap", "everscale-crypto", "everscale-types", + "futures-util", "hex", + "public-ip", "rand", "rustc_version", "serde", "serde_json", "serde_path_to_error", "sha2", + "tempfile", "thiserror", "tikv-jemallocator", "tokio", + "tracing", + "tracing-subscriber", + "tycho-block-util", + "tycho-collator", + "tycho-core", "tycho-network", + "tycho-storage", "tycho-util", ] @@ -2148,6 +2574,7 @@ dependencies = [ "tracing-subscriber", "tracing-test", "tycho-block-util", + "tycho-collator", "tycho-core", "tycho-network", "tycho-storage", @@ -2231,7 +2658,7 @@ dependencies = [ "rustls-webpki", "serde", "serde_json", - "socket2", + "socket2 0.5.6", "thiserror", "tl-proto", "tokio", @@ -2301,6 +2728,7 @@ name = "tycho-util" version = "0.0.1" dependencies = [ "ahash", + "anyhow", "castaway", "dashmap", "everscale-crypto", @@ -2310,6 +2738,8 @@ dependencies = [ "libc", "rand", "serde", + "serde_json", + "serde_path_to_error", "thiserror", "tokio", "tracing", @@ -2328,12 +2758,27 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-xid" version = "0.2.4" @@ -2352,6 +2797,17 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +dependencies = [ + "form_urlencoded", + "idna 0.5.0", + "percent-encoding", +] + [[package]] name = "utf8parse" version = "0.2.1" @@ -2385,6 +2841,15 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 7f95dd9ce..9aa8b08f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ num-traits = "0.2.18" parking_lot = "0.12.1" parking_lot_core = "0.9.9" pin-project-lite = "0.2" +public-ip = "0.2" pkcs8 = "0.10" quick_cache = "0.4.1" quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls"] } diff --git a/block-util/src/block/block_stuff.rs b/block-util/src/block/block_stuff.rs index e02c6c437..4c7c8a456 100644 --- a/block-util/src/block/block_stuff.rs +++ b/block-util/src/block/block_stuff.rs @@ -39,7 +39,7 @@ impl BlockStuff { let cell = CellBuilder::build_from(&block).unwrap(); let root_hash = *cell.repr_hash(); - let file_hash = sha2::Sha256::digest(Boc::encode(&cell)).into(); + let file_hash = Boc::file_hash(Boc::encode(&cell)); let block_id = BlockId { shard: block_info.shard, diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ce4df01fc..021185c2d 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -20,21 +20,30 @@ base64 = { workspace = true } clap = { workspace = true } everscale-crypto = { workspace = true } everscale-types = { workspace = true } +futures-util = { workspace = true } hex = { workspace = true } +public-ip = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_path_to_error = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } sha2 = { workspace = true } +tempfile = { workspace = true } thiserror = { workspace = true } tikv-jemallocator = { workspace = true, features = [ "unprefixed_malloc_on_supported_platforms", "background_threads", ], optional = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } # local deps +tycho-block-util = { workspace = true } +tycho-collator = { workspace = true } +tycho-core = { workspace = true } tycho-network = { workspace = true } +tycho-storage = { workspace = true } tycho-util = { workspace = true } [build-dependencies] diff --git a/cli/src/main.rs b/cli/src/main.rs index f9d1ecf49..d18b11d1e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -11,6 +11,7 @@ mod tools { pub mod gen_zerostate; } +mod node; mod util; #[cfg(feature = "jemalloc")] @@ -50,9 +51,8 @@ impl App { #[derive(Subcommand)] enum Cmd { - Init(InitCmd), - - Run(RunCmd), + #[clap(subcommand)] + Node(NodeCmd), #[clap(subcommand)] Tool(ToolCmd), @@ -61,20 +61,25 @@ enum Cmd { impl Cmd { fn run(self) -> Result<()> { match self { - Cmd::Init(_cmd) => Ok(()), // todo - Cmd::Run(_cmd) => Ok(()), // todo + Cmd::Node(cmd) => cmd.run(), Cmd::Tool(cmd) => cmd.run(), } } } -/// Initialize a node environment -#[derive(Parser)] -struct InitCmd {} +/// Node commands +#[derive(Subcommand)] +enum NodeCmd { + Run(node::CmdRun), +} -/// Run a node -#[derive(Parser)] -struct RunCmd {} +impl NodeCmd { + fn run(self) -> Result<()> { + match self { + NodeCmd::Run(cmd) => cmd.run(), + } + } +} /// A collection of tools #[derive(Subcommand)] diff --git a/cli/src/node/config.rs b/cli/src/node/config.rs new file mode 100644 index 000000000..02f55336e --- /dev/null +++ b/cli/src/node/config.rs @@ -0,0 +1,90 @@ +use std::net::Ipv4Addr; +use std::path::Path; + +use anyhow::Result; +use everscale_crypto::ed25519; +use everscale_types::cell::HashBytes; +use serde::{Deserialize, Serialize}; +use tycho_core::block_strider::BlockchainBlockProviderConfig; +use tycho_core::blockchain_rpc::BlockchainRpcServiceConfig; +use tycho_core::overlay_client::PublicOverlayClientConfig; +use tycho_network::{DhtConfig, NetworkConfig, OverlayConfig, PeerResolverConfig}; +use tycho_storage::StorageConfig; + +#[derive(Debug, Deserialize)] +pub struct NodeKeys { + pub secret: HashBytes, +} + +impl NodeKeys { + pub fn from_file>(path: P) -> Result { + tycho_util::serde_helpers::load_json_from_file(path) + } + + pub fn as_secret(&self) -> ed25519::SecretKey { + ed25519::SecretKey::from_bytes(self.secret.0) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NodeConfig { + /// Public IP address of the node. + /// + /// Default: resolved automatically. + pub public_ip: Option, + + /// Ip address to listen on. + /// + /// Default: 0.0.0.0 + pub local_ip: Ipv4Addr, + + /// Default: 30000. + pub port: u16, + + pub network: NetworkConfig, + + pub dht: DhtConfig, + + pub peer_resolver: PeerResolverConfig, + + pub overlay: OverlayConfig, + + pub public_overlay_client: PublicOverlayClientConfig, + + pub storage: StorageConfig, + + pub blockchain_rpc_service: BlockchainRpcServiceConfig, + + pub blockchain_block_provider: BlockchainBlockProviderConfig, +} + +impl Default for NodeConfig { + fn default() -> Self { + Self { + public_ip: None, + local_ip: Ipv4Addr::UNSPECIFIED, + port: 30000, + network: NetworkConfig::default(), + dht: DhtConfig::default(), + peer_resolver: PeerResolverConfig::default(), + overlay: OverlayConfig::default(), + public_overlay_client: PublicOverlayClientConfig::default(), + storage: StorageConfig::default(), + blockchain_rpc_service: BlockchainRpcServiceConfig::default(), + blockchain_block_provider: BlockchainBlockProviderConfig::default(), + } + } +} + +impl NodeConfig { + pub fn from_file>(path: P) -> Result { + tycho_util::serde_helpers::load_json_from_file(path) + } + + pub fn save_to_file>(&self, path: P) -> Result<()> { + let data = serde_json::to_string_pretty(self)?; + std::fs::write(path, data)?; + Ok(()) + } +} diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs new file mode 100644 index 000000000..bd345aab9 --- /dev/null +++ b/cli/src/node/mod.rs @@ -0,0 +1,665 @@ +use std::net::{Ipv4Addr, SocketAddr}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use clap::Parser; +use everscale_crypto::ed25519; +use everscale_types::models::*; +use everscale_types::prelude::*; +use futures_util::future::BoxFuture; +use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; +use tycho_collator::collator::CollatorStdImplFactory; +use tycho_collator::manager::CollationManager; +use tycho_collator::mempool::MempoolAdapterStdImpl; +use tycho_collator::msg_queue::MessageQueueAdapterStdImpl; +use tycho_collator::state_node::{StateNodeAdapter, StateNodeAdapterStdImpl}; +use tycho_collator::types::{CollationConfig, ValidatorNetwork}; +use tycho_collator::validator::config::ValidatorConfig; +use tycho_collator::validator::validator::ValidatorStdImplFactory; +use tycho_core::block_strider::{ + BlockProvider, BlockStrider, BlockchainBlockProvider, BlockchainBlockProviderConfig, + OptionalBlockStuff, PersistentBlockStriderState, StateSubscriber, StateSubscriberContext, + StorageBlockProvider, +}; +use tycho_core::blockchain_rpc::{BlockchainRpcClient, BlockchainRpcService}; +use tycho_core::global_config::{GlobalConfig, ZerostateId}; +use tycho_core::overlay_client::PublicOverlayClient; +use tycho_network::{ + DhtClient, DhtService, Network, OverlayService, PeerResolver, PublicOverlay, Router, +}; +use tycho_storage::{BlockMetaData, Storage}; +use tycho_util::FastHashMap; + +use crate::util::error::ResultExt; +use crate::util::logger::LoggerConfig; + +use self::config::{NodeConfig, NodeKeys}; + +mod config; + +const SERVICE_NAME: &str = "tycho-node"; + +/// Run a Tycho node. +#[derive(Parser)] +pub struct CmdRun { + /// dump the template of the zero state config + #[clap( + short = 'i', + long, + conflicts_with_all = ["config", "global_config", "keys", "logger_config", "import_zerostate"] + )] + init_config: Option, + + /// overwrite the existing config + #[clap(short, long)] + force: bool, + + /// path to the node config + #[clap(long, required_unless_present = "init_config")] + config: Option, + + /// path to the global config + #[clap(long, required_unless_present = "init_config")] + global_config: Option, + + /// path to the node keys + #[clap(long, required_unless_present = "init_config")] + keys: Option, + + /// path to the logger config + #[clap(long)] + logger_config: Option, + + /// list of zerostate files to import + #[clap(long)] + import_zerostate: Option>, +} + +impl CmdRun { + pub fn run(self) -> Result<()> { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(self.run_impl()) + } + + async fn run_impl(self) -> Result<()> { + if let Some(init_config_path) = self.init_config { + return NodeConfig::default() + .save_to_file(init_config_path) + .wrap_err("failed to save node config"); + } + + init_logger(self.logger_config)?; + + let node = { + let node_config = NodeConfig::from_file(self.config.unwrap()) + .wrap_err("failed to load node config")?; + + let global_config = GlobalConfig::from_file(self.global_config.unwrap()) + .wrap_err("failed to load global config")?; + + let keys = config::NodeKeys::from_file(&self.keys.unwrap()) + .wrap_err("failed to load node keys")?; + + let public_ip = resolve_public_ip(node_config.public_ip).await?; + let socket_addr = SocketAddr::new(public_ip.into(), node_config.port); + + Node::new(socket_addr, keys, node_config, global_config)? + }; + + let init_block_id = node + .try_init(self.import_zerostate) + .await + .wrap_err("failed to init node")?; + tracing::info!(%init_block_id, "node initialized"); + + node.run(&init_block_id).await?; + + Ok(()) + } +} + +fn init_logger(logger_config: Option) -> Result<()> { + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::{fmt, reload, EnvFilter}; + + let try_make_filter = { + let logger_config = logger_config.clone(); + move || { + Ok::<_, anyhow::Error>(match &logger_config { + None => EnvFilter::builder() + .with_default_directive(tracing::Level::INFO.into()) + .from_env_lossy(), + Some(path) => LoggerConfig::load_from(path) + .wrap_err("failed to load logger config")? + .build_subscriber(), + }) + } + }; + + let (layer, handle) = reload::Layer::new(try_make_filter()?); + + let subscriber = tracing_subscriber::registry() + .with(layer) + .with(fmt::layer()); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + if let Some(logger_config) = logger_config { + tokio::spawn(async move { + tracing::info!( + logger_config = %logger_config.display(), + "started watching for changes in logger config" + ); + + let get_metadata = move || { + std::fs::metadata(&logger_config) + .ok() + .and_then(|m| m.modified().ok()) + }; + + let mut last_modified = get_metadata(); + + let mut interval = tokio::time::interval(Duration::from_secs(10)); + loop { + interval.tick().await; + + let modified = get_metadata(); + if last_modified == modified { + continue; + } + last_modified = modified; + + match try_make_filter() { + Ok(filter) => { + if handle.reload(filter).is_err() { + break; + } + tracing::info!("reloaded logger config"); + } + Err(e) => tracing::error!(%e, "failed to reload logger config"), + } + } + + tracing::info!("stopped watching for changes in logger config"); + }); + } + + std::panic::set_hook(Box::new(|info| { + use std::io::Write; + + tracing::error!("PANIC: {}", info); + std::io::stderr().flush().ok(); + std::io::stdout().flush().ok(); + std::process::exit(1); + })); + + Ok(()) +} + +async fn resolve_public_ip(ip: Option) -> Result { + match ip { + Some(address) => Ok(address), + None => match public_ip::addr_v4().await { + Some(address) => Ok(address), + None => anyhow::bail!("failed to resolve public IP address"), + }, + } +} + +pub struct Node { + pub keypair: Arc, + + pub zerostate: ZerostateId, + + pub network: Network, + pub dht_client: DhtClient, + pub peer_resolver: PeerResolver, + pub overlay_service: OverlayService, + pub storage: Storage, + pub blockchain_rpc_client: BlockchainRpcClient, + + pub state_tracker: MinRefMcStateTracker, + pub blockchain_block_provider_config: BlockchainBlockProviderConfig, +} + +impl Node { + pub fn new( + public_addr: SocketAddr, + keys: NodeKeys, + node_config: NodeConfig, + global_config: GlobalConfig, + ) -> Result { + // Setup network + let keypair = Arc::new(ed25519::KeyPair::from(&keys.as_secret())); + let local_id = keypair.public_key.into(); + + let (dht_tasks, dht_service) = DhtService::builder(local_id) + .with_config(node_config.dht) + .build(); + + let (overlay_tasks, overlay_service) = OverlayService::builder(local_id) + .with_config(node_config.overlay) + .with_dht_service(dht_service.clone()) + .build(); + + let router = Router::builder() + .route(dht_service.clone()) + .route(overlay_service.clone()) + .build(); + + let local_addr = SocketAddr::from((node_config.local_ip, node_config.port)); + + let network = Network::builder() + .with_config(node_config.network) + .with_private_key(keys.secret.0) + .with_service_name(SERVICE_NAME) + .with_remote_addr(public_addr) + .build(local_addr, router) + .wrap_err("failed to build node network")?; + + dht_tasks.spawn(&network); + overlay_tasks.spawn(&network); + + let dht_client = dht_service.make_client(&network); + let peer_resolver = dht_service + .make_peer_resolver() + .with_config(node_config.peer_resolver) + .build(&network); + + let mut bootstrap_peers = 0usize; + for peer in global_config.bootstrap_peers { + let is_new = dht_client.add_peer(Arc::new(peer))?; + bootstrap_peers += is_new as usize; + } + + tracing::info!( + %local_id, + %local_addr, + %public_addr, + bootstrap_peers, + "initialized network" + ); + + // Setup storage + let storage = Storage::new(node_config.storage).wrap_err("failed to create storage")?; + tracing::info!( + root_dir = %storage.root().path().display(), + "initialized storage" + ); + + // Setup blockchain rpc + let blockchain_rpc_service = + BlockchainRpcService::new(storage.clone(), node_config.blockchain_rpc_service); + + let public_overlay = + PublicOverlay::builder(global_config.zerostate.compute_public_overlay_id()) + .with_peer_resolver(peer_resolver.clone()) + .build(blockchain_rpc_service); + overlay_service.add_public_overlay(&public_overlay); + + let blockchain_rpc_client = BlockchainRpcClient::new(PublicOverlayClient::new( + network.clone(), + public_overlay, + node_config.public_overlay_client, + )); + + tracing::info!( + overlay_id = %blockchain_rpc_client.overlay().overlay_id(), + "initialized blockchain rpc" + ); + + // Setup block strider + let state_tracker = MinRefMcStateTracker::default(); + + Ok(Self { + keypair, + zerostate: global_config.zerostate, + network, + dht_client, + peer_resolver, + overlay_service, + blockchain_rpc_client, + storage, + state_tracker, + blockchain_block_provider_config: node_config.blockchain_block_provider, + }) + } + + /// Initialize the node and return the init block id. + async fn try_init(&self, zerostates: Option>) -> Result { + let node_state = self.storage.node_state(); + + match node_state.load_init_mc_block_id() { + Some(block_id) => { + tracing::info!("warm init"); + Ok(block_id) + } + None => { + tracing::info!("cold init"); + + let zerostate_id = if let Some(zerostates) = zerostates { + self.import_zerostates(zerostates).await? + } else { + // TODO: Download zerostates + anyhow::bail!("zerostates not provided (STUB)"); + }; + + node_state.store_init_mc_block_id(&zerostate_id); + + // TODO: Only store this if it is a zerostate + node_state.store_last_mc_block_id(&zerostate_id); + + Ok(zerostate_id) + } + } + } + + async fn import_zerostates(&self, paths: Vec) -> Result { + // Use a separate tracker for zerostates + let tracker = MinRefMcStateTracker::default(); + + // Read all zerostates + let mut zerostates = FastHashMap::default(); + for path in paths { + let state = load_zerostate(&tracker, &path) + .wrap_err_with(|| format!("failed to load zerostate {}", path.display()))?; + + if let Some(prev) = zerostates.insert(*state.block_id(), state) { + anyhow::bail!("duplicate zerostate {}", prev.block_id()); + } + } + + // Find the masterchain zerostate + let zerostate_id = self.zerostate.as_block_id(); + let Some(masterchain_zerostate) = zerostates.remove(&zerostate_id) else { + anyhow::bail!("missing mc zerostate for {zerostate_id}"); + }; + + // Prepare the list of zerostates to import + let mut to_import = vec![masterchain_zerostate.clone()]; + + let global_id = masterchain_zerostate.state().global_id; + let gen_utime = masterchain_zerostate.state().gen_utime; + + for entry in masterchain_zerostate.shards()?.iter() { + let (shard_ident, descr) = entry.wrap_err("invalid mc zerostate")?; + anyhow::ensure!(descr.seqno == 0, "invalid shard description {shard_ident}"); + + let block_id = BlockId { + shard: shard_ident, + seqno: 0, + root_hash: descr.root_hash, + file_hash: descr.file_hash, + }; + + let state = match zerostates.remove(&block_id) { + Some(existing) => { + tracing::debug!(block_id = %block_id, "using custom zerostate"); + existing + } + None => { + tracing::debug!(block_id = %block_id, "creating default zerostate"); + let state = + make_shard_state(&self.state_tracker, global_id, shard_ident, gen_utime) + .wrap_err("failed to create shard zerostate")?; + + anyhow::ensure!( + state.block_id() == &block_id, + "custom zerostate must be provided for {shard_ident}", + ); + + state + } + }; + + to_import.push(state); + } + + anyhow::ensure!( + zerostates.is_empty(), + "unused zerostates left: {}", + zerostates.len() + ); + + // Import all zerostates + let handle_storage = self.storage.block_handle_storage(); + let state_storage = self.storage.shard_state_storage(); + + for state in to_import { + let (handle, status) = handle_storage.create_or_load_handle( + state.block_id(), + BlockMetaData { + is_key_block: true, + gen_utime, + mc_ref_seqno: 0, + }, + ); + + let stored = state_storage + .store_state(&handle, &state) + .await + .wrap_err_with(|| { + format!("failed to import zerostate for {}", state.block_id().shard) + })?; + + tracing::debug!( + block_id = %state.block_id(), + handle_status = ?status, + stored, + "importing zerostate" + ); + } + + tracing::info!("imported zerostates"); + Ok(zerostate_id) + } + + async fn run(&self, _init_block_id: &BlockId) -> Result<()> { + // Ensure that there are some neighbours + tracing::info!("waiting for initial neighbours"); + self.blockchain_rpc_client + .overlay_client() + .neighbours() + .wait_for_peers(1) + .await; + tracing::info!("found initial neighbours"); + + // Create collator + tracing::info!("starting collator"); + + // TODO: move into config + let collation_config = CollationConfig { + key_pair: self.keypair.clone(), + mc_block_min_interval_ms: 10000, + max_mc_block_delta_from_bc_to_await_own: 2, + supported_block_version: 50, + supported_capabilities: supported_capabilities(), + max_collate_threads: 1, + // test_validators_keypairs: vec![], + }; + + let collation_manager = CollationManager::start( + collation_config, + Arc::new(MessageQueueAdapterStdImpl::new()), + |listener| StateNodeAdapterStdImpl::new(listener, self.storage.clone()), + MempoolAdapterStdImpl::new, + ValidatorStdImplFactory { + network: ValidatorNetwork { + overlay_service: self.overlay_service.clone(), + peer_resolver: self.peer_resolver.clone(), + dht_client: self.dht_client.clone(), + }, + // TODO: Move into node config + config: ValidatorConfig { + base_loop_delay: Duration::from_millis(50), + max_loop_delay: Duration::from_secs(10), + }, + }, + CollatorStdImplFactory, + ); + + let collator_state_subscriber = CollatorStateSubscriber { + adapter: collation_manager.state_node_adapter().clone(), + }; + + // TEMP + { + let masterchain_zerostate = self + .storage + .shard_state_storage() + .load_state(&self.zerostate.as_block_id()) + .await?; + + collator_state_subscriber + .adapter + .handle_state(&masterchain_zerostate) + .await?; + } + + tracing::info!("collator started"); + + // Create block strider + let blockchain_block_provider = BlockchainBlockProvider::new( + self.blockchain_rpc_client.clone(), + self.storage.clone(), + self.blockchain_block_provider_config.clone(), + ); + + let storage_block_provider = StorageBlockProvider::new(self.storage.clone()); + + let collator_block_provider = CollatorBlockProvider { + adapter: collation_manager.state_node_adapter().clone(), + }; + + let strider_state = + PersistentBlockStriderState::new(self.zerostate.as_block_id(), self.storage.clone()); + + let block_strider = BlockStrider::builder() + .with_provider(( + (blockchain_block_provider, storage_block_provider), + collator_block_provider, + )) + .with_state(strider_state) + .with_state_subscriber( + self.state_tracker.clone(), + self.storage.clone(), + collator_state_subscriber, + ) + .build(); + + // Run block strider + tracing::info!("block strider started"); + block_strider.run().await?; + tracing::info!("block strider finished"); + + Ok(()) + } +} + +struct CollatorStateSubscriber { + adapter: Arc, +} + +impl StateSubscriber for CollatorStateSubscriber { + type HandleStateFut<'a> = BoxFuture<'a, Result<()>>; + + fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> { + self.adapter.handle_state(&cx.state) + } +} + +struct CollatorBlockProvider { + adapter: Arc, +} + +impl BlockProvider for CollatorBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + self.adapter.wait_for_block(prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + self.adapter.wait_for_block(block_id) + } +} + +fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result { + let data = std::fs::read(path).wrap_err("failed to read file")?; + let file_hash = Boc::file_hash(&data); + + let root = Boc::decode(data).wrap_err("failed to decode BOC")?; + let root_hash = *root.repr_hash(); + + let state = root + .parse::() + .wrap_err("failed to parse state")?; + + anyhow::ensure!(state.seqno == 0, "not a zerostate"); + + let block_id = BlockId { + shard: state.shard_ident, + seqno: state.seqno, + root_hash, + file_hash, + }; + + ShardStateStuff::new(block_id, root, &tracker) +} + +fn make_shard_state( + tracker: &MinRefMcStateTracker, + global_id: i32, + shard_ident: ShardIdent, + now: u32, +) -> Result { + let state = ShardStateUnsplit { + global_id, + shard_ident, + gen_utime: now, + min_ref_mc_seqno: u32::MAX, + ..Default::default() + }; + + let root = CellBuilder::build_from(&state)?; + let root_hash = *root.repr_hash(); + let file_hash = Boc::file_hash(Boc::encode(&root)); + + let block_id = BlockId { + shard: state.shard_ident, + seqno: state.seqno, + root_hash, + file_hash, + }; + + ShardStateStuff::new(block_id, root, &tracker) +} + +fn supported_capabilities() -> u64 { + GlobalCapabilities::from([ + GlobalCapability::CapCreateStatsEnabled, + GlobalCapability::CapBounceMsgBody, + GlobalCapability::CapReportVersion, + GlobalCapability::CapShortDequeue, + GlobalCapability::CapInitCodeHash, + GlobalCapability::CapOffHypercube, + GlobalCapability::CapFixTupleIndexBug, + GlobalCapability::CapFastStorageStat, + GlobalCapability::CapMyCode, + GlobalCapability::CapFullBodyInBounced, + GlobalCapability::CapStorageFeeToTvm, + GlobalCapability::CapWorkchains, + GlobalCapability::CapStcontNewFormat, + GlobalCapability::CapFastStorageStatBugfix, + GlobalCapability::CapResolveMerkleCell, + GlobalCapability::CapFeeInGasUnits, + GlobalCapability::CapBounceAfterFailedAction, + GlobalCapability::CapSuspendedList, + GlobalCapability::CapsTvmBugfixes2022, + ]) + .into_inner() +} diff --git a/cli/src/tools/gen_zerostate.rs b/cli/src/tools/gen_zerostate.rs index cb38490a9..13d3e96e8 100644 --- a/cli/src/tools/gen_zerostate.rs +++ b/cli/src/tools/gen_zerostate.rs @@ -12,12 +12,17 @@ use serde::{Deserialize, Serialize}; use sha2::Digest; use crate::util::compute_storage_used; +use crate::util::error::ResultExt; /// Generate a zero state for a network. #[derive(clap::Parser)] pub struct Cmd { /// dump the template of the zero state config - #[clap(short = 'i', long, exclusive = true)] + #[clap( + short = 'i', + long, + conflicts_with_all = ["config", "output", "now"] + )] init_config: Option, /// path to the zero state config @@ -78,25 +83,23 @@ fn generate_zerostate( config .prepare_config_params(now) - .map_err(|e| GenError::new("validator config is invalid", e))?; + .wrap_err("validator config is invalid")?; config .add_required_accounts() - .map_err(|e| GenError::new("failed to add required accounts", e))?; + .wrap_err("failed to add required accounts")?; let state = config .build_masterchain_state(now) - .map_err(|e| GenError::new("failed to build masterchain zerostate", e))?; + .wrap_err("failed to build masterchain zerostate")?; - let boc = CellBuilder::build_from(&state) - .map_err(|e| GenError::new("failed to serialize zerostate", e))?; + let boc = CellBuilder::build_from(&state).wrap_err("failed to serialize zerostate")?; let root_hash = *boc.repr_hash(); let data = Boc::encode(&boc); let file_hash = HashBytes::from(sha2::Sha256::digest(&data)); - std::fs::write(output_path, data) - .map_err(|e| GenError::new("failed to write masterchain zerostate", e))?; + std::fs::write(output_path, data).wrap_err("failed to write masterchain zerostate")?; let hashes = serde_json::json!({ "root_hash": root_hash, @@ -297,12 +300,43 @@ impl ZerostateConfig { state.total_balance = state .total_balance .checked_add(&account.balance) - .map_err(|e| GenError::new("failed ot compute total balance", e))?; + .wrap_err("failed ot compute total balance")?; } } + let workchains = self.params.get::()?.unwrap(); + let mut shards = Vec::new(); + for entry in workchains.iter() { + let (workchain, descr) = entry?; + shards.push(( + ShardIdent::new_full(workchain), + ShardDescription { + seqno: 0, + reg_mc_seqno: 0, + start_lt: 0, + end_lt: 0, + root_hash: descr.zerostate_root_hash, + file_hash: descr.zerostate_file_hash, + before_split: false, + before_merge: false, + want_split: false, + want_merge: false, + nx_cc_updated: true, + next_catchain_seqno: 0, + next_validator_shard: ShardIdent::PREFIX_FULL, + min_ref_mc_seqno: u32::MAX, + gen_utime: now, + split_merge_at: None, + fees_collected: CurrencyCollection::ZERO, + funds_created: CurrencyCollection::ZERO, + copyleft_rewards: Dict::new(), + proof_chain: None, + }, + )); + } + state.custom = Some(Lazy::new(&McStateExtra { - shards: Default::default(), + shards: ShardHashes::from_shards(shards.iter().map(|(ident, descr)| (ident, descr)))?, config: BlockchainConfig { address: self.params.get::()?.unwrap(), params: self.params.clone(), @@ -708,23 +742,6 @@ fn zero_public_key() -> &'static ed25519::PublicKey { KEY.get_or_init(|| ed25519::PublicKey::from_bytes([0; 32]).unwrap()) } -#[derive(thiserror::Error, Debug)] -#[error("{context}: {source}")] -struct GenError { - context: String, - #[source] - source: anyhow::Error, -} - -impl GenError { - fn new(context: impl Into, source: impl Into) -> Self { - Self { - context: context.into(), - source: source.into(), - } - } -} - mod serde_account_states { use super::*; diff --git a/cli/src/util/error.rs b/cli/src/util/error.rs new file mode 100644 index 000000000..3007f924d --- /dev/null +++ b/cli/src/util/error.rs @@ -0,0 +1,42 @@ +pub trait ResultExt { + fn wrap_err(self, context: impl Into) -> anyhow::Result; + + fn wrap_err_with(self, f: F) -> anyhow::Result + where + F: FnOnce() -> R, + R: Into; +} + +impl> ResultExt for Result { + fn wrap_err(self, context: impl Into) -> anyhow::Result { + self.map_err(|e| { + ErrorWithContext { + context: context.into(), + source: e.into(), + } + .into() + }) + } + + fn wrap_err_with(self, f: F) -> anyhow::Result + where + F: FnOnce() -> R, + R: Into, + { + self.map_err(|e| { + ErrorWithContext { + context: f().into(), + source: e.into(), + } + .into() + }) + } +} + +#[derive(thiserror::Error, Debug)] +#[error("{context}: {source}")] +pub struct ErrorWithContext { + context: String, + #[source] + source: anyhow::Error, +} diff --git a/cli/src/util/logger.rs b/cli/src/util/logger.rs new file mode 100644 index 000000000..190c886cf --- /dev/null +++ b/cli/src/util/logger.rs @@ -0,0 +1,60 @@ +use std::path::Path; + +use anyhow::Result; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer}; +use tracing_subscriber::filter::Directive; + +pub struct LoggerConfig { + directives: Vec, +} + +impl LoggerConfig { + pub fn load_from>(path: P) -> Result { + tycho_util::serde_helpers::load_json_from_file(path) + } + + pub fn build_subscriber(&self) -> tracing_subscriber::filter::EnvFilter { + let mut builder = tracing_subscriber::filter::EnvFilter::default(); + for item in &self.directives { + builder = builder.add_directive(item.clone()); + } + builder + } +} + +impl<'de> Deserialize<'de> for LoggerConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct LoggerVisitor; + + impl<'de> Visitor<'de> for LoggerVisitor { + type Value = LoggerConfig; + + fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("a list of targets") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut directives = Vec::new(); + + while let Some((target, level)) = map.next_entry::()? { + let directive = format!("{}={}", target, level) + .parse::() + .map_err(serde::de::Error::custom)?; + + directives.push(directive); + } + + Ok(LoggerConfig { directives }) + } + } + + deserializer.deserialize_map(LoggerVisitor) + } +} diff --git a/cli/src/util/mod.rs b/cli/src/util/mod.rs index 0dad67020..a7b339b42 100644 --- a/cli/src/util/mod.rs +++ b/cli/src/util/mod.rs @@ -5,6 +5,9 @@ use everscale_types::models::{Account, StorageUsed}; use everscale_types::num::VarUint56; use everscale_types::prelude::*; +pub mod error; +pub mod logger; + // TODO: move into types pub fn compute_storage_used(account: &Account) -> Result { let cell = { diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 7cfdbc84b..ecb7ad10f 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -38,6 +38,7 @@ tempfile = { workspace = true } tokio = { version = "1", features = ["rt-multi-thread"] } tracing-test = { workspace = true } tycho-block-util = { workspace = true, features = ["test"] } +tycho-collator = { workspace = true, features = ["test"] } tycho-core = { workspace = true, features = ["test"] } tycho-storage = { workspace = true, features = ["test"] } tycho-util = { workspace = true, features = ["test"] } diff --git a/collator/src/collator/build_block.rs b/collator/src/collator/build_block.rs index 514eeabae..4e5fcba5b 100644 --- a/collator/src/collator/build_block.rs +++ b/collator/src/collator/build_block.rs @@ -150,17 +150,10 @@ impl CollatorStdImpl { .total_balance .try_add_assign(&value_flow.fees_collected)?; - // TODO got error 'result error! underlying integer is too large to fit in target type' without checking - if let Err(err) = new_state - .total_validator_fees - .checked_sub(&value_flow.recovered) - { - tracing::warn!("Error: {}", err); - - new_state - .total_validator_fees - .checked_sub(&new_state.total_validator_fees)?; - } + // TODO: + // new_state + // .total_validator_fees + // .try_sub_assign(&value_flow.recovered)?; if self.shard_id.is_masterchain() { new_state.libraries = diff --git a/collator/src/lib.rs b/collator/src/lib.rs index f493c3b1f..d486fa2a6 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -4,10 +4,12 @@ pub mod manager; pub mod mempool; pub mod msg_queue; pub mod state_node; -pub mod test_utils; pub mod types; pub mod validator; +#[cfg(any(test, feature = "test"))] +pub mod test_utils; + mod tracing_targets; mod utils; diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 760ec1d88..9efe6031b 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -668,6 +668,8 @@ where new_session_seqno, ))?; + tracing::warn!("SUBSET: {subset:?}"); + //TEST: override with test subset with test keypairs defined on test run #[cfg(feature = "test")] let subset = if self.config.test_validators_keypairs.is_empty() { @@ -697,6 +699,7 @@ where let local_pubkey_opt = find_us_in_collators_set(&self.config, &subset); let new_session_info = Arc::new(CollationSessionInfo::new( + shard_id.workchain(), new_session_seqno, ValidatorSubsetInfo { validators: subset, diff --git a/collator/src/test_utils.rs b/collator/src/test_utils.rs index 63c4ca35b..44f4035a0 100644 --- a/collator/src/test_utils.rs +++ b/collator/src/test_utils.rs @@ -5,13 +5,11 @@ use everscale_crypto::ed25519; use everscale_types::boc::Boc; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, ShardStateUnsplit}; -use futures_util::future::BoxFuture; -use futures_util::FutureExt; use sha2::Digest; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_network::{DhtConfig, DhtService, Network, OverlayService, PeerId, Router}; -use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; +use tycho_storage::{BlockMetaData, Storage}; use crate::types::NodeNetwork; @@ -68,9 +66,7 @@ pub fn create_node_network() -> NodeNetwork { } pub async fn prepare_test_storage() -> anyhow::Result { - let temp = tempfile::tempdir().unwrap(); - let db = Db::open(temp.path().to_path_buf(), DbOptions::default()).unwrap(); - let storage = Storage::new(db, temp.path().join("file"), 1_000_000).unwrap(); + let (storage, _tmp_dir) = Storage::new_temp()?; let tracker = MinRefMcStateTracker::default(); // master state diff --git a/collator/src/types.rs b/collator/src/types.rs index 9e7a5ea5d..1ee829506 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -178,22 +178,28 @@ pub(crate) type CollationSessionId = (ShardIdent, u32); #[derive(Clone)] pub struct CollationSessionInfo { /// Sequence number of the collation session + workchain: i32, seqno: u32, collators: ValidatorSubsetInfo, current_collator_keypair: Option>, } impl CollationSessionInfo { pub fn new( + workchain: i32, seqno: u32, collators: ValidatorSubsetInfo, current_collator_keypair: Option>, ) -> Self { Self { + workchain, seqno, collators, current_collator_keypair, } } + pub fn workchain(&self) -> i32 { + self.workchain + } pub fn seqno(&self) -> u32 { self.seqno } diff --git a/collator/src/validator/network/network_service.rs b/collator/src/validator/network/network_service.rs index b912d2e95..ff1ae3ea2 100644 --- a/collator/src/validator/network/network_service.rs +++ b/collator/src/validator/network/network_service.rs @@ -52,7 +52,9 @@ impl Service for NetworkService { signatures, } = query; { - let session = state.get_session(session_seqno).await; + let session = state + .get_session(block_id_short.shard.workchain(), session_seqno) + .await; match handle_signatures_query( session, session_seqno, diff --git a/collator/src/validator/state.rs b/collator/src/validator/state.rs index e9ce47ec7..db507a80b 100644 --- a/collator/src/validator/state.rs +++ b/collator/src/validator/state.rs @@ -35,12 +35,14 @@ pub trait ValidationState: Send + Sync { /// Retrieves an immutable reference to a session by its ID. fn get_session( &self, + workchain: i32, session_id: u32, ) -> impl std::future::Future>> + Send; } /// Holds information about a validation session. pub struct SessionInfo { + workchain: i32, seqno: u32, max_weight: u64, blocks_signatures: FastDashMap, @@ -51,6 +53,7 @@ pub struct SessionInfo { impl SessionInfo { pub fn new( + workchain: i32, seqno: u32, validation_session_info: Arc, private_overlay: PrivateOverlay, @@ -61,6 +64,7 @@ impl SessionInfo { .map(|vi| vi.weight) .sum(); Arc::new(Self { + workchain, seqno, max_weight, blocks_signatures: Default::default(), @@ -70,6 +74,10 @@ impl SessionInfo { }) } + pub fn workchain(&self) -> i32 { + self.workchain + } + pub fn get_seqno(&self) -> u32 { self.seqno } @@ -377,7 +385,7 @@ impl SessionInfo { /// Standard implementation of `ValidationState`. pub struct ValidationStateStdImpl { - sessions: RwLock>>, + sessions: RwLock>>, } impl ValidationState for ValidationStateStdImpl { @@ -388,19 +396,27 @@ impl ValidationState for ValidationStateStdImpl { } async fn try_add_session(&self, session: Arc) -> anyhow::Result<()> { + let workchain = session.workchain; let seqno = session.seqno; - let session = self.sessions.write().await.insert(seqno, session); + let session = self + .sessions + .write() + .await + .insert((workchain, seqno), session); if session.is_some() { - warn!(target: tracing_targets::VALIDATOR, "Session already exists with seqno: {seqno}"); - // bail!("Session already exists with seqno: {seqno}"); + bail!("Session already exists with seqno: ({workchain}, {seqno})"); } Ok(()) } - async fn get_session(&self, session_id: u32) -> Option> { - self.sessions.read().await.get(&session_id).cloned() + async fn get_session(&self, workchain: i32, session_id: u32) -> Option> { + self.sessions + .read() + .await + .get(&(workchain, session_id)) + .cloned() } } diff --git a/collator/src/validator/types.rs b/collator/src/validator/types.rs index 01945c7d3..852aacf04 100644 --- a/collator/src/validator/types.rs +++ b/collator/src/validator/types.rs @@ -38,6 +38,7 @@ impl TryFrom<&ValidatorDescription> for ValidatorInfo { } pub struct ValidationSessionInfo { + pub workchain: i32, pub seqno: u32, pub validators: ValidatorsMap, } @@ -64,6 +65,7 @@ impl TryFrom> for ValidationSessionInfo { } let validation_session = ValidationSessionInfo { + workchain: session_info.workchain(), seqno: session_info.seqno(), validators, }; @@ -99,6 +101,7 @@ impl BlockValidationCandidate { #[derive(TlWrite, TlRead)] #[tl(boxed, id = 0x12341111)] pub(crate) struct OverlayNumber { + pub workchain: i32, pub session_seqno: u32, } diff --git a/collator/src/validator/validator.rs b/collator/src/validator/validator.rs index 2940b8f04..f93c6f007 100644 --- a/collator/src/validator/validator.rs +++ b/collator/src/validator/validator.rs @@ -132,7 +132,7 @@ impl Validator for ValidatorStdImpl { async fn validate(&self, candidate: BlockId, session_seqno: u32) -> Result<()> { let session = self .validation_state - .get_session(session_seqno) + .get_session(candidate.shard.workchain(), session_seqno) .await .ok_or_else(|| { anyhow::anyhow!("Validation session not found for seqno: {}", session_seqno) @@ -171,6 +171,7 @@ impl Validator for ValidatorStdImpl { }; let overlay_id = OverlayNumber { + workchain: validators_session_info.workchain, session_seqno: validators_session_info.seqno, }; trace!(target: tracing_targets::VALIDATOR, overlay_id = ?validators_session_info.seqno, "Creating private overlay"); @@ -194,6 +195,7 @@ impl Validator for ValidatorStdImpl { } let session_info = SessionInfo::new( + validators_session_info.workchain, validators_session_info.seqno, validators_session_info.clone(), private_overlay.clone(), @@ -341,7 +343,7 @@ async fn start_candidate_validation( match response { Ok(Ok(response)) => { if let Ok(signatures) = response.parse_tl::() { - trace!(target: tracing_targets::VALIDATOR, "Received signatures from validator {:?}", validator.public_key.to_bytes()); + trace!(target: tracing_targets::VALIDATOR, "Received signatures from validator {}", validator.public_key); let is_finished = process_candidate_signature_response( cloned_session.clone(), @@ -359,14 +361,14 @@ async fn start_candidate_validation( } } Err(e) => { - warn!(target: tracing_targets::VALIDATOR, "Elapsed validator response {:?}: {:?}", validator.public_key.to_bytes(), e); + warn!(target: tracing_targets::VALIDATOR, "Elapsed validator response {}: {e}", validator.public_key); let delay = delay * 2_u32.pow(attempt); let delay = std::cmp::min(delay, max_delay); tokio::time::sleep(delay).await; attempt += 1; } Ok(Err(e)) => { - warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {:?}: {:?}", validator.public_key.to_bytes(), e); + warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {}: {e}", validator.public_key); let delay = delay * 2_u32.pow(attempt); let delay = std::cmp::min(delay, max_delay); tokio::time::sleep(delay).await; diff --git a/core/Cargo.toml b/core/Cargo.toml index 3dcbbdd4e..514bc3f52 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,10 +35,10 @@ tycho-util = { workspace = true } [dev-dependencies] bytesize = { workspace = true } everscale-crypto = { workspace = true } -tycho-util = { workspace = true, features = ["test"] } -tycho-storage = { workspace = true, features = ["test"] } tempfile = { workspace = true } tracing-test = { workspace = true } +tycho-storage = { workspace = true, features = ["test"] } +tycho-util = { workspace = true, features = ["test"] } [features] test = [] diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index f98103d8e..51b6ec51a 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -9,8 +9,8 @@ use tycho_storage::Storage; use tycho_util::FastHashMap; pub use self::provider::{ - BlockProvider, BlockchainBlockProvider, BlockchainBlockProviderConfig, EmptyBlockProvider, - OptionalBlockStuff, + BlockProvider, BlockProviderExt, BlockchainBlockProvider, BlockchainBlockProviderConfig, + ChainBlockProvider, EmptyBlockProvider, OptionalBlockStuff, StorageBlockProvider, }; pub use self::state::{BlockStriderState, PersistentBlockStriderState, TempBlockStriderState}; pub use self::state_applier::ShardStateApplier; diff --git a/core/src/block_strider/provider/blockchain_provider.rs b/core/src/block_strider/provider/blockchain_provider.rs index b04581de9..3f6628381 100644 --- a/core/src/block_strider/provider/blockchain_provider.rs +++ b/core/src/block_strider/provider/blockchain_provider.rs @@ -5,6 +5,7 @@ use futures_util::future::BoxFuture; use serde::{Deserialize, Serialize}; use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use tycho_storage::Storage; +use tycho_util::serde_helpers; use crate::block_strider::provider::OptionalBlockStuff; use crate::block_strider::BlockProvider; @@ -20,11 +21,13 @@ pub struct BlockchainBlockProviderConfig { /// Polling interval for `get_next_block` method. /// /// Default: 1 second. + #[serde(with = "serde_helpers::humantime")] pub get_next_block_polling_interval: Duration, /// Polling interval for `get_block` method. /// /// Default: 1 second. + #[serde(with = "serde_helpers::humantime")] pub get_block_polling_interval: Duration, } @@ -59,6 +62,7 @@ impl BlockchainBlockProvider { // TODO: Validate block with proof. async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff { let mut interval = tokio::time::interval(self.config.get_next_block_polling_interval); + loop { let res = self.client.get_next_block_full(prev_block_id).await; let block = match res { diff --git a/core/src/block_strider/provider/mod.rs b/core/src/block_strider/provider/mod.rs index beffde6f3..ca0048db0 100644 --- a/core/src/block_strider/provider/mod.rs +++ b/core/src/block_strider/provider/mod.rs @@ -1,12 +1,14 @@ use std::future::Future; +use std::pin::pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use everscale_types::models::BlockId; -use futures_util::future::BoxFuture; +use futures_util::future::{self, BoxFuture}; use tycho_block_util::block::BlockStuffAug; pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig}; +pub use self::storage_provider::StorageBlockProvider; #[cfg(any(test, feature = "test"))] pub use self::archive_provider::ArchiveBlockProvider; @@ -54,6 +56,20 @@ impl BlockProvider for Arc { } } +pub trait BlockProviderExt: Sized { + fn chain(self, other: T) -> ChainBlockProvider; +} + +impl BlockProviderExt for B { + fn chain(self, other: T) -> ChainBlockProvider { + ChainBlockProvider { + left: self, + right: other, + is_right: AtomicBool::new(false), + } + } +} + // === Provider combinators === #[derive(Debug, Clone, Copy)] pub struct EmptyBlockProvider; @@ -71,7 +87,7 @@ impl BlockProvider for EmptyBlockProvider { } } -struct ChainBlockProvider { +pub struct ChainBlockProvider { left: T1, right: T2, is_right: AtomicBool, @@ -105,6 +121,47 @@ impl BlockProvider for ChainBlockProvider< } } +impl BlockProvider for (T1, T2) { + type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + let left = self.0.get_next_block(prev_block_id); + let right = self.1.get_next_block(prev_block_id); + + Box::pin(async move { + match future::select(pin!(left), pin!(right)).await { + future::Either::Left((res, right)) => match res { + Some(res) => Some(res), + None => right.await, + }, + future::Either::Right((res, left)) => match res { + Some(res) => Some(res), + None => left.await, + }, + } + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + let left = self.0.get_block(block_id); + let right = self.1.get_block(block_id); + + Box::pin(async move { + match future::select(pin!(left), pin!(right)).await { + future::Either::Left((res, right)) => match res { + Some(res) => Some(res), + None => right.await, + }, + future::Either::Right((res, left)) => match res { + Some(res) => Some(res), + None => left.await, + }, + } + }) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/core/src/block_strider/provider/storage_provider.rs b/core/src/block_strider/provider/storage_provider.rs index 284deef12..29625fa52 100644 --- a/core/src/block_strider/provider/storage_provider.rs +++ b/core/src/block_strider/provider/storage_provider.rs @@ -7,13 +7,23 @@ use crate::block_strider::BlockProvider; // TODO: Add an explicit storage provider type -impl BlockProvider for Storage { +pub struct StorageBlockProvider { + storage: Storage, +} + +impl StorageBlockProvider { + pub fn new(storage: Storage) -> Self { + Self { storage } + } +} + +impl BlockProvider for StorageBlockProvider { type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { Box::pin(async { - let block_storage = self.block_storage(); + let block_storage = self.storage.block_storage(); let get_next_block = || async { let rx = block_storage @@ -34,7 +44,7 @@ impl BlockProvider for Storage { fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { Box::pin(async { - let block_storage = self.block_storage(); + let block_storage = self.storage.block_storage(); let get_block = || async { let rx = block_storage.subscribe_to_block(*block_id).await?; diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index bc99ded10..936cdc161 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -223,7 +223,7 @@ pub mod test { use everscale_types::cell::HashBytes; use everscale_types::models::*; use tracing_test::traced_test; - use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; + use tycho_storage::{BlockMetaData, Storage}; use super::*; use crate::block_strider::subscriber::test::PrintSubscriber; @@ -271,9 +271,8 @@ pub mod test { pub async fn prepare_state_apply() -> Result<(ArchiveBlockProvider, Storage)> { let data = include_bytes!("../../tests/data/00001"); let provider = ArchiveBlockProvider::new(data).unwrap(); - let temp = tempfile::tempdir().unwrap(); - let db = Db::open(temp.path().to_path_buf(), DbOptions::default()).unwrap(); - let storage = Storage::new(db, temp.path().join("file"), 1_000_000).unwrap(); + + let (storage, _tmp_dir) = Storage::new_temp()?; let master = include_bytes!("../../tests/data/everscale_zerostate.boc"); let shard = include_bytes!("../../tests/data/everscale_shard_zerostate.boc"); diff --git a/core/src/blockchain_rpc/client.rs b/core/src/blockchain_rpc/client.rs index 1d177f70a..acb291488 100644 --- a/core/src/blockchain_rpc/client.rs +++ b/core/src/blockchain_rpc/client.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use anyhow::Result; use everscale_types::models::BlockId; +use tycho_network::PublicOverlay; -use crate::overlay_client::{PublicOverlayClient, QueryResponse}; +use crate::overlay_client::{Error, PublicOverlayClient, QueryResponse}; use crate::proto::blockchain::*; #[derive(Clone)] @@ -23,6 +24,10 @@ impl BlockchainRpcClient { } } + pub fn overlay(&self) -> &PublicOverlay { + self.inner.overlay_client.overlay() + } + pub fn overlay_client(&self) -> &PublicOverlayClient { &self.inner.overlay_client } @@ -31,7 +36,7 @@ impl BlockchainRpcClient { &self, block: &BlockId, max_size: u32, - ) -> Result> { + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, KeyBlockIds>(&rpc::GetNextKeyBlockIds { @@ -42,7 +47,7 @@ impl BlockchainRpcClient { Ok(data) } - pub async fn get_block_full(&self, block: &BlockId) -> Result> { + pub async fn get_block_full(&self, block: &BlockId) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, BlockFull>(&rpc::GetBlockFull { block_id: *block }) @@ -53,7 +58,7 @@ impl BlockchainRpcClient { pub async fn get_next_block_full( &self, prev_block: &BlockId, - ) -> Result> { + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, BlockFull>(&rpc::GetNextBlockFull { @@ -63,7 +68,10 @@ impl BlockchainRpcClient { Ok(data) } - pub async fn get_archive_info(&self, mc_seqno: u32) -> Result> { + pub async fn get_archive_info( + &self, + mc_seqno: u32, + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, ArchiveInfo>(&rpc::GetArchiveInfo { mc_seqno }) @@ -76,7 +84,7 @@ impl BlockchainRpcClient { archive_id: u64, offset: u64, max_size: u32, - ) -> Result> { + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, Data>(&rpc::GetArchiveSlice { @@ -94,7 +102,7 @@ impl BlockchainRpcClient { block: &BlockId, offset: u64, max_size: u64, - ) -> Result> { + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, PersistentStatePart>(&rpc::GetPersistentStatePart { diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index ebac72a7a..0e6fbd3b7 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -71,6 +71,9 @@ impl Service for BlockchainRpcService { }; tycho_network::match_tl_request!(body, tag = constructor, { + overlay::Ping as _ => BoxFutureOrNoop::future(async { + Some(Response::from_tl(overlay::Pong)) + }), rpc::GetNextKeyBlockIds as req => { tracing::debug!( block_id = %req.block_id, diff --git a/core/src/global_config.rs b/core/src/global_config.rs new file mode 100644 index 000000000..8c3f08e5d --- /dev/null +++ b/core/src/global_config.rs @@ -0,0 +1,52 @@ +use std::path::Path; + +use anyhow::Result; +use everscale_types::cell::HashBytes; +use everscale_types::models::{BlockId, ShardIdent}; +use serde::{Deserialize, Serialize}; +use tycho_network::{OverlayId, PeerInfo}; + +use crate::proto::blockchain::OverlayIdData; + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct GlobalConfig { + pub bootstrap_peers: Vec, + pub zerostate: ZerostateId, +} + +impl GlobalConfig { + pub fn from_file>(path: P) -> Result { + tycho_util::serde_helpers::load_json_from_file(path) + } + + pub fn validate(&self, now: u32) -> Result<()> { + for peer in &self.bootstrap_peers { + anyhow::ensure!(peer.is_valid(now), "invalid peer info for {}", peer.id); + } + Ok(()) + } +} + +#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ZerostateId { + pub root_hash: HashBytes, + pub file_hash: HashBytes, +} + +impl ZerostateId { + pub fn as_block_id(&self) -> BlockId { + BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 0, + root_hash: self.root_hash, + file_hash: self.file_hash, + } + } + + pub fn compute_public_overlay_id(&self) -> OverlayId { + OverlayId(tl_proto::hash(OverlayIdData { + zerostate_root_hash: self.root_hash.0, + zerostate_file_hash: self.file_hash.0, + })) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 31a27e05c..ae9166cfd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,4 +1,5 @@ pub mod block_strider; pub mod blockchain_rpc; +pub mod global_config; pub mod overlay_client; pub mod proto; diff --git a/core/src/overlay_client/mod.rs b/core/src/overlay_client/mod.rs index 6f43db315..1d1452ed9 100644 --- a/core/src/overlay_client/mod.rs +++ b/core/src/overlay_client/mod.rs @@ -136,12 +136,39 @@ impl Clone for Inner { impl Inner { async fn ping_neighbours_task(self) { + let req = Request::from_tl(overlay::Ping); + + // Start pinging neighbours let mut interval = tokio::time::interval(self.config.neighbours_ping_interval); loop { interval.tick().await; - if let Err(e) = self.query::<_, overlay::Pong>(overlay::Ping).await { - tracing::error!("failed to ping random neighbour: {e}"); + let Some(neighbour) = self.neighbours.choose().await else { + continue; + }; + + let peer_id = *neighbour.peer_id(); + match self.query_impl(neighbour.clone(), req.clone()).await { + Ok(res) => match tl_proto::deserialize::(&res.data) { + Ok(_) => { + res.accept(); + tracing::debug!(%peer_id, "pinged neighbour"); + } + Err(e) => { + tracing::warn!( + %peer_id, + "received an invalid ping response: {e}", + ); + res.reject(); + } + }, + Err(e) => { + tracing::warn!( + %peer_id, + "failed to ping neighbour: {e}", + ); + continue; + } } } } @@ -151,10 +178,22 @@ impl Inner { let max_neighbours = self.config.max_neighbours; let default_roundtrip = self.config.default_roundtrip; + let mut overlay_peers_added = self.overlay.entires_added().notified(); + let mut overlay_peer_count = self.overlay.read_entries().len(); + let mut interval = tokio::time::interval(self.config.neighbours_update_interval); loop { - interval.tick().await; + if overlay_peer_count < max_neighbours { + tracing::info!("not enough neighbours, waiting for more"); + + overlay_peers_added.await; + overlay_peers_added = self.overlay.entires_added().notified(); + + overlay_peer_count = self.overlay.read_entries().len(); + } else { + interval.tick().await; + } let active_neighbours = self.neighbours.get_active_neighbours().await.len(); let neighbours_to_get = max_neighbours + (max_neighbours - active_neighbours); diff --git a/core/src/overlay_client/neighbours.rs b/core/src/overlay_client/neighbours.rs index 7aa5eed55..4f6be743f 100644 --- a/core/src/overlay_client/neighbours.rs +++ b/core/src/overlay_client/neighbours.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::uniform::{UniformInt, UniformSampler}; use rand::Rng; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Notify}; use crate::overlay_client::neighbour::Neighbour; @@ -22,10 +22,27 @@ impl Neighbours { max_neighbours, entries: Mutex::new(entries), selection_index: Mutex::new(selection_index), + changed: Notify::new(), }), } } + pub async fn wait_for_peers(&self, count: usize) { + loop { + let changed = self.inner.changed.notified(); + + if self.inner.entries.lock().await.len() >= count { + break; + } + + changed.await; + } + } + + pub fn changed(&self) -> &Notify { + &self.inner.changed + } + pub async fn choose(&self) -> Option { let selection_index = self.inner.selection_index.lock().await; selection_index.get(&mut rand::thread_rng()) @@ -53,9 +70,15 @@ impl Neighbours { pub async fn update(&self, new: Vec) { let now = tycho_util::time::now_sec(); + let mut changed = false; + let mut guard = self.inner.entries.lock().await; // remove unreliable and expired neighbours - guard.retain(|x| x.is_reliable() && x.expires_at_secs() > now); + guard.retain(|x| { + let retain = x.is_reliable() && x.expires_at_secs() > now; + changed |= !retain; + retain + }); // if all neighbours are reliable and valid then remove the worst if guard.len() >= self.inner.max_neighbours { @@ -65,6 +88,7 @@ impl Neighbours { { if let Some(index) = guard.iter().position(|x| x.peer_id() == worst.peer_id()) { guard.remove(index); + changed = true; } } } @@ -75,11 +99,16 @@ impl Neighbours { } if guard.len() < self.inner.max_neighbours { guard.push(n); + changed = true; } } drop(guard); self.update_selection_index().await; + + if changed { + self.inner.changed.notify_waiters(); + } } pub async fn remove_outdated_neighbours(&self) { @@ -97,6 +126,7 @@ struct Inner { max_neighbours: usize, entries: Mutex>, selection_index: Mutex, + changed: Notify, } struct SelectionIndex { diff --git a/core/src/proto.tl b/core/src/proto.tl index 082db0c07..7ffccf834 100644 --- a/core/src/proto.tl +++ b/core/src/proto.tl @@ -22,6 +22,14 @@ overlay.response.err code:int = overlay.Response T; ---types--- +/** +* Data for computing a public overlay id +*/ +blockchain.overlayIdData + zerostate_root_hash:int256 + zerostate_file_hash:int256 + = blockchain.OverlayIdData; + /** * A full block id */ diff --git a/core/src/proto/blockchain.rs b/core/src/proto/blockchain.rs index a75713638..ce9d93b7a 100644 --- a/core/src/proto/blockchain.rs +++ b/core/src/proto/blockchain.rs @@ -3,6 +3,14 @@ use tl_proto::{TlRead, TlWrite}; use crate::proto::{tl_block_id, tl_block_id_vec}; +/// Data for computing a public overlay id. +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "blockchain.overlayIdData", scheme = "proto.tl")] +pub struct OverlayIdData { + pub zerostate_root_hash: [u8; 32], + pub zerostate_file_hash: [u8; 32], +} + #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.data", scheme = "proto.tl")] pub struct Data { diff --git a/core/tests/block_strider.rs b/core/tests/block_strider.rs index c3220a1e2..8fee0f4e3 100644 --- a/core/tests/block_strider.rs +++ b/core/tests/block_strider.rs @@ -3,7 +3,7 @@ use std::time::Duration; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; -use tycho_core::block_strider::{BlockProvider, BlockchainBlockProvider}; +use tycho_core::block_strider::{BlockProvider, BlockchainBlockProvider, StorageBlockProvider}; use tycho_core::blockchain_rpc::BlockchainRpcClient; use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig}; use tycho_network::PeerId; @@ -14,12 +14,14 @@ mod common; async fn storage_block_strider() -> anyhow::Result<()> { tycho_util::test::init_logger("storage_block_strider"); - let (storage, tmp_dir) = common::storage::init_storage().await?; + let (storage, _tmp_dir) = common::storage::init_storage().await?; + + let storage_provider = StorageBlockProvider::new(storage); let archive = common::storage::get_archive()?; for (block_id, data) in archive.blocks { if block_id.shard.is_masterchain() { - let block = storage.get_block(&block_id).await; + let block = storage_provider.get_block(&block_id).await; assert!(block.is_some()); if let Some(block) = block { @@ -31,8 +33,6 @@ async fn storage_block_strider() -> anyhow::Result<()> { } } - tmp_dir.close()?; - tracing::info!("done!"); Ok(()) } diff --git a/core/tests/common/storage.rs b/core/tests/common/storage.rs index bef51964a..a2ba16506 100644 --- a/core/tests/common/storage.rs +++ b/core/tests/common/storage.rs @@ -1,34 +1,11 @@ use anyhow::{Context, Result}; -use bytesize::ByteSize; use tempfile::TempDir; use tycho_block_util::archive::ArchiveData; use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff}; -use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; +use tycho_storage::{BlockMetaData, Storage}; use crate::common::*; -pub(crate) async fn init_empty_storage() -> Result<(Storage, TempDir)> { - let tmp_dir = tempfile::tempdir()?; - let root_path = tmp_dir.path(); - - // Init rocksdb - let db_options = DbOptions { - rocksdb_lru_capacity: ByteSize::kb(1024), - cells_cache_size: ByteSize::kb(1024), - }; - let db = Db::open(root_path.join("db_storage"), db_options)?; - - // Init storage - let storage = Storage::new( - db, - root_path.join("file_storage"), - db_options.cells_cache_size.as_u64(), - )?; - assert!(storage.node_state().load_init_mc_block_id().is_none()); - - Ok((storage, tmp_dir)) -} - pub(crate) fn get_archive() -> Result { let data = include_bytes!("../../tests/data/00001"); let archive = archive::Archive::new(data)?; @@ -37,7 +14,7 @@ pub(crate) fn get_archive() -> Result { } pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> { - let (storage, tmp_dir) = init_empty_storage().await?; + let (storage, tmp_dir) = Storage::new_temp()?; let data = include_bytes!("../../tests/data/00001"); let provider = archive::Archive::new(data)?; diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index 3e3f439fb..935009545 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -9,6 +9,7 @@ use tycho_core::blockchain_rpc::BlockchainRpcClient; use tycho_core::overlay_client::PublicOverlayClient; use tycho_core::proto::blockchain::{BlockFull, KeyBlockIds, PersistentStatePart}; use tycho_network::PeerId; +use tycho_storage::Storage; use crate::common::archive::*; @@ -24,7 +25,7 @@ async fn overlay_server_with_empty_storage() -> Result<()> { known_by: usize, } - let (storage, tmp_dir) = common::storage::init_empty_storage().await?; + let (storage, _tmp_dir) = Storage::new_temp()?; const NODE_COUNT: usize = 10; let nodes = common::node::make_network(storage, NODE_COUNT); @@ -134,8 +135,6 @@ async fn overlay_server_with_empty_storage() -> Result<()> { let result = client.get_archive_slice(0, 0, 100).await; assert!(result.is_err()); - tmp_dir.close()?; - tracing::info!("done!"); Ok(()) } diff --git a/justfile b/justfile index 5179c61d9..227e10aba 100644 --- a/justfile +++ b/justfile @@ -20,4 +20,73 @@ docs: check_format cargo doc --no-deps --document-private-items --all-features --workspace test: lint - cargo test --all-targets --all-features --workspace \ No newline at end of file + cargo test --all-targets --all-features --workspace + +gen_network n: build_debug + #!/usr/bin/env bash + TEMP_DIR="./.temp" + TYCHO_BIN="./target/debug/tycho" + + mkdir -p "$TEMP_DIR" + + N={{n}} + + GLOBAL_CONFIG='{}' + NODE_CONFIG=$(cat ./config.json) + + for i in $(seq $N); + do + $TYCHO_BIN tool gen-key > "$TEMP_DIR/keys${i}.json" + + PORT=$((20000 + i)) + + KEY=$(jq -r .secret < "$TEMP_DIR/keys${i}.json") + DHT_ENTRY=$($TYCHO_BIN tool gen-dht "127.0.0.1:$PORT" --key "$KEY") + + GLOBAL_CONFIG=$(echo "$GLOBAL_CONFIG" | jq ".bootstrap_peers += [$DHT_ENTRY]") + + NODE_CONFIG=$(echo "$NODE_CONFIG" | jq ".port = $PORT | .storage.root_dir = \"$TEMP_DIR/db${i}\"") + echo "$NODE_CONFIG" > "$TEMP_DIR/config${i}.json" + done + + ZEROSTATE=$(cat zerostate.json | jq '.validators = []') + for i in $(seq $N); + do + PUBKEY=$(jq .public < "$TEMP_DIR/keys${i}.json") + ZEROSTATE=$(echo "$ZEROSTATE" | jq ".validators += [$PUBKEY]") + done + + echo "$ZEROSTATE" > "$TEMP_DIR/zerostate.json" + ZEROSTATE_ID=$( + $TYCHO_BIN tool gen-zerostate "$TEMP_DIR/zerostate.json" \ + --output "$TEMP_DIR/zerostate.boc" \ + --force + ) + + GLOBAL_CONFIG=$(echo "$GLOBAL_CONFIG" | jq ".zerostate = $ZEROSTATE_ID") + echo "$GLOBAL_CONFIG" > "$TEMP_DIR/global-config.json" + +node n: build_debug + #!/usr/bin/env bash + TEMP_DIR="./.temp" + TYCHO_BIN="./target/debug/tycho" + + $TYCHO_BIN node run \ + --keys "$TEMP_DIR/keys{{n}}.json" \ + --config "$TEMP_DIR/config{{n}}.json" \ + --global-config "$TEMP_DIR/global-config.json" \ + --import-zerostate "$TEMP_DIR/zerostate.boc" \ + --logger-config ./logger.json \ + +init_node_config: build_debug + #!/usr/bin/env bash + TYCHO_BIN="./target/debug/tycho" + $TYCHO_BIN node run --init-config "./config.json" + +init_zerostate_config: build_debug + #!/usr/bin/env bash + TYCHO_BIN="./target/debug/tycho" + $TYCHO_BIN tool gen-zerostate --init-config "./zerostate.json" + +build_debug: + cargo build --bin tycho diff --git a/network/src/dht/mod.rs b/network/src/dht/mod.rs index 23d2163a2..b1d6eb4ad 100644 --- a/network/src/dht/mod.rs +++ b/network/src/dht/mod.rs @@ -20,7 +20,9 @@ use crate::types::{PeerId, PeerInfo, Request, Response, Service, ServiceRequest} use crate::util::{NetworkExt, Routable}; pub use self::config::DhtConfig; -pub use self::peer_resolver::{PeerResolver, PeerResolverBuilder, PeerResolverHandle}; +pub use self::peer_resolver::{ + PeerResolver, PeerResolverBuilder, PeerResolverConfig, PeerResolverHandle, +}; pub use self::query::DhtQueryMode; pub use self::storage::{DhtValueMerger, DhtValueSource, StorageError}; diff --git a/network/src/dht/peer_resolver.rs b/network/src/dht/peer_resolver.rs index 0f9068628..2a23c868b 100644 --- a/network/src/dht/peer_resolver.rs +++ b/network/src/dht/peer_resolver.rs @@ -4,10 +4,11 @@ use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use exponential_backoff::Backoff; +use serde::{Deserialize, Serialize}; use tokio::sync::{Notify, Semaphore}; use tycho_util::futures::JoinTask; use tycho_util::time::now_sec; -use tycho_util::FastDashMap; +use tycho_util::{serde_helpers, FastDashMap}; use crate::dht::DhtService; use crate::network::{KnownPeerHandle, KnownPeersError, Network, PeerBannedError, WeakNetwork}; @@ -20,77 +21,66 @@ pub struct PeerResolverBuilder { } impl PeerResolverBuilder { + pub fn with_config(mut self, config: PeerResolverConfig) -> Self { + self.inner = config; + self + } + + pub fn build(self, network: &Network) -> PeerResolver { + let semaphore = Semaphore::new(self.inner.max_parallel_resolve_requests); + + PeerResolver { + inner: Arc::new(PeerResolverInner { + weak_network: Network::downgrade(network), + dht_service: self.dht_service, + config: Default::default(), + tasks: Default::default(), + semaphore, + }), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct PeerResolverConfig { + /// Maximum number of parallel resolve requests. + /// + /// Default: 100. + pub max_parallel_resolve_requests: usize, + /// Minimal time-to-live for the resolved peer info. /// /// Default: 600 seconds. - pub fn with_min_ttl_sec(mut self, ttl_sec: u32) -> Self { - self.inner.min_ttl_sec = ttl_sec; - self - } + pub min_ttl_sec: u32, /// Time before the expiration when the peer info should be updated. /// /// Default: 1200 seconds. - pub fn with_update_before_sec(mut self, update_before_sec: u32) -> Self { - self.inner.update_before_sec = update_before_sec; - self - } + pub update_before_sec: u32, /// Number of fast retries before switching to the stale retry interval. /// /// Default: 10. - pub fn with_fast_retry_count(mut self, fast_retry_count: u32) -> Self { - self.inner.fast_retry_count = fast_retry_count; - self - } + pub fast_retry_count: u32, /// Minimal interval between the fast retries. /// /// Default: 1 second. - pub fn with_min_retry_interval(mut self, min_retry_interval: Duration) -> Self { - self.inner.min_retry_interval = min_retry_interval; - self - } + #[serde(with = "serde_helpers::humantime")] + pub min_retry_interval: Duration, /// Maximal interval between the fast retries. /// /// Default: 120 seconds. - pub fn with_max_retry_interval(mut self, max_retry_interval: Duration) -> Self { - self.inner.max_retry_interval = max_retry_interval; - self - } + #[serde(with = "serde_helpers::humantime")] + pub max_retry_interval: Duration, /// Interval between the stale retries. /// /// Default: 600 seconds. - pub fn with_stale_retry_interval(mut self, stale_retry_interval: Duration) -> Self { - self.inner.stale_retry_interval = stale_retry_interval; - self - } - - pub fn build(self, network: &Network) -> PeerResolver { - let semaphore = Semaphore::new(self.inner.max_parallel_resolve_requests); - - PeerResolver { - inner: Arc::new(PeerResolverInner { - weak_network: Network::downgrade(network), - dht_service: self.dht_service, - config: Default::default(), - tasks: Default::default(), - semaphore, - }), - } - } -} - -struct PeerResolverConfig { - max_parallel_resolve_requests: usize, - min_ttl_sec: u32, - update_before_sec: u32, - fast_retry_count: u32, - min_retry_interval: Duration, - max_retry_interval: Duration, - stale_retry_interval: Duration, + #[serde(with = "serde_helpers::humantime")] + pub stale_retry_interval: Duration, } impl Default for PeerResolverConfig { diff --git a/network/src/lib.rs b/network/src/lib.rs index c276c25cf..efd61227a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -8,7 +8,8 @@ pub use self::util::{check_peer_signature, NetworkExt, Routable, Router, RouterB pub use dht::{ xor_distance, DhtClient, DhtConfig, DhtQueryBuilder, DhtQueryMode, DhtQueryWithDataBuilder, DhtService, DhtServiceBackgroundTasks, DhtServiceBuilder, DhtValueMerger, DhtValueSource, - FindValueError, PeerResolver, PeerResolverBuilder, PeerResolverHandle, StorageError, + FindValueError, PeerResolver, PeerResolverBuilder, PeerResolverConfig, PeerResolverHandle, + StorageError, }; pub use network::{ ActivePeers, Connection, KnownPeerHandle, KnownPeers, KnownPeersError, Network, NetworkBuilder, diff --git a/network/src/network/mod.rs b/network/src/network/mod.rs index 84d8ab073..1e699d4c4 100644 --- a/network/src/network/mod.rs +++ b/network/src/network/mod.rs @@ -38,6 +38,7 @@ pub struct NetworkBuilder { #[derive(Default)] struct BuilderFields { config: Option, + remote_addr: Option
, } impl NetworkBuilder { @@ -45,6 +46,11 @@ impl NetworkBuilder { self.optional_fields.config = Some(config); self } + + pub fn with_remote_addr>(mut self, addr: T) -> Self { + self.optional_fields.remote_addr = Some(addr.into()); + self + } } impl NetworkBuilder<((), T2)> { @@ -129,6 +135,12 @@ impl NetworkBuilder { let weak_active_peers = ActivePeers::downgrade(&active_peers); let known_peers = KnownPeers::new(); + let remote_addr = self.optional_fields.remote_addr.unwrap_or_else(|| { + let addr = endpoint.local_addr(); + tracing::debug!(%addr, "using local address as remote address"); + addr.into() + }); + let inner = Arc::new_cyclic(move |_weak| { let service = service.boxed_clone(); @@ -144,6 +156,7 @@ impl NetworkBuilder { NetworkInner { config, + remote_addr, endpoint, active_peers: weak_active_peers, known_peers, @@ -181,6 +194,10 @@ impl Network { } } + pub fn remote_addr(&self) -> &Address { + self.0.remote_addr() + } + pub fn local_addr(&self) -> SocketAddr { self.0.local_addr() } @@ -232,7 +249,7 @@ impl Network { pub fn sign_peer_info(&self, now: u32, ttl: u32) -> PeerInfo { let mut res = PeerInfo { id: *self.0.peer_id(), - address_list: vec![self.local_addr().into()].into_boxed_slice(), + address_list: vec![self.remote_addr().clone()].into_boxed_slice(), created_at: now, expires_at: now.saturating_add(ttl), signature: Box::new([0; 64]), @@ -248,6 +265,7 @@ impl Network { struct NetworkInner { config: Arc, + remote_addr: Address, endpoint: Arc, active_peers: WeakActivePeers, known_peers: KnownPeers, @@ -256,6 +274,10 @@ struct NetworkInner { } impl NetworkInner { + fn remote_addr(&self) -> &Address { + &self.remote_addr + } + fn local_addr(&self) -> SocketAddr { self.endpoint.local_addr() } @@ -355,7 +377,7 @@ mod tests { fn make_peer_info(network: &Network) -> Arc { Arc::new(PeerInfo { id: *network.peer_id(), - address_list: vec![network.local_addr().into()].into_boxed_slice(), + address_list: vec![network.remote_addr().clone()].into_boxed_slice(), created_at: 0, expires_at: u32::MAX, signature: Box::new([0; 64]), diff --git a/network/src/overlay/background_tasks.rs b/network/src/overlay/background_tasks.rs index 58bdae552..333d95105 100644 --- a/network/src/overlay/background_tasks.rs +++ b/network/src/overlay/background_tasks.rs @@ -230,7 +230,7 @@ impl OverlayServiceInner { entries.extend( all_entries .choose_multiple(rng, n) - .filter(|&item| (item.entry.peer_id != target_peer_id)) + .filter(|&item| item.entry.peer_id != target_peer_id) .map(|item| item.entry.clone()) .take(n - 1), ); @@ -259,7 +259,7 @@ impl OverlayServiceInner { count = entries.len(), "received public entries" ); - overlay.add_untrusted_entries(&entries, now_sec()); + overlay.add_untrusted_entries(&self.local_id, &entries, now_sec()); } PublicEntriesResponse::OverlayNotFound => { tracing::debug!( @@ -311,7 +311,7 @@ impl OverlayServiceInner { } }; - overlay.add_untrusted_entries(&entries, now_sec()); + overlay.add_untrusted_entries(&self.local_id, &entries, now_sec()); tracing::debug!(count = entries.len(), "discovered public entries"); Ok(()) diff --git a/network/src/overlay/mod.rs b/network/src/overlay/mod.rs index c8abd000d..5af17e7dc 100644 --- a/network/src/overlay/mod.rs +++ b/network/src/overlay/mod.rs @@ -317,7 +317,7 @@ impl OverlayServiceInner { }; // Add proposed entries to the overlay - overlay.add_untrusted_entries(&req.entries, now_sec()); + overlay.add_untrusted_entries(&self.local_id, &req.entries, now_sec()); // Collect proposed entries to exclude from the response let requested_ids = req diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs index a9dcfae32..bca628299 100644 --- a/network/src/overlay/public_overlay.rs +++ b/network/src/overlay/public_overlay.rs @@ -202,7 +202,12 @@ impl PublicOverlay { /// Adds the given entries to the overlay. /// /// NOTE: Will deadlock if called while `PublicOverlayEntriesReadGuard` is held. - pub(crate) fn add_untrusted_entries(&self, entries: &[Arc], now: u32) { + pub(crate) fn add_untrusted_entries( + &self, + local_id: &PeerId, + entries: &[Arc], + now: u32, + ) { if entries.is_empty() { return; } @@ -239,6 +244,7 @@ impl PublicOverlay { for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) { if entry.is_expired(now, this.entry_ttl_sec) || self.inner.banned_peer_ids.contains(&entry.peer_id) + || entry.peer_id == local_id { // Skip expired or banned peers early continue; @@ -579,16 +585,17 @@ mod tests { #[test] fn min_capacity_works_with_single_thread() { let now = now_sec(); + let local_id: PeerId = rand::random(); // Add with small portions { let overlay = make_overlay_with_min_capacity(10); let entries = generate_public_entries(&overlay, now, 10); - overlay.add_untrusted_entries(&entries[..5], now); + overlay.add_untrusted_entries(&local_id, &entries[..5], now); assert_eq!(count_entries(&overlay), 5); - overlay.add_untrusted_entries(&entries[5..], now); + overlay.add_untrusted_entries(&local_id, &entries[5..], now); assert_eq!(count_entries(&overlay), 10); } @@ -596,7 +603,7 @@ mod tests { { let overlay = make_overlay_with_min_capacity(10); let entries = generate_public_entries(&overlay, now, 10); - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 10); } @@ -604,7 +611,7 @@ mod tests { { let overlay = make_overlay_with_min_capacity(10); let entries = generate_public_entries(&overlay, now, 20); - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 10); } @@ -612,7 +619,7 @@ mod tests { { let overlay = make_overlay_with_min_capacity(0); let entries = generate_public_entries(&overlay, now, 10); - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 0); } @@ -622,7 +629,7 @@ mod tests { let entries = (0..10) .map(|_| generate_invalid_public_entry(now)) .collect::>(); - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 0); } @@ -641,7 +648,7 @@ mod tests { generate_invalid_public_entry(now), generate_public_entry(&overlay, now), ]; - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 5); } @@ -660,7 +667,7 @@ mod tests { generate_public_entry(&overlay, now), generate_public_entry(&overlay, now), ]; - overlay.add_untrusted_entries(&entries, now); + overlay.add_untrusted_entries(&local_id, &entries, now); assert_eq!(count_entries(&overlay), 3); } } @@ -668,6 +675,7 @@ mod tests { #[test] fn min_capacity_works_with_multi_thread() { let now = now_sec(); + let local_id: PeerId = rand::random(); let overlay = make_overlay_with_min_capacity(201); let entries = generate_public_entries(&overlay, now, 7 * 3 * 10); @@ -676,7 +684,7 @@ mod tests { for entries in entries.chunks_exact(7 * 3) { s.spawn(|| { for entries in entries.chunks_exact(7) { - overlay.add_untrusted_entries(entries, now); + overlay.add_untrusted_entries(&local_id, entries, now); } }); } diff --git a/network/src/overlay/tasks_stream.rs b/network/src/overlay/tasks_stream.rs index e484eaf71..39e7bf895 100644 --- a/network/src/overlay/tasks_stream.rs +++ b/network/src/overlay/tasks_stream.rs @@ -105,7 +105,7 @@ impl TasksStream { let overlay_id = *overlay_id; async move { if let Err(e) = fut.await { - tracing::error!(task, %overlay_id, "task failed: {e:?}"); + tracing::error!(task, %overlay_id, "task failed: {e}"); } overlay_id } diff --git a/storage/src/config.rs b/storage/src/config.rs new file mode 100644 index 000000000..f22b87373 --- /dev/null +++ b/storage/src/config.rs @@ -0,0 +1,85 @@ +use std::path::{Path, PathBuf}; + +use bytesize::ByteSize; +use serde::{Deserialize, Serialize}; + +use crate::db::DbConfig; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields, default)] +pub struct StorageConfig { + /// Path to the root directory of the storage. + /// + /// Default: `./db`. + pub root_dir: PathBuf, + + /// Runtime cells cache size. + /// + /// Default: calculated based on the available memory. + pub cells_cache_size: ByteSize, + + /// RocksDB configuration. + pub db_config: DbConfig, +} + +impl StorageConfig { + /// Creates a new storage config with very low cache sizes. + pub fn new_potato(path: &Path) -> Self { + Self { + root_dir: path.to_owned(), + cells_cache_size: ByteSize::kb(1024), + db_config: DbConfig { + rocksdb_lru_capacity: ByteSize::kb(1024), + }, + } + } +} + +impl Default for StorageConfig { + fn default() -> Self { + // Fetch the currently available memory in bytes + let available = { + let mut sys = sysinfo::System::new(); + sys.refresh_memory(); + sys.available_memory() + }; + + // Estimated memory usage of components other than cache: + // - 2 GiBs for write buffers(4 if we are out of luck and all memtables are being flushed at the same time) + // - 2 GiBs for indexer logic + // - 10 bits per cell for bloom filter. Realistic case is 100M cells, so 0.25 GiBs + // - 1/3 of all available memory is reserved for kernel buffers + const WRITE_BUFFERS: ByteSize = ByteSize::gib(2); + const INDEXER_LOGIC: ByteSize = ByteSize::gib(2); + const BLOOM_FILTER: ByteSize = ByteSize::mib(256); + let estimated_memory_usage = WRITE_BUFFERS + INDEXER_LOGIC + BLOOM_FILTER + available / 3; + + // Reduce the available memory by the fixed offset + let available = available + .checked_sub(estimated_memory_usage.as_u64()) + .unwrap_or_else(|| { + tracing::error!( + "Not enough memory for cache, using 1/4 of all available memory. \ + Tweak `db_options` in config to improve performance." + ); + available / 4 + }); + + // We will use 3/4 of available memory for the cells cache (at most 4 GB). + let cells_cache_size = std::cmp::min(ByteSize(available * 4 / 3), ByteSize::gib(4)); + + // The reset of the memory is used for LRU cache (at least 128 MB) + let rocksdb_lru_capacity = std::cmp::max( + ByteSize(available.saturating_sub(cells_cache_size.as_u64())), + ByteSize::mib(128), + ); + + Self { + root_dir: PathBuf::from("./db"), + cells_cache_size, + db_config: DbConfig { + rocksdb_lru_capacity, + }, + } + } +} diff --git a/storage/src/db/kv_db/config.rs b/storage/src/db/kv_db/config.rs index bd2d53b2d..93839d9e4 100644 --- a/storage/src/db/kv_db/config.rs +++ b/storage/src/db/kv_db/config.rs @@ -3,53 +3,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields, default)] -pub struct DbOptions { +pub struct DbConfig { pub rocksdb_lru_capacity: ByteSize, pub cells_cache_size: ByteSize, } - -impl Default for DbOptions { - fn default() -> Self { - // Fetch the currently available memory in bytes - let available = { - let mut sys = sysinfo::System::new(); - sys.refresh_memory(); - sys.available_memory() - }; - - // Estimated memory usage of components other than cache: - // - 2 GiBs for write buffers(4 if we are out of luck and all memtables are being flushed at the same time) - // - 2 GiBs for indexer logic - // - 10 bits per cell for bloom filter. Realistic case is 100M cells, so 0.25 GiBs - // - 1/3 of all available memory is reserved for kernel buffers - const WRITE_BUFFERS: ByteSize = ByteSize::gib(2); - const INDEXER_LOGIC: ByteSize = ByteSize::gib(2); - const BLOOM_FILTER: ByteSize = ByteSize::mib(256); - let estimated_memory_usage = WRITE_BUFFERS + INDEXER_LOGIC + BLOOM_FILTER + available / 3; - - // Reduce the available memory by the fixed offset - let available = available - .checked_sub(estimated_memory_usage.as_u64()) - .unwrap_or_else(|| { - tracing::error!( - "Not enough memory for cache, using 1/4 of all available memory. \ - Tweak `db_options` in config to improve performance." - ); - available / 4 - }); - - // We will use 3/4 of available memory for the cells cache (at most 4 GB). - let cells_cache_size = std::cmp::min(ByteSize(available * 4 / 3), ByteSize::gib(4)); - - // The reset of the memory is used for LRU cache (at least 128 MB) - let rocksdb_lru_capacity = std::cmp::max( - ByteSize(available.saturating_sub(cells_cache_size.as_u64())), - ByteSize::mib(128), - ); - - Self { - rocksdb_lru_capacity, - cells_cache_size, - } - } -} diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index aeed753fd..801b06759 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -4,19 +4,22 @@ use std::thread::available_parallelism; use anyhow::{Context, Result}; use bytesize::ByteSize; +use serde::{Deserialize, Serialize}; use weedb::{Caches, WeeDb}; pub use weedb::Stats as RocksdbStats; pub use weedb::{rocksdb, BoundedCfHandle, ColumnFamily, Table}; -pub use self::config::DbOptions; - pub mod refcount; pub mod tables; -mod config; mod migrations; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DbConfig { + pub rocksdb_lru_capacity: ByteSize, +} + pub struct Db { pub archives: Table, pub block_handles: Table, @@ -35,10 +38,9 @@ pub struct Db { } impl Db { - pub fn open(path: PathBuf, options: DbOptions) -> Result> { + pub fn open(path: PathBuf, options: DbConfig) -> Result> { tracing::info!( rocksdb_lru_capacity = %options.rocksdb_lru_capacity, - cells_cache_size = %options.cells_cache_size, "opening DB" ); diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 974d0361a..b51cf7675 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,12 +1,13 @@ -use std::path::PathBuf; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; +pub use self::config::*; pub use self::db::*; pub use self::models::*; pub use self::store::*; +mod config; mod db; mod models; mod store; @@ -17,6 +18,9 @@ mod util { mod stored_value; } +const DB_SUBDIR: &str = "rocksdb"; +const FILES_SUBDIR: &str = "files"; + #[derive(Clone)] #[repr(transparent)] pub struct Storage { @@ -24,30 +28,35 @@ pub struct Storage { } impl Storage { - pub fn new(db: Arc, file_db_path: PathBuf, max_cell_cache_size_bytes: u64) -> Result { - let files_dir = FileDb::new(file_db_path); + pub fn new(config: StorageConfig) -> Result { + let root = FileDb::new(&config.root_dir); + + let files_db = root.subdir(FILES_SUBDIR); + let kv_db = Db::open(config.root_dir.join(DB_SUBDIR), config.db_config) + .context("failed to open a rocksdb")?; - let block_handle_storage = Arc::new(BlockHandleStorage::new(db.clone())); - let block_connection_storage = Arc::new(BlockConnectionStorage::new(db.clone())); + let block_handle_storage = Arc::new(BlockHandleStorage::new(kv_db.clone())); + let block_connection_storage = Arc::new(BlockConnectionStorage::new(kv_db.clone())); let runtime_storage = Arc::new(RuntimeStorage::new(block_handle_storage.clone())); let block_storage = Arc::new(BlockStorage::new( - db.clone(), + kv_db.clone(), block_handle_storage.clone(), block_connection_storage.clone(), )?); let shard_state_storage = ShardStateStorage::new( - db.clone(), - &files_dir, + kv_db.clone(), + &files_db, block_handle_storage.clone(), block_storage.clone(), - max_cell_cache_size_bytes, + config.cells_cache_size.as_u64(), )?; let persistent_state_storage = - PersistentStateStorage::new(db.clone(), &files_dir, block_handle_storage.clone())?; - let node_state_storage = NodeStateStorage::new(db); + PersistentStateStorage::new(kv_db.clone(), &files_db, block_handle_storage.clone())?; + let node_state_storage = NodeStateStorage::new(kv_db); Ok(Self { inner: Arc::new(Inner { + root, block_handle_storage, block_storage, shard_state_storage, @@ -65,28 +74,15 @@ impl Storage { /// otherwise compaction filter will not work. #[cfg(any(test, feature = "test"))] pub fn new_temp() -> Result<(Self, tempfile::TempDir)> { - use bytesize::ByteSize; - let tmp_dir = tempfile::tempdir()?; - let root_path = tmp_dir.path(); - - // Init rocksdb - let db_options = DbOptions { - rocksdb_lru_capacity: ByteSize::kb(1024), - cells_cache_size: ByteSize::kb(1024), - }; - let db = Db::open(root_path.join("db_storage"), db_options)?; - - // Init storage - let storage = Storage::new( - db, - root_path.join("file_storage"), - db_options.cells_cache_size.as_u64(), - )?; - + let storage = Storage::new(StorageConfig::new_potato(tmp_dir.path()))?; Ok((storage, tmp_dir)) } + pub fn root(&self) -> &FileDb { + &self.inner.root + } + pub fn runtime_storage(&self) -> &RuntimeStorage { &self.inner.runtime_storage } @@ -117,6 +113,7 @@ impl Storage { } struct Inner { + root: FileDb, runtime_storage: Arc, block_handle_storage: Arc, block_connection_storage: Arc, diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index 6d56d4bec..c6fedfa2a 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -658,6 +658,7 @@ impl StorageCellReferenceData { struct RawCellsCache(Cache); impl RawCellsCache { + #[allow(unused)] pub(crate) fn hit_ratio(&self) -> f64 { (if self.0.hits() > 0 { self.0.hits() as f64 / (self.0.hits() + self.0.misses()) as f64 diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs index b945e216a..edfe5f14f 100644 --- a/storage/tests/mod.rs +++ b/storage/tests/mod.rs @@ -1,12 +1,11 @@ use std::str::FromStr; use anyhow::Result; -use bytesize::ByteSize; use everscale_types::boc::Boc; use everscale_types::cell::{Cell, DynCell}; use everscale_types::models::{BlockId, ShardState}; use tycho_block_util::state::ShardStateStuff; -use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; +use tycho_storage::{BlockMetaData, Storage}; #[derive(Clone)] struct ShardStateCombined { @@ -57,22 +56,7 @@ fn compare_cells(orig_cell: &DynCell, stored_cell: &DynCell) { async fn persistent_storage_everscale() -> Result<()> { tracing_subscriber::fmt::try_init().ok(); - let tmp_dir = tempfile::tempdir()?; - let root_path = tmp_dir.path(); - - // Init rocksdb - let db_options = DbOptions { - rocksdb_lru_capacity: ByteSize::kb(1024), - cells_cache_size: ByteSize::kb(1024), - }; - let db = Db::open(root_path.join("db_storage"), db_options)?; - - // Init storage - let storage = Storage::new( - db, - root_path.join("file_storage"), - db_options.cells_cache_size.as_u64(), - )?; + let (storage, _tmp_dir) = Storage::new_temp()?; assert!(storage.node_state().load_init_mc_block_id().is_none()); // Read zerostate @@ -159,8 +143,5 @@ async fn persistent_storage_everscale() -> Result<()> { let cell = Boc::decode(&persistent_state_data)?; assert_eq!(&cell, zerostate.root_cell()); - // Clear files for test - tmp_dir.close()?; - Ok(()) } diff --git a/util/Cargo.toml b/util/Cargo.toml index 5511f9023..1ea918b08 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true [dependencies] # crates.io deps ahash = { workspace = true } +anyhow = { workspace = true } castaway = { workspace = true } dashmap = { workspace = true } everscale-crypto = { workspace = true } @@ -20,6 +21,8 @@ humantime = { workspace = true } libc = { workspace = true } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +serde_path_to_error = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["time", "sync", "rt"] } tracing = { workspace = true } diff --git a/util/src/serde_helpers.rs b/util/src/serde_helpers.rs index 0d2ddd9ce..c0387633a 100644 --- a/util/src/serde_helpers.rs +++ b/util/src/serde_helpers.rs @@ -1,10 +1,22 @@ use std::borrow::Cow; use std::marker::PhantomData; +use std::path::Path; use std::str::FromStr; +use anyhow::Result; use serde::de::{Error, Expected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; +pub fn load_json_from_file(path: P) -> Result +where + for<'de> T: Deserialize<'de>, + P: AsRef, +{ + let data = std::fs::read_to_string(path)?; + let de = &mut serde_json::Deserializer::from_str(&data); + serde_path_to_error::deserialize(de).map_err(Into::into) +} + pub mod socket_addr { use std::net::SocketAddr;