Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send usage data in 1-month portions #806

Merged
merged 5 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rita_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ antenna_forwarding_client = { path = "../antenna_forwarding_client" }
settings = { path = "../settings" }
sha3 = "0.10"
lettre = "0.10"
rand = "0.8.0"
phonenumber = "0.3"
babel_monitor = { path = "../babel_monitor" }
arrayvec = {version= "0.7", features = ["serde"]}
Expand Down
246 changes: 73 additions & 173 deletions rita_client/src/operator_update/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! This module is responsible for checking in with the operator server and getting updated local settings
pub mod tests;
pub mod update_loop;
pub mod updater;
extern crate openssh_keys;
Expand All @@ -19,6 +20,7 @@ use num256::Uint256;
use rita_common::rita_loop::is_gateway;
use rita_common::tunnel_manager::neighbor_status::get_neighbor_status;
use rita_common::tunnel_manager::shaping::flag_reset_shaper;
use rita_common::usage_tracker::UsageHour as RCUsageHour;
use rita_common::usage_tracker::UsageType::{Client, Relay};
use rita_common::usage_tracker::{get_current_hour, get_usage_data};
use rita_common::utils::option_convert;
Expand Down Expand Up @@ -168,53 +170,20 @@ pub async fn operator_update(ops_last_seen_usage_hour: Option<u64>) -> u64 {
}
};
let last_seen_hour = ops_last_seen_usage_hour.unwrap_or(0);
let mut hours_to_send: u64 = 0;
// first check whats the last saved hr, send everything from that hr on
// but, we need to save a var of the last x hrs not seen, and load that up with the next checkin cycle if >0.
if current_hour - last_seen_hour > 0 {
hours_to_send = current_hour - last_seen_hour;
}
// We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour
let send_hours = current_hour - last_seen_hour > 1;
let mut usage_tracker_data: Option<UsageTracker> = None;
// only deal with this if we actually do need to send some hours
// if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much
// data we need to send, which will be populated with the next checkin cycle
if hours_to_send != 0 && ops_last_seen_usage_hour.is_some() {
let mut usage_data_client = get_usage_data(Client);
let mut usage_data_relay = get_usage_data(Relay);
let mut new_client_data: VecDeque<UsageHour> = VecDeque::new();
let mut new_relay_data: VecDeque<UsageHour> = VecDeque::new();
for _hour in 0..hours_to_send {
// pop front, add to front of new vecdeque if exists.
while let Some(data) = usage_data_client.pop_front() {
if data.index > last_seen_hour {
new_client_data.push_front(UsageHour {
up: data.up,
down: data.down,
price: data.price,
index: data.index,
});
break;
}
}

// pop front, add to front of new vecdeque if exists.
while let Some(data) = usage_data_relay.pop_front() {
if data.index > last_seen_hour {
new_relay_data.push_front(UsageHour {
up: data.up,
down: data.down,
price: data.price,
index: data.index,
});
break;
}
}
}
usage_tracker_data = Some(UsageTracker {
last_save_hour: current_hour,
client_bandwidth: new_client_data,
relay_bandwidth: new_relay_data,
});
// if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much data we need to send,
// which will be populated with the next checkin cycle. we only send 1 month at a time if ops is requesting the full usage history.
if send_hours && ops_last_seen_usage_hour.is_some() {
jkilpatr marked this conversation as resolved.
Show resolved Hide resolved
let usage_data_client = get_usage_data(Client);
let usage_data_relay = get_usage_data(Relay);
usage_tracker_data = process_usage_data(
usage_data_client,
usage_data_relay,
last_seen_hour,
current_hour,
)
}

let exit_con = Some(ExitConnection {
Expand Down Expand Up @@ -680,134 +649,65 @@ fn contains_forbidden_key(map: Map<String, Value>, forbidden_values: &[&str]) ->
false
}

#[cfg(test)]
mod tests {
use std::{fs, io::Error, path::Path};

use serde_json::json;

use super::*;

const FORBIDDEN_MERGE_VALUES: [&str; 2] = ["test_key", "other_test_key"];

#[test]
fn test_contains_key() {
// exact key match should fail
let object = json!({"localization": { "wyre_enabled": true, "wyre_account_id": "test_key", "test_key": false}});
if let Value::Object(map) = object {
assert!(contains_forbidden_key(map, &FORBIDDEN_MERGE_VALUES));
} else {
panic!("Not a json map!");
}

// slightly modified key should not match
let object = json!({"localization": { "wyre_enabled": true, "wyre_account_id": "test_key", "test_key1": false}});
if let Value::Object(map) = object {
assert!(!contains_forbidden_key(map, &FORBIDDEN_MERGE_VALUES));
} else {
panic!("Not a json map!");
/// Given a vecdeque of usage hours, add up to a month's worth of hours to a returned vecdeque
pub fn iterate_month_usage_data(mut data: VecDeque<RCUsageHour>) -> VecDeque<UsageHour> {
// one month in hours
let max_hour_iterations: u32 = 730;
let mut client_iter = 0;
let mut res = VecDeque::new();
while let Some(hour) = data.pop_front() {
// either we hit max iterations or we are on the second to last entry.
res.push_back(UsageHour {
up: hour.up,
down: hour.down,
price: hour.price,
index: hour.index,
});
if client_iter >= max_hour_iterations || data.len() == 1 {
break;
}
client_iter += 1;
}
fn touch_temp_file(file_name: &str) -> &str {
let test_file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(file_name);
let operator_key = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIL+UBakquB9rJ7tA2H+U43H/xNmpJiHpOkHGpVfFUXgP OPERATOR";
writeln!(test_file.unwrap(), "{operator_key}").expect("setup failed to create temp file");
operator_key
}
fn remove_temp_file(file_name: &str) -> Result<(), Error> {
fs::remove_file(file_name)
}
fn parse_keys(file_name: &str) -> Vec<String> {
let mut temp = Vec::new();
let expected = File::open(file_name).unwrap();
let reader = BufReader::new(expected);
for key in reader.lines() {
temp.push(key.unwrap());
}
temp
}

#[test]
fn test_update_auth_keys() {
let added_keys = vec![String::from("ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHFgFrnSm9MFS1zpHHvwtfLohjqtsK13NyL41g/zyIhK test@hawk-net")];
let removed_keys = vec![];
let key_file: &str = "authorized_keys";
let operator_key = touch_temp_file(key_file);

let _update = update_authorized_keys(added_keys.clone(), removed_keys, key_file);
let result = parse_keys(key_file);
assert_eq!(result.len(), 2);
assert!(result.contains(&added_keys[0]));
assert!(result.contains(&operator_key.to_string()));
remove_temp_file(key_file).unwrap();
}

#[test]
fn test_update_auth_multiple_keys() {
let added_keys = vec![String::from("ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHFgFrnSm9MFS1zpHHvwtfLohjqtsK13NyL41g/zyIhK test@hawk-net"),
String::from("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDVF1POOko4/fTE/SowsURSmd+kAUFDX6VPNqICJjn8eQk8FZ15WsZKfBdrGXLhl2+pxM66VWMUVRQOq84iSRVSVPA3abz0H7JYIGzO8psTweSZfK1jwHfKDGQA1h1aPuspnPrX7dyS1qLZf3YeVUUi+BFsW2gSiMadbS4zal2c2F1AG5Ezr3zcRVA8y3D0bZxScPAEX74AeTFcimHpHFyzDtUsRpf0uSEXZcMFqX5j4ETKlIs28k1v8LlhHo91IQYHEtbyi/I1M0axbF4VCz5JlcbAs9LUEJg8Kx8LxzJSeSJbxVwyk5WiEDwVsCL2MAtaOcJ+/FhxLb0ZEELAHnXFNSqmY8QoHeSdHrGP7FmVCBjRb/AhVUHYvsG94rO3Ij4H5XsbsQbP3AHVKbvf387WB53Wga7VrBXvRC9aDisetdP9+4/seVIBbOIePotaiHoTyS1cJ+Jg0PkKy96enqwMt9T1Wt8jURB+s/A/bDGHkjB3dxomuGxux8dD6UNX54M= test-rsa@hawk-net"),
];
let removed_keys = vec![];
let key_file: &str = "add_keys";

let operator_key = touch_temp_file(key_file);

let _update = update_authorized_keys(added_keys.clone(), removed_keys, key_file);
let result = parse_keys(key_file);
assert!(result.contains(&added_keys[0]));
assert!(result.contains(&added_keys[1]));
assert!(result.contains(&operator_key.to_string()));
assert_eq!(result.len(), 3);
remove_temp_file(key_file).unwrap();
}

#[test]
fn test_update_auth_remove_keys() {
let added_keys = vec![];
let removed_keys = vec![
String::from("ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHFgFrnSm9MFS1zpHHvwtfLohjqtsK13NyL41g/zyIhK test@hawk-net"),
String::from("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDVF1POOko4/fTE/SowsURSmd+kAUFDX6VPNqICJjn8eQk8FZ15WsZKfBdrGXLhl2+pxM66VWMUVRQOq84iSRVSVPA3abz0H7JYIGzO8psTweSZfK1jwHfKDGQA1h1aPuspnPrX7dyS1qLZf3YeVUUi+BFsW2gSiMadbS4zal2c2F1AG5Ezr3zcRVA8y3D0bZxScPAEX74AeTFcimHpHFyzDtUsRpf0uSEXZcMFqX5j4ETKlIs28k1v8LlhHo91IQYHEtbyi/I1M0axbF4VCz5JlcbAs9LUEJg8Kx8LxzJSeSJbxVwyk5WiEDwVsCL2MAtaOcJ+/FhxLb0ZEELAHnXFNSqmY8QoHeSdHrGP7FmVCBjRb/AhVUHYvsG94rO3Ij4H5XsbsQbP3AHVKbvf387WB53Wga7VrBXvRC9aDisetdP9+4/seVIBbOIePotaiHoTyS1cJ+Jg0PkKy96enqwMt9T1Wt8jURB+s/A/bDGHkjB3dxomuGxux8dD6UNX54M= test-rsa@hawk-net"),
];
let key_file: &str = "auth_remove_keys";

let operator_key = touch_temp_file(key_file);

let _update = update_authorized_keys(added_keys, removed_keys, key_file);
let result = parse_keys(key_file);
assert!(result.contains(&operator_key.to_string()));

assert_eq!(result.len(), 1);

remove_temp_file(key_file).unwrap();
}
#[test]
fn test_removing_existing_key() {
let added_keys = vec![];
let key_file: &str = "remove_keys";

let operator_key = touch_temp_file(key_file);
let removed_keys = vec![String::from(operator_key)];
let _update = update_authorized_keys(added_keys, removed_keys.clone(), key_file);

let result = parse_keys(key_file);
for item in result {
assert_eq!(item, removed_keys[0].to_string());
}
res
}

remove_temp_file(key_file).unwrap();
}
#[test]
fn test_authorized_keys_create_if_missing() {
let added_keys = vec![
String::from("ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHFgFrnSm9MFS1zpHHvwtfLohjqtsK13NyL41g/zyIhK test@hawk-net ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDVF1POOko4/fTE/SowsURSmd+kAUFDX6VPNqICJjn8eQk8FZ15WsZKfBdrGXLhl2+pxM66VWMUVRQOq84iSRVSVPA3abz0H7JYIGzO8psTweSZfK1jwHfKDGQA1h1aPuspnPrX7dyS1qLZf3YeVUUi+BFsW2gSiMadbS4zal2c2F1AG5Ezr3zcRVA8y3D0bZxScPAEX74AeTFcimHpHFyzDtUsRpf0uSEXZcMFqX5j4ETKlIs28k1v8LlhHo91IQYHEtbyi/I1M0axbF4VCz5JlcbAs9LUEJg8Kx8LxzJSeSJbxVwyk5WiEDwVsCL2MAtaOcJ+/FhxLb0ZEELAHnXFNSqmY8QoHeSdHrGP7FmVCBjRb/AhVUHYvsG94rO3Ij4H5XsbsQbP3AHVKbvf387WB53Wga7VrBXvRC9aDisetdP9+4/seVIBbOIePotaiHoTyS1cJ+Jg0PkKy96enqwMt9T1Wt8jURB+s/A/bDGHkjB3dxomuGxux8dD6UNX54M= test-rsa@hawk-net"),
];
let removed_keys: Vec<String> = vec![];
let key_file: &str = "create_keys_file";
let _update = update_authorized_keys(added_keys, removed_keys, key_file);
assert!(Path::new(key_file).exists());
}
/// Given our saved usage data and our last seen value, process the vecdeques so that we only
/// send to ops new data since last seen.
pub fn process_usage_data(
mut usage_data_client: VecDeque<RCUsageHour>,
mut usage_data_relay: VecDeque<RCUsageHour>,
last_seen_hour: u64,
current_hour: u64,
) -> Option<UsageTracker> {
// sort client and relay data in case they have come out of order somehow. This sorts by index increasing so newest data at back
usage_data_relay
.make_contiguous()
.sort_by(|a, b| a.index.cmp(&b.index));
usage_data_client
.make_contiguous()
.sort_by(|a, b| a.index.cmp(&b.index));

// so this spits out the index for where last seen is, or the index of the next highest hour(returned in an error).
// we take the result -1 just in case, limit 0, since it's possible we might get back an index out of bounds at the back.
jkilpatr marked this conversation as resolved.
Show resolved Hide resolved
let client_last_seen_index =
match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
let relay_last_seen_index =
match usage_data_relay.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
// remove all data before the last seen index
usage_data_client.drain(0..client_last_seen_index);
usage_data_relay.drain(0..relay_last_seen_index);
let new_client_data = iterate_month_usage_data(usage_data_client);
let new_relay_data = iterate_month_usage_data(usage_data_relay);

Some(UsageTracker {
last_save_hour: current_hour,
client_bandwidth: new_client_data,
relay_bandwidth: new_relay_data,
})
}
Loading
Loading