Skip to content

Commit

Permalink
Add processing receiver channels
Browse files Browse the repository at this point in the history
Signed-off-by: Till Wegmueller <[email protected]>
  • Loading branch information
Toasterson committed Jan 30, 2024
1 parent 491887c commit 1df3be8
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 2 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async-graphql.workspace = true
lapin.workspace = true
deadpool-lapin.workspace = true
deadpool.workspace = true
scanf = "1.2.1"

[workspace.dependencies]
activitypub_federation = { version = "0.5.0", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions src/activitypub/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::Serialize;
use serde_json::Value;
use std::collections::HashMap;

pub const PUBLIC_TARGET: &str = "https://www.w3.org/ns/activitystreams#Public";

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Activity {
Expand Down
3 changes: 3 additions & 0 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ use crate::prisma::actor::Data;
pub struct Actor {
name: String,
domain: String,
handle: String,
}

impl From<Data> for Actor {
fn from(value: Data) -> Self {
let handle = value.handle.clone();
let (_, domain) = value.handle.split_once('@').unwrap_or(("", "localhost"));
Self {
name: value.display_name,
handle,
domain: domain.to_owned(),
}
}
Expand Down
175 changes: 175 additions & 0 deletions src/domainservd/inbox.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use super::{Result, SharedState};
use crate::activitypub::activities::{Activity, PUBLIC_TARGET};
use axum::{
extract::{Host, Path, State},
Json,
};
use lapin::{options::BasicPublishOptions, protocol::basic::AMQPProperties};
use scanf::sscanf;
use url::Url;

fn get_target_actors(
optional_target: Option<String>,
domain: String,
to: Vec<Url>,
cc: Vec<Url>,
) -> Result<Vec<String>> {
let mut actor_list: Vec<String> = vec![];
for receipient in vec![to, cc].concat() {
let recp_str = receipient.to_string();
if &recp_str == PUBLIC_TARGET {
continue;
}
let mut actor_name: String = String::new();
let mut scheme: String = String::new();
let mut domain_scanned: String = String::new();
sscanf!(
&recp_str,
"{}://{}/actors/{}",
scheme,
domain_scanned,
actor_name
)?;
if &domain == &domain_scanned {
actor_list.push(actor_name);
}
}
if let Some(actor) = optional_target {
actor_list.push(actor);
}
Ok(actor_list)
}

pub async fn post_inbox(
Host(domain): Host,
Path(actor): Path<String>,
State(state): State<SharedState>,
Json(activity): Json<Activity>,
) -> Result<()> {
post_inbox_handler(domain, Some(actor), state, activity).await
}

pub async fn post_shared_inbox(
Host(domain): Host,
State(state): State<SharedState>,
Json(activity): Json<Activity>,
) -> Result<()> {
post_inbox_handler(domain, None, state, activity).await
}

async fn post_inbox_handler(
domain: String,
actor: Option<String>,
state: SharedState,
activity: Activity,
) -> Result<()> {
let mongo_client = state.lock().await.mongo.clone();
let conn = state.lock().await.rbmq_pool.get().await?;
let channel = conn.create_channel().await?;
let base_exchange = state.lock().await.acitivity_process_channel.clone();
//TODO: HTTP Signature verification

match activity {
Activity::Create(create) => {
// Actor is receiving a message from someone
let targets = get_target_actors(
actor,
domain.clone(),
create.to.clone(),
create.cc.clone().unwrap_or(vec![]),
)?;
let exchange = format!("{base_exchange}.create");
for target in targets {
let handle = format!("{target}@{domain}");

let timeline_collection = mongo_client
.database("inboxes")
.collection::<Activity>(&handle);
let doc = Activity::Create(create.clone());
timeline_collection.insert_one(&doc, None).await?;
channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&serde_json::to_vec(&doc)?,
AMQPProperties::default(),
)
.await?;
}
Ok(())
}
Activity::Follow(follow) => {
let exchange = format!("{base_exchange}.follow");
channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&serde_json::to_vec(&follow)?,
AMQPProperties::default(),
)
.await?;

Ok(())
}
Activity::Accept(accept) => {
let exchange = format!("{base_exchange}.accept");

channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&serde_json::to_vec(&accept)?,
AMQPProperties::default(),
)
.await?;

Ok(())
}
Activity::Announce(announce) => {
let targets = get_target_actors(
actor,
domain.clone(),
announce.to.clone(),
announce.cc.clone().unwrap_or(vec![]),
)?;
let exchange = format!("{base_exchange}.announce");
for target in targets {
let handle = format!("{target}@{domain}");

let timeline_collection = mongo_client
.database("inboxes")
.collection::<Activity>(&handle);
let doc = Activity::Announce(announce.clone());
timeline_collection.insert_one(&doc, None).await?;
channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&serde_json::to_vec(&doc)?,
AMQPProperties::default(),
)
.await?;
}
Ok(())
}
Activity::Like(like) => {
let exchange = format!("{base_exchange}.like");
channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&serde_json::to_vec(&like)?,
AMQPProperties::default(),
)
.await?;

Ok(())
}
Activity::EchoRequest(_) => Ok(()),
}
}
56 changes: 54 additions & 2 deletions src/domainservd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
mod actors;
mod collections;
mod inbox;
mod webfinger;

use crate::{
domainservd::{actors::get_actor, collections::get_outbox_collection},
domainservd::{
actors::get_actor,
collections::get_outbox_collection,
inbox::{post_inbox, post_shared_inbox},
},
PrismaClient,
};
use axum::{http::StatusCode, response::IntoResponse, routing::get, Json, Router};
use axum::{
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use clap::{Parser, Subcommand};
use config::File;
use lapin::{options::ExchangeDeclareOptions, types::FieldTable};
use miette::Diagnostic;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -43,6 +54,18 @@ pub enum DomainServdError {
#[error(transparent)]
MongoValueAccessError(#[from] mongodb::bson::raw::ValueAccessError),

#[error(transparent)]
CreatePoolError(#[from] deadpool_lapin::CreatePoolError),

#[error(transparent)]
LapinError(#[from] lapin::Error),

#[error(transparent)]
LapinPoolError(#[from] deadpool_lapin::PoolError),

#[error(transparent)]
SerdeJson(#[from] serde_json::Error),

#[error("could not find {0}")]
NotFound(String),
}
Expand Down Expand Up @@ -85,6 +108,8 @@ pub struct Args {
pub struct Config {
pub postgres: PostgresConfig,
pub mongodb: MongoDBConfig,
pub rabbitmq: deadpool_lapin::Config,
pub acitivity_process_channel: String,
pub listen: String,
pub use_ssl: bool,
}
Expand All @@ -109,6 +134,8 @@ struct ServerState {
use_ssl: bool,
prisma: crate::PrismaClient,
mongo: mongodb::Client,
rbmq_pool: deadpool_lapin::Pool,
acitivity_process_channel: String,
}

type SharedState = Arc<Mutex<ServerState>>;
Expand All @@ -125,6 +152,8 @@ pub fn read_config(args: &Args) -> Result<Config> {
)?
.set_default("listen", "127.0.0.1:3000")?
.set_default("use_ssl", false)?
.set_default("rabbitmq.url", "amqp://dev:dev@localhost:5672/dev")?
.set_default("acitivity_process_channel", "activity.process")?
.add_source(File::with_name("domainservd").required(false))
.add_source(File::with_name("/etc/oxifed/domainservd").required(false))
.set_override_option("connection_string", args.connection_string.clone())?
Expand All @@ -145,12 +174,35 @@ pub async fn listen(cfg: Config) -> Result<()> {
use_ssl: cfg.use_ssl,
prisma: prisam_client,
mongo: mongo_client.clone(),
rbmq_pool: cfg.rabbitmq.create_pool(Some(deadpool::Runtime::Tokio1))?,
acitivity_process_channel: cfg.acitivity_process_channel,
}));

{
let conn = shared_state.lock().await.rbmq_pool.get().await?;
let init_channel = conn.create_channel().await?;
let base_queue_name = shared_state.lock().await.acitivity_process_channel.clone();
debug!("Initializing processing exchanges");
for kind in vec!["create", "follow", "accept", "announce", "like"] {
let exchange_name = format!("{base_queue_name}.{kind}");

init_channel
.exchange_declare(
&exchange_name,
lapin::ExchangeKind::Fanout,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
}
}

let app = Router::new()
.route("/.well-known/webfinger", get(get_webfinger))
.route("/actors/:actor", get(get_actor))
.route("/actors/:actor/outbox", get(get_outbox_collection))
.route("/actors/:actor/inbox", post(post_inbox))
.route("/inbox", post(post_shared_inbox))
.with_state(shared_state);

let listener = tokio::net::TcpListener::bind(&cfg.listen).await?;
Expand Down

0 comments on commit 1df3be8

Please sign in to comment.