Skip to content

Commit

Permalink
[Agent] add icmp srt
Browse files Browse the repository at this point in the history
  • Loading branch information
TomatoMr authored and rvql committed Sep 28, 2023
1 parent 6a3f245 commit e885005
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 165 deletions.
10 changes: 7 additions & 3 deletions agent/benches/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use criterion::*;
use deepflow_agent::{
_FlowMapConfig as Config, _TcpFlags as TcpFlags, _Timestamp as Timestamp,
_new_flow_map_and_receiver as new_flow_map_and_receiver, _new_meta_packet as new_meta_packet,
_reverse_meta_packet as reverse_meta_packet,
_reverse_meta_packet as reverse_meta_packet, common::meta_packet::ProtocolData,
};

use public::proto::common::TridentType;
Expand Down Expand Up @@ -84,15 +84,19 @@ pub(super) fn bench(c: &mut Criterion) {
reverse_meta_packet(&mut pkt);
pkt.lookup_key.src_port = dst_port;
pkt.lookup_key.dst_port = src_port;
pkt.tcp_data.flags = TcpFlags::SYN_ACK;
if let ProtocolData::TcpHeader(tcp_data) = &mut pkt.protocol_data {
tcp_data.flags = TcpFlags::SYN_ACK;
}
packets.push(pkt);

for k in 2..10 {
let mut pkt = new_meta_packet();
pkt.lookup_key.timestamp += Timestamp::from_nanos(100 * (i + k));
pkt.lookup_key.src_port = src_port;
pkt.lookup_key.dst_port = dst_port;
pkt.tcp_data.flags = TcpFlags::ACK;
if let ProtocolData::TcpHeader(tcp_data) = &mut pkt.protocol_data {
tcp_data.flags = TcpFlags::ACK;
}
packets.push(pkt);
}
}
Expand Down
2 changes: 2 additions & 0 deletions agent/crates/public/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ pub enum L4Protocol {
Unknown = 0,
Tcp = 1,
Udp = 2,
Icmp = 3,
}

impl From<IpProtocol> for L4Protocol {
fn from(proto: IpProtocol) -> Self {
match proto {
IpProtocol::TCP => Self::Tcp,
IpProtocol::UDP => Self::Udp,
IpProtocol::ICMPV4 | IpProtocol::ICMPV6 => Self::Icmp,
_ => Self::Unknown,
}
}
Expand Down
4 changes: 4 additions & 0 deletions agent/src/common/lookup_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ impl LookupKey {
self.proto == IpProtocol::TCP
}

pub fn is_udp(&self) -> bool {
self.proto == IpProtocol::UDP
}

pub fn is_ipv4(&self) -> bool {
self.eth_type == EthernetType::IPV4
}
Expand Down
117 changes: 92 additions & 25 deletions agent/src/common/meta_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub struct MetaPacket<'a> {

tcp_options_flag: u8,

pub tcp_data: MetaPacketTcpHeader,
pub protocol_data: ProtocolData,
pub tap_port: TapPort, // packet与xflow复用
pub signal_source: SignalSource,
pub payload_len: u16,
Expand Down Expand Up @@ -208,19 +208,31 @@ impl<'a> MetaPacket<'a> {
}

pub fn is_syn(&self) -> bool {
self.tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN
if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data {
return tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN;
}
false
}

pub fn is_syn_ack(&self) -> bool {
self.tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN_ACK && self.payload_len == 0
if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data {
return tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN_ACK && self.payload_len == 0;
}
false
}

pub fn is_ack(&self) -> bool {
self.tcp_data.flags & TcpFlags::MASK == TcpFlags::ACK && self.payload_len == 0
if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data {
return tcp_data.flags & TcpFlags::MASK == TcpFlags::ACK && self.payload_len == 0;
}
false
}

pub fn is_psh_ack(&self) -> bool {
self.tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK && self.payload_len > 1
if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data {
return tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK && self.payload_len > 1;
}
false
}

pub fn has_valid_payload(&self) -> bool {
Expand All @@ -246,7 +258,11 @@ impl<'a> MetaPacket<'a> {
fn update_tcp_opt(&mut self, packet: &[u8]) {
let mut offset = self.header_type.min_packet_size() + self.l2_l3_opt_size as usize;
let payload_offset = (offset + self.l4_opt_size as usize).min(packet.len());

let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &mut self.protocol_data {
tcp_data
} else {
unreachable!()
};
while offset + 1 < payload_offset {
// 如果不足2B,EOL和NOP都可以忽略
let assume_length = packet[offset + 1].max(2) as usize;
Expand All @@ -260,7 +276,7 @@ impl<'a> MetaPacket<'a> {
let tcp_opt_mss_offset = offset + 2;
self.tcp_options_flag |= TCP_OPT_FLAG_MSS;
offset += TCP_OPT_MSS_LEN;
self.tcp_data.mss = u16::from_be_bytes(
tcp_data.mss = u16::from_be_bytes(
*<&[u8; 2]>::try_from(&packet[tcp_opt_mss_offset..tcp_opt_mss_offset + 2])
.unwrap(),
);
Expand All @@ -272,12 +288,12 @@ impl<'a> MetaPacket<'a> {
let tcp_opt_win_scale_offset = offset + 2;
self.tcp_options_flag |= TCP_OPT_FLAG_WIN_SCALE;
offset += TCP_OPT_WIN_SCALE_LEN;
self.tcp_data.win_scale = packet[tcp_opt_win_scale_offset];
tcp_data.win_scale = packet[tcp_opt_win_scale_offset];
}
TcpOptionNumbers::SACK_PERMITTED => {
self.tcp_options_flag |= TCP_OPT_FLAG_SACK_PERMIT;
offset += 2;
self.tcp_data.sack_permitted = true;
tcp_data.sack_permitted = true;
}
TcpOptionNumbers::SACK => {
if offset + assume_length > payload_offset {
Expand All @@ -294,7 +310,7 @@ impl<'a> MetaPacket<'a> {
sack.extend_from_slice(
&packet[tcp_opt_sack_offset..tcp_opt_sack_offset + sack_size],
);
self.tcp_data.sack.replace(sack);
tcp_data.sack.replace(sack);
}
TcpOptionNumber(TCP_OPT_ADDRESS_HUAWEI) | TcpOptionNumber(TCP_OPT_ADDRESS_IPVS) => {
if assume_length == TCP_TOA_LEN {
Expand Down Expand Up @@ -646,6 +662,10 @@ impl<'a> MetaPacket<'a> {
if size_checker < 0 {
return Ok(());
}
let icmp_type_index = FIELD_OFFSET_ICMP_TYPE_CODE + self.l2_l3_opt_size as usize;
let mut icmp_data = IcmpData::default();
icmp_data.icmp_type = packet[icmp_type_index];

match IcmpType::new(
packet[FIELD_OFFSET_ICMP_TYPE_CODE + self.l2_l3_opt_size as usize],
) {
Expand All @@ -660,8 +680,16 @@ impl<'a> MetaPacket<'a> {
return Ok(());
}
}
IcmpTypes::EchoRequest => {
icmp_data.echo_id_seq = read_u32_be(&packet[icmp_type_index + 4..]);
}
IcmpTypes::EchoReply => {
icmp_data.echo_id_seq = read_u32_be(&packet[icmp_type_index + 4..]);
self.lookup_key.direction = PacketDirection::ServerToClient;
}
_ => (),
}
self.protocol_data = ProtocolData::IcmpData(icmp_data);
self.payload_len =
(self.packet_len as usize - (packet.len() - size_checker as usize)) as u16;
self.header_type = HeaderType::Ipv4Icmp;
Expand Down Expand Up @@ -758,30 +786,47 @@ impl<'a> MetaPacket<'a> {
(self.packet_len - (packet.len() - size_checker as usize) as u32) as u16;
self.payload_len = self.l4_payload_len as u16;
self.header_type = header_type;
self.tcp_data.data_offset = data_offset;
self.tcp_data.win_size =
read_u16_be(&packet[win_off + self.l2_l3_opt_size as usize..]);
self.tcp_data.flags =
TcpFlags::from_bits_truncate(packet[flag_off + self.l2_l3_opt_size as usize]);
self.tcp_data.seq = read_u32_be(&packet[seq_off + self.l2_l3_opt_size as usize..]);
self.tcp_data.ack = read_u32_be(&packet[ack_off + self.l2_l3_opt_size as usize..]);
if let ProtocolData::TcpHeader(tcp_data) = &mut self.protocol_data {
tcp_data.data_offset = data_offset;
tcp_data.win_size =
read_u16_be(&packet[win_off + self.l2_l3_opt_size as usize..]);
tcp_data.flags = TcpFlags::from_bits_truncate(
packet[flag_off + self.l2_l3_opt_size as usize],
);
tcp_data.seq = read_u32_be(&packet[seq_off + self.l2_l3_opt_size as usize..]);
tcp_data.ack = read_u32_be(&packet[ack_off + self.l2_l3_opt_size as usize..]);
tcp_data.data_offset = data_offset;
}
if data_offset > 5 {
self.update_tcp_opt(packet);
}
}
IpProtocol::ICMPV6 => {
let mut icmp_data = IcmpData::default();
if size_checker > 0 {
// ICMPV6_TYPE_OFFSET使用ipv6的头长,实际ipv6比ipv4多的已经加在l3optSize中,这里再去掉
self.nd_reply_or_arp_request = Icmpv6Type::new(
packet[ICMPV6_TYPE_OFFSET + self.l2_l3_opt_size as usize
- IPV6_HEADER_ADJUST],
) == Icmpv6Types::NeighborAdvert;
let icmpv6_type_index = ICMPV6_TYPE_OFFSET + self.l2_l3_opt_size as usize;
icmp_data.icmp_type = packet[icmpv6_type_index];

match Icmpv6Type::new(packet[icmpv6_type_index]) {
Icmpv6Types::NeighborAdvert => {
self.nd_reply_or_arp_request = true;
}
Icmpv6Types::EchoRequest => {
icmp_data.echo_id_seq = read_u32_be(&packet[icmpv6_type_index + 4..]);
}
Icmpv6Types::EchoReply => {
icmp_data.echo_id_seq = read_u32_be(&packet[icmpv6_type_index + 4..]);
self.lookup_key.direction = PacketDirection::ServerToClient;
}
_ => {}
}
// 忽略link-local address并只考虑ND reply, i.e. neighbour advertisement
if let IpAddr::V6(ip) = self.lookup_key.src_ip {
self.nd_reply_or_arp_request =
self.nd_reply_or_arp_request && !is_unicast_link_local(&ip);
}
}
self.protocol_data = ProtocolData::IcmpData(icmp_data);
self.payload_len =
(self.packet_len - (packet.len() - size_checker as usize) as u32) as u16;
return Ok(());
Expand Down Expand Up @@ -898,7 +943,9 @@ impl<'a> MetaPacket<'a> {
PACKET_KNAME_MAX_PADDING,
);
packet.socket_id = data.socket_id;
packet.tcp_data.seq = data.tcp_seq as u32;
if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data {
tcp_data.seq = data.tcp_seq as u32;
}
packet.ebpf_type = EbpfType::try_from(data.source)?;
packet.l7_protocol_from_ebpf = L7Protocol::from(data.l7_protocol_hint as u8);

Expand Down Expand Up @@ -996,8 +1043,10 @@ impl<'a> fmt::Display for MetaPacket<'a> {
if let Some(t) = &self.tunnel {
write!(f, "\t\ttunnel: {}\n", t)?;
}
if self.lookup_key.proto == IpProtocol::TCP {
write!(f, "\t\ttcp: {:?}\n", self.tcp_data)?;
if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data {
if self.lookup_key.proto == IpProtocol::TCP {
write!(f, "\t\ttcp: {:?}\n", tcp_data)?;
}
}
if let Some(r) = &self.raw {
if r.len() > 0 {
Expand Down Expand Up @@ -1026,6 +1075,24 @@ pub struct MetaPacketTcpHeader {
pub sack: Option<Vec<u8>>, // sack value
}

#[derive(Clone, Debug, Default)]
pub struct IcmpData {
pub icmp_type: u8,
pub echo_id_seq: u32,
}

#[derive(Clone, Debug)]
pub enum ProtocolData {
TcpHeader(MetaPacketTcpHeader),
IcmpData(IcmpData),
}

impl Default for ProtocolData {
fn default() -> Self {
Self::TcpHeader(MetaPacketTcpHeader::default())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit e885005

Please sign in to comment.