Skip to content

Commit

Permalink
Adding RabbitMQ RPC API
Browse files Browse the repository at this point in the history
Signed-off-by: Till Wegmueller <[email protected]>
  • Loading branch information
Toasterson committed Jul 21, 2023
1 parent bdf30f3 commit 6a214e1
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 61 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 96 additions & 24 deletions crates/identd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use axum::{Form, Json, Router};
use base64::{engine::general_purpose, Engine as _};
use bcrypt::{hash, verify, DEFAULT_COST};
use chrono::Utc;
use config::{Config, Environment};
use deadpool_lapin::Pool;
use futures::{join, StreamExt};
use lapin::message::Delivery;
use lapin::BasicProperties;
use lapin::{options::*, types::FieldTable};
use miette::Diagnostic;
use openidconnect::core::{
Expand Down Expand Up @@ -44,7 +45,7 @@ use tokio::sync::Mutex;

use url::Url;

const ADMIN_QUEUE_NAME: &str = "identd.admin";
pub const ADMIN_QUEUE_NAME: &str = "identd.admin";

#[derive(Debug, Error, Diagnostic)]
pub enum Error {
Expand Down Expand Up @@ -111,6 +112,9 @@ pub enum Error {
#[error(transparent)]
JsonWebTokenError(#[from] openidconnect::JsonWebTokenError),

#[error(transparent)]
ConfigError(#[from] config::ConfigError),

#[error("unautorized")]
Unauthorized,

Expand Down Expand Up @@ -173,7 +177,7 @@ pub struct ServerConfig {
pub domain: String,
pub use_ssl: bool,
pub realm_keys_base_path: PathBuf,
pub realms: Vec<ConfigRealm>,
pub realms: Option<Vec<ConfigRealm>>,
pub db_url: String,
pub realm_db_url: String,
}
Expand All @@ -186,7 +190,7 @@ impl Default for ServerConfig {
domain: String::from("localhost:4200"),
use_ssl: false,
realm_keys_base_path: Path::new("keys").to_path_buf(),
realms: vec![],
realms: None,
db_url: String::from("sqlite://identd.db?mode=rwc"),
realm_db_url: String::from("sqlite://"),
}
Expand Down Expand Up @@ -214,10 +218,33 @@ impl ServerConfig {
Ok(conn)
}
}

pub fn new(config_file: Option<String>) -> Result<Self> {
let mut cfg = Config::builder()
.add_source(config::File::with_name("/etc/identd.toml").required(false))
.add_source(config::File::with_name("identd.toml").required(false));

if let Some(path) = config_file {
cfg = cfg.add_source(config::File::with_name(&path));
}

cfg = cfg.add_source(Environment::with_prefix("identd"));
cfg = cfg.set_default("amqp_url", "amqp://dev:[email protected]:5672/master")?;
cfg = cfg.set_default("listen_addr", "127.0.0.1:4200")?;
cfg = cfg.set_default("domain", "localhost:4200")?;
cfg = cfg.set_default("use_ssl", false)?;
cfg = cfg.set_default("realm_keys_base_path", "keys")?;
cfg = cfg.set_default("db_url", "sqlite://identd.db?mode=rwc")?;
cfg = cfg.set_default("realm_db_url", "sqlite://")?;

let s = cfg.build()?;

Ok(s.try_deserialize()?)
}
}

// A realm as it is in the config file
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ConfigRealm {
pub name: String,
pub domain: Option<String>,
Expand Down Expand Up @@ -279,20 +306,19 @@ impl ServerState {
.create_pool(Some(deadpool_lapin::Runtime::Tokio1))
.map_err(|e| Error::ErrorValue(e.to_string()))?;

for cfg_realm in config.realms.iter() {
let domain_or_default = cfg_realm
.domain
.clone()
.unwrap_or(String::from("localhost:4200"));
Self::create_realm(
&realm_db,
&cfg_realm.name,
&domain_or_default,
helper_get_scheme_from_config(config.use_ssl),
cfg_realm.clients.clone(),
&config.realm_keys_base_path,
)
.await?;
if let Some(realms) = config.realms.clone() {
for cfg_realm in realms.iter() {
let domain_or_default = cfg_realm.domain.clone().unwrap_or(config.domain.clone());
Self::create_realm(
&realm_db,
&cfg_realm.name,
&domain_or_default,
helper_get_scheme_from_config(config.use_ssl),
cfg_realm.clients.clone(),
&config.realm_keys_base_path,
)
.await?;
}
}

Ok(Self {
Expand Down Expand Up @@ -502,6 +528,21 @@ pub enum AdminMessage {
},
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AdminResponse {
Success,
Error { message: String },
}

impl std::fmt::Display for AdminResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AdminResponse::Success => write!(f, "success"),
AdminResponse::Error { message } => write!(f, "{}", message),
}
}
}

async fn init_rmq_listen(pool: Pool, user_db: DatabaseConnection) -> Result<()> {
let rmq_con = pool
.get()
Expand Down Expand Up @@ -530,20 +571,51 @@ async fn init_rmq_listen(pool: Pool, user_db: DatabaseConnection) -> Result<()>
tracing::info!("rmq consumer connected, waiting for messages");
while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
let tag = delivery.delivery_tag.clone();
match handle_admin_message(&user_db, delivery).await {
Ok(_) => channel.basic_ack(tag, BasicAckOptions::default()).await?,
match handle_admin_message(&user_db, delivery.data.as_slice()).await {
Ok(_) => {
if let Some(reply_queue) = delivery.properties.reply_to() {
let err_msg = AdminResponse::Success;
let err_payload = serde_json::to_vec(&err_msg)?;
channel
.basic_publish(
"",
reply_queue.as_str(),
BasicPublishOptions::default(),
&err_payload,
BasicProperties::default(),
)
.await?;
}

delivery.ack(BasicAckOptions::default()).await?
}
Err(err) => {
tracing::error!(error = err.to_string(), "failed to handle message");
if let Some(reply_queue) = delivery.properties.reply_to() {
let err_msg = AdminResponse::Error {
message: err.to_string(),
};
let err_payload = serde_json::to_vec(&err_msg)?;
channel
.basic_publish(
"",
reply_queue.as_str(),
BasicPublishOptions::default(),
&err_payload,
BasicProperties::default(),
)
.await?;
}
delivery.ack(BasicAckOptions::default()).await?;
}
}
}
}
Ok(())
}

async fn handle_admin_message(user_db: &DatabaseConnection, delivery: Delivery) -> Result<()> {
let msg: AdminMessage = serde_json::from_slice(&delivery.data)?;
async fn handle_admin_message(user_db: &DatabaseConnection, data: &[u8]) -> Result<()> {
let msg: AdminMessage = serde_json::from_slice(data)?;
use user::user::Column;
use user::user::{ActiveModel as Model, Entity as UserEntity};
tracing::debug!("received message {:?}, processing.", msg);
Expand Down
9 changes: 5 additions & 4 deletions crates/identd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
struct Args {
#[command(subcommand)]
command: Commands,
config: Option<String>,
}

#[derive(Debug, Subcommand, Clone, Default)]
Expand All @@ -23,6 +24,7 @@ pub enum Commands {
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let cfg = ServerConfig::new(args.config)?;
match args.command {
Commands::Start => {
tracing_subscriber::registry()
Expand All @@ -35,7 +37,7 @@ async fn main() -> Result<()> {
)
.with(tracing_subscriber::fmt::layer())
.init();
let cfg = ServerConfig::default();

let server = ServerState::new(&cfg).await?;

identd::listen(server).await
Expand All @@ -53,7 +55,7 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Starting migrations by manual command");
let cfg = ServerConfig::default();

tracing::info!("Connecting to database URL: {}", &cfg.db_url);
let conn = cfg.open_db_conn().await?;
tracing::debug!("Mark: Staring migration; connection was sucessfull");
Expand All @@ -73,8 +75,7 @@ async fn main() -> Result<()> {
)
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Starting migrations by manual command");
let cfg = ServerConfig::default();

tracing::info!("Connecting to database URL: sqlite://realms.db?mode=rwc");
let conn = cfg
.open_realm_db_conn(Some(String::from("realms.db")))
Expand Down
11 changes: 9 additions & 2 deletions crates/identd/tests/auth_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,22 @@ async fn setup_server() -> Result<()> {

cfg.db_url = String::from("sqlite://");

cfg.realms.push(identd::ConfigRealm {
let master_realm = identd::ConfigRealm {
name: String::from("master"),
domain: None,
clients: vec![identd::Client::new(
"client_id",
None,
"https://localhost:4300/callback",
)],
});
};

if let Some(mut realms) = cfg.realms.clone() {
realms.push(master_realm);
cfg.realms = Some(realms);
} else {
cfg.realms = Some(vec![master_realm])
}

let server = identd::ServerState::new(&cfg).await?;

Expand Down
7 changes: 7 additions & 0 deletions crates/realmadm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-global-executor = "2.3.1"
clap = { workspace = true, features = ["derive"] }
config.workspace = true
futures-lite = "1.13.0"
identd = { version = "0.1.0-dev", path = "../identd" }
lapin.workspace = true
miette = { workspace = true, features = ["fancy"] }
rand = "0.8.5"
rsa = "0.9.2"
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Loading

0 comments on commit 6a214e1

Please sign in to comment.