Skip to content

Commit

Permalink
Fix Activity Parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Till Wegmueller <[email protected]>
  • Loading branch information
Toasterson committed Oct 31, 2023
1 parent 73dc420 commit 855e047
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 12 deletions.
36 changes: 36 additions & 0 deletions crates/activitypub/src/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Activity {
Follow(Follow),
Accept(Accept),
Announce(Announce),
Like(Like),
EchoRequest(EchoRequest),
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand All @@ -18,6 +20,7 @@ pub enum CreateType {
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Create {
#[serde(rename = "@context")]
pub context: Context,
pub id: Url,
#[serde(rename = "type")]
Expand All @@ -35,6 +38,7 @@ pub enum FollowType {
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Follow {
#[serde(rename = "@context")]
pub context: Context,
pub id: Url,
#[serde(rename = "type")]
Expand All @@ -51,6 +55,7 @@ pub enum AcceptType {
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Accept {
#[serde(rename = "@context")]
pub context: Context,
pub id: Url,
#[serde(rename = "type")]
Expand All @@ -67,6 +72,7 @@ pub enum AnnounceType {

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Announce {
#[serde(rename = "@context")]
pub context: Context,
pub id: Url,
#[serde(rename = "type")]
Expand All @@ -77,3 +83,33 @@ pub struct Announce {
pub cc: Option<Vec<Url>>,
pub object: Url,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum EchoRequestType {
EchoRequest,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EchoRequest {
#[serde(rename = "@context")]
pub context: Context,
#[serde(rename = "type")]
pub kind: EchoRequestType,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum LikeType {
Like,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Like {
#[serde(rename = "@context")]
pub context: Context,
pub id: Url,
#[serde(rename = "type")]
pub kind: LikeType,
pub actor: Url,
#[serde(deserialize_with = "deserialize_skip_error", default)]
pub to: Vec<Url>,
}
54 changes: 42 additions & 12 deletions crates/domainservd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
};
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::debug;
use tracing_subscriber::prelude::*;
use url::Url;
use webfinger::{Link, Webfinger};
Expand Down Expand Up @@ -111,7 +112,7 @@ enum Command {
#[derive(Debug, Serialize, Deserialize)]
struct Config {
domain: String,
addr: Option<String>,
addr: String,
use_ssl: bool,
accounts: Option<Vec<ConfigUser>>,
amqp_url: String,
Expand Down Expand Up @@ -162,7 +163,7 @@ const INBOX_RECEIVE_QUEUE: &str = "inbox";
async fn load_config() -> Result<Config> {
use config::{builder::DefaultState, ConfigBuilder, File};
let builder = ConfigBuilder::<DefaultState>::default()
.set_default("addr", "127.0.0.1:3001")?
.set_default("addr", "0.0.0.0:3001")?
.set_default("domain", "localhost:3001")?
.set_default("use_ssl", false)?
.set_default("amqp_url", "amqp://dev:dev@localhost:5672/master")?
Expand All @@ -175,20 +176,14 @@ async fn load_config() -> Result<Config> {
}

async fn listen(cfg: &Config) -> Result<()> {
let addr = if let Some(addr) = &cfg.addr {
addr.clone()
} else {
String::from("localhost:3001")
};

let mut pool_config = deadpool_lapin::Config::default();
pool_config.url = Some(cfg.amqp_url.clone());

let mq_pool = pool_config
.create_pool(Some(deadpool_lapin::Runtime::Tokio1))
.map_err(|e| Error::InternalError(e.to_string()))?;

tracing::debug!("Opening RabbitMQ Connection");
tracing::debug!("Opening RabbitMQ Connection: {}", &cfg.amqp_url);
let conn = mq_pool.get().await?;
tracing::debug!(
"Connected to {} as {}",
Expand All @@ -198,7 +193,11 @@ async fn listen(cfg: &Config) -> Result<()> {

let channel = conn.create_channel().await?;

tracing::debug!("Defining inbox queue from channel id {}", channel.id());
tracing::debug!(
"Defining inbox: {} queue from channel id {}",
INBOX_RECEIVE_QUEUE,
channel.id()
);
channel
.queue_declare(
INBOX_RECEIVE_QUEUE,
Expand Down Expand Up @@ -236,8 +235,8 @@ async fn listen(cfg: &Config) -> Result<()> {
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(Arc::new(Mutex::new(state)));

tracing::debug!("Starting Webserver");
axum::Server::try_bind(&addr.parse()?)
tracing::debug!("Starting Webserver on: {}", &cfg.addr);
axum::Server::try_bind(&cfg.addr.parse()?)
.map_err(|e| Error::HyperError(e.to_string()))?
.serve(app.into_make_service())
.await
Expand Down Expand Up @@ -336,6 +335,21 @@ async fn post_shared_inbox(
activity.cc = Some(filter_recipients(&accounts, &cc));
}

if activity.to.len() > 0 {
queue_activity(state, activity, INBOX_RECEIVE_QUEUE).await?;
Ok(StatusCode::CREATED)
} else {
Err(Error::UnknownUser)
}
}
Activity::EchoRequest(_) => {
debug!("Received an echo request");
Ok(StatusCode::CREATED)
}
Activity::Like(like) => {
let mut activity = like.clone();
activity.to = filter_recipients(&accounts, &activity.to);

if activity.to.len() > 0 {
queue_activity(state, activity, INBOX_RECEIVE_QUEUE).await?;
Ok(StatusCode::CREATED)
Expand Down Expand Up @@ -411,6 +425,22 @@ async fn post_inbox(
activity.cc = Some(filter_recipients(&accounts, &cc));
}

if activity.to.len() > 0 {
queue_activity(state, activity, INBOX_RECEIVE_QUEUE).await?;
Ok(StatusCode::CREATED)
} else {
Err(Error::UnknownUser)
}
}
Activity::EchoRequest(_) => {
debug!("Received an echo request");
Ok(StatusCode::CREATED)
}
Activity::Like(like) => {
let mut activity = like.clone();
let actor = format!("{}/actors/{}", &base_url, &actor);
activity.to = vec![actor.parse()?];

if activity.to.len() > 0 {
queue_activity(state, activity, INBOX_RECEIVE_QUEUE).await?;
Ok(StatusCode::CREATED)
Expand Down
2 changes: 2 additions & 0 deletions crates/identd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ impl ServerState {
provider_metadata: ActiveValue::Set(provider_meta_string),
jwks: ActiveValue::Set(jwks_string),
issuer_url: ActiveValue::Set(base_url),
biscuit_private_key: ActiveValue::Set(String::new()),
biscuit_public_key: ActiveValue::Set(String::new()),
};

realm.insert(realm_db).await?;
Expand Down
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ services:
volumes:
- couchdb:/opt/couchdb/data

identd:
build:
context: .
args: ["PACKAGE=identd"]
ports:
- "3000:3000"

domainservd:
build:
context: .
args: ["PACKAGE=domainservd"]
ports:
- "3000:3000"
command:
- "start"

publisherd:
build:
context: .
args: ["PACKAGE=publisherd"]
ports:
- "3000:3000"

volumes:
rabbitmq:
couchdb:

0 comments on commit 855e047

Please sign in to comment.