diff --git a/domain_connector/Cargo.toml b/domain_connector/Cargo.toml deleted file mode 100644 index 73dbbd9..0000000 --- a/domain_connector/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "oxifed_domain_connector" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -activitypub_federation = { version = "0.4.0", features = ["axum"], default-features = false } -async-trait = "0.1.68" -axum = "0.6.12" -axum-macros = "0.3.7" -chrono = { version = "0.4.24", features = ["serde"] } -clap = { version = "4.1.14", features = ["derive", "env"] } -config = "0.13.3" -enum_delegate = "0.2.0" -lapin = "2.2.1" -miette = { version = "5.6.0", features = ["fancy"] } -semver = { version = "1.0.17", features = ["serde"] } -serde = { version = "1.0.159", features = ["derive"] } -serde_json = "1.0.96" -thiserror = "1.0.40" -tokio = { version = "1.27.0", features = ["fs"] } -tracing = "0.1.37" -tracing-subscriber = "0.3.16" -url = { version = "2.3.1", features = ["serde"] } -uuid = { version = "1.3.3", features = ["serde", "v4"] } diff --git a/domain_connector/src/activities/create_post.rs b/domain_connector/src/activities/create_post.rs deleted file mode 100644 index c22b685..0000000 --- a/domain_connector/src/activities/create_post.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::objects::article::Article; -use crate::objects::{article::InternalArticle, person::InternalPerson}; -use crate::AppData; -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::activity::CreateType, - protocol::helpers::deserialize_one_or_many, - traits::{ActivityHandler, Object}, -}; -use serde::{Deserialize, Serialize}; -use url::Url; - -#[derive(Deserialize, Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct CreateArticle { - pub(crate) actor: ObjectId, - #[serde(deserialize_with = "deserialize_one_or_many")] - pub(crate) to: Vec, - pub(crate) object: Article, - #[serde(rename = "type")] - pub(crate) kind: CreateType, - pub(crate) id: Url, -} - -#[async_trait::async_trait] -impl ActivityHandler for CreateArticle { - type DataType = AppData; - type Error = crate::Error; - - fn id(&self) -> &Url { - &self.id - } - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn verify(&self, data: &Data) -> std::result::Result<(), Self::Error> { - InternalArticle::verify(&self.object, &self.id, data).await?; - Ok(()) - } - - async fn receive(self, data: &Data) -> std::result::Result<(), Self::Error> { - InternalArticle::from_json(self.object, data).await?; - Ok(()) - } -} diff --git a/domain_connector/src/activities/mod.rs b/domain_connector/src/activities/mod.rs deleted file mode 100644 index 7e15ee0..0000000 --- a/domain_connector/src/activities/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod create_post; diff --git a/domain_connector/src/lib.rs b/domain_connector/src/lib.rs deleted file mode 100644 index 350ae0a..0000000 --- a/domain_connector/src/lib.rs +++ /dev/null @@ -1,255 +0,0 @@ -mod activities; -mod objects; - -use crate::objects::person::PersonAcceptedActivities; -use activitypub_federation::{ - axum::{ - inbox::{receive_activity, ActivityData}, - json::FederationJson, - }, - config::{Data, FederationConfig, FederationMiddleware}, - fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger}, - protocol::context::WithContext, - traits::Object, -}; -use axum::{ - extract::{Path, Query}, - http::StatusCode, - response::IntoResponse, - routing::{get, post}, - Json, Router, -}; -use axum_macros::debug_handler; -use config::{builder::AsyncState, ConfigBuilder, File, FileFormat}; -use lapin::{ - options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}, - types::FieldTable, - BasicProperties, Channel, Connection, ConnectionProperties, Consumer, Queue, -}; -use miette::Diagnostic; -use objects::person::{InternalPerson, Person}; -use serde::Deserialize; -use std::net::ToSocketAddrs; -use thiserror::Error; -use tracing::info; -use url::Url; - -#[derive(Debug, Error, Diagnostic)] -pub enum Error { - #[error(transparent)] - IOError(#[from] std::io::Error), - - #[error(transparent)] - VarError(#[from] std::env::VarError), - - #[error(transparent)] - URLParseError(#[from] url::ParseError), - - #[error(transparent)] - ActivityPubError(#[from] activitypub_federation::error::Error), - - #[error(transparent)] - ConfigError(#[from] config::ConfigError), - - #[error(transparent)] - JsonError(#[from] serde_json::Error), - - #[error("delivery error: {0}")] - DeliveryError(String), - - #[error("no such person {0}")] - NoSuchPerson(String), - - #[error("no reply from data server")] - NoReply, -} - -impl IntoResponse for Error { - fn into_response(self) -> axum::response::Response { - let error_str = format!("{}", &self); - (StatusCode::INTERNAL_SERVER_ERROR, error_str).into_response() - } -} - -pub type Result = miette::Result; - -#[derive(Debug)] -pub struct AppData { - pub(crate) mq_conn: Connection, - pub(crate) mq_chan: Channel, - pub(crate) mq_callback_queue: Queue, - pub(crate) mq_callback_consumer: Consumer, - pub(crate) domain: String, -} - -impl AppData { - pub async fn new() -> Result { - let builder = ConfigBuilder::::default() - .set_default("domain", "localhost")? - .set_default("amqp_addr", "amqp://127.0.0.1:5672")? - .add_source(File::new("config", FileFormat::Toml)); - let cfg = builder.build().await?; - - let conn = Connection::connect( - cfg.get_string("amqp_addr")?.as_str(), - ConnectionProperties::default(), - ) - .await?; - let channel = conn.create_channel().await?; - - let callback_queue = channel - .queue_declare( - "", - QueueDeclareOptions { - exclusive: true, - ..Default::default() - }, - FieldTable::default(), - ) - .await?; - - let callback_consumer = channel - .basic_consume( - callback_queue.name().as_str(), - "domain_connector_callback", - BasicConsumeOptions { - no_ack: true, - ..Default::default() - }, - FieldTable::default(), - ) - .await?; - - Ok(AppData { - mq_conn: conn, - mq_chan: channel, - mq_callback_queue: callback_queue, - mq_callback_consumer: callback_consumer, - domain: cfg.get_string("domain")?, - }) - } - - pub fn get_domain(&self) -> String { - self.domain.clone() - } - - pub async fn get_object_by_id(&mut self, object_id: Url, correlation_id: &str) -> Result { - self.mq_chan - .basic_publish( - "", - "get_obejct_by_id", - BasicPublishOptions::default(), - object_id.into(), - BasicProperties::default() - .with_reply_to(self.mq_callback_queue.name().clone()) - .with_correlation_id(correlation_id.into()), - ) - .await? - .await?; - while let Some(delivery) = self.mq_callback_consumer.next().await { - match delivery { - Ok(delivery) => { - if delivery.properties.correlation_id().as_ref() == Some(correlation_id) { - let object = serde_json::from_slice(delivery.data.as_slice())?; - delivery.ack(BasicAckOptions::default()).await?; - return Ok(object); - } - } - Err(error) => return Err(Error::DeliveryError(error)), - } - } - - Err(Error::NoReply) - } - - pub async fn receive_object<'de, T>(&mut self, object: &T, correlation_id: &str) -> Result - where - T: serde::Serialize + serde::Deserialize<'de>, - { - self.mq_chan - .basic_publish( - "", - "receive_obkect", - BasicPublishOptions::default(), - &serde_json::to_vec(object)?, - BasicProperties::default() - .with_reply_to(self.mq_callback_queue.name().clone()) - .with_correlation_id(correlation_id.into()), - ) - .await? - .await?; - while let Some(delivery) = self.mq_callback_consumer.next().await { - match delivery { - Ok(delivery) => { - if delivery.properties.correlation_id().as_ref() == Some(correlation_id) { - let object = serde_json::from_slice(delivery.data.as_slice())?; - delivery.ack(BasicAckOptions::default()).await?; - return Ok(object); - } - } - Err(error) => return Err(Error::DeliveryError(error)), - } - } - - Err(Error::NoReply) - } -} - -pub async fn listen(config: &FederationConfig) -> Result<()> { - let hostname = config.domain(); - info!("Listening on {hostname}"); - let config = config.clone(); - let app = Router::new() - .route("/:user/inbox", post(http_post_user_inbox)) - .route("/:user", get(http_get_user)) - .route("/.well-known/webfinger", get(webfinger)) - .layer(FederationMiddleware::new(config)); - - let addr = hostname - .to_socket_addrs()? - .next() - .expect("Failed to lookup domain name"); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await -} - -#[debug_handler] -async fn http_get_user( - Path(name): Path, - data: Data, -) -> std::result::Result>, Error> { - let db_user = data.get_user(&name).await?; - let json_user = db_user.into_json(&data).await?; - Ok(FederationJson(WithContext::new_default(json_user))) -} - -#[debug_handler] -async fn http_post_user_inbox( - data: Data, - activity_data: ActivityData, -) -> impl IntoResponse { - receive_activity::, InternalPerson, AppData>( - activity_data, - &data, - ) - .await -} - -#[derive(Deserialize)] -struct WebfingerQuery { - resource: String, -} - -#[debug_handler] -async fn webfinger( - Query(query): Query, - data: Data, -) -> std::result::Result, Error> { - let name = extract_webfinger_name(&query.resource, &data)?; - let db_user = data.read_user(&name)?; - Ok(Json(build_webfinger_response( - query.resource, - db_user.ap_id.into_inner(), - ))) -} diff --git a/domain_connector/src/main.rs b/domain_connector/src/main.rs deleted file mode 100644 index 6466c60..0000000 --- a/domain_connector/src/main.rs +++ /dev/null @@ -1,29 +0,0 @@ -use activitypub_federation::config::FederationConfig; -use miette::Context; -use miette::IntoDiagnostic; -use oxifed_server::*; -use tracing::{info, Level}; -use tracing_subscriber::FmtSubscriber; - -#[tokio::main] -async fn main() -> Result<()> { - let app_data = AppData::new().await?; - - let fedi_config = FederationConfig::builder() - .app_data(app_data) - .domain(app_data.get_domain()) - .build() - .into_diagnostic()?; - - // a builder for `FmtSubscriber`. - let subscriber = FmtSubscriber::builder() - // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.) - // will be written to stdout. - .with_max_level(Level::TRACE) - // completes the builder. - .finish(); - - tracing::subscriber::set_global_default(subscriber).into_diagnostic()?; - - listen(&fedi_config).await -} diff --git a/domain_connector/src/objects/article.rs b/domain_connector/src/objects/article.rs deleted file mode 100644 index 359b68b..0000000 --- a/domain_connector/src/objects/article.rs +++ /dev/null @@ -1,72 +0,0 @@ -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::object::ArticleType, - protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match}, - traits::Object, -}; -use semver::Version; -use serde::{Deserialize, Serialize}; -use url::Url; - -use super::person::InternalPerson; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct InternalArticle { - pub id: super::ObjectId, - pub raw_content: String, - pub version: Version, - pub local: bool, - pub content: String, - pub author: Url, -} - -#[derive(Debug, Deserialize, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Article { - id: ObjectId, - #[serde(rename = "type")] - kind: ArticleType, - name: String, - content: String, - pub(crate) attributed_to: ObjectId, - #[serde(deserialize_with = "deserialize_one_or_many")] - pub(crate) to: Vec, - pub(crate) cc: Option>, -} - -#[async_trait::async_trait] -impl Object for InternalArticle { - type DataType = crate::AppData; - type Kind = Article; - type Error = crate::Error; - - async fn read_from_id( - object_id: Url, - data: &Data, - ) -> std::result::Result, Self::Error> { - todo!() - } - - async fn into_json( - self, - _data: &Data, - ) -> std::result::Result { - todo!() - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> std::result::Result<(), Self::Error> { - todo!() - } - - async fn from_json( - json: Self::Kind, - data: &Data, - ) -> std::result::Result { - todo!() - } -} diff --git a/domain_connector/src/objects/mod.rs b/domain_connector/src/objects/mod.rs deleted file mode 100644 index 8c5f48f..0000000 --- a/domain_connector/src/objects/mod.rs +++ /dev/null @@ -1,76 +0,0 @@ -use activitypub_federation::{fetch::object_id::ObjectId as ActivityPubId, traits::Object}; -use serde::{Deserialize, Serialize}; -use std::fmt::Display; -use url::Url; - -pub(crate) mod article; -pub(crate) mod person; -pub(crate) mod post; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ObjectKind { - Person, - Article, - Post, -} - -impl Display for ObjectKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ObjectKind::Person => write!(f, "person"), - ObjectKind::Article => write!(f, "article"), - ObjectKind::Post => write!(f, "post"), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ObjectId { - Local { - id: uuid::Uuid, - domain: String, - kind: ObjectKind, - }, - Remote { - id: Url, - }, -} - -impl ObjectId { - pub fn to_object_id(&self) -> crate::Result> - where - K: Object, - for<'de2> ::Kind: Deserialize<'de2>, - K: Send, - { - match self { - ObjectId::Local { id, domain, kind } => Ok(ActivityPubId::::parse( - format!("//{}/{}/{}", domain, kind, id).as_str(), - )?), - ObjectId::Remote { id } => Ok(ActivityPubId::::parse(id.to_string().as_str())?), - } - } -} - -impl Display for ObjectId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ObjectId::Local { id, domain, kind } => write!(f, "//{}/{}/{}", domain, kind, id), - ObjectId::Remote { id } => write!(f, "{}", id), - } - } -} - -impl ObjectId { - pub fn new_local(domain: &str, kind: ObjectKind) -> Self { - Self::Local { - id: uuid::Uuid::new_v4(), - domain: domain.to_owned(), - kind, - } - } - - pub fn new_remote(url: Url) -> Self { - Self::Remote { id: url } - } -} diff --git a/domain_connector/src/objects/person.rs b/domain_connector/src/objects/person.rs deleted file mode 100644 index 0c149e1..0000000 --- a/domain_connector/src/objects/person.rs +++ /dev/null @@ -1,208 +0,0 @@ -use crate::activities::create_post::CreateArticle; -use crate::AppData; -use crate::{Error, Result}; -use activitypub_federation::config::Data; -use activitypub_federation::http_signatures::generate_actor_keypair; -use activitypub_federation::protocol::public_key::PublicKey; -use activitypub_federation::protocol::verification::verify_domains_match; -use activitypub_federation::traits::{Actor, Object}; -use activitypub_federation::{fetch::object_id::ObjectId, kinds::actor::PersonType}; - -use chrono::Local; -use chrono::NaiveDateTime; -use serde::{Deserialize, Serialize}; -use url::Url; -use Debug; - -use super::article::InternalArticle; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct InternalPerson { - pub prefered_username: String, - pub display_name: String, - pub password_hash: Option, - pub email: Option, - pub ap_id: Url, - pub inbox: Url, - pub outbox: Url, - pub local: bool, - pub public_key: String, - pub private_key: Option, - pub last_refreshed_at: NaiveDateTime, - pub followers: Vec, -} - -impl InternalPerson { - pub fn build_ap_id(hostname: &str, name: &str) -> Result { - Ok(Url::parse(&format!("//{}/{}", hostname, name))?) - } - - pub fn new_local(hostname: &str, name: String) -> Result { - let ap_id = InternalPerson::build_ap_id(hostname, &name)?.into(); - let inbox = Url::parse(&format!("//{}/{}/inbox", hostname, &name))?; - let outbox = Url::parse(&format!("//{}/{}/outbox", hostname, &name))?; - let keypair = generate_actor_keypair()?; - Ok(InternalPerson { - prefered_username: name.clone(), - ap_id, - inbox, - public_key: keypair.public_key, - private_key: Some(keypair.private_key), - last_refreshed_at: Local::now().naive_local(), - local: true, - display_name: name.clone(), - password_hash: None, - email: None, - outbox, - followers: vec![], - }) - } - - pub fn followers(&self) -> &Vec { - &self.followers - } - - pub fn followers_url(&self) -> Result { - Ok(Url::parse(&format!( - "{}/followers", - self.ap_id.to_string() - ))?) - } - - pub async fn follow(&self, other: &str, data: &Data) -> Result<()> { - /* - let other: InternalPerson = webfinger_resolve_actor(other, data).await?; - let id = generate_object_id(data.domain())?; - let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send(follow, vec![other.shared_inbox_or_inbox()], data) - .await?; - Ok(()) - */ - todo!() - } - - pub async fn post(&self, post: InternalArticle, data: &Data) -> Result<()> { - /* - let id = generate_object_id(data.domain())?; - let create = CreatePost::new(post.into_json(data).await?, id.clone()); - let mut inboxes = vec![]; - for f in self.followers.clone() { - let user: InternalPerson = ObjectId::from(f).dereference(data).await?; - inboxes.push(user.shared_inbox_or_inbox()); - } - self.send(create, inboxes, data).await?; - Ok(()) - */ - todo!() - } -} - -impl From for InternalPerson { - fn from(value: Person) -> Self { - InternalPerson { - prefered_username: value.preferred_username, - display_name: value.name, - password_hash: None, - email: None, - ap_id: value.id.inner().clone(), - inbox: value.inbox, - outbox: value.outbox, - local: false, - public_key: value.public_key.public_key_pem, - private_key: None, - last_refreshed_at: Local::now().naive_local(), - followers: vec![], - } - } -} - -#[async_trait::async_trait] -impl Object for InternalPerson { - type DataType = AppData; - - type Kind = Person; - - type Error = Error; - - async fn read_from_id( - object_id: Url, - data: &Data, - ) -> std::result::Result, Self::Error> { - //TODO: Implement Serializable Option Type so I can send over the wire of nothing got found - Ok(data.get_object_by_id(object_id, "person").await.ok()) - } - - async fn into_json( - self, - _data: &Data, - ) -> std::result::Result { - Ok(self.into()) - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> std::result::Result<(), Self::Error> { - verify_domains_match(json.id.inner(), expected_domain)?; - Ok(()) - } - - async fn from_json( - json: Self::Kind, - data: &Data, - ) -> std::result::Result { - Ok(data.receive_object(&json, "receive_person").await?.into()) - } -} - -impl Actor for InternalPerson { - fn id(&self) -> Url { - self.ap_id.clone() - } - - fn public_key_pem(&self) -> &str { - &self.public_key - } - - fn private_key_pem(&self) -> Option { - self.private_key.clone() - } - - fn inbox(&self) -> Url { - self.inbox.clone() - } -} - -#[derive(Deserialize, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Person { - id: ObjectId, - #[serde(rename = "type")] - kind: PersonType, - preferred_username: String, - name: String, - inbox: Url, - outbox: Url, - public_key: PublicKey, -} - -impl From for Person { - fn from(value: InternalPerson) -> Self { - Self { - id: ObjectId::from(value.ap_id.clone()), - kind: PersonType::Person, - preferred_username: value.prefered_username.clone(), - name: value.display_name.clone(), - inbox: value.inbox.clone(), - outbox: value.outbox.clone(), - public_key: value.public_key().clone(), - } - } -} - -#[derive(Deserialize, Serialize, Debug)] -#[serde(untagged)] -pub enum PersonAcceptedActivities { - CreateArticle(CreateArticle), -} diff --git a/domain_connector/src/objects/post.rs b/domain_connector/src/objects/post.rs deleted file mode 100644 index 0d8bb23..0000000 --- a/domain_connector/src/objects/post.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::{objects::person::InternalPerson, AppData, Error}; -use activitypub_federation::fetch::object_id::ObjectId as ActivityPubId; -use activitypub_federation::{ - config::Data, - fetch::object_id::ObjectId, - kinds::{object::NoteType, public}, - protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match}, - traits::Object, -}; -use bonsaidb::core::schema::{Collection, SerializedCollection}; -use serde::{Deserialize, Serialize}; -use url::Url; - -#[derive(Clone, Debug, Collection, PartialEq, Serialize, Deserialize)] -#[collection(name = "Post", primary_key = String, natural_id = |post: &DbPost| Some(post.id.to_string()))] -pub struct DbPost { - pub id: super::ObjectId, - pub text: String, - pub creator: ObjectId, -} - -impl DbPost { - pub fn new_local(text: String, creator: ObjectId) -> Result { - let id = super::ObjectId::new_local( - creator.inner().domain().unwrap_or("localhost"), - super::ObjectKind::Post, - ); - Ok(DbPost { id, text, creator }) - } -} - -#[derive(Deserialize, Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct Note { - #[serde(rename = "type")] - kind: NoteType, - id: ObjectId, - pub(crate) attributed_to: ObjectId, - #[serde(deserialize_with = "deserialize_one_or_many")] - pub(crate) to: Vec, - pub(crate) cc: Option>, -} - -#[async_trait::async_trait] -impl Object for DbPost { - type DataType = AppData; - type Kind = Note; - type Error = Error; - - async fn read_from_id( - object_id: Url, - data: &Data, - ) -> Result, Self::Error> { - Ok(DbPost::get_async(object_id.to_string(), &data.db) - .await? - .map(|doc| doc.contents)) - } - - async fn into_json(self, data: &Data) -> Result { - let creator = self.creator.dereference_local(data).await?; - Ok(Note { - kind: Default::default(), - id: self.id.to_object_id::()?, - attributed_to: self.creator, - to: vec![public(), creator.followers_url()?], - content: self.text, - }) - } - - async fn verify( - json: &Self::Kind, - expected_domain: &Url, - _data: &Data, - ) -> Result<(), Self::Error> { - verify_domains_match(json.id.inner(), expected_domain)?; - Ok(()) - } - - async fn from_json(json: Self::Kind, data: &Data) -> Result { - let post = DbPost { - text: json.content, - ap_id: json.id, - creator: json.attributed_to, - local: false, - }; - - let mut lock = data.posts.lock().unwrap(); - lock.push(post.clone()); - Ok(post) - } -}