diff --git a/main/Cargo.toml b/main/Cargo.toml index e31947a..834d022 100644 --- a/main/Cargo.toml +++ b/main/Cargo.toml @@ -14,6 +14,7 @@ log = "0.4" mockall = "0.12" prost = "0.12" serde = { version = "1.0", features = ["derive"] } +serde_json = "1" tansa-protocol = { path = "../protocol/rust" } thiserror = "1" tokio = { version = "1", features = ["fs", "process"] } diff --git a/main/src/network/ip_neighbor/linux.rs b/main/src/network/ip_neighbor/linux.rs new file mode 100644 index 0000000..e26eba1 --- /dev/null +++ b/main/src/network/ip_neighbor/linux.rs @@ -0,0 +1,152 @@ +use super::IpNeighbor; +use super::IpNeighborScanError; +use super::IpNeighborScanner; +use csv::ReaderBuilder; +use futures_util::future::BoxFuture; +use futures_util::FutureExt; +use serde::Deserialize; +use std::collections::HashMap; +use std::net::Ipv6Addr; + +pub struct IpRoute2IpNeighborScanner; + +impl IpRoute2IpNeighborScanner { + async fn scan() -> Result, IpNeighborScanError> { + let ip_links = crate::process::run("ip", &["-json", "link"]).await?; + let ip_neighbors = crate::process::run("ip", &["-family", "inet6", "neighbor"]).await?; + Self::parse_output(&ip_links, &ip_neighbors).map_err(Into::into) + } + + fn parse_output( + ip_link_output: &[u8], + ip_neighbor_output: &[u8], + ) -> Result, IpNeighborScanError> { + let neighbors = ReaderBuilder::new() + .has_headers(false) + .delimiter(b' ') + .from_reader(ip_neighbor_output) + .deserialize() + .collect::, _>>()?; + neighbors + .iter() + .for_each(|n| log::debug!("Scanned IP neighbor: {:?}", n)); + + let links: Vec = serde_json::from_reader(ip_link_output)?; + links + .iter() + .for_each(|l| log::debug!("Scanned IP link: {:?}", l)); + let links: HashMap<_, _> = links.into_iter().map(|l| (l.ifname.clone(), l)).collect(); + + let neighbors: Vec<_> = neighbors + .into_iter() + .filter(|n| n.state != "FAILED") + .filter(|n| n.ip.segments().starts_with(&[0xFE80, 0, 0, 0])) + .filter_map(|n| { + links.get(&n.ifname).map(|l| IpNeighbor { + address: n.ip, + network_interface_index: l.ifindex, + }) + }) + .collect(); + neighbors + .iter() + .for_each(|n| log::info!("Valid IP neighbor: {:?}", n)); + + Ok(neighbors) + } +} + +impl IpNeighborScanner for IpRoute2IpNeighborScanner { + fn supports_current_operating_system(&self) -> BoxFuture<'static, bool> { + crate::process::probe("ip", &["-Version"]).boxed() + } + + fn scan(&self) -> BoxFuture<'static, Result, IpNeighborScanError>> { + Self::scan().boxed() + } +} + +#[derive(Deserialize, Debug)] +#[allow(non_snake_case, dead_code)] +struct Neighbor { + ip: Ipv6Addr, + dev: String, + ifname: String, + lladdr: String, + mac: String, + router: String, + state: String, +} + +#[derive(Deserialize, Debug)] +#[allow(non_snake_case)] +struct Link { + ifindex: u32, + ifname: String, +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn scan() { + crate::test::init(); + + let scanner = IpRoute2IpNeighborScanner; + + if !scanner.supports_current_operating_system().await { + println!("`iproute2` does not exist, skipping."); + return; + } + + let neighbors = scanner.scan().await.unwrap(); + assert!(!neighbors.is_empty()); + } + + #[test] + fn parse_output() { + let ip_neighbor_output = r#" +fe80::1:abcd dev wlan0 lladdr 00:00:00:00:00:01 router REACHABLE +fe80::2:abcd dev eth0 lladdr 00:00:00:00:00:02 router STALE +fe80::3:abcd dev wlan2 lladdr 00:00:00:00:00:03 router STALE +fe02::4:abcd dev wlan0 lladdr 00:00:00:00:00:04 router REACHABLE +fe80::5:abcd dev wlan0 lladdr 00:00:00:00:00:05 router FAILED + "# + .trim(); + let ip_link_output = r#" + [ + { + "ifindex": 1, + "ifname": "wlan0", + "mtu": 1500 + }, + { + "ifindex": 2, + "ifname": "eth0", + "mtu": 1500 + } + ] + "#; + let expected_neighbors = vec![ + IpNeighbor { + network_interface_index: 1, + address: "fe80::1:abcd".parse().unwrap(), + }, + IpNeighbor { + network_interface_index: 2, + address: "fe80::2:abcd".parse().unwrap(), + }, + ]; + + // When + let actual_neighbors = IpRoute2IpNeighborScanner::parse_output( + ip_link_output.as_bytes(), + ip_neighbor_output.as_bytes(), + ) + .unwrap(); + + // Then + assert_eq!(actual_neighbors, expected_neighbors); + } +} diff --git a/main/src/network/ip_neighbor/mod.rs b/main/src/network/ip_neighbor/mod.rs index 09c7955..0b9ff38 100644 --- a/main/src/network/ip_neighbor/mod.rs +++ b/main/src/network/ip_neighbor/mod.rs @@ -1,5 +1,7 @@ +mod linux; mod windows; +use self::linux::IpRoute2IpNeighborScanner; use self::windows::PowerShellIpNeighborScanner; use crate::process::ProcessError; use futures_util::future::BoxFuture; @@ -11,8 +13,10 @@ use std::net::SocketAddrV6; use thiserror::Error; pub async fn ip_neighbor_scanner() -> Box { - let scanners: Vec> = - vec![Box::new(PowerShellIpNeighborScanner)]; + let scanners: Vec> = vec![ + Box::new(PowerShellIpNeighborScanner), + Box::new(IpRoute2IpNeighborScanner), + ]; if let Some(supported_scanner) = futures_util::stream::iter(scanners) .filter(|s| s.supports_current_operating_system()) .next() @@ -31,7 +35,10 @@ pub enum IpNeighborScanError { ChildProcess(#[from] ProcessError), #[error("Failed to parse the CSV output of a child process")] - ChildProcessCsvOutput(#[from] csv::Error), + ParseCsv(#[from] csv::Error), + + #[error("Failed to parse the JSON output of a child process")] + ParseJson(#[from] serde_json::Error), } #[automock] diff --git a/main/src/process.rs b/main/src/process.rs index 0bd8c2d..b1a6714 100644 --- a/main/src/process.rs +++ b/main/src/process.rs @@ -2,6 +2,7 @@ use std::process::Stdio; use thiserror::Error; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; +use tokio::process::Child; use tokio::process::Command; pub async fn eval(command: &str, args: &[&str], stdin: &[u8]) -> Result, ProcessError> { @@ -23,14 +24,22 @@ pub async fn eval(command: &str, args: &[&str], stdin: &[u8]) -> Result, return Err(ProcessError::ExternalCommand); } - let mut stdout = process - .stdout - .take() - .ok_or_else(|| ProcessError::StdioRedirection)?; + read_stdout(&mut process).await +} - let mut buffer = Default::default(); - stdout.read_to_end(&mut buffer).await?; - Ok(buffer) +pub async fn run(command: &str, args: &[&str]) -> Result, ProcessError> { + let mut process = Command::new(command) + .args(args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + if !process.wait().await?.success() { + return Err(ProcessError::ExternalCommand); + } + + read_stdout(&mut process).await } pub async fn probe(command: &str, args: &[&str]) -> bool { @@ -62,6 +71,17 @@ pub async fn probe(command: &str, args: &[&str]) -> bool { } } +async fn read_stdout(process: &mut Child) -> Result, ProcessError> { + let mut stdout = process + .stdout + .take() + .ok_or_else(|| ProcessError::StdioRedirection)?; + + let mut buffer = Default::default(); + stdout.read_to_end(&mut buffer).await?; + Ok(buffer) +} + #[derive(Error, Debug)] pub enum ProcessError { #[error("Failed in create a child process")]