Skip to content

Commit

Permalink
zeroconf: use min heap for timers
Browse files Browse the repository at this point in the history
  • Loading branch information
oysteintveit-nordicsemi committed Apr 15, 2024
1 parent d1eb6d3 commit 750a2b3
Showing 1 changed file with 41 additions and 26 deletions.
67 changes: 41 additions & 26 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use if_addrs::Interface;
use polling::Poller;
use socket2::{SockAddr, Socket};
use std::{
cmp,
collections::{HashMap, HashSet},
cmp::{self, Reverse},
collections::{BinaryHeap, HashMap, HashSet},
fmt,
io::Read,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
Expand Down Expand Up @@ -478,29 +478,20 @@ impl ServiceDaemon {
// Setup timer for IP checks.
const IP_CHECK_INTERVAL_MILLIS: u64 = 30_000;
let mut next_ip_check = current_time_millis() + IP_CHECK_INTERVAL_MILLIS;
zc.timers.push(next_ip_check);
zc.add_timer(next_ip_check);

// Start the run loop.

let mut events = Vec::new();
loop {
let now = current_time_millis();

let earliest_timer = zc
.timers
.iter()
.enumerate()
.min_by(|(_, a), (_, b)| a.cmp(b))
.map(|(i, v)| (i, *v));

let timeout = match earliest_timer {
Some((_, timer)) => {
let earliest_timer = zc.peek_earliest_timer();
let timeout = earliest_timer.map(|timer| {
// If `timer` already passed, set `timeout` to be 1ms.
let millis = if timer > now { timer - now } else { 1 };
Some(Duration::from_millis(millis))
}
None => None,
};
Duration::from_millis(millis)
});

// Process incoming packets, command events and optional timeout.
events.clear();
Expand All @@ -512,9 +503,9 @@ impl ServiceDaemon {
let now = current_time_millis();

// Remove the timer if already passed.
if let Some((min_index, timer)) = earliest_timer {
if let Some(timer) = earliest_timer {
if now >= timer {
zc.timers.remove(min_index);
zc.pop_earliest_timer();
}
}

Expand Down Expand Up @@ -622,7 +613,7 @@ impl ServiceDaemon {
if now > next_ip_check {
next_ip_check = now + IP_CHECK_INTERVAL_MILLIS;
zc.check_ip_changes();
zc.timers.push(next_ip_check);
zc.add_timer(next_ip_check);
}
}
}
Expand Down Expand Up @@ -731,6 +722,7 @@ impl ServiceDaemon {
UnregisterStatus::NotFound
}
Some((_k, info)) => {
let mut timers = Vec::new();
for (ip, intf_sock) in zc.intf_socks.iter() {
let packet = zc.unregister_service(&info, intf_sock);
// repeat for one time just in case some peers miss the message
Expand All @@ -740,9 +732,14 @@ impl ServiceDaemon {
next_time,
command: Command::UnregisterResend(packet, *ip),
});
zc.timers.push(next_time);
timers.push(next_time);
}
}

for t in timers {
zc.add_timer(t);
}

zc.increase_counter(Counter::Unregister, 1);
UnregisterStatus::OK
}
Expand Down Expand Up @@ -1077,7 +1074,7 @@ struct Zeroconf {
///
/// When the run loop goes through a single iteration, it will
/// set its timeout to the earliest timer in this list.
timers: Vec<u64>,
timers: BinaryHeap<Reverse<u64>>,

status: DaemonStatus,

Expand Down Expand Up @@ -1110,7 +1107,7 @@ impl Zeroconf {
let monitors = Vec::new();
let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;

let timers = vec![];
let timers = BinaryHeap::new();
let if_selections = vec![];

let status = DaemonStatus::Running;
Expand Down Expand Up @@ -1216,6 +1213,18 @@ impl Zeroconf {
key
}

fn add_timer(&mut self, next_time: u64) {
self.timers.push(Reverse(next_time));
}

fn peek_earliest_timer(&self) -> Option<u64> {
self.timers.peek().map(|Reverse(v)| *v)
}

fn pop_earliest_timer(&mut self) -> Option<u64> {
self.timers.pop().map(|Reverse(v)| v)
}

/// Apply all selections to `interfaces`.
///
/// For any interface, add it if selected but not bound yet,
Expand Down Expand Up @@ -1546,7 +1555,7 @@ impl Zeroconf {
self.hostname_resolvers
.insert(hostname, (listener, real_timeout));
if let Some(t) = real_timeout {
self.timers.push(t);
self.add_timer(t);
}
}

Expand Down Expand Up @@ -1843,15 +1852,16 @@ impl Zeroconf {
// each record immediately as the answers are likely related to each
// other.
let mut changes = Vec::new();
let mut timers = Vec::new();
for record in msg.answers {
if let Some((dns_record, true)) = self.cache.add_or_update(record) {
self.timers.push(dns_record.get_record().get_expire_time());
timers.push(dns_record.get_record().get_expire_time());

let ty = dns_record.get_type();
let name = dns_record.get_name();
if ty == TYPE_PTR {
if self.service_queriers.contains_key(name) {
self.timers.push(dns_record.get_record().get_refresh_time());
timers.push(dns_record.get_record().get_refresh_time());
}

// send ServiceFound
Expand All @@ -1875,6 +1885,11 @@ impl Zeroconf {
}
}

// Add timers for the new records.
for t in timers {
self.add_timer(t);
}

// Go through remaining changes to see if any hostname resolutions were found or updated.
changes
.iter()
Expand Down Expand Up @@ -2128,7 +2143,7 @@ impl Zeroconf {

fn add_retransmission(&mut self, next_time: u64, command: Command) {
self.retransmissions.push(ReRun { next_time, command });
self.timers.push(next_time);
self.add_timer(next_time);
}
}

Expand Down

0 comments on commit 750a2b3

Please sign in to comment.