diff --git a/Cargo.lock b/Cargo.lock index 0d4121eb..e511901a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -914,6 +914,22 @@ dependencies = [ "subtle", ] +[[package]] +name = "firehose" +version = "0.1.0" +dependencies = [ + "anyhow", + "atrium-api", + "atrium-repo", + "chrono", + "futures", + "ipld-core", + "serde_ipld_dagcbor", + "tokio", + "tokio-tungstenite", + "trait-variant", +] + [[package]] name = "flate2" version = "1.0.33" @@ -962,6 +978,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -984,6 +1001,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -2483,6 +2511,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2724,6 +2763,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -2864,6 +2917,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -2930,6 +3003,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index c8f11427..d0eea2db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,11 +9,14 @@ members = [ "atrium-oauth/oauth-client", "bsky-cli", "bsky-sdk", + + # "examples/concurrent", + "examples/firehose", ] # Examples show how to use the latest published crates, not the workspace state. exclude = [ - "examples/concurrent", - "examples/firehose", + # "examples/concurrent", + # "examples/firehose", ] resolver = "2" diff --git a/atrium-repo/src/lib.rs b/atrium-repo/src/lib.rs index 7302a900..cdf240fa 100644 --- a/atrium-repo/src/lib.rs +++ b/atrium-repo/src/lib.rs @@ -1,3 +1,5 @@ pub mod blockstore; pub mod mst; pub mod repo; + +pub use repo::Repository; diff --git a/atrium-repo/src/repo.rs b/atrium-repo/src/repo.rs index da8cba22..c4ff1576 100644 --- a/atrium-repo/src/repo.rs +++ b/atrium-repo/src/repo.rs @@ -2,7 +2,7 @@ use atrium_api::types::{ string::{Did, Nsid, RecordKey}, Collection, }; -use ipld_core::cid::Cid; +use ipld_core::{cid::Cid, ipld::Ipld}; use serde::{Deserialize, Serialize}; use crate::{ @@ -48,7 +48,7 @@ struct SignedCommit { /// pointer (by hash) to a previous commit object for this repository pub prev: Option, /// cryptographic signature of this commit, as raw bytes - pub sig: Vec, + pub sig: Ipld, } async fn read_record( @@ -65,13 +65,13 @@ async fn read_record( #[derive(Debug)] pub struct Repository { db: R, - latest_commit: Commit, + latest_commit: SignedCommit, } impl Repository { pub async fn new(mut db: R, root: Cid) -> Result { let commit_block = db.read_block(&root).await?; - let latest_commit: Commit = serde_ipld_dagcbor::from_reader(&commit_block[..])?; + let latest_commit: SignedCommit = serde_ipld_dagcbor::from_reader(&commit_block[..])?; Ok(Self { db, latest_commit }) } @@ -82,14 +82,10 @@ impl Repository { } /// Returns the specified record from the repository, or `None` if it does not exist. - pub async fn get( - &mut self, - rkey: &RecordKey, - ) -> Result, Error> { + pub async fn get(&mut self, rkey: &str) -> Result, Error> { let mut mst = mst::Tree::open(&mut self.db, self.latest_commit.data); - let key = C::repo_path(rkey); - if let Some(cid) = mst.get(&key).await? { + if let Some(cid) = mst.get(&rkey).await? { Ok(Some(read_record::(&mut self.db, cid).await?)) } else { Ok(None) @@ -111,8 +107,6 @@ fn parse_recordkey(key: &str) -> Result { /// Errors that can occur while interacting with a repository. #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("CAR error: {0}")] - Car(#[from] blockstore::CarError), #[error("Invalid key: {0}")] InvalidKey(#[from] std::str::Utf8Error), #[error("Invalid RecordKey: {0}")] @@ -134,7 +128,7 @@ mod test { /// Loads a repository from the given CAR file. async fn load(bytes: &[u8]) -> Result>>, Error> { - let db = CarStore::new(std::io::Cursor::new(bytes)).await?; + let db = CarStore::new(std::io::Cursor::new(bytes)).await.unwrap(); let root = db.header().roots[0]; Repository::new(db, root).await diff --git a/examples/firehose/Cargo.toml b/examples/firehose/Cargo.toml index 70bab61d..32a60068 100644 --- a/examples/firehose/Cargo.toml +++ b/examples/firehose/Cargo.toml @@ -6,12 +6,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +atrium-api = { version = "0.24", path = "../../atrium-api" } +atrium-repo = { version = "0", path = "../../atrium-repo" } + anyhow = "1.0.80" -atrium-api = { version = "0.18.1", features = ["dag-cbor"] } chrono = "0.4.34" futures = "0.3.30" -ipld-core = { version = "0.4.0", default-features = false, features = ["std"] } -rs-car = "0.4.1" +ipld-core = { version = "0.4.1", default-features = false, features = ["std"] } serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = ["std"] } tokio = { version = "1.36.0", features = ["full"] } tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } diff --git a/examples/firehose/src/main.rs b/examples/firehose/src/main.rs index e70a237e..3c76cb6a 100644 --- a/examples/firehose/src/main.rs +++ b/examples/firehose/src/main.rs @@ -1,7 +1,4 @@ use anyhow::{anyhow, Result}; -use atrium_api::app::bsky::feed::post::Record; -use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; -use atrium_api::types::{CidLink, Collection}; use chrono::Local; use firehose::stream::frames::Frame; use firehose::subscription::{CommitHandler, Subscription}; @@ -10,6 +7,12 @@ use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use atrium_api::app::bsky::feed; +use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; +use atrium_api::types::string::RecordKey; +use atrium_api::types::{CidLink, Collection}; +use atrium_repo::{blockstore::CarStore, Repository}; + struct RepoSubscription { stream: WebSocketStream>, } @@ -48,28 +51,50 @@ struct Firehose; impl CommitHandler for Firehose { async fn handle_commit(&self, commit: &Commit) -> Result<()> { + let mut repo = Repository::new( + CarStore::new(std::io::Cursor::new(commit.blocks.as_slice())).await?, + commit.commit.0, + ) + .await?; + for op in &commit.ops { let collection = op.path.split('/').next().expect("op.path is empty"); - if op.action != "create" || collection != atrium_api::app::bsky::feed::Post::NSID { + if op.action != "create" { continue; } - let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; - if let Some((_, item)) = items.iter().find(|(cid, _)| Some(CidLink(*cid)) == op.cid) { - let record = serde_ipld_dagcbor::from_reader::(&mut item.as_slice())?; - println!( - "{} - {}", - record.created_at.as_ref().with_timezone(&Local), - commit.repo.as_str() - ); - for line in record.text.split('\n') { - println!(" {line}"); + + match collection { + feed::Post::NSID => { + // N.B: We do _NOT_ read out the record using `op.cid` because that is insecure. + // It bypasses the MST, which means that we cannot ensure that the contents are + // signed by the owner of the repository. + // You will always want to read out records using the MST to ensure they haven't been + // tampered with. + if let Some(record) = repo.get::(&op.path).await? { + println!( + "{} - {} - {}", + record.created_at.as_ref().with_timezone(&Local), + commit.repo.as_str(), + op.path + ); + for line in record.text.split('\n') { + println!(" {line}"); + } + } else { + return Err(anyhow!( + "FAILED: could not find item with operation {}", + op.path + )); + } + } + _ => { + println!( + "{} - {} - {}", + commit.time.as_ref().with_timezone(&Local), + commit.repo.as_str(), + op.path + ); } - } else { - return Err(anyhow!( - "FAILED: could not find item with operation cid {:?} out of {} items", - op.cid, - items.len() - )); } } Ok(()) @@ -78,8 +103,5 @@ impl CommitHandler for Firehose { #[tokio::main] async fn main() -> Result<(), Box> { - RepoSubscription::new("bsky.network") - .await? - .run(Firehose) - .await + RepoSubscription::new("bsky.network").await?.run(Firehose).await }