Skip to content

Commit

Permalink
Firehose: Subscribe to labels; Lexicon: SubscribeLabels struct; Feedg…
Browse files Browse the repository at this point in the history
…en: Ingest labels for posts and filter out labeled posts in all-posts and trending
  • Loading branch information
rudyfraser committed Dec 15, 2024
1 parent 19ba7b1 commit 3f298e0
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 22 deletions.
2 changes: 1 addition & 1 deletion rsky-feedgen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsky-feedgen"
version = "1.0.4"
version = "1.1.0"
authors = ["Rudy Fraser <[email protected]>"]
description = "A framework for building AT Protocol feed generators, in Rust."
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions rsky-feedgen/migrations/2024-12-14-193843_022/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
ALTER TABLE post
DROP CONSTRAINT IF EXISTS no_nulls_in_labels,
DROP COLUMN IF EXISTS labels;
4 changes: 4 additions & 0 deletions rsky-feedgen/migrations/2024-12-14-193843_022/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TABLE post
ADD COLUMN labels TEXT [] NOT NULL DEFAULT '{}',
ADD CONSTRAINT no_nulls_in_labels CHECK (NOT (labels && ARRAY[NULL]));
41 changes: 37 additions & 4 deletions rsky-feedgen/src/apis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::common::env::env_int;
use crate::db::*;
use crate::explicit_slurs::contains_explicit_slurs;
use crate::models::create_request::CreateRecord;
use crate::models::Lexicon::{AppBskyFeedFollow, AppBskyFeedLike, AppBskyFeedPost};
use crate::models::*;
use crate::{FeedGenConfig, ReadReplicaConn, WriteDbConn};
use chrono::offset::Utc as UtcOffset;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use diesel::prelude::*;
use diesel::sql_query;
use diesel::sql_types::{Array, Bool, Nullable, Text};
use once_cell::sync::Lazy;
use rand::Rng;
use regex::Regex;
Expand All @@ -15,6 +18,7 @@ use rsky_lexicon::app::bsky::embed::{Embeds, MediaUnion};
use std::collections::HashSet;
use std::fmt::Write;
use std::time::SystemTime;
use diesel::dsl::sql;

#[allow(deprecated)]
pub async fn get_posts_by_membership(
Expand Down Expand Up @@ -226,7 +230,8 @@ pub async fn get_blacksky_trending(
\"externalThumb\",
\"quoteCid\",
\"quoteUri\",
\"createdAt\"
\"createdAt\",
labels
FROM ranked_posts
WHERE percentile_rank >= {:.4}",
random_percentile
Expand Down Expand Up @@ -340,6 +345,7 @@ pub async fn get_all_posts(
.limit(limit.unwrap_or(30))
.select(Post::as_select())
.order((PostSchema::createdAt.desc(), PostSchema::cid.desc()))
.filter(sql::<Bool>("array_length(labels, 1) IS NULL OR array_length(labels, 1) = 0"))
.into_boxed();

if let Some(lang) = lang {
Expand Down Expand Up @@ -542,9 +548,10 @@ pub async fn queue_creation(
quote_cid: None,
quote_uri: None,
created_at: format!("{}", dt.format("%+")), // use now() as a default
labels: vec![]
};

if let Lexicon::AppBskyFeedPost(post_record) = req.record {
if let CreateRecord::Lexicon(AppBskyFeedPost(post_record)) = req.record {
post_text_original = post_record.text.clone();
post_text = post_record.text.to_lowercase();
let post_created_at = format!("{}", post_record.created_at.format("%+"));
Expand Down Expand Up @@ -740,6 +747,7 @@ pub async fn queue_creation(
PostSchema::quoteCid.eq(new_post.quote_cid),
PostSchema::quoteUri.eq(new_post.quote_uri),
PostSchema::createdAt.eq(new_post.created_at),
PostSchema::labels.eq(new_post.labels),
);
new_posts.push(new_post);
new_images.extend(post_images);
Expand Down Expand Up @@ -860,7 +868,7 @@ pub async fn queue_creation(
body
.into_iter()
.map(|req| {
if let Lexicon::AppBskyFeedLike(like_record) = req.record {
if let CreateRecord::Lexicon(AppBskyFeedLike(like_record)) = req.record {
let subject_author: &String = &like_record.subject.uri[5..37].into(); // parse DID:PLC from URI
let member_of = is_included(vec![&req.author, subject_author].into(), conn).unwrap_or_else(|_| vec![]);
if !member_of.is_empty() {
Expand Down Expand Up @@ -897,7 +905,7 @@ pub async fn queue_creation(
body
.into_iter()
.map(|req| {
if let Lexicon::AppBskyFeedFollow(follow_record) = req.record {
if let CreateRecord::Lexicon(AppBskyFeedFollow(follow_record)) = req.record {
let member_of = is_included(vec![&req.author, &follow_record.subject].into(), conn).unwrap_or_else(|_| vec![]);
if !member_of.is_empty() {
let system_time = SystemTime::now();
Expand Down Expand Up @@ -926,6 +934,31 @@ pub async fn queue_creation(
.expect("Error inserting like records");
}
Ok(())
} else if lex == "labels" {
body
.into_iter()
.map(|req| {
if let CreateRecord::Label(label) = req.record {
// @TODO: Handle account-level labels; Maybe put label on membership
if req.uri.starts_with("did:plc:") {
()
} else {
// Raw SQL query
let updated_rows = sql_query(
"UPDATE post SET labels = labels || $1 WHERE uri = $2"
)
.bind::<Array<Nullable<Text>>, _>(vec![Some(&label.val)])
.bind::<Text, _>(&req.uri)
.execute(conn)
.expect("Error updating labels");
if updated_rows > 0 {
println!("@LOG: applied {} to {}", label.val, req.uri);
}
}
}
})
.for_each(drop);
Ok(())
} else {
Err(format!("Unknown lexicon received {lex:?}"))
}
Expand Down
11 changes: 10 additions & 1 deletion rsky-feedgen/src/models/create_request.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use rsky_lexicon::com::atproto::label::Label;

#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "$type")]
pub enum Lexicon {
Expand All @@ -12,6 +14,13 @@ pub enum Lexicon {
AppBskyFeedFollow(rsky_lexicon::app::bsky::graph::follow::Follow),
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum CreateRecord {
Lexicon(Lexicon),
Label(Label),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateRequest {
#[serde(rename = "uri")]
Expand All @@ -25,5 +34,5 @@ pub struct CreateRequest {
#[serde(rename = "author")]
pub author: String,
#[serde(rename = "record")]
pub record: Lexicon,
pub record: CreateRecord,
}
9 changes: 9 additions & 0 deletions rsky-feedgen/src/models/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Post {
pub quote_uri: Option<String>,
#[serde(rename = "createdAt")]
pub created_at: String,
pub labels: Vec<Option<String>>,
}

impl Queryable<post::SqlType, DB> for Post {
Expand All @@ -67,6 +68,7 @@ impl Queryable<post::SqlType, DB> for Post {
Option<String>,
Option<String>,
String,
Vec<Option<String>>,
);

fn build(row: Self::Row) -> deserialize::Result<Self> {
Expand All @@ -88,6 +90,7 @@ impl Queryable<post::SqlType, DB> for Post {
quote_cid: row.14,
quote_uri: row.15,
created_at: row.16,
labels: row.17,
})
}
}
Expand All @@ -114,6 +117,7 @@ where
post::quoteCid,
post::quoteUri,
post::createdAt,
post::labels,
);

fn construct_selection() -> Self::SelectExpression {
Expand All @@ -135,6 +139,7 @@ where
post::quoteCid,
post::quoteUri,
post::createdAt,
post::labels,
)
}
}
Expand All @@ -145,6 +150,8 @@ where
String: FromSql<diesel::dsl::SqlTypeOf<post::uri>, DB>,
Option<String>: FromSql<diesel::dsl::SqlTypeOf<post::replyParent>, DB>,
Option<i64>: FromSql<diesel::dsl::SqlTypeOf<post::sequence>, DB>,
Vec<Option<String>>:
FromSql<diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::sql_types::Text>>, DB>,
{
fn build<'a>(row: &impl NamedRow<'a, DB>) -> deserialize::Result<Self> {
let uri = NamedRow::get::<diesel::dsl::SqlTypeOf<post::uri>, _>(row, "uri")?;
Expand Down Expand Up @@ -177,6 +184,7 @@ where
NamedRow::get::<diesel::dsl::SqlTypeOf<post::quoteUri>, _>(row, "quoteUri")?;
let created_at =
NamedRow::get::<diesel::dsl::SqlTypeOf<post::createdAt>, _>(row, "createdAt")?;
let labels = NamedRow::get::<diesel::dsl::SqlTypeOf<post::labels>, _>(row, "labels")?;
Ok(Self {
uri,
cid,
Expand All @@ -195,6 +203,7 @@ where
quote_cid,
quote_uri,
created_at,
labels,
})
}
}
5 changes: 3 additions & 2 deletions rsky-feedgen/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ diesel::table! {
subjectCid -> Varchar,
subjectUri -> Varchar,
createdAt -> Varchar,
indexedAt -> Varchar,
indexedAt -> Text,
prev -> Nullable<Varchar>,
sequence -> Nullable<Int8>,
}
Expand All @@ -80,7 +80,7 @@ diesel::table! {
cid -> Varchar,
replyParent -> Nullable<Varchar>,
replyRoot -> Nullable<Varchar>,
indexedAt -> Varchar,
indexedAt -> Text,
prev -> Nullable<Varchar>,
sequence -> Nullable<Int8>,
text -> Nullable<Varchar>,
Expand All @@ -93,6 +93,7 @@ diesel::table! {
quoteCid -> Nullable<Varchar>,
quoteUri -> Nullable<Varchar>,
createdAt -> Varchar,
labels -> Array<Nullable<Text>>,
}
}

Expand Down
2 changes: 1 addition & 1 deletion rsky-firehose/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsky-firehose"
version = "0.1.1"
version = "0.2.0"
authors = ["Rudy Fraser <[email protected]>"]
description = "A framework for subscribing to the AT Protocol firehose, in Rust."
license = "Apache-2.0"
Expand Down
19 changes: 19 additions & 0 deletions rsky-firehose/src/firehose.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Result};
use rsky_lexicon::com::atproto::label::SubscribeLabels;
use rsky_lexicon::com::atproto::sync::SubscribeRepos;
use serde::Deserialize;
use std::io::Cursor;
Expand Down Expand Up @@ -50,3 +51,21 @@ pub fn read(data: &[u8]) -> Result<(Header, SubscribeRepos)> {

Ok((header, body))
}

pub fn read_labels(data: &[u8]) -> Result<(Header, SubscribeLabels)> {
let mut reader = Cursor::new(data);

let header = ciborium::de::from_reader::<Header, _>(&mut reader)?;
let body = match header.type_.as_str() {
"#labels" => serde_ipld_dagcbor::from_reader(&mut reader)?,
_ => {
eprintln!("Received unknown header {:?}", header.type_.as_str());
bail!(format!(
"Received unknown header {:?}",
header.type_.as_str()
))
}
};

Ok((header, body))
}
Loading

0 comments on commit 3f298e0

Please sign in to comment.