From e885005a0172543edc7cfcc9515468a5e482c318 Mon Sep 17 00:00:00 2001 From: huanchao Date: Sun, 24 Sep 2023 18:45:27 +0800 Subject: [PATCH] [Agent] add icmp srt --- agent/benches/flow_generator/flow_map.rs | 10 +- agent/crates/public/src/enums.rs | 2 + agent/src/common/lookup_key.rs | 4 + agent/src/common/meta_packet.rs | 117 ++++++-- agent/src/flow_generator/flow_map.rs | 253 ++++++++++++------ agent/src/flow_generator/flow_node.rs | 41 ++- agent/src/flow_generator/perf/icmp.rs | 152 +++++++++++ agent/src/flow_generator/perf/mod.rs | 17 +- agent/src/flow_generator/perf/tcp.rs | 100 +++++-- .../flow_generator/protocol_logs/parser.rs | 10 +- 10 files changed, 541 insertions(+), 165 deletions(-) create mode 100644 agent/src/flow_generator/perf/icmp.rs diff --git a/agent/benches/flow_generator/flow_map.rs b/agent/benches/flow_generator/flow_map.rs index 242a8ed6d86..e68ef131ff3 100644 --- a/agent/benches/flow_generator/flow_map.rs +++ b/agent/benches/flow_generator/flow_map.rs @@ -21,7 +21,7 @@ use criterion::*; use deepflow_agent::{ _FlowMapConfig as Config, _TcpFlags as TcpFlags, _Timestamp as Timestamp, _new_flow_map_and_receiver as new_flow_map_and_receiver, _new_meta_packet as new_meta_packet, - _reverse_meta_packet as reverse_meta_packet, + _reverse_meta_packet as reverse_meta_packet, common::meta_packet::ProtocolData, }; use public::proto::common::TridentType; @@ -84,7 +84,9 @@ pub(super) fn bench(c: &mut Criterion) { reverse_meta_packet(&mut pkt); pkt.lookup_key.src_port = dst_port; pkt.lookup_key.dst_port = src_port; - pkt.tcp_data.flags = TcpFlags::SYN_ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut pkt.protocol_data { + tcp_data.flags = TcpFlags::SYN_ACK; + } packets.push(pkt); for k in 2..10 { @@ -92,7 +94,9 @@ pub(super) fn bench(c: &mut Criterion) { pkt.lookup_key.timestamp += Timestamp::from_nanos(100 * (i + k)); pkt.lookup_key.src_port = src_port; pkt.lookup_key.dst_port = dst_port; - pkt.tcp_data.flags = TcpFlags::ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut pkt.protocol_data { + tcp_data.flags = TcpFlags::ACK; + } packets.push(pkt); } } diff --git a/agent/crates/public/src/enums.rs b/agent/crates/public/src/enums.rs index 41f462565ad..c49e674034d 100644 --- a/agent/crates/public/src/enums.rs +++ b/agent/crates/public/src/enums.rs @@ -127,6 +127,7 @@ pub enum L4Protocol { Unknown = 0, Tcp = 1, Udp = 2, + Icmp = 3, } impl From for L4Protocol { @@ -134,6 +135,7 @@ impl From for L4Protocol { match proto { IpProtocol::TCP => Self::Tcp, IpProtocol::UDP => Self::Udp, + IpProtocol::ICMPV4 | IpProtocol::ICMPV6 => Self::Icmp, _ => Self::Unknown, } } diff --git a/agent/src/common/lookup_key.rs b/agent/src/common/lookup_key.rs index ad289f337f8..e426392e4b7 100644 --- a/agent/src/common/lookup_key.rs +++ b/agent/src/common/lookup_key.rs @@ -206,6 +206,10 @@ impl LookupKey { self.proto == IpProtocol::TCP } + pub fn is_udp(&self) -> bool { + self.proto == IpProtocol::UDP + } + pub fn is_ipv4(&self) -> bool { self.eth_type == EthernetType::IPV4 } diff --git a/agent/src/common/meta_packet.rs b/agent/src/common/meta_packet.rs index 60c1a0d38b6..56ef6c314c8 100644 --- a/agent/src/common/meta_packet.rs +++ b/agent/src/common/meta_packet.rs @@ -132,7 +132,7 @@ pub struct MetaPacket<'a> { tcp_options_flag: u8, - pub tcp_data: MetaPacketTcpHeader, + pub protocol_data: ProtocolData, pub tap_port: TapPort, // packet与xflow复用 pub signal_source: SignalSource, pub payload_len: u16, @@ -208,19 +208,31 @@ impl<'a> MetaPacket<'a> { } pub fn is_syn(&self) -> bool { - self.tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN + if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data { + return tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN; + } + false } pub fn is_syn_ack(&self) -> bool { - self.tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN_ACK && self.payload_len == 0 + if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data { + return tcp_data.flags & TcpFlags::MASK == TcpFlags::SYN_ACK && self.payload_len == 0; + } + false } pub fn is_ack(&self) -> bool { - self.tcp_data.flags & TcpFlags::MASK == TcpFlags::ACK && self.payload_len == 0 + if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data { + return tcp_data.flags & TcpFlags::MASK == TcpFlags::ACK && self.payload_len == 0; + } + false } pub fn is_psh_ack(&self) -> bool { - self.tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK && self.payload_len > 1 + if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data { + return tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK && self.payload_len > 1; + } + false } pub fn has_valid_payload(&self) -> bool { @@ -246,7 +258,11 @@ impl<'a> MetaPacket<'a> { fn update_tcp_opt(&mut self, packet: &[u8]) { let mut offset = self.header_type.min_packet_size() + self.l2_l3_opt_size as usize; let payload_offset = (offset + self.l4_opt_size as usize).min(packet.len()); - + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &mut self.protocol_data { + tcp_data + } else { + unreachable!() + }; while offset + 1 < payload_offset { // 如果不足2B,EOL和NOP都可以忽略 let assume_length = packet[offset + 1].max(2) as usize; @@ -260,7 +276,7 @@ impl<'a> MetaPacket<'a> { let tcp_opt_mss_offset = offset + 2; self.tcp_options_flag |= TCP_OPT_FLAG_MSS; offset += TCP_OPT_MSS_LEN; - self.tcp_data.mss = u16::from_be_bytes( + tcp_data.mss = u16::from_be_bytes( *<&[u8; 2]>::try_from(&packet[tcp_opt_mss_offset..tcp_opt_mss_offset + 2]) .unwrap(), ); @@ -272,12 +288,12 @@ impl<'a> MetaPacket<'a> { let tcp_opt_win_scale_offset = offset + 2; self.tcp_options_flag |= TCP_OPT_FLAG_WIN_SCALE; offset += TCP_OPT_WIN_SCALE_LEN; - self.tcp_data.win_scale = packet[tcp_opt_win_scale_offset]; + tcp_data.win_scale = packet[tcp_opt_win_scale_offset]; } TcpOptionNumbers::SACK_PERMITTED => { self.tcp_options_flag |= TCP_OPT_FLAG_SACK_PERMIT; offset += 2; - self.tcp_data.sack_permitted = true; + tcp_data.sack_permitted = true; } TcpOptionNumbers::SACK => { if offset + assume_length > payload_offset { @@ -294,7 +310,7 @@ impl<'a> MetaPacket<'a> { sack.extend_from_slice( &packet[tcp_opt_sack_offset..tcp_opt_sack_offset + sack_size], ); - self.tcp_data.sack.replace(sack); + tcp_data.sack.replace(sack); } TcpOptionNumber(TCP_OPT_ADDRESS_HUAWEI) | TcpOptionNumber(TCP_OPT_ADDRESS_IPVS) => { if assume_length == TCP_TOA_LEN { @@ -646,6 +662,10 @@ impl<'a> MetaPacket<'a> { if size_checker < 0 { return Ok(()); } + let icmp_type_index = FIELD_OFFSET_ICMP_TYPE_CODE + self.l2_l3_opt_size as usize; + let mut icmp_data = IcmpData::default(); + icmp_data.icmp_type = packet[icmp_type_index]; + match IcmpType::new( packet[FIELD_OFFSET_ICMP_TYPE_CODE + self.l2_l3_opt_size as usize], ) { @@ -660,8 +680,16 @@ impl<'a> MetaPacket<'a> { return Ok(()); } } + IcmpTypes::EchoRequest => { + icmp_data.echo_id_seq = read_u32_be(&packet[icmp_type_index + 4..]); + } + IcmpTypes::EchoReply => { + icmp_data.echo_id_seq = read_u32_be(&packet[icmp_type_index + 4..]); + self.lookup_key.direction = PacketDirection::ServerToClient; + } _ => (), } + self.protocol_data = ProtocolData::IcmpData(icmp_data); self.payload_len = (self.packet_len as usize - (packet.len() - size_checker as usize)) as u16; self.header_type = HeaderType::Ipv4Icmp; @@ -758,30 +786,47 @@ impl<'a> MetaPacket<'a> { (self.packet_len - (packet.len() - size_checker as usize) as u32) as u16; self.payload_len = self.l4_payload_len as u16; self.header_type = header_type; - self.tcp_data.data_offset = data_offset; - self.tcp_data.win_size = - read_u16_be(&packet[win_off + self.l2_l3_opt_size as usize..]); - self.tcp_data.flags = - TcpFlags::from_bits_truncate(packet[flag_off + self.l2_l3_opt_size as usize]); - self.tcp_data.seq = read_u32_be(&packet[seq_off + self.l2_l3_opt_size as usize..]); - self.tcp_data.ack = read_u32_be(&packet[ack_off + self.l2_l3_opt_size as usize..]); + if let ProtocolData::TcpHeader(tcp_data) = &mut self.protocol_data { + tcp_data.data_offset = data_offset; + tcp_data.win_size = + read_u16_be(&packet[win_off + self.l2_l3_opt_size as usize..]); + tcp_data.flags = TcpFlags::from_bits_truncate( + packet[flag_off + self.l2_l3_opt_size as usize], + ); + tcp_data.seq = read_u32_be(&packet[seq_off + self.l2_l3_opt_size as usize..]); + tcp_data.ack = read_u32_be(&packet[ack_off + self.l2_l3_opt_size as usize..]); + tcp_data.data_offset = data_offset; + } if data_offset > 5 { self.update_tcp_opt(packet); } } IpProtocol::ICMPV6 => { + let mut icmp_data = IcmpData::default(); if size_checker > 0 { - // ICMPV6_TYPE_OFFSET使用ipv6的头长,实际ipv6比ipv4多的已经加在l3optSize中,这里再去掉 - self.nd_reply_or_arp_request = Icmpv6Type::new( - packet[ICMPV6_TYPE_OFFSET + self.l2_l3_opt_size as usize - - IPV6_HEADER_ADJUST], - ) == Icmpv6Types::NeighborAdvert; + let icmpv6_type_index = ICMPV6_TYPE_OFFSET + self.l2_l3_opt_size as usize; + icmp_data.icmp_type = packet[icmpv6_type_index]; + + match Icmpv6Type::new(packet[icmpv6_type_index]) { + Icmpv6Types::NeighborAdvert => { + self.nd_reply_or_arp_request = true; + } + Icmpv6Types::EchoRequest => { + icmp_data.echo_id_seq = read_u32_be(&packet[icmpv6_type_index + 4..]); + } + Icmpv6Types::EchoReply => { + icmp_data.echo_id_seq = read_u32_be(&packet[icmpv6_type_index + 4..]); + self.lookup_key.direction = PacketDirection::ServerToClient; + } + _ => {} + } // 忽略link-local address并只考虑ND reply, i.e. neighbour advertisement if let IpAddr::V6(ip) = self.lookup_key.src_ip { self.nd_reply_or_arp_request = self.nd_reply_or_arp_request && !is_unicast_link_local(&ip); } } + self.protocol_data = ProtocolData::IcmpData(icmp_data); self.payload_len = (self.packet_len - (packet.len() - size_checker as usize) as u32) as u16; return Ok(()); @@ -898,7 +943,9 @@ impl<'a> MetaPacket<'a> { PACKET_KNAME_MAX_PADDING, ); packet.socket_id = data.socket_id; - packet.tcp_data.seq = data.tcp_seq as u32; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data { + tcp_data.seq = data.tcp_seq as u32; + } packet.ebpf_type = EbpfType::try_from(data.source)?; packet.l7_protocol_from_ebpf = L7Protocol::from(data.l7_protocol_hint as u8); @@ -996,8 +1043,10 @@ impl<'a> fmt::Display for MetaPacket<'a> { if let Some(t) = &self.tunnel { write!(f, "\t\ttunnel: {}\n", t)?; } - if self.lookup_key.proto == IpProtocol::TCP { - write!(f, "\t\ttcp: {:?}\n", self.tcp_data)?; + if let ProtocolData::TcpHeader(tcp_data) = &self.protocol_data { + if self.lookup_key.proto == IpProtocol::TCP { + write!(f, "\t\ttcp: {:?}\n", tcp_data)?; + } } if let Some(r) = &self.raw { if r.len() > 0 { @@ -1026,6 +1075,24 @@ pub struct MetaPacketTcpHeader { pub sack: Option>, // sack value } +#[derive(Clone, Debug, Default)] +pub struct IcmpData { + pub icmp_type: u8, + pub echo_id_seq: u32, +} + +#[derive(Clone, Debug)] +pub enum ProtocolData { + TcpHeader(MetaPacketTcpHeader), + IcmpData(IcmpData), +} + +impl Default for ProtocolData { + fn default() -> Self { + Self::TcpHeader(MetaPacketTcpHeader::default()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index df80db4943f..624eb24ad63 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -60,7 +60,7 @@ use crate::{ l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface}, l7_protocol_log::{L7PerfCache, L7ProtocolParser, L7ProtocolParserInterface}, lookup_key::LookupKey, - meta_packet::{MetaPacket, MetaPacketTcpHeader}, + meta_packet::{MetaPacket, MetaPacketTcpHeader, ProtocolData}, tagged_flow::TaggedFlow, tap_port::TapPort, Timestamp, @@ -691,24 +691,26 @@ impl FlowMap { ))); } - let mini_meta_packet = packet_sequence_block::MiniMetaPacket::new( - node.tagged_flow.flow.flow_id, - meta_packet.lookup_key.direction as u8, - meta_packet.lookup_key.timestamp.into(), - meta_packet.payload_len, - meta_packet.tcp_data.seq, - meta_packet.tcp_data.ack, - meta_packet.tcp_data.win_size, - meta_packet.tcp_data.mss, - meta_packet.tcp_data.flags.bits(), - meta_packet.tcp_data.win_scale, - meta_packet.tcp_data.sack_permitted, - &meta_packet.tcp_data.sack, - ); - node.packet_sequence_block - .as_mut() - .unwrap() - .append_packet(mini_meta_packet, config.packet_sequence_flag); + if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + let mini_meta_packet = packet_sequence_block::MiniMetaPacket::new( + node.tagged_flow.flow.flow_id, + meta_packet.lookup_key.direction as u8, + meta_packet.lookup_key.timestamp.into(), + meta_packet.payload_len, + tcp_data.seq, + tcp_data.ack, + tcp_data.win_size, + tcp_data.mss, + tcp_data.flags.bits(), + tcp_data.win_scale, + tcp_data.sack_permitted, + &tcp_data.sack, + ); + node.packet_sequence_block + .as_mut() + .unwrap() + .append_packet(mini_meta_packet, config.packet_sequence_flag); + } } fn update_tcp_node( @@ -801,7 +803,9 @@ impl FlowMap { { node.timeout = config.flow.flow_timeout.established_rst; } - + if let Some(meta_flow_log) = node.meta_flow_log.as_mut() { + let _ = meta_flow_log.parse_l3(meta_packet); + } false } @@ -820,7 +824,11 @@ impl FlowMap { ) -> bool { let flow_config = config.flow; let direction = meta_packet.lookup_key.direction; - let pkt_tcp_flags = meta_packet.tcp_data.flags; + let pkt_tcp_flags = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data.flags + } else { + unreachable!(); + }; node.tagged_flow.flow.flow_metrics_peers[direction as usize].tcp_flags |= pkt_tcp_flags; node.tagged_flow.flow.flow_metrics_peers[direction as usize].total_tcp_flags |= pkt_tcp_flags; @@ -861,42 +869,47 @@ impl FlowMap { let (next_tcp_seq0, next_tcp_seq1) = (node.next_tcp_seq0, node.next_tcp_seq1); + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data + } else { + unreachable!() + }; // 记录下一次TCP Seq match meta_packet.lookup_key.direction { - PacketDirection::ClientToServer => node.next_tcp_seq1 = meta_packet.tcp_data.ack, - PacketDirection::ServerToClient => node.next_tcp_seq0 = meta_packet.tcp_data.ack, + PacketDirection::ClientToServer => node.next_tcp_seq1 = tcp_data.ack, + PacketDirection::ServerToClient => node.next_tcp_seq0 = tcp_data.ack, } // TCP Keepalive报文判断,并记录其TCP Seq if meta_packet.payload_len > 1 { return; } - if meta_packet.tcp_data.flags & (TcpFlags::SYN | TcpFlags::FIN | TcpFlags::RST) - != TcpFlags::empty() - { + if tcp_data.flags & (TcpFlags::SYN | TcpFlags::FIN | TcpFlags::RST) != TcpFlags::empty() { return; } - let (seq, ack) = (meta_packet.tcp_data.seq, meta_packet.tcp_data.ack); - if meta_packet.lookup_key.direction == PacketDirection::ClientToServer - && seq.wrapping_add(1) == next_tcp_seq0 + && tcp_data.seq.wrapping_add(1) == next_tcp_seq0 || meta_packet.lookup_key.direction == PacketDirection::ServerToClient - && seq.wrapping_add(1) == next_tcp_seq1 + && tcp_data.seq.wrapping_add(1) == next_tcp_seq1 { let flow = &mut node.tagged_flow.flow; - flow.last_keepalive_seq = seq; - flow.last_keepalive_ack = ack; + flow.last_keepalive_seq = tcp_data.seq; + flow.last_keepalive_ack = tcp_data.ack; } } fn update_syn_or_syn_ack_seq(&mut self, node: &mut FlowNode, meta_packet: &mut MetaPacket) { - let tcp_flag = meta_packet.tcp_data.flags; + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data + } else { + unreachable!() + }; let flow = &mut node.tagged_flow.flow; - if tcp_flag == TcpFlags::SYN { - flow.syn_seq = meta_packet.tcp_data.seq; - } else if tcp_flag == TcpFlags::SYN_ACK && meta_packet.payload_len == 0 { - flow.synack_seq = meta_packet.tcp_data.seq; + if tcp_data.flags == TcpFlags::SYN { + flow.syn_seq = tcp_data.seq; + } else if tcp_data.flags == TcpFlags::SYN_ACK && meta_packet.payload_len == 0 { + flow.synack_seq = tcp_data.seq; } } @@ -1001,6 +1014,11 @@ impl FlowMap { } else { false }; + let flags = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data.flags + } else { + TcpFlags::default() + }; let flow = Flow { flow_key: FlowKey { vtap_id: flow_config.vtap_id, @@ -1051,8 +1069,8 @@ impl FlowMap { l4_byte_count: meta_packet.l4_payload_len() as u64, first: lookup_key.timestamp.into(), last: lookup_key.timestamp.into(), - tcp_flags: meta_packet.tcp_data.flags, - total_tcp_flags: meta_packet.tcp_data.flags, + tcp_flags: flags, + total_tcp_flags: flags, ..Default::default() }, FlowMetricsPeer::default(), @@ -1478,8 +1496,12 @@ impl FlowMap { } else { reverse = self.update_l4_direction(meta_packet, &mut node, true); - let pkt_tcp_flags = meta_packet.tcp_data.flags; - if pkt_tcp_flags.is_invalid() { + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data + } else { + unreachable!() + }; + if tcp_data.flags.is_invalid() { // exception timeout node.timeout = flow_config.flow_timeout.exception; node.flow_state = FlowState::Exception; @@ -1487,7 +1509,7 @@ impl FlowMap { self.update_flow_state_machine( flow_config, &mut node, - pkt_tcp_flags, + tcp_data.flags, meta_packet.lookup_key.direction, ); self.update_syn_or_syn_ack_seq(&mut node, meta_packet); @@ -1657,7 +1679,10 @@ impl FlowMap { let mut l7_stats = L7Stats::default(); let mut collect_stats = false; if config.collector_enabled - && (flow.flow_key.proto == IpProtocol::TCP || flow.flow_key.proto == IpProtocol::UDP) + && (flow.flow_key.proto == IpProtocol::TCP + || flow.flow_key.proto == IpProtocol::UDP + || flow.flow_key.proto == IpProtocol::ICMPV4 + || flow.flow_key.proto == IpProtocol::ICMPV6) { if let Some(perf) = node.meta_flow_log.as_mut() { collect_stats = true; @@ -1749,7 +1774,11 @@ impl FlowMap { } let mut l7_stats = L7Stats::default(); let mut collect_stats = false; - if flow.flow_key.proto == IpProtocol::TCP || flow.flow_key.proto == IpProtocol::UDP { + if flow.flow_key.proto == IpProtocol::TCP + || flow.flow_key.proto == IpProtocol::UDP + || flow.flow_key.proto == IpProtocol::ICMPV4 + || flow.flow_key.proto == IpProtocol::ICMPV6 + { if let Some(perf) = node.meta_flow_log.as_mut() { perf.copy_and_reset_l4_perf_data(flow.reversed, flow); let l7_timeout_count = self @@ -1762,17 +1791,13 @@ impl FlowMap { let flow_perf_stats = flow.flow_perf_stats.as_mut().unwrap(); flow_perf_stats.l7.sequential_merge(&l7_perf_stats); flow_perf_stats.l7_protocol = l7_protocol; - if flow.flow_key.proto == IpProtocol::TCP - || flow.flow_key.proto == IpProtocol::UDP - { - collect_stats = true; - l7_stats.stats = l7_perf_stats; - l7_stats.endpoint = flow.last_endpoint.clone(); - l7_stats.flow_id = flow.flow_id; - l7_stats.signal_source = flow.signal_source; - l7_stats.time_in_second = flow.flow_stat_time.into(); - l7_stats.l7_protocol = l7_protocol; - } + collect_stats = true; + l7_stats.stats = l7_perf_stats; + l7_stats.endpoint = flow.last_endpoint.clone(); + l7_stats.flow_id = flow.flow_id; + l7_stats.signal_source = flow.signal_source; + l7_stats.time_in_second = flow.flow_stat_time.into(); + l7_stats.l7_protocol = l7_protocol; } } // Unknown application only counts metrics, and the judgment condition needs to consider @@ -1864,12 +1889,17 @@ impl FlowMap { let (mut flow_src_score, mut flow_dst_score) = match lookup_key.proto { // TCP/UDP IpProtocol::TCP => { - let flags = meta_packet.tcp_data.flags; + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data + { + tcp_data + } else { + unreachable!() + }; self.service_table.get_tcp_score( is_first_packet, meta_packet.need_reverse_flow, lookup_key.direction, - flags, + tcp_data.flags, false, false, flow_src_key, @@ -2274,13 +2304,13 @@ pub fn _new_meta_packet<'a>() -> MetaPacket<'a> { packet.header_type = HeaderType::Ipv4Tcp; packet.tap_port = TapPort(65533); packet.packet_len = 128; - packet.tcp_data = MetaPacketTcpHeader { + packet.protocol_data = ProtocolData::TcpHeader(MetaPacketTcpHeader { data_offset: 5, flags: TcpFlags::SYN, ack: 0, seq: 0, ..Default::default() - }; + }); packet.endpoint_data = Some(EndpointDataPov::new(Arc::new(EndpointData { src_info: EndpointInfo { real_ip: Ipv4Addr::UNSPECIFIED.into(), @@ -2349,7 +2379,9 @@ mod tests { let mut packet0 = _new_meta_packet(); flow_map.inject_meta_packet(&config, &mut packet0); let mut packet1 = _new_meta_packet(); - packet1.tcp_data.flags = TcpFlags::RST; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.flags = TcpFlags::RST; + } _reverse_meta_packet(&mut packet1); packet1.lookup_key.timestamp += DEFAULT_DURATION.into(); let flush_timestamp = packet1.lookup_key.timestamp.into(); @@ -2386,11 +2418,15 @@ mod tests { flow_map.inject_meta_packet(&config, &mut packet0); let mut packet1 = _new_meta_packet(); - packet1.tcp_data.flags = TcpFlags::PSH_ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.flags = TcpFlags::PSH_ACK; + } flow_map.inject_meta_packet(&config, &mut packet1); let mut packet2 = _new_meta_packet(); - packet2.tcp_data.flags = TcpFlags::FIN_ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet2.protocol_data { + tcp_data.flags = TcpFlags::FIN_ACK; + } packet2.lookup_key.timestamp += Timestamp::from_millis(10); _reverse_meta_packet(&mut packet2); let flush_timestamp = packet2.lookup_key.timestamp.into(); @@ -2423,8 +2459,10 @@ mod tests { ebpf: None, }; let mut packet1 = _new_meta_packet(); - packet1.tcp_data.seq = 1111; - packet1.tcp_data.ack = 112; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.seq = 1111; + tcp_data.ack = 112; + } packet1.lookup_key.timestamp = packet1.lookup_key.timestamp.round_to(TIME_UNIT.into()); let flush_timestamp = packet1.lookup_key.timestamp.into(); flow_map.inject_meta_packet(&config, &mut packet1); @@ -2450,24 +2488,45 @@ mod tests { ebpf: None, }; let mut packet0 = _new_meta_packet(); - packet0.tcp_data.flags = TcpFlags::SYN; - packet0.tcp_data.seq = 111; - packet0.tcp_data.ack = 0; + let tcp_data0 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet0.protocol_data { + tcp_data + } else { + unreachable!() + }; + tcp_data0.flags = TcpFlags::SYN; + tcp_data0.seq = 111; + tcp_data0.ack = 0; flow_map.inject_meta_packet(&config, &mut packet0); let mut packet1 = _new_meta_packet(); - packet1.tcp_data.flags = TcpFlags::SYN_ACK; + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; + tcp_data1.flags = TcpFlags::SYN_ACK; packet1.lookup_key.timestamp += Timestamp::from_millis(10); _reverse_meta_packet(&mut packet1); - packet1.tcp_data.seq = 1111; - packet1.tcp_data.ack = 112; + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; + tcp_data1.seq = 1111; + tcp_data1.ack = 112; + flow_map.inject_meta_packet(&config, &mut packet1); let mut packet2 = _new_meta_packet(); - packet2.tcp_data.flags = TcpFlags::ACK; + let tcp_data2 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet2.protocol_data { + tcp_data + } else { + unreachable!() + }; + tcp_data2.flags = TcpFlags::ACK; packet2.lookup_key.timestamp += Timestamp::from_millis(10 * 2); - packet2.tcp_data.seq = 112; - packet2.tcp_data.ack = 1112; + tcp_data2.seq = 112; + tcp_data2.ack = 1112; let flush_timestamp = packet2.lookup_key.timestamp.into(); flow_map.inject_meta_packet(&config, &mut packet2); @@ -2516,7 +2575,9 @@ mod tests { let mut policy_data1 = PolicyData::default(); policy_data1.merge_npb_action(&vec![npb_action], 11, None); let mut packet1 = _new_meta_packet(); - packet1.tcp_data.flags = TcpFlags::SYN_ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.flags = TcpFlags::SYN_ACK; + } _reverse_meta_packet(&mut packet1); packet1.lookup_key.direction = PacketDirection::ServerToClient; packet1.policy_data.replace(Arc::new(policy_data1)); @@ -2550,13 +2611,17 @@ mod tests { flow_map.inject_meta_packet(&config, &mut packet0); let mut packet1 = _new_meta_packet(); - packet1.tcp_data.flags = TcpFlags::SYN_ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.flags = TcpFlags::SYN_ACK; + } packet1.lookup_key.timestamp += Timestamp::from_millis(10); _reverse_meta_packet(&mut packet1); flow_map.inject_meta_packet(&config, &mut packet1); let mut packet2 = _new_meta_packet(); - packet2.tcp_data.flags = TcpFlags::ACK; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet2.protocol_data { + tcp_data.flags = TcpFlags::ACK; + } packet2.lookup_key.timestamp += Timestamp::from_millis(10); let flush_timestamp = packet2.lookup_key.timestamp.into(); flow_map.inject_meta_packet(&config, &mut packet2); @@ -2628,7 +2693,9 @@ mod tests { let mut packet1 = _new_meta_packet(); packet1.lookup_key.tap_type = TapType::Cloud; - packet1.tcp_data.flags = TcpFlags::RST; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data.flags = TcpFlags::RST; + } _reverse_meta_packet(&mut packet1); let flush_timestamp = packet1.lookup_key.timestamp.into(); flow_map.inject_meta_packet(&config, &mut packet1); @@ -2656,7 +2723,9 @@ mod tests { packet3.tap_port = TapPort(0x1234); packet3.lookup_key.l2_end_0 = true; packet3.lookup_key.l2_end_1 = false; - packet3.tcp_data.flags = TcpFlags::RST; + if let ProtocolData::TcpHeader(tcp_data) = &mut packet3.protocol_data { + tcp_data.flags = TcpFlags::RST; + } _reverse_meta_packet(&mut packet3); let flush_timestamp = packet3.lookup_key.timestamp.into(); flow_map.inject_meta_packet(&config, &mut packet3); @@ -2762,27 +2831,47 @@ mod tests { // SYN|ACK let mut packet1 = _new_meta_packet(); + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; packet1.lookup_key.timestamp = flush_timestamp; - packet1.tcp_data.flags = TcpFlags::SYN_ACK; + tcp_data1.flags = TcpFlags::SYN_ACK; _reverse_meta_packet(&mut packet1); flow_map.inject_meta_packet(&config, &mut packet1); // ACK let mut packet1 = _new_meta_packet(); + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; packet1.lookup_key.timestamp = flush_timestamp; - packet1.tcp_data.flags = TcpFlags::ACK; + tcp_data1.flags = TcpFlags::ACK; flow_map.inject_meta_packet(&config, &mut packet1); // FIN let mut packet1 = _new_meta_packet(); + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; packet1.lookup_key.timestamp = flush_timestamp; - packet1.tcp_data.flags = TcpFlags::FIN; + tcp_data1.flags = TcpFlags::FIN; _reverse_meta_packet(&mut packet1); flow_map.inject_meta_packet(&config, &mut packet1); // FIN let mut packet1 = _new_meta_packet(); + let tcp_data1 = if let ProtocolData::TcpHeader(tcp_data) = &mut packet1.protocol_data { + tcp_data + } else { + unreachable!() + }; packet1.lookup_key.timestamp = flush_timestamp; - packet1.tcp_data.flags = TcpFlags::FIN; + tcp_data1.flags = TcpFlags::FIN; _reverse_meta_packet(&mut packet1); flow_map.inject_meta_packet(&config, &mut packet1); diff --git a/agent/src/flow_generator/flow_node.rs b/agent/src/flow_generator/flow_node.rs index 27ac4360efa..1172035ec27 100644 --- a/agent/src/flow_generator/flow_node.rs +++ b/agent/src/flow_generator/flow_node.rs @@ -259,38 +259,29 @@ impl FlowNode { && flow_key.port_src == meta_lookup_key.src_port && flow_key.port_dst == meta_lookup_key.dst_port { - meta_packet.lookup_key.direction = PacketDirection::ClientToServer; - Self::endpoint_match_with_direction( - &flow.flow_metrics_peers, - meta_packet, - PacketDirection::ClientToServer, - ) && Self::mac_match_with_direction( - meta_packet, - flow_key.mac_src, - flow_key.mac_dst, - mac_match, - PacketDirection::ClientToServer, - ) + // l3 protocols, such as icmp, can determine the direction of packets according + // to icmp type, so there is no need to correct the direction of packets + if meta_lookup_key.is_tcp() || meta_lookup_key.is_udp() { + meta_packet.lookup_key.direction = PacketDirection::ClientToServer; + } } else if flow_key.ip_src == meta_lookup_key.dst_ip && flow_key.ip_dst == meta_lookup_key.src_ip && flow_key.port_src == meta_lookup_key.dst_port && flow_key.port_dst == meta_lookup_key.src_port { - meta_packet.lookup_key.direction = PacketDirection::ServerToClient; - Self::endpoint_match_with_direction( - &flow.flow_metrics_peers, - meta_packet, - PacketDirection::ServerToClient, - ) && Self::mac_match_with_direction( + if meta_lookup_key.is_tcp() || meta_lookup_key.is_udp() { + meta_packet.lookup_key.direction = PacketDirection::ServerToClient; + } + } else { + return false; + } + Self::endpoint_match_with_direction(&flow.flow_metrics_peers, meta_packet) + && Self::mac_match_with_direction( meta_packet, flow_key.mac_src, flow_key.mac_dst, mac_match, - PacketDirection::ServerToClient, ) - } else { - false - } } fn is_hyper_v(trident_type: TridentType) -> bool { @@ -344,9 +335,8 @@ impl FlowNode { flow_mac_src: MacAddr, flow_mac_dst: MacAddr, match_mac: MatchMac, - direction: PacketDirection, ) -> bool { - let (src_mac, dst_mac) = match direction { + let (src_mac, dst_mac) = match meta_packet.lookup_key.direction { PacketDirection::ClientToServer => (flow_mac_src, flow_mac_dst), PacketDirection::ServerToClient => (flow_mac_dst, flow_mac_src), }; @@ -365,7 +355,6 @@ impl FlowNode { fn endpoint_match_with_direction( peers: &[FlowMetricsPeer; 2], meta_packet: &MetaPacket, - direction: PacketDirection, ) -> bool { if meta_packet.tunnel.is_none() { return true; @@ -374,7 +363,7 @@ impl FlowNode { // 同一个TapPort上的流量,如果有隧道的话,当Port做发卡弯转发时,进出的内层流量完全一样 // 此时需要额外比较L2End确定哪股是进入的哪股是出去的 let lookup_key = &meta_packet.lookup_key; - match direction { + match meta_packet.lookup_key.direction { PacketDirection::ClientToServer => { lookup_key.l2_end_0 == peers[0].is_l2_end && lookup_key.l2_end_1 == peers[1].is_l2_end diff --git a/agent/src/flow_generator/perf/icmp.rs b/agent/src/flow_generator/perf/icmp.rs new file mode 100644 index 00000000000..410060f1a31 --- /dev/null +++ b/agent/src/flow_generator/perf/icmp.rs @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2023 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::cmp::max; + +use pnet::packet::{ + icmp::{IcmpType, IcmpTypes}, + icmpv6::{Icmpv6Type, Icmpv6Types}, +}; + +use crate::{ + common::{ + flow::{FlowPerfStats, L4Protocol}, + meta_packet::{MetaPacket, ProtocolData}, + Timestamp, + }, + flow_generator::error::{Error, Result}, +}; + +use super::{L4FlowPerf, ART_MAX}; + +const MAX_CACHE_COUNT: usize = 16; + +#[derive(Debug)] +struct LastIcmp { + timestamp: Timestamp, + id_and_seq: u32, +} + +#[derive(Debug, Default)] +pub struct IcmpPerf { + srt_max: Timestamp, + srt_sum: Timestamp, + srt_count: u32, + last_requests: Vec, + last_replies: Vec, + data_update_flag: bool, +} + +impl IcmpPerf { + pub fn new() -> Self { + IcmpPerf::default() + } +} + +impl L4FlowPerf for IcmpPerf { + fn parse(&mut self, packet: &MetaPacket, _: bool) -> Result<()> { + if packet.payload_len == 0 { + return Err(Error::ZeroPayloadLen); + } + let icmp_data = if let ProtocolData::IcmpData(icmp_data) = &packet.protocol_data { + icmp_data + } else { + return Err(Error::InvalidIpProtocol); + }; + let pkt_timestamp = packet.lookup_key.timestamp; + let (is_request, is_reply) = if packet.lookup_key.is_ipv4() { + let icmp_type = IcmpType::new(icmp_data.icmp_type); + ( + icmp_type == IcmpTypes::EchoRequest, + icmp_type == IcmpTypes::EchoReply, + ) + } else { + let icmp_v6_type = Icmpv6Type::new(icmp_data.icmp_type); + ( + icmp_v6_type == Icmpv6Types::EchoRequest, + icmp_v6_type == Icmpv6Types::EchoReply, + ) + }; + + if is_request { + if let Some(i) = self + .last_replies + .iter() + .position(|l| l.id_and_seq == icmp_data.echo_id_seq) + { + if pkt_timestamp <= self.last_replies[i].timestamp { + let srt = Timestamp::from(self.last_replies[i].timestamp - pkt_timestamp); + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + self.last_replies.remove(i); + } else { + if self.last_requests.len() >= MAX_CACHE_COUNT { + self.last_requests.remove(0); + } + self.last_requests.push(LastIcmp { + timestamp: pkt_timestamp, + id_and_seq: icmp_data.echo_id_seq, + }); + } + } else if is_reply { + if let Some(i) = self + .last_requests + .iter() + .position(|l| l.id_and_seq == icmp_data.echo_id_seq) + { + if pkt_timestamp >= self.last_requests[i].timestamp { + let srt = Timestamp::from(pkt_timestamp - self.last_requests[i].timestamp); + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + self.last_requests.remove(i); + } else { + if self.last_replies.len() >= MAX_CACHE_COUNT { + self.last_replies.remove(0); + } + self.last_replies.push(LastIcmp { + timestamp: pkt_timestamp, + id_and_seq: icmp_data.echo_id_seq, + }); + } + } + Ok(()) + } + + fn data_updated(&self) -> bool { + self.data_update_flag + } + + fn copy_and_reset_data(&mut self, _: bool) -> FlowPerfStats { + let mut stats = FlowPerfStats::default(); + stats.l4_protocol = L4Protocol::Icmp; + stats.tcp.srt_max = (self.srt_max.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; + stats.tcp.srt_sum = (self.srt_sum.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; + stats.tcp.srt_count = self.srt_count; + *self = IcmpPerf::default(); + + stats + } +} diff --git a/agent/src/flow_generator/perf/mod.rs b/agent/src/flow_generator/perf/mod.rs index ef95b2d0090..4dd3c07aa74 100644 --- a/agent/src/flow_generator/perf/mod.rs +++ b/agent/src/flow_generator/perf/mod.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +pub(crate) mod icmp; mod stats; pub mod tcp; pub(crate) mod udp; @@ -59,7 +60,7 @@ use crate::{ config::{handler::LogParserConfig, FlowConfig}, }; -use {tcp::TcpPerf, udp::UdpPerf}; +use {icmp::IcmpPerf, tcp::TcpPerf, udp::UdpPerf}; pub use stats::FlowPerfCounter; pub use stats::PerfStats; @@ -88,6 +89,7 @@ pub trait L7FlowPerf { pub enum L4FlowPerfTable { Tcp(Box), Udp(UdpPerf), + Icmp(IcmpPerf), } impl L4FlowPerf for L4FlowPerfTable { @@ -95,6 +97,7 @@ impl L4FlowPerf for L4FlowPerfTable { match self { Self::Tcp(p) => p.parse(packet, direction), Self::Udp(p) => p.parse(packet, direction), + Self::Icmp(p) => p.parse(packet, direction), } } @@ -102,6 +105,7 @@ impl L4FlowPerf for L4FlowPerfTable { match self { Self::Tcp(p) => p.data_updated(), Self::Udp(p) => p.data_updated(), + Self::Icmp(p) => p.data_updated(), } } @@ -109,6 +113,7 @@ impl L4FlowPerf for L4FlowPerfTable { match self { Self::Tcp(p) => p.copy_and_reset_data(flow_reversed), Self::Udp(p) => p.copy_and_reset_data(flow_reversed), + Self::Icmp(p) => p.copy_and_reset_data(flow_reversed), } } } @@ -153,7 +158,7 @@ impl L7ProtocolChecker { iter: match l4_protocol { L4Protocol::Tcp => self.tcp.iter(), L4Protocol::Udp => self.udp.iter(), - L4Protocol::Unknown => [].iter(), + _ => [].iter(), }, port, } @@ -483,6 +488,7 @@ impl FlowLog { .unwrap_or_else(|| Box::new(TcpPerf::new(counter))), )), L4Protocol::Udp => Some(L4FlowPerfTable::Udp(UdpPerf::new())), + L4Protocol::Icmp => Some(L4FlowPerfTable::Icmp(IcmpPerf::new())), _ => None, } } else { @@ -552,6 +558,13 @@ impl FlowLog { Ok(L7ParseResult::None) } + pub fn parse_l3(&mut self, packet: &mut MetaPacket) -> Result<()> { + if let Some(l4) = self.l4.as_mut() { + l4.parse(packet, false)?; + } + Ok(()) + } + pub fn copy_and_reset_l4_perf_data(&mut self, flow_reversed: bool, flow: &mut Flow) { if let Some(l4) = self.l4.as_mut() { if l4.data_updated() { diff --git a/agent/src/flow_generator/perf/tcp.rs b/agent/src/flow_generator/perf/tcp.rs index e423b77793a..a0bc3601336 100644 --- a/agent/src/flow_generator/perf/tcp.rs +++ b/agent/src/flow_generator/perf/tcp.rs @@ -27,7 +27,7 @@ use crate::{ enums::TcpFlags, flow::{FlowPerfStats, L4Protocol}, lookup_key::LookupKey, - meta_packet::{MetaPacket, MetaPacketTcpHeader}, + meta_packet::{MetaPacket, MetaPacketTcpHeader, ProtocolData}, Timestamp, }, flow_generator::error::{Error, Result}, @@ -109,15 +109,24 @@ impl SessionPeer { const SEQ_NUMBER_HIGH_THRESHOLD: u32 = 0xc0000000; fn is_sync_ack_ack_packet(&self, p: &MetaPacket) -> bool { - p.is_ack() && p.tcp_data.ack == self.seq_threshold + if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + return p.is_ack() && tcp_data.ack == self.seq_threshold; + } + false } fn is_reply_packet(&self, p: &MetaPacket) -> bool { - p.tcp_data.ack == self.seq.overflowing_add(self.payload_len).0 + if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + return tcp_data.ack == self.seq.overflowing_add(self.payload_len).0; + } + false } fn is_next_packet(&self, p: &MetaPacket) -> bool { - p.tcp_data.seq == self.seq.overflowing_add(self.payload_len).0 + if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + return tcp_data.seq == self.seq.overflowing_add(self.payload_len).0; + } + false } // merge array[index+1] into array[index] @@ -336,14 +345,18 @@ impl SessionPeer { // 在TCP_STATE_ESTABLISHED阶段更新数据 fn update_data(&mut self, p: &MetaPacket) { - let header = &p.tcp_data; + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + tcp_data + } else { + unreachable!(); + }; self.timestamp = p.lookup_key.timestamp.into(); self.payload_len = p.payload_len as u32; - if header.flags.contains(TcpFlags::SYN) { + if tcp_data.flags.contains(TcpFlags::SYN) { self.payload_len = 1; } - self.seq = header.seq; - self.win_size = header.win_size; + self.seq = tcp_data.seq; + self.win_size = tcp_data.win_size; // winScale不能在这里更新p.winScale = tcpHeader.WinScale } } @@ -609,6 +622,11 @@ impl TcpPerf { // fpd for first packet direction fn is_invalid_retrans_packet(&mut self, p: &MetaPacket, fpd: bool) -> (bool, bool) { + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + tcp_data + } else { + unreachable!(); + }; let (same_dir, oppo_dir) = if fpd { (&mut self.ctrl_info.0, &mut self.ctrl_info.1) } else { @@ -617,7 +635,7 @@ impl TcpPerf { if p.is_syn() { if same_dir.seq_threshold == 0 { // first SYN - same_dir.seq_threshold = p.tcp_data.seq + 1; + same_dir.seq_threshold = tcp_data.seq + 1; same_dir.first_syn_timestamp = p.lookup_key.timestamp.into(); self.handshaking = true; } else if same_dir.syn_transmitted { @@ -631,10 +649,10 @@ impl TcpPerf { if p.is_syn_ack() { if same_dir.seq_threshold == 0 { // first - same_dir.seq_threshold = p.tcp_data.seq + 1; + same_dir.seq_threshold = tcp_data.seq + 1; if oppo_dir.seq_threshold == 0 { // no syn before first syn/ack - oppo_dir.seq_threshold = p.tcp_data.ack; + oppo_dir.seq_threshold = tcp_data.ack; } else { oppo_dir.rtt_full_precondition = true; } @@ -658,7 +676,7 @@ impl TcpPerf { } // 连接建立后,即ESTABLISHED阶段,用SeqArray判断包重传 - match same_dir.assert_seq_number(&p.tcp_data, p.payload_len) { + match same_dir.assert_seq_number(tcp_data, p.payload_len) { PacketSeqType::Retrans => { // established retrans self.perf_data.calc_retrans(fpd); @@ -704,7 +722,10 @@ impl TcpPerf { oppo_dir: &mut SessionPeer, p: &MetaPacket, ) -> bool { - p.is_ack() && oppo_dir.seq_threshold == p.tcp_data.ack + if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + return p.is_ack() && oppo_dir.seq_threshold == tcp_data.ack; + } + false } fn flow_opening(&mut self, p: &MetaPacket, fpd: bool) -> bool { @@ -745,9 +766,14 @@ impl TcpPerf { } } + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + tcp_data + } else { + unreachable!(); + }; if p.is_syn() || p.is_syn_ack() { - if p.tcp_data.win_scale > 0 { - same_dir.win_scale = WIN_SCALE_FLAG | p.tcp_data.win_scale.min(WIN_SCALE_MAX); + if tcp_data.win_scale > 0 { + same_dir.win_scale = WIN_SCALE_FLAG | tcp_data.win_scale.min(WIN_SCALE_MAX); } same_dir.rtt_calculable = false; @@ -831,9 +857,13 @@ impl TcpPerf { oppo_dir.srt_calculable = false; oppo_dir.art_calculable = false; } - + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + tcp_data + } else { + unreachable!(); + }; // zero_win, psh_urg_count_0 - let mut win_size = p.tcp_data.win_size as u32; + let mut win_size = tcp_data.win_size as u32; if same_dir.win_scale & oppo_dir.win_scale & WIN_SCALE_FLAG > 0 { win_size <<= (same_dir.win_scale & WIN_SCALE_MASK) as u32; } @@ -843,7 +873,7 @@ impl TcpPerf { } // PSH/URG - if p.tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK_URG { + if tcp_data.flags & TcpFlags::MASK == TcpFlags::PSH_ACK_URG { self.perf_data.calc_psh_urg(fpd); } // calculate client waiting time @@ -914,12 +944,17 @@ impl TcpPerf { // 异常flag判断,方向识别,payload_len计算等 // 去除功能不相关报文 fn is_interested_packet(&self, p: &MetaPacket) -> bool { - if p.tcp_data.data_offset == 0 { + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &p.protocol_data { + tcp_data + } else { + unreachable!(); + }; + if tcp_data.data_offset == 0 { // invalid tcp header or ip fragment return false; } - if !Self::is_interested_tcp_flags(p.tcp_data.flags) { + if !Self::is_interested_tcp_flags(tcp_data.flags) { self.counter .ignored_packet_count .fetch_add(1, Ordering::Relaxed); @@ -1102,13 +1137,13 @@ struct MiniMetaPacket { impl<'a> From for MetaPacket<'_> { fn from(m: MiniMetaPacket) -> Self { let mut packet = MetaPacket::empty(); - packet.tcp_data = MetaPacketTcpHeader { + packet.protocol_data = ProtocolData::TcpHeader(MetaPacketTcpHeader { data_offset: m.data_offset, flags: m.flags, seq: m.seq, ack: m.ack, ..Default::default() - }; + }); packet.lookup_key = LookupKey { timestamp: Timestamp::from_secs(m.timestamp), ..Default::default() @@ -1886,8 +1921,13 @@ mod tests { let first_packet = &packets[0]; for (i, packet) in packets.iter().enumerate() { + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &packet.protocol_data { + tcp_data + } else { + unreachable!(); + }; assert!( - packet.tcp_data.data_offset > 0, + tcp_data.data_offset > 0, "raw packet is not tcp, packet#{} is {}", i, packet @@ -1955,7 +1995,12 @@ mod tests { assert!(n < packets.len()); for i in 0..n { let packet = &packets[i]; - assert!(packet.tcp_data.data_offset != 0); + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &packet.protocol_data { + tcp_data + } else { + unreachable!(); + }; + assert!(tcp_data.data_offset != 0); perf.parse(&packet, first_packet_src_ip == packet.lookup_key.src_ip) .unwrap(); } @@ -1971,7 +2016,12 @@ mod tests { if i + 1 == ignore_nth_packet { continue; } - assert!(packet.tcp_data.data_offset != 0); + let tcp_data = if let ProtocolData::TcpHeader(tcp_data) = &packet.protocol_data { + tcp_data + } else { + unreachable!(); + }; + assert!(tcp_data.data_offset != 0); let _ = perf.parse(&*packet, first_packet_src_ip == packet.lookup_key.src_ip); if first_report_moment == i as isize + 1 { diff --git a/agent/src/flow_generator/protocol_logs/parser.rs b/agent/src/flow_generator/protocol_logs/parser.rs index 81e8f5772d4..0b1dd457c15 100644 --- a/agent/src/flow_generator/protocol_logs/parser.rs +++ b/agent/src/flow_generator/protocol_logs/parser.rs @@ -41,6 +41,7 @@ use crate::{ enums::EthernetType, flow::{get_uniq_flow_id_in_one_minute, PacketDirection, SignalSource}, l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface}, + meta_packet::ProtocolData, MetaPacket, TaggedFlow, }, config::handler::LogParserAccess, @@ -150,8 +151,13 @@ impl MetaAppProto { } } + let seq = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data.seq + } else { + 0 + }; if meta_packet.lookup_key.direction == PacketDirection::ClientToServer { - base_info.req_tcp_seq = meta_packet.tcp_data.seq + l7_info.tcp_seq_offset(); + base_info.req_tcp_seq = seq + l7_info.tcp_seq_offset(); // ebpf info base_info.syscall_trace_id_request = meta_packet.syscall_trace_id; @@ -167,7 +173,7 @@ impl MetaAppProto { ); } - base_info.resp_tcp_seq = meta_packet.tcp_data.seq + l7_info.tcp_seq_offset(); + base_info.resp_tcp_seq = seq + l7_info.tcp_seq_offset(); // ebpf info base_info.syscall_trace_id_response = meta_packet.syscall_trace_id;