Skip to content

Commit

Permalink
Scan IP neighbors on Linux
Browse files Browse the repository at this point in the history
  • Loading branch information
seamlik committed Mar 16, 2024
1 parent b50a582 commit 07097ef
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 10 deletions.
1 change: 1 addition & 0 deletions main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ log = "0.4"
mockall = "0.12"
prost = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
tansa-protocol = { path = "../protocol/rust" }
thiserror = "1"
tokio = { version = "1", features = ["fs", "process"] }
Expand Down
152 changes: 152 additions & 0 deletions main/src/network/ip_neighbor/linux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use super::IpNeighbor;
use super::IpNeighborScanError;
use super::IpNeighborScanner;
use csv::ReaderBuilder;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use serde::Deserialize;
use std::collections::HashMap;
use std::net::Ipv6Addr;

pub struct IpRoute2IpNeighborScanner;

impl IpRoute2IpNeighborScanner {
async fn scan() -> Result<Vec<IpNeighbor>, IpNeighborScanError> {
let ip_links = crate::process::run("ip", &["-json", "link"]).await?;
let ip_neighbors = crate::process::run("ip", &["-family", "inet6", "neighbor"]).await?;
Self::parse_output(&ip_links, &ip_neighbors).map_err(Into::into)
}

fn parse_output(
ip_link_output: &[u8],
ip_neighbor_output: &[u8],
) -> Result<Vec<IpNeighbor>, IpNeighborScanError> {
let neighbors = ReaderBuilder::new()
.has_headers(false)
.delimiter(b' ')
.from_reader(ip_neighbor_output)
.deserialize()
.collect::<Result<Vec<Neighbor>, _>>()?;
neighbors
.iter()
.for_each(|n| log::debug!("Scanned IP neighbor: {:?}", n));

let links: Vec<Link> = serde_json::from_reader(ip_link_output)?;
links
.iter()
.for_each(|l| log::debug!("Scanned IP link: {:?}", l));
let links: HashMap<_, _> = links.into_iter().map(|l| (l.ifname.clone(), l)).collect();

let neighbors: Vec<_> = neighbors
.into_iter()
.filter(|n| n.state != "FAILED")
.filter(|n| n.ip.segments().starts_with(&[0xFE80, 0, 0, 0]))
.filter_map(|n| {
links.get(&n.ifname).map(|l| IpNeighbor {
address: n.ip,
network_interface_index: l.ifindex,
})
})
.collect();
neighbors
.iter()
.for_each(|n| log::info!("Valid IP neighbor: {:?}", n));

Ok(neighbors)
}
}

impl IpNeighborScanner for IpRoute2IpNeighborScanner {
fn supports_current_operating_system(&self) -> BoxFuture<'static, bool> {
crate::process::probe("ip", &["-Version"]).boxed()
}

fn scan(&self) -> BoxFuture<'static, Result<Vec<IpNeighbor>, IpNeighborScanError>> {
Self::scan().boxed()
}
}

#[derive(Deserialize, Debug)]
#[allow(non_snake_case, dead_code)]
struct Neighbor {
ip: Ipv6Addr,
dev: String,
ifname: String,
lladdr: String,
mac: String,
router: String,
state: String,
}

#[derive(Deserialize, Debug)]
#[allow(non_snake_case)]
struct Link {
ifindex: u32,
ifname: String,
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn scan() {
crate::test::init();

let scanner = IpRoute2IpNeighborScanner;

if !scanner.supports_current_operating_system().await {
println!("`iproute2` does not exist, skipping.");
return;
}

let neighbors = scanner.scan().await.unwrap();
assert!(!neighbors.is_empty());
}

#[test]
fn parse_output() {
let ip_neighbor_output = r#"
fe80::1:abcd dev wlan0 lladdr 00:00:00:00:00:01 router REACHABLE
fe80::2:abcd dev eth0 lladdr 00:00:00:00:00:02 router STALE
fe80::3:abcd dev wlan2 lladdr 00:00:00:00:00:03 router STALE
fe02::4:abcd dev wlan0 lladdr 00:00:00:00:00:04 router REACHABLE
fe80::5:abcd dev wlan0 lladdr 00:00:00:00:00:05 router FAILED
"#
.trim();
let ip_link_output = r#"
[
{
"ifindex": 1,
"ifname": "wlan0",
"mtu": 1500
},
{
"ifindex": 2,
"ifname": "eth0",
"mtu": 1500
}
]
"#;
let expected_neighbors = vec![
IpNeighbor {
network_interface_index: 1,
address: "fe80::1:abcd".parse().unwrap(),
},
IpNeighbor {
network_interface_index: 2,
address: "fe80::2:abcd".parse().unwrap(),
},
];

// When
let actual_neighbors = IpRoute2IpNeighborScanner::parse_output(
ip_link_output.as_bytes(),
ip_neighbor_output.as_bytes(),
)
.unwrap();

// Then
assert_eq!(actual_neighbors, expected_neighbors);
}
}
13 changes: 10 additions & 3 deletions main/src/network/ip_neighbor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod linux;
mod windows;

use self::linux::IpRoute2IpNeighborScanner;
use self::windows::PowerShellIpNeighborScanner;
use crate::process::ProcessError;
use futures_util::future::BoxFuture;
Expand All @@ -11,8 +13,10 @@ use std::net::SocketAddrV6;
use thiserror::Error;

pub async fn ip_neighbor_scanner() -> Box<dyn IpNeighborScanner + Send> {
let scanners: Vec<Box<dyn IpNeighborScanner + Send>> =
vec![Box::new(PowerShellIpNeighborScanner)];
let scanners: Vec<Box<dyn IpNeighborScanner + Send>> = vec![
Box::new(PowerShellIpNeighborScanner),
Box::new(IpRoute2IpNeighborScanner),
];
if let Some(supported_scanner) = futures_util::stream::iter(scanners)
.filter(|s| s.supports_current_operating_system())
.next()
Expand All @@ -31,7 +35,10 @@ pub enum IpNeighborScanError {
ChildProcess(#[from] ProcessError),

#[error("Failed to parse the CSV output of a child process")]
ChildProcessCsvOutput(#[from] csv::Error),
ParseCsv(#[from] csv::Error),

#[error("Failed to parse the JSON output of a child process")]
ParseJson(#[from] serde_json::Error),
}

#[automock]
Expand Down
34 changes: 27 additions & 7 deletions main/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::process::Stdio;
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::process::Child;
use tokio::process::Command;

pub async fn eval(command: &str, args: &[&str], stdin: &[u8]) -> Result<Vec<u8>, ProcessError> {
Expand All @@ -23,14 +24,22 @@ pub async fn eval(command: &str, args: &[&str], stdin: &[u8]) -> Result<Vec<u8>,
return Err(ProcessError::ExternalCommand);
}

let mut stdout = process
.stdout
.take()
.ok_or_else(|| ProcessError::StdioRedirection)?;
read_stdout(&mut process).await
}

let mut buffer = Default::default();
stdout.read_to_end(&mut buffer).await?;
Ok(buffer)
pub async fn run(command: &str, args: &[&str]) -> Result<Vec<u8>, ProcessError> {
let mut process = Command::new(command)
.args(args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()?;

if !process.wait().await?.success() {
return Err(ProcessError::ExternalCommand);
}

read_stdout(&mut process).await
}

pub async fn probe(command: &str, args: &[&str]) -> bool {
Expand Down Expand Up @@ -62,6 +71,17 @@ pub async fn probe(command: &str, args: &[&str]) -> bool {
}
}

async fn read_stdout(process: &mut Child) -> Result<Vec<u8>, ProcessError> {
let mut stdout = process
.stdout
.take()
.ok_or_else(|| ProcessError::StdioRedirection)?;

let mut buffer = Default::default();
stdout.read_to_end(&mut buffer).await?;
Ok(buffer)
}

#[derive(Error, Debug)]
pub enum ProcessError {
#[error("Failed in create a child process")]
Expand Down

0 comments on commit 07097ef

Please sign in to comment.