Skip to content

Commit

Permalink
Merge pull request #94 from freifunk-saar/refactor
Browse files Browse the repository at this point in the history
a bit of refactoring
  • Loading branch information
RalfJung authored Dec 31, 2023
2 parents 6e7bf7e + 0c161bf commit b17b3dc
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 328 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ serde_json = "1.0"
serde_repr = "0.1"
rmp-serde = "1"
anyhow = "1.0.31"
thiserror = "1.0"
url = { version = "2.2", features = ["serde"] }
base64 = "0.21"
hex = "0.4.3"
Expand Down
7 changes: 4 additions & 3 deletions src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use rmp_serde::to_vec as serialize_to_vec;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};

use crate::db::DbConn;
use crate::email::EmailAddress;
use crate::models::*;
use crate::schema::*;
use crate::util::EmailAddress;

#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Copy, Clone, FromFormField)]
#[repr(u8)]
Expand Down Expand Up @@ -73,14 +74,14 @@ impl Action {
}
}

pub async fn run(&self, db: &crate::DbConn) -> Result<bool> {
pub async fn run(&self, db: &DbConn) -> Result<bool> {
let op = self.op;
let node = self.node.clone();
let email = self.email.clone();
db.run(move |db| {
let m = Monitor {
id: node.as_str(),
email: email.as_str(),
email: &email,
};
Ok(match op {
Operation::Add => {
Expand Down
34 changes: 1 addition & 33 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::borrow::Cow;

use rocket::fairing::{AdHoc, Fairing};
use rocket::request::{self, FromRequest, Request};
use rocket::{self, http::uri, request::Outcome, State};
use rocket_dyn_templates::Template;
use rocket::http::uri;

use anyhow::{bail, Result};
use lettre::address::Address;
Expand Down Expand Up @@ -104,31 +100,3 @@ impl Config {
Ok(vals)
}
}

/// A request guard that makes the config available to all templates.
pub struct Renderer<'a>(&'a Config);

#[rocket::async_trait]
impl<'r> FromRequest<'r> for Renderer<'r> {
type Error = ();

async fn from_request(request: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
Outcome::Success(Renderer(
request
.guard::<&State<Config>>()
.await
.expect("config")
.inner(),
))
}
}

impl<'a> Renderer<'a> {
pub fn render(
&self,
name: impl Into<Cow<'static, str>>,
vals: serde_json::Value,
) -> Result<Template> {
Ok(Template::render(name, self.0.template_vals(vals)?))
}
}
233 changes: 113 additions & 120 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@ use std::collections::HashMap;

use anyhow::{bail, Result};
use diesel::prelude::*;
use rocket::uri;
use serde_json::{self, json};
use thiserror::Error;

use crate::config;
use rocket::uri;

use crate::db::DbConn;
use crate::email::EmailAddress;
use crate::models;
use crate::routes;
use crate::schema::*;
use crate::util::EmailAddress;
use crate::util::EmailSender;

#[derive(Debug, Error)]
enum NodeListError {
#[error("got unsupported version number {version}")]
UnsupportedVersion { version: usize },
}
use crate::util::Ctx;

mod json {
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -118,125 +112,123 @@ pub enum UpdateResult {
}

/// Fetch the latest nodelist, update node state and send out emails
pub async fn update_nodes(
db: &crate::DbConn,
config: &config::Config,
email_sender: EmailSender<'_>,
) -> Result<UpdateResult> {
let cur_nodes: json::Nodes = reqwest::get(config.urls.nodes.clone())
.await?
.json()
.await?;

if cur_nodes.version != 2 {
bail!(NodeListError::UnsupportedVersion {
version: cur_nodes.version
});
}
impl<'r> Ctx<'r> {
pub async fn update_nodes(&self, db: &DbConn) -> Result<UpdateResult> {
let config = self.config();
let cur_nodes: json::Nodes = reqwest::get(config.urls.nodes.clone())
.await?
.json()
.await?;

// Build node HashMap: map node ID to name and online state
let mut cur_nodes_map: HashMap<String, NodeData> = HashMap::new();
for cur_node in cur_nodes.nodes.into_iter() {
if let Some((id, data)) = json_to_node_data(cur_node) {
cur_nodes_map.insert(id, data);
if cur_nodes.version != 2 {
bail!(
"unsupported hopglass node list version: {}",
cur_nodes.version
);
}
}

// Stop here if nearly all nodes are offline
let online_nodes = cur_nodes_map.values().filter(|data| data.online).count();
if online_nodes < config.ui.min_online_nodes.unwrap_or(0) {
return Ok(UpdateResult::NotEnoughOnline(online_nodes));
}
// Build node HashMap: map node ID to name and online state
let mut cur_nodes_map: HashMap<String, NodeData> = HashMap::new();
for cur_node in cur_nodes.nodes.into_iter() {
if let Some((id, data)) = json_to_node_data(cur_node) {
cur_nodes_map.insert(id, data);
}
}

// Stop here if nearly all nodes are offline
let online_nodes = cur_nodes_map.values().filter(|data| data.online).count();
if online_nodes < config.ui.min_online_nodes.unwrap_or(0) {
return Ok(UpdateResult::NotEnoughOnline(online_nodes));
}

// Compute which nodes changed their state, also update node names in DB
let changed: Vec<(String, NodeData)> = db
.run(move |db| {
db.transaction::<_, anyhow::Error, _>(|db| {
{
let mut changed = Vec::new();

// Go over every node in the database
let db_nodes = nodes::table.load::<models::NodeQuery>(db)?;
for db_node in db_nodes.into_iter() {
let (id, db_data) = model_to_node_data(db_node);
if let Some(cur_data) = cur_nodes_map.remove(&id) {
// We already know this node.
// Did it change?
if cur_data != db_data {
// Update in database
diesel::update(nodes::table.find(id.as_str()))
.set((
nodes::name.eq(cur_data.name.as_str()),
nodes::online.eq(cur_data.online),
))
.execute(db)?;
// Compute which nodes changed their state, also update node names in DB
let changed: Vec<(String, NodeData)> = db
.run(move |db| {
db.transaction::<_, anyhow::Error, _>(|db| {
{
let mut changed = Vec::new();

// Go over every node in the database
let db_nodes = nodes::table.load::<models::NodeQuery>(db)?;
for db_node in db_nodes.into_iter() {
let (id, db_data) = model_to_node_data(db_node);
if let Some(cur_data) = cur_nodes_map.remove(&id) {
// We already know this node.
// Did it change?
if cur_data != db_data {
// Update in database
diesel::update(nodes::table.find(id.as_str()))
.set((
nodes::name.eq(cur_data.name.as_str()),
nodes::online.eq(cur_data.online),
))
.execute(db)?;
}
// Did its online status change?
if cur_data.online != db_data.online {
changed.push((id, cur_data));
}
} else {
// The node is in the DB but does not exist any more.
diesel::delete(nodes::table.find(id.as_str())).execute(db)?;
if db_data.online {
// The node was online, so it being gone is a change to offline
changed.push((
id,
NodeData {
online: false,
..db_data
},
));
}
}
// Did its online status change?
if cur_data.online != db_data.online {
}

// Go over nodes remaining in the hash map -- they are not in the DB
for (id, cur_data) in cur_nodes_map.into_iter() {
// Insert into DB
diesel::insert_into(nodes::table)
.values(&models::Node {
id: id.as_str(),
name: cur_data.name.as_str(),
online: cur_data.online,
})
.execute(db)?;
if cur_data.online {
// The node is online, so it appearing is a change from the implicit offline
// it was in when it did not exist.
changed.push((id, cur_data));
}
} else {
// The node is in the DB but does not exist any more.
diesel::delete(nodes::table.find(id.as_str())).execute(db)?;
if db_data.online {
// The node was online, so it being gone is a change to offline
changed.push((
id,
NodeData {
online: false,
..db_data
},
));
}
}
}

// Go over nodes remaining in the hash map -- they are not in the DB
for (id, cur_data) in cur_nodes_map.into_iter() {
// Insert into DB
diesel::insert_into(nodes::table)
.values(&models::Node {
id: id.as_str(),
name: cur_data.name.as_str(),
online: cur_data.online,
})
.execute(db)?;
if cur_data.online {
// The node is online, so it appearing is a change from the implicit offline
// it was in when it did not exist.
changed.push((id, cur_data));
}
Ok(changed)
}

Ok(changed)
}
})
})
.await?;

// Send out notifications (not in the transaction as we don't really care here -- also
// we have an external side-effect, the email, which we cannot roll back anyway)
for (id, cur_data) in changed.into_iter() {
// See who monitors this node
let watchers = db
.run({
let id = id.clone();
move |db| {
monitors::table
.filter(monitors::id.eq(id.as_str()))
.load::<models::MonitorQuery>(db)
}
})
})
.await?;
// Send them email
let node = cur_data.into_model(id);
for watcher in watchers.iter() {
// Generate email text
let email = EmailAddress::new(watcher.email.clone()).unwrap();
let list_url = config.urls.absolute(uri!(routes::list(email = &email)));
// Build and send email
email_sender
.email(

// Send out notifications (not in the transaction as we don't really care here -- also
// we have an external side-effect, the email, which we cannot roll back anyway)
for (id, cur_data) in changed.into_iter() {
// See who monitors this node
let watchers = db
.run({
let id = id.clone();
move |db| {
monitors::table
.filter(monitors::id.eq(id.as_str()))
.load::<models::MonitorQuery>(db)
}
})
.await?;
// Send them email
let node = cur_data.into_model(id);
for watcher in watchers.iter() {
// Generate email text
let email = EmailAddress::new(watcher.email.clone()).unwrap();
let list_url = config.urls.absolute(uri!(routes::list(email = &email)));
// Build and send email
self.email(
"notification",
json!({
"node": node,
Expand All @@ -245,8 +237,9 @@ pub async fn update_nodes(
watcher.email.as_str(),
)
.await?;
}
}
}

Ok(UpdateResult::AllOk)
Ok(UpdateResult::AllOk)
}
}
23 changes: 23 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use diesel_migrations::MigrationHarness;

use rocket::fairing::{AdHoc, Fairing};
use rocket_sync_db_pools::{database, diesel};

// DB connection guard type
#[database("postgres")]
pub struct DbConn(diesel::PgConnection);

pub fn migration() -> impl Fairing {
AdHoc::on_ignite("Run DB migrations", move |rocket| async move {
let migrations = diesel_migrations::FileBasedMigrations::find_migrations_directory()
.expect("could not load migrations");
let conn = DbConn::get_one(&rocket)
.await
.expect("could not connect to DB for migrations");
conn.run(move |db| {
db.run_pending_migrations(migrations).unwrap();
})
.await;
rocket
})
}
Loading

0 comments on commit b17b3dc

Please sign in to comment.