Skip to content

Commit

Permalink
Add unit test for usage tracker to ops updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ch-iara committed Jul 28, 2023
1 parent b12fd32 commit 0b7ba63
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 180 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rita_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ antenna_forwarding_client = { path = "../antenna_forwarding_client" }
settings = { path = "../settings" }
sha3 = "0.10"
lettre = "0.10"
rand = "0.8.0"
phonenumber = "0.3"
babel_monitor = { path = "../babel_monitor" }
arrayvec = {version= "0.7", features = ["serde"]}
Expand Down
232 changes: 148 additions & 84 deletions rita_client/src/operator_update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,65 +172,36 @@ pub async fn operator_update(ops_last_seen_usage_hour: Option<u64>) -> u64 {
// We check that the difference is >1 because we leave a 1 hour buffer to prevent from sending over an incomplete current hour
let send_hours = current_hour - last_seen_hour > 1;
let mut usage_tracker_data: Option<UsageTracker> = None;
// if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much
// data we need to send, which will be populated with the next checkin cycle. 730 is the average numbers of hours
// in a month, and we only send 1 month at a time if ops is requesting the full usage history.
// if ops_last_seen_usage_hour is a None the thread has restarted and we are waiting for ops to tell us how much data we need to send,
// which will be populated with the next checkin cycle. we only send 1 month at a time if ops is requesting the full usage history.
if send_hours && ops_last_seen_usage_hour.is_some() {
let mut usage_data_client = get_usage_data(Client);
let mut usage_data_relay = get_usage_data(Relay);
let mut new_client_data: VecDeque<UsageHour> = VecDeque::new();
let mut new_relay_data: VecDeque<UsageHour> = VecDeque::new();

// ops is expecting data as [oldest..newest] but now we sent newest .. oldest
let client_oldest = match usage_data_client.back() {
Some(x) => x.index,
None => 0,
};
let relay_oldest = match usage_data_relay.back() {
Some(x) => x.index,
None => 0,
};
// if the last seen hour is earlier (lower) than our earliest saved index, we are uploading entire history so don't worry about position since last seen
if last_seen_hour < client_oldest {
new_client_data = iterate_month_usage_data(usage_data_client);
} else if client_oldest == last_seen_hour + 2 {
// we are simply updating the second most recent (because the most recent may be incomplete until it rolls over)
if let Some(data) = usage_data_client.get(1) {
if data.index == last_seen_hour + 1 {
new_client_data.push_front(UsageHour {
up: data.up,
down: data.down,
price: data.price,
index: data.index,
});
}
}
} else {
// binary search until we find the last seen hour or hour in the vecdeque with index before that (as in cases of gaps in data)
let pos = find_position_since_last_seen(&usage_data_client, last_seen_hour);
// drain all older data as it has been uploaded to ops already
usage_data_client.drain(pos..);
new_client_data = iterate_month_usage_data(usage_data_client);
}
// then repeat for relay data
if last_seen_hour < relay_oldest {
new_relay_data = iterate_month_usage_data(usage_data_relay);
} else if relay_oldest == last_seen_hour + 2 {
if let Some(data) = usage_data_relay.get(1) {
if data.index == last_seen_hour + 1 {
new_relay_data.push_front(UsageHour {
up: data.up,
down: data.down,
price: data.price,
index: data.index,
});
}
}
} else {
let pos = find_position_since_last_seen(&usage_data_relay, last_seen_hour);
usage_data_relay.drain(pos..);
new_relay_data = iterate_month_usage_data(usage_data_relay);
}
// sort client and relay data in case they have come out of order somehow. This sorts by index increasing so newest data at back
usage_data_relay
.make_contiguous()
.sort_by(|a, b| a.index.cmp(&b.index));
usage_data_client
.make_contiguous()
.sort_by(|a, b| a.index.cmp(&b.index));

// so this spits out the index for where last seen is, or the index of the next highest hour(returned in an error).
// we take the result -1 just in case, limit 0, since it's possible we might get back an index out of bounds at the back.
let client_last_seen_index =
match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
let relay_last_seen_index =
match usage_data_relay.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
usage_data_client.drain(0..client_last_seen_index);
usage_data_relay.drain(0..relay_last_seen_index);
let new_client_data = iterate_month_usage_data(usage_data_client);
let new_relay_data = iterate_month_usage_data(usage_data_relay);

usage_tracker_data = Some(UsageTracker {
last_save_hour: current_hour,
Expand Down Expand Up @@ -702,38 +673,21 @@ fn contains_forbidden_key(map: Map<String, Value>, forbidden_values: &[&str]) ->
false
}

/// Binary search the usage data until we get close enough to the last seen(either exact match or the next oldest
/// data). usage data is saved newest at the front, oldest at the back, meaning largest indexes at the front. Returns
/// the index of the position from which to iterate.
fn find_position_since_last_seen(usage: &VecDeque<RCUsageHour>, last_seen: u64) -> usize {
let mut left = 0;
let mut right = usage.len();

while left < right {
let mid = left + (right - left) / 2;
if usage[mid].index > last_seen {
left = mid + 1;
} else {
right = mid;
}
}
left
}

fn iterate_month_usage_data(mut data: VecDeque<RCUsageHour>) -> VecDeque<UsageHour> {
/// Given a vecdeque of usage hours, add up to a month's worth of hours to a returned vecdeque
pub fn iterate_month_usage_data(mut data: VecDeque<RCUsageHour>) -> VecDeque<UsageHour> {
// one month in hours
let max_hour_iterations: u32 = 730;
let mut client_iter = 0;
let mut res = VecDeque::new();
while let Some(hour) = data.pop_back() {
// either we hit max iterations or we are on the second to last entry
while let Some(hour) = data.pop_front() {
// either we hit max iterations or we are on the second to last entry.
res.push_back(UsageHour {
up: hour.up,
down: hour.down,
price: hour.price,
index: hour.index,
});
if client_iter >= max_hour_iterations || data.len() == 1 {
res.push_front(UsageHour {
up: hour.up,
down: hour.down,
price: hour.price,
index: hour.index,
});
break;
}
client_iter += 1;
Expand All @@ -743,9 +697,13 @@ fn iterate_month_usage_data(mut data: VecDeque<RCUsageHour>) -> VecDeque<UsageHo

#[cfg(test)]
mod tests {
use std::{fs, io::Error, path::Path};

use rand::seq::SliceRandom;
// TODO: Why is this import broken?
//use rita_common::usage_tracker::generate_dummy_usage_tracker;
use rita_common::usage_tracker::UsageHour as RCUsageHour;
use rita_common::usage_tracker::UsageTracker as RCUsageTracker;
use serde_json::json;
use std::{fs, io::Error, path::Path};

use super::*;

Expand Down Expand Up @@ -871,4 +829,110 @@ mod tests {
let _update = update_authorized_keys(added_keys, removed_keys, key_file);
assert!(Path::new(key_file).exists());
}
#[test]
fn test_usage_data_processing() {
// this tests the flow used in rita client's operator update loop used to process usage data sent up to ops
let dummy_usage_tracker = generate_dummy_usage_tracker_temp();
let mut usage_data_client = dummy_usage_tracker.client_bandwidth.clone();
// Test the sort function first:
// shuffle the data because it's currently ordered
usage_data_client
.make_contiguous()
.shuffle(&mut rand::thread_rng());
println!(
"Sample of current shuffle is {} {} {}",
usage_data_client.get(0).unwrap().index,
usage_data_client.get(1).unwrap().index,
usage_data_client.get(2).unwrap().index
);
// Sort by index increasing so newest data at back. Note that usage hours are stored to disk as
// the opposite order where newest are added to the front, so this is inefficient.
// Options here to optimize are either a/write my own binary sort again which will compare for the existing structure
// where the saved vecdeque is highest index to lowest index, b/rework usage tracker so that we save data lowest index
// to highest index, or c/the current solution(inefficient, as we will be fully reversing the whole vecdeque of each
// client and relay every checkin): sort the entire list in reverse order to use with the builtin bin search from vecdeque

// this here sorts from lowest index to highest index, so we end with a vecdeque that we can use binary search on
usage_data_client
.make_contiguous()
.sort_by(|a, b| a.index.cmp(&b.index));
println!(
"Sample of sorted list is {} {} {}",
usage_data_client.get(0).unwrap().index,
usage_data_client.get(1).unwrap().index,
usage_data_client.get(2).unwrap().index
);
assert!(
usage_data_client.get(0).unwrap().index < usage_data_client.get(1).unwrap().index
&& usage_data_client.get(1).unwrap().index
< usage_data_client.get(2).unwrap().index
);
// Next test the binary search
// Case A: no gaps in data when searching through for the last seen hour
// for the purposes of this test we will look at the 10th entry in the list
let client_oldest = match dummy_usage_tracker.client_bandwidth.front() {
Some(x) => x.index,
None => 0,
};
let last_seen_hour = client_oldest - 10;
// so this spits out the index for where last seen is, or the index of the next highest hour(returned in an error).
// we take the result -1 just in case, limit 0, since it's possible we might get back an index out of bounds at the back.
let last_seen_position =
match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
assert!(usage_data_client.get(last_seen_position).unwrap().index == last_seen_hour);
// now for Case B: we have a gap in the data
usage_data_client.remove(last_seen_position);
let last_seen_position =
match usage_data_client.binary_search_by(|x| x.index.cmp(&last_seen_hour)) {
Ok(p) => p,
Err(p) => p.saturating_sub(1),
};
// so we must retrieve the next earliest entry from where the last seen would be:
assert!(usage_data_client.get(last_seen_position).unwrap().index == last_seen_hour - 1);

// now that we have the position of where to start (keep in mind these are sorted vecdeques, and we only need larger
// (later) indexes than the last seen.) we can drain any earlier entries up to the last seen, and send off the result
// to the iterate function
usage_data_client.drain(0..last_seen_position);
let new_client_data = iterate_month_usage_data(usage_data_client);
// finally, check that the returned list to be sent back to ops is sorted as intended:
assert!(
new_client_data.get(0).unwrap().index < new_client_data.get(1).unwrap().index
&& new_client_data.get(1).unwrap().index < new_client_data.get(2).unwrap().index
);
}

// generates a usage tracker struct for testing without payments since these do not get send up in ops updates.
// using this while I can't get the import working... as a note the original function needs to be updated to push to back
// instead of front, as this generates data in the wrong order
fn generate_dummy_usage_tracker_temp() -> RCUsageTracker {
let current_hour = get_current_hour().unwrap();
RCUsageTracker {
last_save_hour: current_hour,
client_bandwidth: generate_bandwidth(current_hour),
relay_bandwidth: generate_bandwidth(current_hour),
exit_bandwidth: generate_bandwidth(current_hour),
payments: VecDeque::new(),
}
}
#[cfg(test)]
// generates dummy usage hour data randomly
fn generate_bandwidth(starting_hour: u64) -> VecDeque<RCUsageHour> {
use rand::{thread_rng, Rng};
// 8760 is the max number of saved usage entries(1 year)
let num_to_generate: u16 = thread_rng().gen_range(50..8760);
let mut output = VecDeque::new();
for i in 0..num_to_generate {
output.push_back(RCUsageHour {
index: starting_hour - i as u64,
up: rand::random(),
down: rand::random(),
price: rand::random(),
});
}
output
}
}
3 changes: 2 additions & 1 deletion rita_common/src/payment_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,9 @@ mod tests {
use cosmos_sdk_proto_althea::cosmos::bank::v1beta1::MsgSend;
use deep_space::utils::decode_any;

use crate::usage_tracker::random_identity;

use super::*;
use crate::usage_tracker::tests::random_identity;

fn generate_fake_payment() -> ToValidate {
let amount: u128 = rand::random();
Expand Down
Loading

0 comments on commit 0b7ba63

Please sign in to comment.