diff --git a/Cargo.lock b/Cargo.lock index 28d789852..81808e21d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "rand", "safelog", "serde", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-chanmgr", @@ -1629,6 +1629,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs-err" version = "2.11.0" @@ -1650,7 +1656,7 @@ dependencies = [ "once_cell", "pwd-grp", "serde", - "thiserror 2.0.8", + "thiserror 2.0.12", "walkdir", ] @@ -1681,7 +1687,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f261f25f1e94963fe8f72863f4da841b280fa3b5a573990b425a26b585a54578" dependencies = [ "fslock-arti-fork", - "thiserror 2.0.8", + "thiserror 2.0.12", "winapi", ] @@ -2353,10 +2359,11 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -2450,8 +2457,7 @@ dependencies = [ [[package]] name = "libsqlite3-sys" version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +source = "git+https://github.com/yukibtc/rusqlite?rev=846aeb3e4088b11d8804bd3261464ba84352a2f6#846aeb3e4088b11d8804bd3261464ba84352a2f6" dependencies = [ "cc", "pkg-config", @@ -2745,6 +2751,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "nostr-gossip" +version = "0.39.0" +dependencies = [ + "async-utility", + "lru", + "nostr", + "rusqlite", + "tokio", + "tracing", +] + [[package]] name = "nostr-indexeddb" version = "0.39.0" @@ -2834,6 +2852,7 @@ dependencies = [ "nostr", "nostr-connect", "nostr-database", + "nostr-gossip", "nostr-indexeddb", "nostr-lmdb", "nostr-ndb", @@ -2885,7 +2904,7 @@ dependencies = [ "flatbuffers", "futures", "libc", - "thiserror 2.0.8", + "thiserror 2.0.12", "tokio", "tracing", "tracing-subscriber", @@ -3811,8 +3830,7 @@ dependencies = [ [[package]] name = "rusqlite" version = "0.32.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +source = "git+https://github.com/yukibtc/rusqlite?rev=846aeb3e4088b11d8804bd3261464ba84352a2f6#846aeb3e4088b11d8804bd3261464ba84352a2f6" dependencies = [ "bitflags 2.6.0", "fallible-iterator", @@ -3820,6 +3838,7 @@ dependencies = [ "hashlink", "libsqlite3-sys", "smallvec", + "sqlite-wasm-rs", "time", ] @@ -3956,7 +3975,7 @@ dependencies = [ "educe", "either", "fluid-let", - "thiserror 2.0.8", + "thiserror 2.0.12", ] [[package]] @@ -4295,7 +4314,7 @@ dependencies = [ "paste", "serde", "slotmap", - "thiserror 2.0.8", + "thiserror 2.0.12", "void", ] @@ -4343,6 +4362,24 @@ dependencies = [ "der", ] +[[package]] +name = "sqlite-wasm-rs" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84473e8335ed2f167c6f8040970d976990fb66aa62292ae58735de6d61ecfb73" +dependencies = [ + "fragile", + "js-sys", + "once_cell", + "parking_lot", + "thiserror 2.0.12", + "tokio", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "xshell", +] + [[package]] name = "ssh-cipher" version = "0.2.0" @@ -4541,11 +4578,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.8" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f5383f3e0071702bf93ab5ee99b52d26936be9dedd9413067cbdcddcb6141a" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.8", + "thiserror-impl 2.0.12", ] [[package]] @@ -4561,9 +4598,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.8" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f357fcec90b3caef6623a099691be676d033b40a058ac95d2a6ade6fa0c943" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", @@ -4772,7 +4809,7 @@ dependencies = [ "oneshot-fused-workaround", "pin-project", "postage", - "thiserror 2.0.8", + "thiserror 2.0.12", "void", ] @@ -4792,7 +4829,7 @@ dependencies = [ "serde", "slab", "smallvec", - "thiserror 2.0.8", + "thiserror 2.0.12", ] [[package]] @@ -4807,7 +4844,7 @@ dependencies = [ "educe", "getrandom 0.2.15", "safelog", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-error", "tor-llcrypto", "zeroize", @@ -4829,7 +4866,7 @@ dependencies = [ "paste", "rand", "smallvec", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-basic-utils", "tor-bytes", "tor-cert", @@ -4852,7 +4889,7 @@ dependencies = [ "derive_builder_fork_arti", "derive_more", "digest", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-bytes", "tor-checkable", "tor-llcrypto", @@ -4874,7 +4911,7 @@ dependencies = [ "rand", "safelog", "serde", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-cell", @@ -4900,7 +4937,7 @@ checksum = "cd3d9898abee1d7dd03dee82809bd261274bd04f1174f042aa8ab4fdfb0d18b4" dependencies = [ "humantime", "signature", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-llcrypto", ] @@ -4930,7 +4967,7 @@ dependencies = [ "safelog", "serde", "static_assertions", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-chanmgr", @@ -4976,7 +5013,7 @@ dependencies = [ "serde-value", "serde_ignored", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "toml 0.8.19", "tor-basic-utils", "tor-error", @@ -4995,7 +5032,7 @@ dependencies = [ "once_cell", "serde", "shellexpand", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-error", "tor-general-addr", ] @@ -5008,7 +5045,7 @@ checksum = "bb5933975e5a89df3d68de12c70f7b48252327c2fe24e67a8e1abc3d3e2be348" dependencies = [ "digest", "hex", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-llcrypto", ] @@ -5028,7 +5065,7 @@ dependencies = [ "httpdate", "itertools 0.14.0", "memchr", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-circmgr", "tor-error", "tor-hscrypto", @@ -5072,7 +5109,7 @@ dependencies = [ "serde", "signature", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "time", "tor-async-utils", "tor-basic-utils", @@ -5105,7 +5142,7 @@ dependencies = [ "retry-error", "static_assertions", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tracing", "void", ] @@ -5117,7 +5154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "736453e89f894e1967266e4bdcf7ac3e2c3be908f0cb02f669e60e4d7420cda8" dependencies = [ "derive_more", - "thiserror 2.0.8", + "thiserror 2.0.12", "void", ] @@ -5146,7 +5183,7 @@ dependencies = [ "safelog", "serde", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-config", @@ -5183,7 +5220,7 @@ dependencies = [ "safelog", "slotmap-careful", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-bytes", @@ -5223,7 +5260,7 @@ dependencies = [ "safelog", "signature", "subtle", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-basic-utils", "tor-bytes", "tor-error", @@ -5249,7 +5286,7 @@ dependencies = [ "safelog", "serde", "serde_with", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-cell", "tor-config", @@ -5294,7 +5331,7 @@ dependencies = [ "serde", "serde_with", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-bytes", @@ -5333,7 +5370,7 @@ dependencies = [ "rand", "signature", "ssh-key", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-cert", "tor-error", "tor-llcrypto", @@ -5362,7 +5399,7 @@ dependencies = [ "serde", "signature", "ssh-key", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-basic-utils", "tor-config", "tor-config-path", @@ -5394,7 +5431,7 @@ dependencies = [ "serde", "serde_with", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-basic-utils", "tor-bytes", "tor-config", @@ -5430,7 +5467,7 @@ dependencies = [ "sha3", "signature", "subtle", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-memquota", "visibility", "x25519-dalek", @@ -5446,7 +5483,7 @@ dependencies = [ "futures", "humantime", "once_cell", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-error", "tor-rtcompat", "tracing", @@ -5470,7 +5507,7 @@ dependencies = [ "serde", "slotmap-careful", "static_assertions", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-async-utils", "tor-basic-utils", "tor-config", @@ -5500,7 +5537,7 @@ dependencies = [ "serde", "static_assertions", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "time", "tor-basic-utils", "tor-error", @@ -5539,7 +5576,7 @@ dependencies = [ "signature", "smallvec", "subtle", - "thiserror 2.0.8", + "thiserror 2.0.12", "time", "tinystr", "tor-basic-utils", @@ -5578,7 +5615,7 @@ dependencies = [ "sanitize-filename", "serde", "serde_json", - "thiserror 2.0.8", + "thiserror 2.0.12", "time", "tor-async-utils", "tor-basic-utils", @@ -5613,7 +5650,7 @@ dependencies = [ "safelog", "slotmap-careful", "subtle", - "thiserror 2.0.8", + "thiserror 2.0.12", "tokio", "tokio-util", "tor-async-utils", @@ -5646,7 +5683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d85616d04baa5940d19877394f9f19640cc2f50e74766d679f7e35350816720" dependencies = [ "caret", - "thiserror 2.0.8", + "thiserror 2.0.12", ] [[package]] @@ -5681,7 +5718,7 @@ dependencies = [ "paste", "pin-project", "rustls-pki-types", - "thiserror 2.0.8", + "thiserror 2.0.12", "tokio", "tokio-util", "tor-error", @@ -5710,7 +5747,7 @@ dependencies = [ "priority-queue", "slotmap-careful", "strum", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-error", "tor-general-addr", "tor-rtcompat", @@ -5731,7 +5768,7 @@ dependencies = [ "educe", "safelog", "subtle", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-bytes", "tor-error", ] @@ -5744,7 +5781,7 @@ checksum = "7cbb818d417a039a4201c92c86adef77871ed2fc0421ac0706795ebb1ea6903f" dependencies = [ "derive-deftly", "derive_more", - "thiserror 2.0.8", + "thiserror 2.0.12", "tor-memquota", ] @@ -5858,7 +5895,7 @@ dependencies = [ "rustls", "rustls-pki-types", "sha1", - "thiserror 2.0.8", + "thiserror 2.0.12", "utf-8", ] @@ -6196,24 +6233,24 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if", "once_cell", + "rustversion", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.90", @@ -6222,21 +6259,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ "cfg-if", "js-sys", + "once_cell", "wasm-bindgen", "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -6244,9 +6282,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", @@ -6257,9 +6295,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] [[package]] name = "weak-table" @@ -6269,9 +6310,9 @@ checksum = "323f4da9523e9a669e1eaf9c6e763892769b1d38c623913647bfdc1532fe4549" [[package]] name = "web-sys" -version = "0.3.70" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" dependencies = [ "js-sys", "wasm-bindgen", @@ -6667,6 +6708,21 @@ dependencies = [ "untrusted 0.7.1", ] +[[package]] +name = "xshell" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e7290c623014758632efe00737145b6867b66292c42167f2ec381eb566a373d" +dependencies = [ + "xshell-macros", +] + +[[package]] +name = "xshell-macros" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32ac00cd3f8ec9c1d33fb3e7958a82df6989c42d747bd326c822b1d625283547" + [[package]] name = "xxhash-rust" version = "0.8.12" diff --git a/Cargo.toml b/Cargo.toml index 56e1ff71c..49296565f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ negentropy-deprecated = { package = "negentropy", version = "0.3", default-featu nostr = { version = "0.39", path = "./crates/nostr", default-features = false } nostr-connect = { version = "0.39", path = "./crates/nostr-connect", default-features = false } nostr-database = { version = "0.39", path = "./crates/nostr-database", default-features = false } +nostr-gossip = { version = "0.39", path = "./crates/nostr-gossip", default-features = false } nostr-indexeddb = { version = "0.39", path = "./crates/nostr-indexeddb", default-features = false } nostr-lmdb = { version = "0.39", path = "./crates/nostr-lmdb", default-features = false } nostr-ndb = { version = "0.39", path = "./crates/nostr-ndb", default-features = false } @@ -46,6 +47,7 @@ web-sys = { version = "0.3", default-features = false } [patch.crates-io] # Patch needed to reduce bindings size bip39 = { git = "https://github.com/yukibtc/rust-bip39", rev = "eade7c56eff5f320e8eb5beee23dd8fb46413938" } # Uses bitcoin_hashes v0.14 +rusqlite = { git = "https://github.com/yukibtc/rusqlite", rev = "846aeb3e4088b11d8804bd3261464ba84352a2f6" } # rusqlite v0.32 with WASM support [profile.release] lto = true diff --git a/README.md b/README.md index 80ba399c3..da399c026 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ The project is split up into several crates in the `crates/` directory: * [**nostr-lmdb**](./crates/nostr-lmdb): LMDB storage backend * [**nostr-ndb**](./crates/nostr-ndb): [nostrdb](https://github.com/damus-io/nostrdb) storage backend * [**nostr-indexeddb**](./crates/nostr-indexeddb): IndexedDB storage backend + * [**nostr-gossip**](./crates/nostr-gossip): Gossip tracker and storage * [**nostr-relay-pool**](./crates/nostr-relay-pool): Nostr Relay Pool * [**nostr-sdk**](./crates/nostr-sdk): High level client library * [**nwc**](./crates/nwc): Nostr Wallet Connect (NWC) client diff --git a/bindings/nostr-sdk-js/.cargo/config.toml b/bindings/nostr-sdk-js/.cargo/config.toml index 28d7affbb..af9762956 100644 --- a/bindings/nostr-sdk-js/.cargo/config.toml +++ b/bindings/nostr-sdk-js/.cargo/config.toml @@ -3,11 +3,14 @@ opt-level = 'z' # Optimize for size. lto = true # Enable Link Time Optimization codegen-units = 1 # Reduce number of codegen units to increase optimizations. panic = "abort" # Abort on panic -strip = true # Strip symbols from binary +strip = "debuginfo" # Strip debug symbols from binary. Full strip will cause issues with SQLite [build] target = "wasm32-unknown-unknown" -rustflags = ["-C", "panic=abort"] +rustflags = [ + "-Cpanic=abort", + "-Ctarget-feature=+bulk-memory" # Required for SQLite WASM +] [unstable] unstable-options = true diff --git a/bindings/nostr-sdk-js/Cargo.toml b/bindings/nostr-sdk-js/Cargo.toml index bf22d95ea..c49c2e216 100644 --- a/bindings/nostr-sdk-js/Cargo.toml +++ b/bindings/nostr-sdk-js/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] console_error_panic_hook = "0.1" js-sys.workspace = true nostr-connect.workspace = true -nostr-sdk = { workspace = true, default-features = false, features = ["all-nips", "indexeddb"] } +nostr-sdk = { workspace = true, default-features = false, features = ["all-nips", "indexeddb", "gossip"] } nwc.workspace = true tracing.workspace = true tracing-subscriber.workspace = true @@ -19,7 +19,7 @@ wasm-bindgen = { workspace = true, features = ["std"] } wasm-bindgen-futures.workspace = true [package.metadata.wasm-pack.profile.profiling] -wasm-opt = true +wasm-opt = false # Optimizations causes issues to SQLite gossip storage [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(wasm_bindgen_unstable_test_coverage)'] } diff --git a/bindings/nostr-sdk-js/examples/gossip.js b/bindings/nostr-sdk-js/examples/gossip.js new file mode 100644 index 000000000..eec480662 --- /dev/null +++ b/bindings/nostr-sdk-js/examples/gossip.js @@ -0,0 +1,31 @@ +const { Keys, Client, NostrSigner, PublicKey, EventBuilder, loadWasmAsync, initLogger, Gossip, LogLevel, Tag } = require("../"); + +async function main() { + await loadWasmAsync(); + + initLogger(LogLevel.info()); + + let keys = Keys.parse("nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"); + let signer = NostrSigner.keys(keys); + + let gossip = Gossip.inMemory() + + let client = Client.builder().signer(signer).gossip(gossip).build(); + + await client.addDiscoveryRelay("wss://relay.damus.io"); + await client.addDiscoveryRelay("wss://purplepag.es"); + + await client.connect(); + + let pk = PublicKey.parse("npub1drvpzev3syqt0kjrls50050uzf25gehpz9vgdw08hvex7e0vgfeq0eseet"); + + let builder = EventBuilder.textNote( + "Hello world nostr:npub1drvpzev3syqt0kjrls50050uzf25gehpz9vgdw08hvex7e0vgfeq0eseet", + ).tags([Tag.publicKey(pk)]); + let output = await client.sendEventBuilder(builder); + console.log("Event ID", output.id.toBech32()); + console.log("Successfully sent to:", output.success); + console.log("Failed to sent to:", output.failed); +} + +main(); \ No newline at end of file diff --git a/bindings/nostr-sdk-js/src/client/builder.rs b/bindings/nostr-sdk-js/src/client/builder.rs index 84d5078ff..6920271c6 100644 --- a/bindings/nostr-sdk-js/src/client/builder.rs +++ b/bindings/nostr-sdk-js/src/client/builder.rs @@ -10,6 +10,7 @@ use wasm_bindgen::prelude::*; use super::options::JsOptions; use super::{JsClient, JsNostrSigner}; use crate::database::JsNostrDatabase; +use crate::gossip::JsGossip; use crate::policy::{FFI2RustAdmitPolicy, JsAdmitPolicy}; #[wasm_bindgen(js_name = ClientBuilder)] @@ -41,6 +42,10 @@ impl JsClientBuilder { self.inner.database(database.deref().clone()).into() } + pub fn gossip(self, gossip: &JsGossip) -> Self { + self.inner.gossip(gossip.deref().clone()).into() + } + #[wasm_bindgen(js_name = admitPolicy)] pub fn admit_policy(self, policy: JsAdmitPolicy) -> Self { self.inner diff --git a/bindings/nostr-sdk-js/src/client/options.rs b/bindings/nostr-sdk-js/src/client/options.rs index 04082a5aa..97311f96d 100644 --- a/bindings/nostr-sdk-js/src/client/options.rs +++ b/bindings/nostr-sdk-js/src/client/options.rs @@ -52,11 +52,6 @@ impl JsOptions { self.inner.automatic_authentication(enabled).into() } - /// Enable gossip model (default: false) - pub fn gossip(self, enable: bool) -> Self { - self.inner.gossip(enable).into() - } - /// Set custom relay limits #[wasm_bindgen(js_name = relayLimits)] pub fn relay_limits(self, limits: &JsRelayLimits) -> Self { diff --git a/bindings/nostr-sdk-js/src/gossip.rs b/bindings/nostr-sdk-js/src/gossip.rs new file mode 100644 index 000000000..49c88d726 --- /dev/null +++ b/bindings/nostr-sdk-js/src/gossip.rs @@ -0,0 +1,31 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::ops::Deref; + +use nostr_sdk::prelude::*; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen(js_name = Gossip)] +pub struct JsGossip { + inner: Gossip, +} + +impl Deref for JsGossip { + type Target = Gossip; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[wasm_bindgen(js_class = Gossip)] +impl JsGossip { + #[wasm_bindgen(js_name = inMemory)] + pub fn in_memory() -> Self { + Self { + inner: Gossip::in_memory(), + } + } +} diff --git a/bindings/nostr-sdk-js/src/lib.rs b/bindings/nostr-sdk-js/src/lib.rs index 1b0556477..2143954fe 100644 --- a/bindings/nostr-sdk-js/src/lib.rs +++ b/bindings/nostr-sdk-js/src/lib.rs @@ -19,6 +19,7 @@ pub mod connect; pub mod database; pub mod duration; pub mod error; +pub mod gossip; pub mod logger; pub mod nwc; pub mod policy; diff --git a/contrib/scripts/check-crates.sh b/contrib/scripts/check-crates.sh index 2f943d331..dbc03eecc 100755 --- a/contrib/scripts/check-crates.sh +++ b/contrib/scripts/check-crates.sh @@ -45,8 +45,10 @@ buildargs=( "-p nostr-relay-builder" "-p nostr-connect" "-p nwc" + "-p nostr-gossip" "-p nostr-sdk" # No default features "-p nostr-sdk --features all-nips" # Only NIPs features + "-p nostr-sdk --features gossip" # Gossip stuff "-p nostr-sdk --features tor" # Embedded tor client "-p nostr-sdk --all-features" # All features "-p nostr-cli" diff --git a/contrib/scripts/release.sh b/contrib/scripts/release.sh index 3eb82bcc1..e0ae90294 100755 --- a/contrib/scripts/release.sh +++ b/contrib/scripts/release.sh @@ -12,6 +12,7 @@ args=( "-p nostr-relay-pool" "-p nwc" "-p nostr-connect" + "-p nostr-gossip" "-p nostr-sdk" "-p nostr-cli" ) diff --git a/crates/nostr-gossip/Cargo.toml b/crates/nostr-gossip/Cargo.toml new file mode 100644 index 000000000..b7c89c8b8 --- /dev/null +++ b/crates/nostr-gossip/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "nostr-gossip" +version = "0.39.0" +edition = "2021" +description = "Gossip storage for nostr apps" +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +readme = "README.md" +rust-version.workspace = true +keywords = ["nostr", "gossip"] + +[dependencies] +async-utility.workspace = true +lru.workspace = true +nostr = { workspace = true, features = ["std"] } +tracing = { workspace = true, features = ["std"] } + +[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] +rusqlite = { version = "0.32", features = ["bundled"] } + +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +rusqlite = { version = "0.32", features = ["precompiled-wasm"] } +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/nostr-gossip/README.md b/crates/nostr-gossip/README.md new file mode 100644 index 000000000..ea95c417d --- /dev/null +++ b/crates/nostr-gossip/README.md @@ -0,0 +1,13 @@ +# Nostr Gossip + +## State + +**This library is in an ALPHA state**, things that are implemented generally work but the API will change in breaking ways. + +## Donations + +`rust-nostr` is free and open-source. This means we do not earn any revenue by selling it. Instead, we rely on your financial support. If you actively use any of the `rust-nostr` libs/software/services, then please [donate](https://rust-nostr.org/donate). + +## License + +This project is distributed under the MIT software license - see the [LICENSE](../../LICENSE) file for details diff --git a/crates/nostr-gossip/migrations/001_init.sql b/crates/nostr-gossip/migrations/001_init.sql new file mode 100644 index 000000000..13ad135ac --- /dev/null +++ b/crates/nostr-gossip/migrations/001_init.sql @@ -0,0 +1,51 @@ +-- Database settings +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; +PRAGMA foreign_keys = ON; +PRAGMA user_version = 1; -- Schema version + +CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + public_key BLOB NOT NULL UNIQUE +); + +CREATE TABLE IF NOT EXISTS relays ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + relay_url TEXT NOT NULL UNIQUE +); + +CREATE TABLE IF NOT EXISTS lists ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pkid INTEGER NOT NULL, -- Public Key ID + kind INTEGER NOT NULL, -- Kind of the list (i.e., 10002 for NIP65, 10050 for NIP17) + created_at INTEGER NOT NULL DEFAULT 0, -- Event list created at (`created_at` field of event) + last_check INTEGER NOT NULL DEFAULT 0, -- The timestamp of the last check + FOREIGN KEY(pkid) REFERENCES users(id) ON DELETE CASCADE ON UPDATE NO ACTION +); + +CREATE UNIQUE INDEX IF NOT EXISTS pubkey_list_idx ON lists(pkid,kind); + +CREATE TABLE IF NOT EXISTS relays_by_list ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pkid INTEGER NOT NULL, -- Public Key ID + listid INTEGER NOT NULL, -- List ID + relayid INTEGER NOT NULL, -- Relay ID + metadata TEXT DEFAULT NULL, -- NIP65 metadata: read, write or NULL + FOREIGN KEY(pkid) REFERENCES users(id) ON DELETE CASCADE ON UPDATE NO ACTION, + FOREIGN KEY(listid) REFERENCES lists(id) ON DELETE CASCADE ON UPDATE NO ACTION, + FOREIGN KEY(relayid) REFERENCES relays(id) +); + +CREATE UNIQUE INDEX IF NOT EXISTS pubkey_list_relay_idx ON relays_by_list(pkid,listid,relayid); + +-- CREATE TABLE IF NOT EXISTS tracker ( +-- id INTEGER PRIMARY KEY AUTOINCREMENT, +-- pkid INTEGER NOT NULL, -- Public Key ID +-- relayid INTEGER NOT NULL, -- Relay ID +-- last_event INTEGER NOT NULL DEFAULT 0, -- Timestamp of the last event seen for the public key on the relay +-- score INTEGER NOT NULL DEFAULT 5, -- Score +-- FOREIGN KEY(pkid) REFERENCES users(id) ON DELETE CASCADE ON UPDATE NO ACTION, +-- FOREIGN KEY(relayid) REFERENCES relays(id) +-- ); +-- +-- CREATE UNIQUE INDEX IF NOT EXISTS pubkey_relay_idx ON tracker(pkid,relayid); diff --git a/crates/nostr-gossip/src/constant.rs b/crates/nostr-gossip/src/constant.rs new file mode 100644 index 000000000..ec8198096 --- /dev/null +++ b/crates/nostr-gossip/src/constant.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::num::NonZeroUsize; +use std::time::Duration; + +/// Max number of relays allowed in NIP17/NIP65 lists +pub(crate) const MAX_RELAYS_LIST: usize = 5; +pub(crate) const PUBKEY_METADATA_OUTDATED_AFTER: Duration = Duration::from_secs(60 * 60); // 60 min +pub(crate) const CHECK_OUTDATED_INTERVAL: Duration = Duration::from_secs(60 * 5); // 5 min + +pub(crate) const CACHE_SIZE: Option = NonZeroUsize::new(10_000); diff --git a/crates/nostr-gossip/src/lib.rs b/crates/nostr-gossip/src/lib.rs new file mode 100644 index 000000000..2269ac697 --- /dev/null +++ b/crates/nostr-gossip/src/lib.rs @@ -0,0 +1,808 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +//! Nostr gossip + +#![forbid(unsafe_code)] +#![warn(missing_docs)] +#![warn(rustdoc::bare_urls)] +#![warn(clippy::large_futures)] +#![allow(unknown_lints)] // TODO: remove when MSRV >= 1.72.0, required for `clippy::arc_with_non_send_sync` +#![allow(clippy::arc_with_non_send_sync)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![doc = include_str!("../README.md")] + +use std::borrow::Cow; +use std::collections::{BTreeSet, HashMap, HashSet}; +use std::num::NonZeroUsize; +#[cfg(not(all(target_family = "wasm", target_os = "unknown")))] +use std::path::Path; +#[cfg(not(all(target_family = "wasm", target_os = "unknown")))] +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +#[cfg(not(all(target_family = "wasm", target_os = "unknown")))] +use std::thread; + +use async_utility::task; +use lru::LruCache; +use nostr::prelude::*; +use rusqlite::{Connection, OpenFlags}; +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +use tokio::sync::mpsc; + +mod constant; +pub mod prelude; +mod store; + +use self::constant::{CACHE_SIZE, CHECK_OUTDATED_INTERVAL, PUBKEY_METADATA_OUTDATED_AFTER}; +pub use self::store::error::Error; +use self::store::Store; + +const P_TAG: SingleLetterTag = SingleLetterTag::lowercase(Alphabet::P); +const SQLITE_IN_MEMORY_URI: &str = "file:memdb?mode=memory&cache=shared"; + +/// Broken-down filters +#[derive(Debug)] +pub enum BrokenDownFilters { + /// Filters by url + Filters(HashMap), + /// Filters that match a certain pattern but where no relays are available + Orphan(Filter), + /// Filters that can be sent to read relays (generic query, not related to public keys) + Other(Filter), +} + +#[derive(Debug, Clone)] +struct Pool { + /// Store + store: Arc>, + /// Event ingester + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + tx: mpsc::Sender, + #[cfg(all(target_family = "wasm", target_os = "unknown"))] + tx: mpsc::UnboundedSender, +} + +impl Pool { + fn new(s1: Store, s2: Store) -> Self { + // Create new asynchronous channel + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + let (tx, rx) = mpsc::channel(); + #[cfg(all(target_family = "wasm", target_os = "unknown"))] + let (tx, rx) = mpsc::unbounded_channel(); + + // Spawn ingester with the store and the channel receiver + Self::spawn_ingester(s1, rx); + + Self { + store: Arc::new(Mutex::new(s2)), + tx, + } + } + + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + async fn interact(&self, f: F) -> Result + where + F: FnOnce(&mut Store) -> R + Send + 'static, + R: Send + 'static, + { + let store: Arc> = self.store.clone(); + Ok(task::spawn_blocking(move || { + let mut conn = store.lock().expect("Failed to lock store"); + f(&mut conn) + }) + .await?) + } + + #[cfg(all(target_family = "wasm", target_os = "unknown"))] + async fn interact(&self, f: F) -> Result + where + F: FnOnce(&mut Store) -> R + Send + 'static, + R: Send + 'static, + { + let store: Arc> = self.store.clone(); + Ok(task::spawn(async move { + let mut conn = store.lock().expect("Failed to lock store"); + f(&mut conn) + }) + .join() + .await?) + } + + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + fn spawn_ingester(mut store: Store, rx: mpsc::Receiver) { + thread::spawn(move || { + #[cfg(debug_assertions)] + tracing::debug!("Gossip ingester thread started"); + + let size: NonZeroUsize = CACHE_SIZE.unwrap(); + let mut cache: LruCache = LruCache::new(size); + + // Listen for items + while let Ok(event) = rx.recv() { + // Update cache and check if was already processed + if cache.put(event.id, ()).is_none() { + // Process event + if let Err(e) = store.process_event(&event) { + tracing::error!(error = %e, "Gossip event ingestion failed."); + } + } + } + + #[cfg(debug_assertions)] + tracing::debug!("Gossip ingester thread exited"); + }); + } + + #[cfg(all(target_family = "wasm", target_os = "unknown"))] + fn spawn_ingester(mut store: Store, mut rx: mpsc::UnboundedReceiver) { + task::spawn(async move { + #[cfg(debug_assertions)] + tracing::debug!("Gossip ingester task started"); + + let size: NonZeroUsize = CACHE_SIZE.unwrap(); + let mut cache: LruCache = LruCache::new(size); + + // Listen for items + while let Some(event) = rx.recv().await { + // Update cache and check if was already processed + if cache.put(event.id, ()).is_none() { + // Process event + if let Err(e) = store.process_event(&event) { + tracing::error!(error = %e, "Gossip event ingestion failed."); + } + } + } + + #[cfg(debug_assertions)] + tracing::debug!("Gossip ingester task exited"); + }); + } +} + +/// Gossip tracker +#[derive(Debug, Clone)] +pub struct Gossip { + pool: Pool, +} + +impl Gossip { + /// New in-memory gossip storage + pub fn in_memory() -> Self { + let s1: Connection = Connection::open_with_flags( + SQLITE_IN_MEMORY_URI, + OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_READ_WRITE, + ) + .expect("Failed to open in-memory database"); + let s1: Store = Store::new(s1); + + s1.migrate().expect("Failed to run migrations"); + + let s2: Connection = Connection::open_with_flags( + SQLITE_IN_MEMORY_URI, + OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_READ_WRITE, + ) + .expect("Failed to open in-memory database"); + let s2: Store = Store::new(s2); + + Self { + pool: Pool::new(s1, s2), + } + } + + /// New persistent gossip storage + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + pub async fn persistent

(path: P) -> Result + where + P: AsRef + Send + 'static, + { + let (s1, s2) = task::spawn_blocking(move || { + let path = path.as_ref(); + + let s1: Connection = Connection::open(path)?; + let s1: Store = Store::new(s1); + + s1.migrate()?; + + let s2: Connection = Connection::open(path)?; + let s2: Store = Store::new(s2); + + Ok::<(Store, Store), Error>((s1, s2)) + }) + .await??; + + Ok(Self { + pool: Pool::new(s1, s2), + }) + } + + /// Process an [`Event`] + /// + /// Only the first [`MAX_RELAYS_LIST`] relays will be used for [`Kind::RelayList`] and [`Kind::InboxRelays`] lists. + pub fn process_event(&self, event: Cow) { + // Check if the event can be processed + if event.kind != Kind::RelayList && event.kind != Kind::InboxRelays { + return; + } + + // Send to event ingester + // An event clone may occur here + let _ = self.pool.tx.send(event.into_owned()); + } + + /// Check for what public keys the metadata are outdated or not existent (both for NIP17 and NIP65) + pub async fn check_outdated( + &self, + public_keys: BTreeSet, + kinds: Vec, + ) -> Result, Error> { + self.pool + .interact(move |store| { + let now: Timestamp = Timestamp::now(); + + let mut outdated: HashSet = HashSet::new(); + + for public_key in public_keys.into_iter() { + for kind in kinds.iter().copied() { + let timestamps = store.get_timestamps(&public_key, kind)?; + + if timestamps.last_check + CHECK_OUTDATED_INTERVAL > now { + continue; + } + + // Check if expired + if timestamps.created_at + PUBKEY_METADATA_OUTDATED_AFTER < now { + outdated.insert(public_key); + } + } + } + + Ok(outdated) + }) + .await? + } + + /// Update last check + pub async fn update_last_check( + &self, + public_keys: HashSet, + kinds: Vec, + ) -> Result<(), Error> { + self.pool + .interact(move |store| { + let now: Timestamp = Timestamp::now(); + + for public_key in public_keys.into_iter() { + store.update_last_check(&public_key, &kinds, &now)?; + } + + Ok(()) + }) + .await? + } + + fn get_nip17_relays( + store: &mut Store, + public_keys: &BTreeSet, + ) -> Result, Error> { + let mut urls: HashSet = HashSet::new(); + + for public_key in public_keys.iter() { + if let Some((pkid, listid)) = + store.get_pkid_and_listid(public_key, Kind::InboxRelays)? + { + let relays = store.get_relays_url(pkid, listid)?; + + urls.extend(relays); + } + } + + Ok(urls) + } + + fn get_nip65_relays( + store: &mut Store, + public_keys: &BTreeSet, + metadata: Option, + ) -> Result, Error> { + let mut urls: HashSet = HashSet::new(); + + for public_key in public_keys.iter() { + if let Some((pkid, listid)) = store.get_pkid_and_listid(public_key, Kind::RelayList)? { + let relays = match metadata { + Some(metadata) => { + store.get_nip65_relays_url_by_metadata(pkid, listid, metadata)? + } + None => store.get_relays_url(pkid, listid)?, + }; + + urls.extend(relays); + } + } + + Ok(urls) + } + + fn map_nip17_relays( + store: &mut Store, + public_keys: &BTreeSet, + ) -> Result>, Error> { + let mut urls: HashMap> = HashMap::new(); + + for public_key in public_keys.iter() { + if let Some((pkid, listid)) = + store.get_pkid_and_listid(public_key, Kind::InboxRelays)? + { + let relays = store.get_relays_url(pkid, listid)?; + + for url in relays.into_iter() { + urls.entry(url) + .and_modify(|s| { + s.insert(*public_key); + }) + .or_default() + .insert(*public_key); + } + } + } + + Ok(urls) + } + + fn map_nip65_relays( + store: &mut Store, + public_keys: &BTreeSet, + metadata: RelayMetadata, + ) -> Result>, Error> { + let mut urls: HashMap> = HashMap::new(); + + for public_key in public_keys.iter() { + if let Some((pkid, listid)) = store.get_pkid_and_listid(public_key, Kind::RelayList)? { + let relays = store.get_nip65_relays_url_by_metadata(pkid, listid, metadata)?; + + for url in relays.into_iter() { + urls.entry(url) + .and_modify(|s| { + s.insert(*public_key); + }) + .or_default() + .insert(*public_key); + } + } + } + + Ok(urls) + } + + /// Get outbox (write) relays for public keys + #[inline] + pub async fn get_nip65_outbox_relays( + &self, + public_keys: BTreeSet, + ) -> Result, Error> { + self.pool + .interact(move |store| { + Self::get_nip65_relays(store, &public_keys, Some(RelayMetadata::Write)) + }) + .await? + } + + /// Get inbox (read) relays for public keys + #[inline] + pub async fn get_nip65_inbox_relays( + &self, + public_keys: BTreeSet, + ) -> Result, Error> { + self.pool + .interact(move |store| { + Self::get_nip65_relays(store, &public_keys, Some(RelayMetadata::Read)) + }) + .await? + } + + /// Get NIP17 inbox (read) relays for public keys + #[inline] + pub async fn get_nip17_inbox_relays( + &self, + public_keys: BTreeSet, + ) -> Result, Error> { + self.pool + .interact(move |store| Self::get_nip17_relays(store, &public_keys)) + .await? + } + + /// Map outbox (write) relays for public keys + #[inline] + fn map_nip65_outbox_relays( + store: &mut Store, + public_keys: &BTreeSet, + ) -> Result>, Error> { + Self::map_nip65_relays(store, public_keys, RelayMetadata::Write) + } + + /// Map NIP65 inbox (read) relays for public keys + #[inline] + fn map_nip65_inbox_relays( + store: &mut Store, + public_keys: &BTreeSet, + ) -> Result>, Error> { + Self::map_nip65_relays(store, public_keys, RelayMetadata::Read) + } + + /// Get NIP65 **outbox** + NIP17 relays + async fn map_outbox_relays( + &self, + public_keys: BTreeSet, + ) -> Result>, Error> { + self.pool + .interact(move |store| { + // Get map of outbox relays + let mut relays = Self::map_nip65_outbox_relays(store, &public_keys)?; + + // Extend with NIP17 relays + let nip17 = Self::map_nip17_relays(store, &public_keys)?; + relays.extend(nip17); + + Ok::<_, Error>(relays) + }) + .await? + } + + /// Get NIP65 **inbox** + NIP17 relays + async fn map_inbox_relays( + &self, + public_keys: BTreeSet, + ) -> Result>, Error> { + self.pool + .interact(move |store| { + // Get map of inbox relays + let mut relays = Self::map_nip65_inbox_relays(store, &public_keys)?; + + // Extend with NIP17 relays + let nip17 = Self::map_nip17_relays(store, &public_keys)?; + relays.extend(nip17); + + Ok::<_, Error>(relays) + }) + .await? + } + + /// Get NIP65 + NIP17 relays + async fn get_relays( + &self, + public_keys: BTreeSet, + ) -> Result, Error> { + self.pool + .interact(move |store| { + // Get map of outbox and inbox relays + let mut relays: HashSet = + Self::get_nip65_relays(store, &public_keys, None)?; + + // Extend with NIP17 relays + let nip17 = Self::get_nip17_relays(store, &public_keys)?; + relays.extend(nip17); + + Ok::<_, Error>(relays) + }) + .await? + } + + /// Break down filters + /// + /// The additional relays will always be used + pub async fn break_down_filter( + &self, + filter: Filter, + additional_relays: I, + ) -> Result + where + I: IntoIterator, + { + // Extract `p` tag from generic tags and parse public key hex + let p_tag: Option> = filter.generic_tags.get(&P_TAG).map(|s| { + s.iter() + .filter_map(|p| PublicKey::from_hex(p).ok()) + .collect() + }); + + // Match pattern + match (filter.authors.as_ref().cloned(), p_tag) { + (Some(authors), None) => { + let additional_relays: HashMap> = additional_relays + .into_iter() + .map(|r| (r, authors.clone())) + .collect(); + + let mut outbox: HashMap> = + self.map_outbox_relays(authors).await?; + + // Extend with additional relays + outbox.extend(additional_relays); + + // No relay available for the authors + if outbox.is_empty() { + return Ok(BrokenDownFilters::Orphan(filter)); + } + + let mut map: HashMap = HashMap::with_capacity(outbox.len()); + + // Construct new filters + for (relay, pk_set) in outbox.into_iter() { + // Clone filter and change authors + let mut new_filter: Filter = filter.clone(); + new_filter.authors = Some(pk_set); + + // Update map + map.insert(relay, new_filter); + } + + Ok(BrokenDownFilters::Filters(map)) + } + (None, Some(p_public_keys)) => { + let additional_relays: HashMap> = additional_relays + .into_iter() + .map(|r| (r, p_public_keys.clone())) + .collect(); + + let mut inbox: HashMap> = + self.map_inbox_relays(p_public_keys).await?; + + // Extend with additional relays + inbox.extend(additional_relays); + + // No relay available for the p tags + if inbox.is_empty() { + return Ok(BrokenDownFilters::Orphan(filter)); + } + + let mut map: HashMap = HashMap::with_capacity(inbox.len()); + + // Construct new filters + for (relay, pk_set) in inbox.into_iter() { + // Clone filter and change p tags + let mut new_filter: Filter = filter.clone(); + new_filter + .generic_tags + .insert(P_TAG, pk_set.into_iter().map(|p| p.to_string()).collect()); + + // Update map + map.insert(relay, new_filter); + } + + Ok(BrokenDownFilters::Filters(map)) + } + (Some(authors), Some(p_public_keys)) => { + let mut union: BTreeSet = authors; + union.extend(p_public_keys); + + let mut relays: HashSet = self.get_relays(union).await?; + + // Extend with additional relays + relays.extend(additional_relays); + + // No relay available for the authors and p tags + if relays.is_empty() { + return Ok(BrokenDownFilters::Orphan(filter)); + } + + let mut map: HashMap = HashMap::with_capacity(relays.len()); + + for relay in relays.into_iter() { + // Update map + map.insert(relay, filter.clone()); + } + + Ok(BrokenDownFilters::Filters(map)) + } + // Nothing to do, add to `other` list + (None, None) => Ok(BrokenDownFilters::Other(filter)), + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + const SECRET_KEY_A: &str = "nsec1j4c6269y9w0q2er2xjw8sv2ehyrtfxq3jwgdlxj6qfn8z4gjsq5qfvfk99"; // aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4 + const SECRET_KEY_B: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"; // 79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3 + + const KEY_A_RELAYS: [(&str, Option); 4] = [ + ("wss://relay.damus.io", None), + ("wss://relay.nostr.bg", None), + ("wss://nos.lol", Some(RelayMetadata::Write)), + ("wss://nostr.mom", Some(RelayMetadata::Read)), + ]; + + const KEY_B_RELAYS: [(&str, Option); 4] = [ + ("wss://relay.damus.io", Some(RelayMetadata::Write)), + ("wss://relay.nostr.info", None), + ("wss://relay.rip", Some(RelayMetadata::Write)), + ("wss://relay.snort.social", Some(RelayMetadata::Read)), + ]; + + fn build_relay_list_event( + secret_key: &str, + relays: Vec<(&str, Option)>, + ) -> Event { + let keys = Keys::parse(secret_key).unwrap(); + let list = relays + .into_iter() + .filter_map(|(url, m)| Some((RelayUrl::parse(url).ok()?, m))); + EventBuilder::relay_list(list) + .sign_with_keys(&keys) + .unwrap() + } + + async fn setup_graph() -> Gossip { + let graph = Gossip::in_memory(); + + let events = vec![ + build_relay_list_event(SECRET_KEY_A, KEY_A_RELAYS.to_vec()), + build_relay_list_event(SECRET_KEY_B, KEY_B_RELAYS.to_vec()), + ]; + + for event in events { + graph.process_event(Cow::Owned(event)); + } + + // Wait to allow to process events + tokio::time::sleep(Duration::from_secs(1)).await; + + graph + } + + #[tokio::test] + async fn test_break_down_filter() { + let keys_a = Keys::parse(SECRET_KEY_A).unwrap(); + let keys_b = Keys::parse(SECRET_KEY_B).unwrap(); + + let damus_url = RelayUrl::parse("wss://relay.damus.io").unwrap(); + let nostr_bg_url = RelayUrl::parse("wss://relay.nostr.bg").unwrap(); + let nos_lol_url = RelayUrl::parse("wss://nos.lol").unwrap(); + let nostr_mom_url = RelayUrl::parse("wss://nostr.mom").unwrap(); + let nostr_info_url = RelayUrl::parse("wss://relay.nostr.info").unwrap(); + let relay_rip_url = RelayUrl::parse("wss://relay.rip").unwrap(); + let snort_url = RelayUrl::parse("wss://relay.snort.social").unwrap(); + + let graph = setup_graph().await; + + // Single author + let filter = Filter::new().author(keys_a.public_key); + match graph + .break_down_filter(filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); + assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); + assert!(!map.contains_key(&nostr_mom_url)); + } + _ => panic!("Expected filters"), + } + + // Single author with additional relays + let filter = Filter::new().author(keys_a.public_key); + match graph + .break_down_filter(filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); + assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); + assert!(!map.contains_key(&nostr_mom_url)); + } + _ => panic!("Expected filters"), + } + + // Multiple authors + let additional_relay = RelayUrl::parse("wss://relay.example.com").unwrap(); + let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]); + match graph + .break_down_filter( + authors_filter.clone(), + HashSet::from([additional_relay.clone()]), + ) + .await + .unwrap() + { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &authors_filter); + assert_eq!( + map.get(&nostr_bg_url).unwrap(), + &Filter::new().author(keys_a.public_key) + ); + assert_eq!( + map.get(&nos_lol_url).unwrap(), + &Filter::new().author(keys_a.public_key) + ); + assert!(!map.contains_key(&nostr_mom_url)); + assert_eq!( + map.get(&nostr_info_url).unwrap(), + &Filter::new().author(keys_b.public_key) + ); + assert_eq!( + map.get(&relay_rip_url).unwrap(), + &Filter::new().author(keys_b.public_key) + ); + assert_eq!(map.get(&additional_relay).unwrap(), &authors_filter); + assert!(!map.contains_key(&snort_url)); + } + _ => panic!("Expected filters"), + } + + // Other filter + let search_filter = Filter::new().search("Test").limit(10); + match graph + .break_down_filter(search_filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Other(filter) => { + assert_eq!(filter, search_filter); + } + _ => panic!("Expected other"), + } + + // Single p tags + let p_tag_filter = Filter::new().pubkey(keys_a.public_key); + match graph + .break_down_filter(p_tag_filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &p_tag_filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &p_tag_filter); + assert_eq!(map.get(&nostr_mom_url).unwrap(), &p_tag_filter); + assert!(!map.contains_key(&nos_lol_url)); + assert!(!map.contains_key(&nostr_info_url)); + assert!(!map.contains_key(&relay_rip_url)); + assert!(!map.contains_key(&snort_url)); + } + _ => panic!("Expected filters"), + } + + // Both author and p tag + let filter = Filter::new() + .author(keys_a.public_key) + .pubkey(keys_b.public_key); + match graph + .break_down_filter(filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); + assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_mom_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_info_url).unwrap(), &filter); + assert_eq!(map.get(&relay_rip_url).unwrap(), &filter); + assert_eq!(map.get(&snort_url).unwrap(), &filter); + } + _ => panic!("Expected filters"), + } + + // test orphan filters + let random_keys = Keys::generate(); + let filter = Filter::new().author(random_keys.public_key); + match graph + .break_down_filter(filter.clone(), HashSet::new()) + .await + .unwrap() + { + BrokenDownFilters::Orphan(f) => { + assert_eq!(f, filter); + } + _ => panic!("Expected filters"), + } + } +} diff --git a/crates/nostr-gossip/src/prelude.rs b/crates/nostr-gossip/src/prelude.rs new file mode 100644 index 000000000..53ad21855 --- /dev/null +++ b/crates/nostr-gossip/src/prelude.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +//! Prelude + +#![allow(unknown_lints)] +#![allow(ambiguous_glob_reexports)] +#![doc(hidden)] + +// External crates +pub use nostr::prelude::*; + +// Internal modules +pub use crate::*; diff --git a/crates/nostr-gossip/src/store/error.rs b/crates/nostr-gossip/src/store/error.rs new file mode 100644 index 000000000..4a6e632b2 --- /dev/null +++ b/crates/nostr-gossip/src/store/error.rs @@ -0,0 +1,70 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::fmt; + +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +use async_utility::task::Error as JoinError; +#[cfg(not(all(target_family = "wasm", target_os = "unknown")))] +use async_utility::tokio::task::JoinError; +use nostr::types::url; +use rusqlite::types::FromSqlError; + +/// Store error +#[derive(Debug)] +pub enum Error { + /// Sqlite error + Sqlite(rusqlite::Error), + /// Pool error + Thread(JoinError), + /// From SQL error + FromSql(FromSqlError), + /// Not found + RelayUrl(url::Error), + /// Migration error + NewerDbVersion { + /// Current version + current: usize, + /// Other version + other: usize, + }, +} + +impl std::error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Sqlite(e) => write!(f, "{e}"), + Self::Thread(e) => write!(f, "{e}"), + Self::FromSql(e) => write!(f, "{e}"), + Self::RelayUrl(e) => write!(f, "{e}"), + Self::NewerDbVersion { current, other } => write!(f, "Database version is newer than supported by this executable (v{current} > v{other})"), + } + } +} + +impl From for Error { + fn from(err: rusqlite::Error) -> Self { + Self::Sqlite(err) + } +} + +impl From for Error { + fn from(err: JoinError) -> Self { + Self::Thread(err) + } +} + +impl From for Error { + fn from(err: FromSqlError) -> Self { + Self::FromSql(err) + } +} + +impl From for Error { + fn from(e: url::Error) -> Self { + Self::RelayUrl(e) + } +} diff --git a/crates/nostr-gossip/src/store/migrate.rs b/crates/nostr-gossip/src/store/migrate.rs new file mode 100644 index 000000000..89f656ca7 --- /dev/null +++ b/crates/nostr-gossip/src/store/migrate.rs @@ -0,0 +1,86 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::cmp::Ordering; + +use rusqlite::Connection; + +use super::Error; + +/// Latest database version +pub const DB_VERSION: usize = 1; + +/// Startup DB Pragmas +pub const STARTUP_SQL: &str = r##" +PRAGMA main.synchronous=NORMAL; +PRAGMA foreign_keys = ON; +PRAGMA journal_size_limit=32768; +pragma mmap_size = 17179869184; -- cap mmap at 16GB +"##; + +/// Determine the current application database schema version. +pub fn curr_db_version(conn: &Connection) -> Result { + let query = "PRAGMA user_version;"; + let curr_version = conn.query_row(query, [], |row| row.get(0))?; + Ok(curr_version) +} + +/// Upgrade DB to latest version, and execute pragma settings +pub(crate) fn run(conn: &Connection) -> Result<(), Error> { + // check the version. + let mut curr_version = curr_db_version(conn)?; + tracing::info!("DB version = {:?}", curr_version); + + match curr_version.cmp(&DB_VERSION) { + // Database version is new or not current + Ordering::Less => { + // initialize from scratch + if curr_version == 0 { + curr_version = mig_init(conn)?; + } + + // for initialized but out-of-date schemas, proceed to + // upgrade sequentially until we are current. + // if curr_version == 1 { + // curr_version = mig_1_to_2(conn)?; + // } + // + // if curr_version == 2 { + // curr_version = mig_2_to_3(conn)?; + // } + + if curr_version == DB_VERSION { + tracing::info!("All migration scripts completed successfully (v{DB_VERSION})"); + } + } + // Same database version + Ordering::Equal => { + tracing::debug!("Database version was already current (v{DB_VERSION})"); + } + // Database version is newer than what this code understands, return error + Ordering::Greater => { + return Err(Error::NewerDbVersion { + current: curr_version, + other: DB_VERSION, + }); + } + } + + // Setup PRAGMA + conn.execute_batch(STARTUP_SQL)?; + tracing::debug!("SQLite PRAGMA startup completed"); + Ok(()) +} + +fn mig_init(conn: &Connection) -> Result { + conn.execute_batch(include_str!("../../migrations/001_init.sql"))?; + tracing::info!("database schema initialized to v1"); + Ok(1) +} + +// fn mig_1_to_2(conn: &mut Connection) -> Result { +// conn.execute_batch(include_str!("002_notifications.sql"))?; +// tracing::info!("database schema upgraded v1 -> v2"); +// Ok(2) +// } diff --git a/crates/nostr-gossip/src/store/mod.rs b/crates/nostr-gossip/src/store/mod.rs new file mode 100644 index 000000000..3269778c1 --- /dev/null +++ b/crates/nostr-gossip/src/store/mod.rs @@ -0,0 +1,282 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::HashSet; + +use nostr::nips::nip65::RelayMetadata; +use nostr::nips::{nip17, nip65}; +use nostr::{Event, Kind, PublicKey, RelayUrl, Timestamp}; +use rusqlite::{Connection, OptionalExtension, Rows, Transaction}; + +pub(super) mod error; +mod migrate; + +use self::error::Error; +use crate::constant::MAX_RELAYS_LIST; + +pub(super) struct ListTimestamps { + pub(super) created_at: Timestamp, + pub(super) last_check: Timestamp, +} + +#[derive(Debug)] +pub struct Store { + conn: Connection, +} + +impl Store { + #[inline] + pub fn new(conn: Connection) -> Self { + Self { conn } + } + + #[inline] + pub(super) fn migrate(&self) -> Result<(), Error> { + migrate::run(&self.conn) + } + + pub(super) fn get_pkid_and_listid( + &mut self, + pk: &PublicKey, + kind: Kind, + ) -> Result, Error> { + let tx = self.conn.transaction()?; + + let mut res: Option<(u64, u64)> = None; + + if let Some(pkid) = get_pkid(&tx, pk)? { + if let Some(listid) = get_list_id(&tx, pkid, kind)? { + res = Some((pkid, listid)); + } + } + + tx.commit()?; + + Ok(res) + } + + pub(super) fn update_last_check( + &mut self, + public_key: &PublicKey, + kinds: &[Kind], + last_check: &Timestamp, + ) -> Result<(), Error> { + let tx = self.conn.transaction()?; + + insert_public_key(&tx, public_key)?; + let pkid: u64 = get_pkid(&tx, public_key)?.expect("The public key ID must exist"); + + { + let mut stmt = + tx.prepare_cached("UPDATE lists SET last_check = ?1 WHERE pkid=?2 AND kind=?3")?; + for kind in kinds { + stmt.execute((last_check.as_u64(), pkid, kind.as_u16()))?; + } + } + + tx.commit()?; + + Ok(()) + } + + pub(super) fn get_timestamps( + &mut self, + public_key: &PublicKey, + kind: Kind, + ) -> Result { + let tx = self.conn.transaction()?; + + insert_public_key(&tx, public_key)?; + let pkid: u64 = get_pkid(&tx, public_key)?.expect("The public key ID must exist"); + + insert_list(&tx, pkid, kind)?; + let listid: u64 = get_list_id(&tx, pkid, kind)?.expect("The list ID must exist"); + + let (created_at, last_check): (u64, u64) = { + let mut stmt = tx.prepare_cached( + "SELECT created_at, last_check FROM lists WHERE id=?1 AND pkid=?2", + )?; + stmt.query_row([listid, pkid], |row| Ok((row.get(0)?, row.get(1)?)))? + }; + + tx.commit()?; + + Ok(ListTimestamps { + created_at: Timestamp::from_secs(created_at), + last_check: Timestamp::from_secs(last_check), + }) + } + + /// Get relay URLs related to the list + pub(super) fn get_relays_url( + &self, + pkid: u64, + list_id: u64, + ) -> Result, Error> { + let mut stmt = self.conn.prepare_cached("SELECT r.relay_url FROM relays_by_list AS rbl JOIN relays AS r ON r.id = rbl.relayid WHERE rbl.pkid = ?1 AND rbl.listid = ?2;")?; + let rows = stmt.query([pkid, list_id])?; + extract_relay_urls(rows) + } + + pub(super) fn get_nip65_relays_url_by_metadata( + &self, + pkid: u64, + list_id: u64, + metadata: RelayMetadata, + ) -> Result, Error> { + let mut stmt = self.conn.prepare_cached("SELECT r.relay_url FROM relays_by_list AS rbl JOIN relays AS r ON r.id = rbl.relayid WHERE rbl.pkid = ?1 AND rbl.listid = ?2 AND (metadata = ?3 OR metadata IS NULL);")?; + let rows = stmt.query((pkid, list_id, metadata.as_str()))?; + extract_relay_urls(rows) + } + + pub(super) fn process_event(&mut self, event: &Event) -> Result<(), Error> { + // Begin a transaction on the underlying connection + let tx = self.conn.transaction()?; + + insert_public_key(&tx, &event.pubkey)?; + let pkid: u64 = get_pkid(&tx, &event.pubkey)?.expect("The public key ID must exist"); + + insert_list(&tx, pkid, event.kind)?; + let listid: u64 = get_list_id(&tx, pkid, event.kind)?.expect("The list ID must exist"); + + let created_at: Timestamp = get_created_at(&tx, pkid, event.kind)?; + + // Check if can update + if created_at > event.created_at { + return Ok(()); + } + + // Delete relays + delete_relays_by_list(&tx, pkid, listid)?; + + match event.kind { + Kind::RelayList => { + let iter = nip65::extract_relay_list(event).take(MAX_RELAYS_LIST); + + for (relay_url, metadata) in iter { + insert_relay(&tx, relay_url)?; + let relayid: u64 = get_relay_id(&tx, relay_url)?; + + insert_relay_for_list(&tx, pkid, listid, relayid, *metadata)?; + } + } + Kind::InboxRelays => { + let iter = nip17::extract_relay_list(event).take(MAX_RELAYS_LIST); + + for relay_url in iter { + insert_relay(&tx, relay_url)?; + let relayid: u64 = get_relay_id(&tx, relay_url)?; + + insert_relay_for_list(&tx, pkid, listid, relayid, None)?; + } + } + _ => {} + } + + update_created_at(&tx, pkid, event.kind, &event.created_at)?; + + // Commit + tx.commit()?; + + Ok(()) + } +} + +fn insert_public_key(tx: &Transaction, public_key: &PublicKey) -> Result<(), Error> { + let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO users (public_key) VALUES (?1)")?; + stmt.execute([public_key.as_bytes()])?; + Ok(()) +} + +/// Get public key ID +fn get_pkid(tx: &Transaction, public_key: &PublicKey) -> Result, Error> { + let mut stmt = tx.prepare_cached("SELECT id FROM users WHERE public_key=?1")?; + let row_id: Option = stmt + .query_row([public_key.as_bytes()], |row| row.get(0)) + .optional()?; + Ok(row_id) +} + +fn insert_relay(tx: &Transaction, relay_url: &RelayUrl) -> Result<(), Error> { + let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO relays (relay_url) VALUES (?1)")?; + stmt.execute([relay_url.as_str_without_trailing_slash()])?; + Ok(()) +} + +/// Get relay ID +fn get_relay_id(tx: &Transaction, relay_url: &RelayUrl) -> Result { + let mut stmt = tx.prepare_cached("SELECT id FROM relays WHERE relay_url=?1")?; + let row_id: u64 = stmt.query_row([relay_url.as_str_without_trailing_slash()], |row| { + row.get(0) + })?; + Ok(row_id) +} + +fn insert_list(tx: &Transaction, pkid: u64, kind: Kind) -> Result<(), Error> { + let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO lists (pkid, kind) VALUES (?1, ?2)")?; + stmt.execute((pkid, kind.as_u16()))?; + Ok(()) +} + +/// Get list ID +fn get_list_id(tx: &Transaction, pkid: u64, kind: Kind) -> Result, Error> { + let mut stmt = tx.prepare_cached("SELECT id FROM lists WHERE pkid=?1 AND kind=?2")?; + let row_id: Option = stmt + .query_row((pkid, kind.as_u16()), |row| row.get(0)) + .optional()?; + Ok(row_id) +} + +fn get_created_at(tx: &Transaction, pkid: u64, kind: Kind) -> Result { + let mut stmt = tx.prepare_cached("SELECT created_at FROM lists WHERE pkid=?1 AND kind=?2")?; + let timestamp: u64 = stmt.query_row((pkid, kind.as_u16()), |row| row.get(0))?; + Ok(Timestamp::from_secs(timestamp)) +} + +fn update_created_at( + tx: &Transaction, + pkid: u64, + kind: Kind, + created_at: &Timestamp, +) -> Result<(), Error> { + let mut stmt = + tx.prepare_cached("UPDATE lists SET created_at = ?1 WHERE pkid=?2 AND kind=?3")?; + stmt.execute((created_at.as_u64(), pkid, kind.as_u16()))?; + Ok(()) +} + +fn insert_relay_for_list( + tx: &Transaction, + pkid: u64, + list_id: u64, + relay_id: u64, + metadata: Option, +) -> Result<(), Error> { + let metadata: Option<&str> = metadata.as_ref().map(|m| m.as_str()); + + let mut stmt = tx.prepare_cached( + "INSERT INTO relays_by_list (pkid, listid, relayid, metadata) VALUES (?1, ?2, ?3, ?4)", + )?; + stmt.execute((pkid, list_id, relay_id, metadata))?; + + Ok(()) +} + +fn delete_relays_by_list(tx: &Transaction, pkid: u64, list_id: u64) -> Result<(), Error> { + let mut stmt = tx.prepare_cached("DELETE FROM relays_by_list WHERE pkid=?1 AND listid=?2")?; + stmt.execute((pkid, list_id))?; + Ok(()) +} + +fn extract_relay_urls(mut rows: Rows) -> Result, Error> { + let mut relays = HashSet::new(); + + while let Some(row) = rows.next()? { + let relay_url: &str = row.get_ref(0)?.as_str()?; + let relay_url: RelayUrl = RelayUrl::parse(relay_url)?; + relays.insert(relay_url); + } + + Ok(relays) +} diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 78e65bbab..0a8c98e68 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = [] +gossip = ["dep:nostr-gossip"] tor = ["nostr-relay-pool/tor"] lmdb = ["dep:nostr-lmdb"] ndb = ["dep:nostr-ndb"] @@ -40,6 +41,7 @@ nip98 = ["nostr/nip98"] async-utility.workspace = true nostr = { workspace = true, features = ["std"] } nostr-database.workspace = true +nostr-gossip = { workspace = true, optional = true } nostr-relay-pool.workspace = true tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true, features = ["std", "attributes"] } @@ -74,7 +76,7 @@ name = "fetch-events" [[example]] name = "gossip" -required-features = ["all-nips"] +required-features = ["all-nips", "gossip"] [[example]] name = "nostr-connect" diff --git a/crates/nostr-sdk/README.md b/crates/nostr-sdk/README.md index 70302e0c2..d7da1f5c3 100644 --- a/crates/nostr-sdk/README.md +++ b/crates/nostr-sdk/README.md @@ -104,6 +104,7 @@ The following crate feature flags are available: | Feature | Default | Description | |-------------|:-------:|----------------------------------------------------------------------------------------------| +| `gossip` | No | Enable gossip support | | `tor` | No | Enable support for embedded tor client | | `lmdb` | No | Enable LMDB storage backend | | `ndb` | No | Enable [nostrdb](https://github.com/damus-io/nostrdb) storage backend | diff --git a/crates/nostr-sdk/examples/bot.rs b/crates/nostr-sdk/examples/bot.rs index ff22a2f2d..bc0e20296 100644 --- a/crates/nostr-sdk/examples/bot.rs +++ b/crates/nostr-sdk/examples/bot.rs @@ -9,10 +9,7 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let keys = Keys::parse("nsec12kcgs78l06p30jz7z7h3n2x2cy99nw2z6zspjdp7qc206887mwvs95lnkx")?; - let client = Client::builder() - .signer(keys.clone()) - .opts(Options::new().gossip(true)) - .build(); + let client = Client::builder().signer(keys.clone()).build(); println!("Bot public key: {}", keys.public_key().to_bech32()?); diff --git a/crates/nostr-sdk/examples/gossip.rs b/crates/nostr-sdk/examples/gossip.rs index a18a14617..b2bbfcdd6 100644 --- a/crates/nostr-sdk/examples/gossip.rs +++ b/crates/nostr-sdk/examples/gossip.rs @@ -11,8 +11,8 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let keys = Keys::parse("nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85")?; - let opts = Options::new().gossip(true); - let client = Client::builder().signer(keys).opts(opts).build(); + let gossip = Gossip::persistent("./db/gossip.db").await?; + let client = Client::builder().signer(keys).gossip(gossip).build(); client.add_discovery_relay("wss://relay.damus.io").await?; client.add_discovery_relay("wss://purplepag.es").await?; diff --git a/crates/nostr-sdk/src/client/builder.rs b/crates/nostr-sdk/src/client/builder.rs index b1327dade..fad5ca7d6 100644 --- a/crates/nostr-sdk/src/client/builder.rs +++ b/crates/nostr-sdk/src/client/builder.rs @@ -9,6 +9,8 @@ use std::sync::Arc; use nostr::signer::{IntoNostrSigner, NostrSigner}; use nostr_database::memory::MemoryDatabase; use nostr_database::{IntoNostrDatabase, NostrDatabase}; +#[cfg(feature = "gossip")] +use nostr_gossip::Gossip; use nostr_relay_pool::policy::AdmitPolicy; use nostr_relay_pool::transport::websocket::{ DefaultWebsocketTransport, IntoWebSocketTransport, WebSocketTransport, @@ -27,6 +29,9 @@ pub struct ClientBuilder { pub admit_policy: Option>, /// Database pub database: Arc, + /// Gossip tracker + #[cfg(feature = "gossip")] + pub gossip: Option, /// Client options pub opts: Options, } @@ -38,6 +43,8 @@ impl Default for ClientBuilder { websocket_transport: Arc::new(DefaultWebsocketTransport), admit_policy: None, database: Arc::new(MemoryDatabase::default()), + #[cfg(feature = "gossip")] + gossip: None, opts: Options::default(), } } @@ -101,6 +108,14 @@ impl ClientBuilder { self } + /// Set gossip tracker + #[inline] + #[cfg(feature = "gossip")] + pub fn gossip(mut self, gossip: Gossip) -> Self { + self.gossip = Some(gossip); + self + } + /// Set opts #[inline] pub fn opts(mut self, opts: Options) -> Self { diff --git a/crates/nostr-sdk/src/client/error.rs b/crates/nostr-sdk/src/client/error.rs index 10e70d3d4..a5bd451f3 100644 --- a/crates/nostr-sdk/src/client/error.rs +++ b/crates/nostr-sdk/src/client/error.rs @@ -7,6 +7,8 @@ use std::fmt; use nostr::prelude::*; use nostr::serde_json; use nostr_database::prelude::*; +#[cfg(feature = "gossip")] +use nostr_gossip as gossip; use nostr_relay_pool::__private::SharedStateError; use nostr_relay_pool::prelude::*; @@ -25,6 +27,9 @@ pub enum Error { EventBuilder(event::builder::Error), /// Json error Json(serde_json::Error), + /// Gossip error + #[cfg(feature = "gossip")] + Gossip(gossip::Error), /// Shared state error SharedState(SharedStateError), /// NIP59 @@ -51,6 +56,8 @@ impl fmt::Display for Error { Self::Signer(e) => write!(f, "{e}"), Self::EventBuilder(e) => write!(f, "{e}"), Self::Json(e) => write!(f, "{e}"), + #[cfg(feature = "gossip")] + Self::Gossip(e) => write!(f, "{e}"), Self::SharedState(e) => write!(f, "{e}"), #[cfg(feature = "nip59")] Self::NIP59(e) => write!(f, "{e}"), @@ -104,6 +111,13 @@ impl From for Error { } } +#[cfg(feature = "gossip")] +impl From for Error { + fn from(e: gossip::Error) -> Self { + Self::Gossip(e) + } +} + impl From for Error { fn from(e: SharedStateError) -> Self { Self::SharedState(e) diff --git a/crates/nostr-sdk/src/client/middleware.rs b/crates/nostr-sdk/src/client/middleware.rs new file mode 100644 index 000000000..28f372115 --- /dev/null +++ b/crates/nostr-sdk/src/client/middleware.rs @@ -0,0 +1,40 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2025 Rust Nostr Developers +// Distributed under the MIT software license + +use std::borrow::Cow; +use std::sync::Arc; + +use nostr::util::BoxedFuture; +use nostr::{Event, RelayUrl, SubscriptionId}; +use nostr_gossip::Gossip; +use nostr_relay_pool::policy::{AdmitPolicy, AdmitStatus, PolicyError}; + +#[derive(Debug)] +pub(super) struct Middleware { + pub(super) gossip: Option, + pub(super) external_policy: Option>, +} + +impl AdmitPolicy for Middleware { + fn admit_event<'a>( + &'a self, + relay_url: &'a RelayUrl, + subscription_id: &'a SubscriptionId, + event: &'a Event, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + if let Some(gossip) = &self.gossip { + gossip.process_event(Cow::Borrowed(event)); + } + + if let Some(external_policy) = &self.external_policy { + return external_policy + .admit_event(relay_url, subscription_id, event) + .await; + } + + Ok(AdmitStatus::Success) + }) + } +} diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 5cba4ddf8..90b59edb1 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -4,33 +4,44 @@ //! Client -use std::collections::{HashMap, HashSet}; +#[cfg(feature = "gossip")] +use std::borrow::Cow; +use std::collections::HashMap; +#[cfg(feature = "gossip")] +use std::collections::{BTreeSet, HashSet}; use std::future::Future; +#[cfg(feature = "gossip")] use std::iter; use std::sync::Arc; use std::time::Duration; use nostr::prelude::*; use nostr_database::prelude::*; +#[cfg(feature = "gossip")] +use nostr_gossip::prelude::*; use nostr_relay_pool::prelude::*; use tokio::sync::broadcast; pub mod builder; mod error; +#[cfg(feature = "gossip")] +mod middleware; pub mod options; pub use self::builder::ClientBuilder; pub use self::error::Error; +#[cfg(feature = "gossip")] +use self::middleware::Middleware; pub use self::options::Options; #[cfg(not(target_arch = "wasm32"))] pub use self::options::{Connection, ConnectionTarget}; -use crate::gossip::{BrokenDownFilters, Gossip}; /// Nostr client #[derive(Debug, Clone)] pub struct Client { pool: RelayPool, - gossip: Gossip, + #[cfg(feature = "gossip")] + gossip: Option, opts: Options, } @@ -65,13 +76,10 @@ impl Client { /// /// # Example /// ```rust,no_run - /// use std::time::Duration; - /// /// use nostr_sdk::prelude::*; /// /// let signer = Keys::generate(); - /// let opts = Options::default().gossip(true); - /// let client: Client = Client::builder().signer(signer).opts(opts).build(); + /// let client: Client = Client::builder().signer(signer).build(); /// ``` #[inline] pub fn builder() -> ClientBuilder { @@ -79,10 +87,20 @@ impl Client { } fn from_builder(builder: ClientBuilder) -> Self { + // Middleware + #[cfg(feature = "gossip")] + let policy = Middleware { + gossip: builder.gossip.clone(), + external_policy: builder.admit_policy, + }; + // Construct relay pool builder let pool_builder: RelayPoolBuilder = RelayPoolBuilder { websocket_transport: builder.websocket_transport, + #[cfg(not(feature = "gossip"))] admit_policy: builder.admit_policy, + #[cfg(feature = "gossip")] + admit_policy: Some(Arc::new(policy)), opts: builder.opts.pool, __database: builder.database, __signer: builder.signer, @@ -91,7 +109,8 @@ impl Client { // Construct client Self { pool: pool_builder.build(), - gossip: Gossip::new(), + #[cfg(feature = "gossip")] + gossip: builder.gossip, opts: builder.opts, } } @@ -156,6 +175,13 @@ impl Client { self.pool.database() } + /// Get gossip instance + #[inline] + #[cfg(feature = "gossip")] + pub fn gossip(&self) -> Option<&Gossip> { + self.gossip.as_ref() + } + /// Reset the client /// /// This method resets the client to simplify the switch to another account. @@ -178,7 +204,7 @@ impl Client { /// Completely shutdown client #[inline] pub async fn shutdown(&self) { - self.pool.shutdown().await + self.pool.shutdown().await; } /// Get new notification listener @@ -349,6 +375,7 @@ impl Client { } #[inline] + #[cfg(feature = "gossip")] async fn add_gossip_relay(&self, url: U) -> Result where U: TryIntoUrl, @@ -576,11 +603,12 @@ impl Client { ) -> Result, Error> { let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts); - if self.opts.gossip { - self.gossip_subscribe(id, filter, opts).await - } else { - Ok(self.pool.subscribe_with_id(id, filter, opts).await?) + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + return self.gossip_subscribe(gossip, id, filter, opts).await; } + + Ok(self.pool.subscribe_with_id(id, filter, opts).await?) } /// Subscribe to filters to specific relays @@ -674,8 +702,9 @@ impl Client { filter: Filter, opts: &SyncOptions, ) -> Result, Error> { - if self.opts.gossip { - return self.gossip_sync_negentropy(filter, opts).await; + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + return self.gossip_sync_negentropy(gossip, filter, opts).await; } Ok(self.pool.sync(filter, opts).await?) @@ -730,9 +759,10 @@ impl Client { /// # } /// ``` pub async fn fetch_events(&self, filter: Filter, timeout: Duration) -> Result { - if self.opts.gossip { + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { return self - .gossip_fetch_events(filter, timeout, ReqExitPolicy::ExitOnEOSE) + .gossip_fetch_events(gossip, filter, timeout, ReqExitPolicy::ExitOnEOSE) .await; } @@ -843,16 +873,17 @@ impl Client { filter: Filter, timeout: Duration, ) -> Result, Error> { - // Check if gossip is enabled - if self.opts.gossip { - self.gossip_stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE) - .await - } else { - Ok(self - .pool - .stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE) - .await?) + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + return self + .gossip_stream_events(gossip, filter, timeout, ReqExitPolicy::ExitOnEOSE) + .await; } + + Ok(self + .pool + .stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE) + .await?) } /// Stream events from specific relays @@ -941,16 +972,17 @@ impl Client { /// - the gossip data will be updated, if the [`Event`] is a NIP17/NIP65 relay list. #[inline] pub async fn send_event(&self, event: &Event) -> Result, Error> { - // NOT gossip, send event to all relays - if !self.opts.gossip { - return Ok(self.pool.send_event(event).await?); - } + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + // Update gossip graph + gossip.process_event(Cow::Borrowed(event)); - // Update gossip graph - self.gossip.process_event(event).await; + // Send event using gossip + return self.gossip_send_event(gossip, event, false).await; + } - // Send event using gossip - self.gossip_send_event(event, false).await + // NOT gossip, send event to all relays + Ok(self.pool.send_event(event).await?) } /// Send event to specific relays @@ -970,9 +1002,10 @@ impl Client { U: TryIntoUrl, pool::Error: From<::Err>, { - // If gossip is enabled, update the gossip graph - if self.opts.gossip { - self.gossip.process_event(event).await; + // If gossip is enabled, update the gossip data + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + gossip.process_event(Cow::Borrowed(event)); } // Send event to relays @@ -1190,12 +1223,12 @@ impl Client { let event: Event = EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?; - // NOT gossip, send to all relays - if !self.opts.gossip { - return self.send_event(&event).await; + #[cfg(feature = "gossip")] + if let Some(gossip) = &self.gossip { + return self.gossip_send_event(gossip, &event, true).await; } - self.gossip_send_event(&event, true).await + self.send_event(&event).await } /// Send a private direct message to specific relays @@ -1313,27 +1346,36 @@ impl Client { } // Gossip +#[cfg(feature = "gossip")] impl Client { /// Check if there are outdated public keys and update them - async fn check_and_update_gossip(&self, public_keys: I) -> Result<(), Error> - where - I: IntoIterator, - { + async fn check_and_update_gossip( + &self, + gossip: &Gossip, + public_keys: BTreeSet, + ) -> Result<(), Error> { + let kinds: Vec = vec![Kind::RelayList, Kind::InboxRelays]; + let outdated_public_keys: HashSet = - self.gossip.check_outdated(public_keys).await; + gossip.check_outdated(public_keys, kinds.clone()).await?; // No outdated public keys, immediately return. if outdated_public_keys.is_empty() { return Ok(()); } + // TODO: use a negentropy sync + // Compose filters let filter: Filter = Filter::default() .authors(outdated_public_keys.clone()) - .kinds([Kind::RelayList, Kind::InboxRelays]); + .kinds(kinds.iter().copied()); - // Query from database + // Query from database and process events let stored_events: Events = self.database().query(filter.clone()).await?; + for event in stored_events.into_iter() { + gossip.process_event(Cow::Owned(event)); + } // Get DISCOVERY and READ relays let urls: Vec = self @@ -1345,37 +1387,41 @@ impl Client { .await; // Get events from discovery and read relays - let events: Events = self - .fetch_events_from(urls, filter, Duration::from_secs(10)) + let mut stream = self + .stream_events_from(urls, filter, Duration::from_secs(10)) .await?; + while let Some(..) = stream.next().await {} // Update last check for these public keys - self.gossip.update_last_check(outdated_public_keys).await; - - // Merge database and relays events - let merged: Events = events.merge(stored_events); - - // Update gossip graph - self.gossip.update(merged).await; + gossip + .update_last_check(outdated_public_keys, kinds) + .await?; Ok(()) } - /// Break down filters for gossip and discovery relays - async fn break_down_filter(&self, filter: Filter) -> Result, Error> { + /// Break down filters for gossip + async fn break_down_filter( + &self, + gossip: &Gossip, + filter: Filter, + ) -> Result, Error> { // Extract all public keys from filters let public_keys = filter.extract_public_keys(); // Check and update outdated public keys - self.check_and_update_gossip(public_keys).await?; + self.check_and_update_gossip(gossip, public_keys).await?; + + // Get read relays + let read_relays: Vec = self.pool.__read_relay_urls().await; // Broken-down filters - let filters: HashMap = match self.gossip.break_down_filter(filter).await { + let filters: HashMap = match gossip + .break_down_filter(filter, read_relays.clone()) + .await? + { BrokenDownFilters::Filters(filters) => filters, BrokenDownFilters::Orphan(filter) | BrokenDownFilters::Other(filter) => { - // Get read relays - let read_relays: Vec = self.pool.__read_relay_urls().await; - let mut map = HashMap::with_capacity(read_relays.len()); for url in read_relays.into_iter() { map.insert(url, filter.clone()); @@ -1392,7 +1438,6 @@ impl Client { } // Check if filters are empty - // TODO: this can't be empty, right? if filters.is_empty() { return Err(Error::GossipFiltersEmpty); } @@ -1402,34 +1447,36 @@ impl Client { async fn gossip_send_event( &self, + gossip: &Gossip, event: &Event, is_nip17: bool, ) -> Result, Error> { let is_gift_wrap: bool = event.kind == Kind::GiftWrap; - // Get involved public keys and check what are up to date in the gossip graph and which ones require an update. - if is_gift_wrap { + // Get involved public keys + let public_keys: BTreeSet = if is_gift_wrap { // Get only p tags since the author of a gift wrap is randomized - let public_keys = event.tags.public_keys().copied(); - self.check_and_update_gossip(public_keys).await?; + event.tags.public_keys().copied().collect() } else { // Get all public keys involved in the event: author + p tags - let public_keys = event + event .tags .public_keys() .copied() - .chain(iter::once(event.pubkey)); - self.check_and_update_gossip(public_keys).await?; + .chain(iter::once(event.pubkey)) + .collect() }; + // Check what are up to date in the gossip storage and which ones require an update. + self.check_and_update_gossip(gossip, public_keys).await?; + // Check if NIP17 or NIP65 let urls: HashSet = if is_nip17 && is_gift_wrap { // Get NIP17 relays // Get only for relays for p tags since gift wraps are signed with random key (random author) - let relays = self - .gossip - .get_nip17_inbox_relays(event.tags.public_keys()) - .await; + let relays = gossip + .get_nip17_inbox_relays(event.tags.public_keys().copied().collect()) + .await?; // Clients SHOULD publish kind 14 events to the 10050-listed relays. // If that is not found, that indicates the user is not ready to receive messages under this NIP and clients shouldn't try. @@ -1449,11 +1496,12 @@ impl Client { relays } else { // Get NIP65 relays - let mut outbox = self.gossip.get_nip65_outbox_relays(&[event.pubkey]).await; - let inbox = self - .gossip - .get_nip65_inbox_relays(event.tags.public_keys()) - .await; + let mut outbox = gossip + .get_nip65_outbox_relays(BTreeSet::from([event.pubkey])) + .await?; + let inbox = gossip + .get_nip65_inbox_relays(event.tags.public_keys().copied().collect()) + .await?; // Add outbox and inbox relays for url in outbox.iter().chain(inbox.iter()) { @@ -1481,11 +1529,12 @@ impl Client { async fn gossip_stream_events( &self, + gossip: &Gossip, filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result, Error> { - let filters = self.break_down_filter(filter).await?; + let filters = self.break_down_filter(gossip, filter).await?; // Stream events let stream: ReceiverStream = self @@ -1498,6 +1547,7 @@ impl Client { async fn gossip_fetch_events( &self, + gossip: &Gossip, filter: Filter, timeout: Duration, policy: ReqExitPolicy, @@ -1505,8 +1555,9 @@ impl Client { let mut events: Events = Events::new(&filter); // Stream events - let mut stream: ReceiverStream = - self.gossip_stream_events(filter, timeout, policy).await?; + let mut stream: ReceiverStream = self + .gossip_stream_events(gossip, filter, timeout, policy) + .await?; while let Some(event) = stream.next().await { // To find out more about why the `force_insert` was used, search for EVENTS_FORCE_INSERT ine the code. @@ -1518,21 +1569,23 @@ impl Client { async fn gossip_subscribe( &self, + gossip: &Gossip, id: SubscriptionId, filter: Filter, opts: SubscribeOptions, ) -> Result, Error> { - let filters = self.break_down_filter(filter).await?; + let filters = self.break_down_filter(gossip, filter).await?; Ok(self.pool.subscribe_targeted(id, filters, opts).await?) } async fn gossip_sync_negentropy( &self, + gossip: &Gossip, filter: Filter, opts: &SyncOptions, ) -> Result, Error> { // Break down filter - let temp_filters = self.break_down_filter(filter).await?; + let temp_filters = self.break_down_filter(gossip, filter).await?; let database = self.database(); let mut filters: HashMap)> = diff --git a/crates/nostr-sdk/src/client/options.rs b/crates/nostr-sdk/src/client/options.rs index 6106da689..c0e681000 100644 --- a/crates/nostr-sdk/src/client/options.rs +++ b/crates/nostr-sdk/src/client/options.rs @@ -16,7 +16,6 @@ use nostr_relay_pool::prelude::*; #[derive(Debug, Clone, Default)] pub struct Options { pub(super) autoconnect: bool, - pub(super) gossip: bool, #[cfg(not(target_arch = "wasm32"))] pub(super) connection: Connection, pub(super) relay_limits: RelayLimits, @@ -65,9 +64,8 @@ impl Options { } /// Enable gossip model (default: false) - #[inline] - pub fn gossip(mut self, enable: bool) -> Self { - self.gossip = enable; + #[deprecated(since = "0.40.0", note = "Use ClientBuilder::gossip instead")] + pub fn gossip(self, _enable: bool) -> Self { self } diff --git a/crates/nostr-sdk/src/gossip/constant.rs b/crates/nostr-sdk/src/gossip/constant.rs deleted file mode 100644 index 7033dd138..000000000 --- a/crates/nostr-sdk/src/gossip/constant.rs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) 2022-2023 Yuki Kishimoto -// Copyright (c) 2023-2025 Rust Nostr Developers -// Distributed under the MIT software license - -use std::time::Duration; - -/// Max number of relays allowed in NIP17/NIP65 lists -pub const MAX_RELAYS_LIST: usize = 5; -pub const PUBKEY_METADATA_OUTDATED_AFTER: Duration = Duration::from_secs(60 * 60); // 60 min -pub const CHECK_OUTDATED_INTERVAL: Duration = Duration::from_secs(60 * 5); // 5 min diff --git a/crates/nostr-sdk/src/gossip/mod.rs b/crates/nostr-sdk/src/gossip/mod.rs deleted file mode 100644 index de3c389d8..000000000 --- a/crates/nostr-sdk/src/gossip/mod.rs +++ /dev/null @@ -1,622 +0,0 @@ -// Copyright (c) 2022-2023 Yuki Kishimoto -// Copyright (c) 2023-2025 Rust Nostr Developers -// Distributed under the MIT software license - -use std::collections::{BTreeSet, HashMap, HashSet}; -use std::sync::Arc; - -use nostr::prelude::*; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; - -pub mod constant; - -use self::constant::{CHECK_OUTDATED_INTERVAL, MAX_RELAYS_LIST, PUBKEY_METADATA_OUTDATED_AFTER}; - -const P_TAG: SingleLetterTag = SingleLetterTag::lowercase(Alphabet::P); - -#[derive(Debug)] -pub enum BrokenDownFilters { - /// Filters by url - Filters(HashMap), - /// Filters that match a certain pattern but where no relays are available - Orphan(Filter), - /// Filters that can be sent to read relays (generic query, not related to public keys) - Other(Filter), -} - -#[derive(Debug, Clone, Default)] -struct RelayList { - pub collection: T, - /// Timestamp of when the event metadata was created - pub event_created_at: Timestamp, - /// Timestamp of when the metadata was updated - pub last_update: Timestamp, -} - -#[derive(Debug, Clone, Default)] -struct RelayLists { - pub nip17: RelayList>, - pub nip65: RelayList>>, - /// Timestamp of the last check - pub last_check: Timestamp, -} - -type PublicKeyMap = HashMap; - -/// Gossip tracker -#[derive(Debug, Clone)] -pub struct Gossip { - /// Keep track of seen public keys and of their NIP65 - public_keys: Arc>, -} - -impl Gossip { - pub fn new() -> Self { - Self { - public_keys: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub async fn process_event(&self, event: &Event) { - // Check if the event can be processed - // This avoids the acquire of the lock for every event processed that is not a NIP17 or NIP65 - if event.kind != Kind::RelayList && event.kind != Kind::InboxRelays { - return; - } - - // Acquire write lock - let mut public_keys = self.public_keys.write().await; - - // Update - self.update_event(&mut public_keys, event); - } - - /// Update graph - /// - /// Only the first [`MAX_RELAYS_LIST`] relays will be used. - pub async fn update(&self, events: I) - where - I: IntoIterator, - { - let mut public_keys = self.public_keys.write().await; - - for event in events.into_iter() { - self.update_event(&mut public_keys, &event); - } - } - - fn update_event(&self, public_keys: &mut RwLockWriteGuard, event: &Event) { - if event.kind == Kind::RelayList { - public_keys - .entry(event.pubkey) - .and_modify(|lists| { - // Update only if new metadata has more recent timestamp - if event.created_at >= lists.nip65.event_created_at { - lists.nip65 = RelayList { - collection: nip65::extract_relay_list(event) - .take(MAX_RELAYS_LIST) - .map(|(u, m)| (u.clone(), *m)) - .collect(), - event_created_at: event.created_at, - last_update: Timestamp::now(), - }; - } - }) - .or_insert_with(|| RelayLists { - nip65: RelayList { - collection: nip65::extract_relay_list(event) - .take(MAX_RELAYS_LIST) - .map(|(u, m)| (u.clone(), *m)) - .collect(), - event_created_at: event.created_at, - last_update: Timestamp::now(), - }, - ..Default::default() - }); - } else if event.kind == Kind::InboxRelays { - public_keys - .entry(event.pubkey) - .and_modify(|lists| { - // Update only if new metadata has more recent timestamp - if event.created_at >= lists.nip17.event_created_at { - lists.nip17 = RelayList { - collection: nip17::extract_relay_list(event) - .take(MAX_RELAYS_LIST) - .cloned() - .collect(), - event_created_at: event.created_at, - last_update: Timestamp::now(), - }; - } - }) - .or_insert_with(|| RelayLists { - nip17: RelayList { - collection: nip17::extract_relay_list(event) - .take(MAX_RELAYS_LIST) - .cloned() - .collect(), - event_created_at: event.created_at, - last_update: Timestamp::now(), - }, - ..Default::default() - }); - } - } - - /// Check for what public keys the metadata are outdated or not existent (both for NIP17 and NIP65) - pub async fn check_outdated(&self, public_keys: I) -> HashSet - where - I: IntoIterator, - { - let map = self.public_keys.read().await; - let now = Timestamp::now(); - - let mut outdated: HashSet = HashSet::new(); - - for public_key in public_keys.into_iter() { - match map.get(&public_key) { - Some(lists) => { - if lists.last_check + CHECK_OUTDATED_INTERVAL > now { - continue; - } - - // Check if collections are empty - let empty: bool = - lists.nip17.collection.is_empty() || lists.nip65.collection.is_empty(); - - // Check if expired - let expired: bool = lists.nip17.last_update + PUBKEY_METADATA_OUTDATED_AFTER - < now - || lists.nip65.last_update + PUBKEY_METADATA_OUTDATED_AFTER < now; - - if empty || expired { - outdated.insert(public_key); - } - } - None => { - // Public key not found, insert into outdated - outdated.insert(public_key); - } - } - } - - outdated - } - - pub async fn update_last_check(&self, public_keys: I) - where - I: IntoIterator, - { - let mut map = self.public_keys.write().await; - let now = Timestamp::now(); - - for public_key in public_keys.into_iter() { - map.entry(public_key) - .and_modify(|lists| { - lists.last_check = now; - }) - .or_insert_with(|| RelayLists { - last_check: now, - ..Default::default() - }); - } - } - - fn get_nip17_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - ) -> HashSet - where - I: IntoIterator, - { - let mut urls: HashSet = HashSet::new(); - - for public_key in public_keys.into_iter() { - if let Some(lists) = txn.get(public_key) { - for url in lists.nip17.collection.iter() { - urls.insert(url.clone()); - } - } - } - - urls - } - - fn get_nip65_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - metadata: Option, - ) -> HashSet - where - I: IntoIterator, - { - let mut urls: HashSet = HashSet::new(); - - for public_key in public_keys.into_iter() { - if let Some(lists) = txn.get(public_key) { - for (url, m) in lists.nip65.collection.iter() { - let insert: bool = match m { - Some(val) => match metadata { - Some(metadata) => val == &metadata, - None => true, - }, - None => true, - }; - - if insert { - urls.insert(url.clone()); - } - } - } - } - - urls - } - - fn map_nip17_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - ) -> HashMap> - where - I: IntoIterator, - { - let mut urls: HashMap> = HashMap::new(); - - for public_key in public_keys.into_iter() { - if let Some(lists) = txn.get(public_key) { - for url in lists.nip17.collection.iter() { - urls.entry(url.clone()) - .and_modify(|s| { - s.insert(*public_key); - }) - .or_default() - .insert(*public_key); - } - } - } - - urls - } - - fn map_nip65_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - metadata: RelayMetadata, - ) -> HashMap> - where - I: IntoIterator, - { - let mut urls: HashMap> = HashMap::new(); - - for public_key in public_keys.into_iter() { - if let Some(lists) = txn.get(public_key) { - for (url, m) in lists.nip65.collection.iter() { - let insert: bool = match m { - Some(val) => val == &metadata, - None => true, - }; - - if insert { - urls.entry(url.clone()) - .and_modify(|s| { - s.insert(*public_key); - }) - .or_default() - .insert(*public_key); - } - } - } - } - - urls - } - - /// Get outbox (write) relays for public keys - #[inline] - pub async fn get_nip65_outbox_relays<'a, I>(&self, public_keys: I) -> HashSet - where - I: IntoIterator, - { - let txn = self.public_keys.read().await; - self.get_nip65_relays(&txn, public_keys, Some(RelayMetadata::Write)) - } - - /// Get inbox (read) relays for public keys - #[inline] - pub async fn get_nip65_inbox_relays<'a, I>(&self, public_keys: I) -> HashSet - where - I: IntoIterator, - { - let txn = self.public_keys.read().await; - self.get_nip65_relays(&txn, public_keys, Some(RelayMetadata::Read)) - } - - /// Get NIP17 inbox (read) relays for public keys - #[inline] - pub async fn get_nip17_inbox_relays<'a, I>(&self, public_keys: I) -> HashSet - where - I: IntoIterator, - { - let txn = self.public_keys.read().await; - self.get_nip17_relays(&txn, public_keys) - } - - /// Map outbox (write) relays for public keys - #[inline] - fn map_nip65_outbox_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - ) -> HashMap> - where - I: IntoIterator, - { - self.map_nip65_relays(txn, public_keys, RelayMetadata::Write) - } - - /// Map NIP65 inbox (read) relays for public keys - #[inline] - fn map_nip65_inbox_relays<'a, I>( - &self, - txn: &RwLockReadGuard, - public_keys: I, - ) -> HashMap> - where - I: IntoIterator, - { - self.map_nip65_relays(txn, public_keys, RelayMetadata::Read) - } - - pub async fn break_down_filter(&self, filter: Filter) -> BrokenDownFilters { - let txn = self.public_keys.read().await; - - // Extract `p` tag from generic tags and parse public key hex - let p_tag: Option> = filter.generic_tags.get(&P_TAG).map(|s| { - s.iter() - .filter_map(|p| PublicKey::from_hex(p).ok()) - .collect() - }); - - // Match pattern - match (&filter.authors, &p_tag) { - (Some(authors), None) => { - // Get map of outbox relays - let mut outbox: HashMap> = - self.map_nip65_outbox_relays(&txn, authors); - - // Extend with NIP17 relays - outbox.extend(self.map_nip17_relays(&txn, authors)); - - // No relay available for the authors - if outbox.is_empty() { - return BrokenDownFilters::Orphan(filter); - } - - let mut map: HashMap = HashMap::with_capacity(outbox.len()); - - // Construct new filters - for (relay, pk_set) in outbox.into_iter() { - // Clone filter and change authors - let mut new_filter: Filter = filter.clone(); - new_filter.authors = Some(pk_set); - - // Update map - map.insert(relay, new_filter); - } - - BrokenDownFilters::Filters(map) - } - (None, Some(p_public_keys)) => { - // Get map of inbox relays - let mut inbox: HashMap> = - self.map_nip65_inbox_relays(&txn, p_public_keys); - - // Extend with NIP17 relays - inbox.extend(self.map_nip17_relays(&txn, p_public_keys)); - - // No relay available for the p tags - if inbox.is_empty() { - return BrokenDownFilters::Orphan(filter); - } - - let mut map: HashMap = HashMap::with_capacity(inbox.len()); - - // Construct new filters - for (relay, pk_set) in inbox.into_iter() { - // Clone filter and change p tags - let mut new_filter: Filter = filter.clone(); - new_filter - .generic_tags - .insert(P_TAG, pk_set.into_iter().map(|p| p.to_string()).collect()); - - // Update map - map.insert(relay, new_filter); - } - - BrokenDownFilters::Filters(map) - } - (Some(authors), Some(p_public_keys)) => { - // Get map of outbox and inbox relays - let mut relays: HashSet = - self.get_nip65_relays(&txn, authors.union(p_public_keys), None); - - // Extend with NIP17 relays - relays.extend(self.get_nip17_relays(&txn, authors.union(p_public_keys))); - - // No relay available for the authors and p tags - if relays.is_empty() { - return BrokenDownFilters::Orphan(filter); - } - - let mut map: HashMap = HashMap::with_capacity(relays.len()); - - for relay in relays.into_iter() { - // Update map - map.insert(relay, filter.clone()); - } - - BrokenDownFilters::Filters(map) - } - // Nothing to do, add to `other` list - (None, None) => BrokenDownFilters::Other(filter), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - const SECRET_KEY_A: &str = "nsec1j4c6269y9w0q2er2xjw8sv2ehyrtfxq3jwgdlxj6qfn8z4gjsq5qfvfk99"; // aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4 - const SECRET_KEY_B: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"; // 79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3 - - const KEY_A_RELAYS: [(&str, Option); 4] = [ - ("wss://relay.damus.io", None), - ("wss://relay.nostr.bg", None), - ("wss://nos.lol", Some(RelayMetadata::Write)), - ("wss://nostr.mom", Some(RelayMetadata::Read)), - ]; - - const KEY_B_RELAYS: [(&str, Option); 4] = [ - ("wss://relay.damus.io", Some(RelayMetadata::Write)), - ("wss://relay.nostr.info", None), - ("wss://relay.rip", Some(RelayMetadata::Write)), - ("wss://relay.snort.social", Some(RelayMetadata::Read)), - ]; - - fn build_relay_list_event( - secret_key: &str, - relays: Vec<(&str, Option)>, - ) -> Event { - let keys = Keys::parse(secret_key).unwrap(); - let list = relays - .into_iter() - .filter_map(|(url, m)| Some((RelayUrl::parse(url).ok()?, m))); - EventBuilder::relay_list(list) - .sign_with_keys(&keys) - .unwrap() - } - - async fn setup_graph() -> Gossip { - let graph = Gossip::new(); - - let events = vec![ - build_relay_list_event(SECRET_KEY_A, KEY_A_RELAYS.to_vec()), - build_relay_list_event(SECRET_KEY_B, KEY_B_RELAYS.to_vec()), - ]; - - graph.update(events).await; - - graph - } - - #[tokio::test] - async fn test_break_down_filter() { - let keys_a = Keys::parse(SECRET_KEY_A).unwrap(); - let keys_b = Keys::parse(SECRET_KEY_B).unwrap(); - - let damus_url = RelayUrl::parse("wss://relay.damus.io").unwrap(); - let nostr_bg_url = RelayUrl::parse("wss://relay.nostr.bg").unwrap(); - let nos_lol_url = RelayUrl::parse("wss://nos.lol").unwrap(); - let nostr_mom_url = RelayUrl::parse("wss://nostr.mom").unwrap(); - let nostr_info_url = RelayUrl::parse("wss://relay.nostr.info").unwrap(); - let relay_rip_url = RelayUrl::parse("wss://relay.rip").unwrap(); - let snort_url = RelayUrl::parse("wss://relay.snort.social").unwrap(); - - let graph = setup_graph().await; - - // Single author - let filter = Filter::new().author(keys_a.public_key); - match graph.break_down_filter(filter.clone()).await { - BrokenDownFilters::Filters(map) => { - assert_eq!(map.get(&damus_url).unwrap(), &filter); - assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); - assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); - assert!(!map.contains_key(&nostr_mom_url)); - } - _ => panic!("Expected filters"), - } - - // Multiple authors - let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]); - match graph.break_down_filter(authors_filter.clone()).await { - BrokenDownFilters::Filters(map) => { - assert_eq!(map.get(&damus_url).unwrap(), &authors_filter); - assert_eq!( - map.get(&nostr_bg_url).unwrap(), - &Filter::new().author(keys_a.public_key) - ); - assert_eq!( - map.get(&nos_lol_url).unwrap(), - &Filter::new().author(keys_a.public_key) - ); - assert!(!map.contains_key(&nostr_mom_url)); - assert_eq!( - map.get(&nostr_info_url).unwrap(), - &Filter::new().author(keys_b.public_key) - ); - assert_eq!( - map.get(&relay_rip_url).unwrap(), - &Filter::new().author(keys_b.public_key) - ); - assert!(!map.contains_key(&snort_url)); - } - _ => panic!("Expected filters"), - } - - // Other filter - let search_filter = Filter::new().search("Test").limit(10); - match graph.break_down_filter(search_filter.clone()).await { - BrokenDownFilters::Other(filter) => { - assert_eq!(filter, search_filter); - } - _ => panic!("Expected other"), - } - - // Single p tags - let p_tag_filter = Filter::new().pubkey(keys_a.public_key); - match graph.break_down_filter(p_tag_filter.clone()).await { - BrokenDownFilters::Filters(map) => { - assert_eq!(map.get(&damus_url).unwrap(), &p_tag_filter); - assert_eq!(map.get(&nostr_bg_url).unwrap(), &p_tag_filter); - assert_eq!(map.get(&nostr_mom_url).unwrap(), &p_tag_filter); - assert!(!map.contains_key(&nos_lol_url)); - assert!(!map.contains_key(&nostr_info_url)); - assert!(!map.contains_key(&relay_rip_url)); - assert!(!map.contains_key(&snort_url)); - } - _ => panic!("Expected filters"), - } - - // Both author and p tag - let filter = Filter::new() - .author(keys_a.public_key) - .pubkey(keys_b.public_key); - match graph.break_down_filter(filter.clone()).await { - BrokenDownFilters::Filters(map) => { - assert_eq!(map.get(&damus_url).unwrap(), &filter); - assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); - assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); - assert_eq!(map.get(&nostr_mom_url).unwrap(), &filter); - assert_eq!(map.get(&nostr_info_url).unwrap(), &filter); - assert_eq!(map.get(&relay_rip_url).unwrap(), &filter); - assert_eq!(map.get(&snort_url).unwrap(), &filter); - } - _ => panic!("Expected filters"), - } - - // test orphan filters - let random_keys = Keys::generate(); - let filter = Filter::new().author(random_keys.public_key); - match graph.break_down_filter(filter.clone()).await { - BrokenDownFilters::Orphan(f) => { - assert_eq!(f, filter); - } - _ => panic!("Expected filters"), - } - } -} diff --git a/crates/nostr-sdk/src/lib.rs b/crates/nostr-sdk/src/lib.rs index 168d02f0c..d6ae6b079 100644 --- a/crates/nostr-sdk/src/lib.rs +++ b/crates/nostr-sdk/src/lib.rs @@ -35,7 +35,6 @@ pub use nostr_relay_pool::{ }; pub mod client; -mod gossip; pub mod prelude; pub use self::client::{Client, ClientBuilder, Options}; diff --git a/crates/nostr-sdk/src/prelude.rs b/crates/nostr-sdk/src/prelude.rs index a60b82e1e..e5b3ffefc 100644 --- a/crates/nostr-sdk/src/prelude.rs +++ b/crates/nostr-sdk/src/prelude.rs @@ -11,6 +11,8 @@ // External crates pub use nostr::prelude::*; pub use nostr_database::prelude::*; +#[cfg(feature = "gossip")] +pub use nostr_gossip::prelude::*; pub use nostr_relay_pool::prelude::*; // Internal modules