Skip to content

Commit

Permalink
Update firehose example
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChat committed Dec 26, 2024
1 parent 830aaf5 commit 4b54d5e
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 42 deletions.
79 changes: 79 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ members = [
"atrium-oauth/oauth-client",
"bsky-cli",
"bsky-sdk",

# "examples/concurrent",
"examples/firehose",
]
# Examples show how to use the latest published crates, not the workspace state.
exclude = [
"examples/concurrent",
"examples/firehose",
# "examples/concurrent",
# "examples/firehose",
]
resolver = "2"

Expand Down
2 changes: 2 additions & 0 deletions atrium-repo/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod blockstore;
pub mod mst;
pub mod repo;

pub use repo::Repository;
20 changes: 7 additions & 13 deletions atrium-repo/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use atrium_api::types::{
string::{Did, Nsid, RecordKey},
Collection,
};
use ipld_core::cid::Cid;
use ipld_core::{cid::Cid, ipld::Ipld};
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -48,7 +48,7 @@ struct SignedCommit {
/// pointer (by hash) to a previous commit object for this repository
pub prev: Option<Cid>,
/// cryptographic signature of this commit, as raw bytes
pub sig: Vec<u8>,
pub sig: Ipld,
}

async fn read_record<C: Collection>(
Expand All @@ -65,13 +65,13 @@ async fn read_record<C: Collection>(
#[derive(Debug)]
pub struct Repository<R: AsyncBlockStoreRead> {
db: R,
latest_commit: Commit,
latest_commit: SignedCommit,
}

impl<R: AsyncBlockStoreRead> Repository<R> {
pub async fn new(mut db: R, root: Cid) -> Result<Self, Error> {
let commit_block = db.read_block(&root).await?;
let latest_commit: Commit = serde_ipld_dagcbor::from_reader(&commit_block[..])?;
let latest_commit: SignedCommit = serde_ipld_dagcbor::from_reader(&commit_block[..])?;

Ok(Self { db, latest_commit })
}
Expand All @@ -82,14 +82,10 @@ impl<R: AsyncBlockStoreRead> Repository<R> {
}

/// Returns the specified record from the repository, or `None` if it does not exist.
pub async fn get<C: Collection>(
&mut self,
rkey: &RecordKey,
) -> Result<Option<C::Record>, Error> {
pub async fn get<C: Collection>(&mut self, rkey: &str) -> Result<Option<C::Record>, Error> {
let mut mst = mst::Tree::open(&mut self.db, self.latest_commit.data);
let key = C::repo_path(rkey);

if let Some(cid) = mst.get(&key).await? {
if let Some(cid) = mst.get(&rkey).await? {
Ok(Some(read_record::<C>(&mut self.db, cid).await?))
} else {
Ok(None)
Expand All @@ -111,8 +107,6 @@ fn parse_recordkey(key: &str) -> Result<RecordKey, Error> {
/// Errors that can occur while interacting with a repository.
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("CAR error: {0}")]
Car(#[from] blockstore::CarError),
#[error("Invalid key: {0}")]
InvalidKey(#[from] std::str::Utf8Error),
#[error("Invalid RecordKey: {0}")]
Expand All @@ -134,7 +128,7 @@ mod test {

/// Loads a repository from the given CAR file.
async fn load(bytes: &[u8]) -> Result<Repository<CarStore<std::io::Cursor<&[u8]>>>, Error> {
let db = CarStore::new(std::io::Cursor::new(bytes)).await?;
let db = CarStore::new(std::io::Cursor::new(bytes)).await.unwrap();
let root = db.header().roots[0];

Repository::new(db, root).await
Expand Down
7 changes: 4 additions & 3 deletions examples/firehose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
atrium-api = { version = "0.24", path = "../../atrium-api" }
atrium-repo = { version = "0", path = "../../atrium-repo" }

anyhow = "1.0.80"
atrium-api = { version = "0.18.1", features = ["dag-cbor"] }
chrono = "0.4.34"
futures = "0.3.30"
ipld-core = { version = "0.4.0", default-features = false, features = ["std"] }
rs-car = "0.4.1"
ipld-core = { version = "0.4.1", default-features = false, features = ["std"] }
serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = ["std"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
Expand Down
70 changes: 46 additions & 24 deletions examples/firehose/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use anyhow::{anyhow, Result};
use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
use atrium_api::types::{CidLink, Collection};
use chrono::Local;
use firehose::stream::frames::Frame;
use firehose::subscription::{CommitHandler, Subscription};
Expand All @@ -10,6 +7,12 @@ use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

use atrium_api::app::bsky::feed;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
use atrium_api::types::string::RecordKey;
use atrium_api::types::{CidLink, Collection};
use atrium_repo::{blockstore::CarStore, Repository};

struct RepoSubscription {
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
Expand Down Expand Up @@ -48,28 +51,50 @@ struct Firehose;

impl CommitHandler for Firehose {
async fn handle_commit(&self, commit: &Commit) -> Result<()> {
let mut repo = Repository::new(
CarStore::new(std::io::Cursor::new(commit.blocks.as_slice())).await?,
commit.commit.0,
)
.await?;

for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if op.action != "create" || collection != atrium_api::app::bsky::feed::Post::NSID {
if op.action != "create" {
continue;
}
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(CidLink(*cid)) == op.cid) {
let record = serde_ipld_dagcbor::from_reader::<Record, _>(&mut item.as_slice())?;
println!(
"{} - {}",
record.created_at.as_ref().with_timezone(&Local),
commit.repo.as_str()
);
for line in record.text.split('\n') {
println!(" {line}");

match collection {
feed::Post::NSID => {
// N.B: We do _NOT_ read out the record using `op.cid` because that is insecure.
// It bypasses the MST, which means that we cannot ensure that the contents are
// signed by the owner of the repository.
// You will always want to read out records using the MST to ensure they haven't been
// tampered with.
if let Some(record) = repo.get::<feed::Post>(&op.path).await? {
println!(
"{} - {} - {}",
record.created_at.as_ref().with_timezone(&Local),
commit.repo.as_str(),
op.path
);
for line in record.text.split('\n') {
println!(" {line}");
}
} else {
return Err(anyhow!(
"FAILED: could not find item with operation {}",
op.path
));
}
}
_ => {
println!(
"{} - {} - {}",
commit.time.as_ref().with_timezone(&Local),
commit.repo.as_str(),
op.path
);
}
} else {
return Err(anyhow!(
"FAILED: could not find item with operation cid {:?} out of {} items",
op.cid,
items.len()
));
}
}
Ok(())
Expand All @@ -78,8 +103,5 @@ impl CommitHandler for Firehose {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
RepoSubscription::new("bsky.network")
.await?
.run(Firehose)
.await
RepoSubscription::new("bsky.network").await?.run(Firehose).await
}

0 comments on commit 4b54d5e

Please sign in to comment.