diff --git a/agent/resources/test/flow_generator/tcp-segment.pcap b/agent/resources/test/flow_generator/tcp-segment.pcap new file mode 100644 index 00000000000..0cbf038ced4 Binary files /dev/null and b/agent/resources/test/flow_generator/tcp-segment.pcap differ diff --git a/agent/src/common/meta_packet.rs b/agent/src/common/meta_packet.rs index 87a2d1369cd..74c7006ef59 100644 --- a/agent/src/common/meta_packet.rs +++ b/agent/src/common/meta_packet.rs @@ -71,6 +71,7 @@ use reorder::{CacheItem, Downcast}; pub enum RawPacket<'a> { Borrowed(&'a [u8]), Owned(BatchedBuffer), + OwnedVec(Vec), } impl<'a> RawPacket<'a> { @@ -78,6 +79,30 @@ impl<'a> RawPacket<'a> { match self { Self::Borrowed(b) => b.len(), Self::Owned(o) => o.len(), + Self::OwnedVec(v) => v.len(), + } + } + + pub fn to_vec(&self) -> Vec { + match self { + Self::Borrowed(b) => b.to_vec(), + Self::Owned(o) => o.to_vec(), + Self::OwnedVec(v) => v.clone(), + } + } + + pub fn to_owned_vec(&mut self) { + match self { + Self::Borrowed(b) => *self = Self::OwnedVec(b.to_vec()), + Self::Owned(o) => *self = Self::OwnedVec(o.to_vec()), + _ => {} + } + } + + pub fn append(&mut self, payload: &[u8]) { + match self { + Self::OwnedVec(v) => v.append(&mut payload.to_vec()), + _ => {} } } } @@ -89,6 +114,7 @@ impl<'a> Deref for RawPacket<'a> { match self { Self::Borrowed(b) => b, Self::Owned(o) => &o, + Self::OwnedVec(v) => v.as_slice(), } } } @@ -99,6 +125,12 @@ impl<'a> From<&'a [u8]> for RawPacket<'a> { } } +impl<'a> From> for RawPacket<'a> { + fn from(b: Vec) -> Self { + Self::OwnedVec(b) + } +} + impl<'a> From> for RawPacket<'a> { fn from(b: BatchedBuffer) -> Self { Self::Owned(b) @@ -953,24 +985,56 @@ impl<'a> MetaPacket<'a> { 0 } - #[cfg(any(target_os = "linux", target_os = "android"))] + pub fn to_owned_vec_packet(&mut self) -> MetaPacket<'static> { + let raw = self.raw.as_ref().unwrap().to_vec(); + + MetaPacket { + lookup_key: self.lookup_key.clone(), + raw: Some(RawPacket::from(raw)), + packet_len: self.packet_len, + l3_payload_len: self.l3_payload_len, + l4_payload_len: self.l4_payload_len, + tcp_options_flag: self.tcp_options_flag, + protocol_data: self.protocol_data.clone(), + tap_port: self.tap_port, + signal_source: self.signal_source, + payload_len: self.payload_len, + sub_packets: self.sub_packets.clone(), + header_type: self.header_type, + l2_l3_opt_size: self.l2_l3_opt_size, + l4_opt_size: self.l4_opt_size, + ..Default::default() + } + } + pub fn merge(&mut self, packet: &mut MetaPacket) { - self.raw_from_ebpf.append(&mut packet.raw_from_ebpf); - self.sub_packets.push(SubPacket { - cap_seq: packet.cap_start_seq, - syscall_trace_id: packet.syscall_trace_id, - raw_from_ebpf_offset: self.l4_payload_len as usize, - timestamp: packet.lookup_key.timestamp, - tcp_seq: if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data { - tcp_data.seq - } else { - 0 - }, - }); - self.packet_len += packet.packet_len - 54; - self.payload_len += packet.payload_len; - self.l4_payload_len += packet.l4_payload_len; - self.cap_end_seq = packet.cap_start_seq; + if self.ebpf_type != EbpfType::None { + self.raw_from_ebpf.append(&mut packet.raw_from_ebpf); + self.sub_packets.push(SubPacket { + cap_seq: packet.cap_start_seq, + syscall_trace_id: packet.syscall_trace_id, + raw_from_ebpf_offset: self.l4_payload_len as usize, + timestamp: packet.lookup_key.timestamp, + tcp_seq: if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data { + tcp_data.seq + } else { + 0 + }, + }); + self.packet_len += packet.packet_len - 54; + self.payload_len += packet.payload_len; + self.l4_payload_len += packet.l4_payload_len; + self.cap_end_seq = packet.cap_start_seq; + } else { + if let Some(payload) = packet.get_l4_payload() { + if let Some(raw) = self.raw.as_mut() { + raw.append(payload); + self.packet_len += packet.payload_len as u32; + self.payload_len += packet.payload_len; + self.l4_payload_len += packet.l4_payload_len; + } + } + } } #[cfg(any(target_os = "linux", target_os = "android"))] diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 3dffaae4810..5b649f00c5e 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -639,6 +639,7 @@ pub struct YamlConfig { pub oracle_parse_config: OracleParseConfig, pub server_ports: Vec, pub consistent_timestamp_in_l7_metrics: bool, + pub packet_segmentation_reassembly: bool, } impl YamlConfig { @@ -1063,6 +1064,7 @@ impl Default for YamlConfig { ebpf_collector_queue_size: 65535, server_ports: vec![], consistent_timestamp_in_l7_metrics: false, + packet_segmentation_reassembly: false, } } } diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index c6aa6bcb795..3d623e6f1e4 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -471,6 +471,8 @@ pub struct FlowConfig { pub obfuscate_enabled_protocols: L7ProtocolBitmap, pub server_ports: Vec, pub consistent_timestamp_in_l7_metrics: bool, + + pub packet_segmentation_reassembly: bool, } impl From<&RuntimeConfig> for FlowConfig { @@ -574,6 +576,7 @@ impl From<&RuntimeConfig> for FlowConfig { ), server_ports: conf.yaml_config.server_ports.clone(), consistent_timestamp_in_l7_metrics: conf.yaml_config.consistent_timestamp_in_l7_metrics, + packet_segmentation_reassembly: conf.yaml_config.packet_segmentation_reassembly, } } } @@ -623,6 +626,10 @@ impl fmt::Debug for FlowConfig { // .field("l7_protocol_parse_port_bitmap", &self.l7_protocol_parse_port_bitmap) .field("plugins", &self.plugins) .field("server_ports", &self.server_ports) + .field( + "packet_segmentation_reassembly", + &self.packet_segmentation_reassembly, + ) .finish() } } diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index cbc0accf2ad..ab9c0a64346 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -524,10 +524,10 @@ impl FlowMap { return false; } - let config = &config.flow; + let flow_config = &config.flow; // FlowMap 时间窗口无法推动 - if timestamp - config.packet_delay - TIME_UNIT < self.start_time { + if timestamp - flow_config.packet_delay - TIME_UNIT < self.start_time { return true; } @@ -550,7 +550,7 @@ impl FlowMap { ); // 根据包到达时间的容差调整 let next_start_time_in_unit = - ((timestamp - config.packet_delay).as_nanos() / TIME_UNIT.as_nanos()) as u64; + ((timestamp - flow_config.packet_delay).as_nanos() / TIME_UNIT.as_nanos()) as u64; debug!( "flow_map#{} ticker flush [{:?}, {:?}) at {:?} time diff is {:?}", self.id, @@ -614,7 +614,7 @@ impl FlowMap { continue; } // 未超时Flow的统计信息发送到队列下游 - self.node_updated_aftercare(&config, node, timestamp, None); + self.node_updated_aftercare(&flow_config, node, timestamp, None); // Enterprise Edition Feature: packet-sequence if self.packet_sequence_enabled { if let Some(block) = node.packet_sequence_block.take() { @@ -650,7 +650,7 @@ impl FlowMap { self.node_map.replace((node_map, time_set)); self.start_time_in_unit = next_start_time_in_unit; - self.flush_queue(&config, timestamp); + self.flush_queue(&flow_config, timestamp); self.flush_app_protolog(); @@ -765,7 +765,7 @@ impl FlowMap { let node = nodes.swap_remove(index); self.send_socket_close_event(&node); self.node_removed_aftercare( - &flow_config, + &config, node, meta_packet.lookup_key.timestamp.into(), Some(meta_packet), @@ -877,8 +877,15 @@ impl FlowMap { let collector_config = config.collector; let flow_closed = self.update_tcp_flow(config, meta_packet, node); if flow_config.collector_enabled { - let direction = meta_packet.lookup_key.direction == PacketDirection::ClientToServer; - self.collect_metric(config, node, meta_packet, direction, false); + if let Some(mut packets) = node.tcp_segments.inject(meta_packet) { + for packet in &mut packets { + let direction = packet.lookup_key.direction == PacketDirection::ClientToServer; + self.collect_metric(config, node, packet, direction, false); + } + } else { + let direction = meta_packet.lookup_key.direction == PacketDirection::ClientToServer; + self.collect_metric(config, node, meta_packet, direction, false); + } } // After collect_metric() is called for eBPF MetaPacket, its direction is determined. @@ -1253,6 +1260,8 @@ impl FlowMap { node.endpoint_data_cache = Default::default(); node.packet_sequence_block = None; // Enterprise Edition Feature: packet-sequence node.residual_request = 0; + node.tcp_segments + .init(config.flow.packet_segmentation_reassembly); #[cfg(any(target_os = "linux", target_os = "android"))] let local_epc_id = match config.ebpf.as_ref() { Some(c) => c.epc_id as i32, @@ -1313,6 +1322,7 @@ impl FlowMap { ); if need_reverse { node.tagged_flow.flow.reverse(true); + node.tcp_segments.reverse(); } node.tagged_flow.flow.direction_score = direction_score; } @@ -1739,7 +1749,14 @@ impl FlowMap { } if flow_config.collector_enabled { - self.collect_metric(config, &mut node, meta_packet, !reverse, true); + if let Some(mut packets) = node.tcp_segments.inject(meta_packet) { + for packet in &mut packets { + let direction = packet.lookup_key.direction == PacketDirection::ClientToServer; + self.collect_metric(config, &mut node, packet, direction, false); + } + } else { + self.collect_metric(config, &mut node, meta_packet, !reverse, true); + } } // After collect_metric() is called for eBPF MetaPacket, its direction is determined. @@ -1921,11 +1938,20 @@ impl FlowMap { // go 版本的removeAndOutput fn node_removed_aftercare( &mut self, - config: &FlowConfig, + config: &Config, mut node: Box, timeout: Duration, meta_packet: Option<&mut MetaPacket>, ) { + if config.flow.collector_enabled { + if let Some(mut packets) = node.tcp_segments.flush() { + for packet in &mut packets { + let direction = packet.lookup_key.direction == PacketDirection::ClientToServer; + self.collect_metric(config, &mut node, packet, direction, false); + } + } + } + // 统计数据输出前矫正流方向 self.update_flow_direction(&mut node, meta_packet); @@ -1952,7 +1978,7 @@ impl FlowMap { } let mut collect_stats = false; - if config.collector_enabled + if config.flow.collector_enabled && (flow.flow_key.proto == IpProtocol::TCP || flow.flow_key.proto == IpProtocol::UDP || flow.flow_key.proto == IpProtocol::ICMPV4 @@ -2247,6 +2273,7 @@ impl FlowMap { } node.tagged_flow.flow.reverse(is_first_packet); node.tagged_flow.tag.reverse(); + node.tcp_segments.reverse(); // Enterprise Edition Feature: packet-sequence if node.packet_sequence_block.is_some() { node.packet_sequence_block diff --git a/agent/src/flow_generator/flow_node.rs b/agent/src/flow_generator/flow_node.rs index 2f49c89b791..25075ae9108 100644 --- a/agent/src/flow_generator/flow_node.rs +++ b/agent/src/flow_generator/flow_node.rs @@ -21,11 +21,12 @@ use ahash::AHashMap; use super::{perf::FlowLog, FlowState, FLOW_METRICS_PEER_DST, FLOW_METRICS_PEER_SRC}; use crate::common::{ decapsulate::TunnelType, + ebpf::EbpfType, endpoint::EndpointDataPov, - enums::{EthernetType, TapType, TcpFlags}, + enums::{EthernetType, IpProtocol, TapType, TcpFlags}, flow::{FlowMetricsPeer, PacketDirection, SignalSource, TcpPerfStats}, lookup_key::LookupKey, - meta_packet::MetaPacket, + meta_packet::{MetaPacket, ProtocolData}, tagged_flow::TaggedFlow, TapPort, Timestamp, }; @@ -116,6 +117,108 @@ impl FlowMapKey { } } +#[derive(Default)] +pub struct PacketSegmentationReassembly { + tcp_seq: [u32; 2], + packets: [Option>; 2], + need_to_reassemble: bool, +} + +impl PacketSegmentationReassembly { + const THRESHOLD: u16 = 3000; + pub fn init(&mut self, need_to_reassemble: bool) { + self.need_to_reassemble = need_to_reassemble; + } + + pub fn reverse(&mut self) { + self.tcp_seq.swap(0, 1); + self.packets.swap(0, 1); + } + + pub fn flush(&mut self) -> Option>> { + let mut meta_packets = vec![]; + + if let Some(packet) = self.packets[0].take() { + meta_packets.push(packet); + } + + if let Some(packet) = self.packets[1].take() { + meta_packets.push(packet); + } + + if meta_packets.is_empty() { + return None; + } + + Some(meta_packets) + } + + pub fn inject(&mut self, meta_packet: &mut MetaPacket) -> Option>> { + if !self.need_to_reassemble { + return None; + } + + if meta_packet.lookup_key.proto != IpProtocol::TCP + || meta_packet.raw.is_none() + || meta_packet.ebpf_type != EbpfType::None + { + return None; + } + + let mut meta_packets = vec![]; + + let (seq, segments) = if meta_packet.lookup_key.direction == PacketDirection::ClientToServer + { + // 输出另一个方向的流量 + if let Some(packet) = self.packets[1].take() { + meta_packets.push(packet); + } + (&mut self.tcp_seq[0], &mut self.packets[0]) + } else { + // 输出另一个方向的流量 + if let Some(packet) = self.packets[0].take() { + meta_packets.push(packet); + } + (&mut self.tcp_seq[1], &mut self.packets[1]) + }; + + let tcp_seq = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data { + tcp_data.seq + } else { + unreachable!() + }; + + if let Some(mut packet) = segments.take() { + if tcp_seq != *seq { + // TCP Seq不连续, 输出上一个包, 保存下一个包 + meta_packets.push(packet); + + *segments = Some(meta_packet.to_owned_vec_packet()); + *seq = tcp_seq + meta_packet.payload_len as u32; + } else { + // TCP Seq连续时: + // 1. 聚合在一起输出 + // 2. 存储并等待和后续包聚合在一起 + if packet.get_pkt_size() < Self::THRESHOLD + && meta_packet.get_pkt_size() < Self::THRESHOLD + && packet.l4_payload_len() > 0 + { + packet.merge(meta_packet); + } + meta_packets.push(packet); + + *segments = Some(meta_packet.to_owned_vec_packet()); + *seq = tcp_seq + meta_packet.payload_len as u32; + } + } else { + *segments = Some(meta_packet.to_owned_vec_packet()); + *seq = tcp_seq + meta_packet.payload_len as u32; + } + + Some(meta_packets) + } +} + #[derive(Default)] pub struct FlowNode { pub tagged_flow: TaggedFlow, @@ -144,6 +247,9 @@ pub struct FlowNode { // Enterprise Edition Feature: packet-sequence pub packet_sequence_block: Option>, + + // tcp segments + pub tcp_segments: PacketSegmentationReassembly, } impl FlowNode { @@ -389,3 +495,36 @@ impl FlowNode { } } } + +#[cfg(test)] +mod tests { + use super::PacketSegmentationReassembly; + use crate::utils::test::Capture; + + #[test] + fn test_packet_segmentation_reassembly() { + let mut outputs = vec![]; + let mut tcp_segments = PacketSegmentationReassembly::default(); + tcp_segments.init(true); + + let capture = + Capture::load_pcap("resources/test/flow_generator/tcp-segment.pcap", Some(2000)); + let mut packets = capture.as_meta_packets(); + + for packet in &mut packets { + if let Some(mut packets) = tcp_segments.inject(packet) { + outputs.append(&mut packets); + } else { + outputs.push(packet.to_owned_vec_packet()); + } + } + + assert_eq!(outputs.len(), 2); + assert_eq!(outputs[0].payload_len, 2896); + assert_eq!(outputs[0].packet_len, 2962); + assert_eq!(outputs[0].get_l4_payload().as_ref().unwrap()[1447], 2); + assert_eq!(outputs[0].get_l4_payload().as_ref().unwrap()[1448], 0x2c); + assert_eq!(outputs[1].payload_len, 2896); + assert_eq!(outputs[1].packet_len, 2962); + } +} diff --git a/agent/src/flow_generator/flow_state.rs b/agent/src/flow_generator/flow_state.rs index d746d4608be..845cb361d73 100644 --- a/agent/src/flow_generator/flow_state.rs +++ b/agent/src/flow_generator/flow_state.rs @@ -825,6 +825,7 @@ mod tests { packet_in_tick: false, policy_in_tick: [false; 2], packet_sequence_block: Some(Box::new(PacketSequenceBlock::default())), // Enterprise Edition Feature: packet-sequence + ..Default::default() }; let peers = &mut flow_node.tagged_flow.flow.flow_metrics_peers; @@ -894,6 +895,7 @@ mod tests { packet_in_tick: false, policy_in_tick: [false; 2], packet_sequence_block: Some(Box::new(PacketSequenceBlock::default())), // Enterprise Edition Feature: packet-sequence + ..Default::default() }; let peers = &mut flow_node.tagged_flow.flow.flow_metrics_peers; diff --git a/agent/src/handler/mod.rs b/agent/src/handler/mod.rs index 0fd2c717cb8..bbe09d9da8d 100644 --- a/agent/src/handler/mod.rs +++ b/agent/src/handler/mod.rs @@ -109,6 +109,7 @@ impl<'a> MiniPacket<'a> { match &self.packet { RawPacket::Borrowed(r) => *r, RawPacket::Owned(r) => r.as_ref(), + RawPacket::OwnedVec(r) => r.as_slice(), } } } diff --git a/server/agent_config/config.go b/server/agent_config/config.go index 968d08c0d21..2659494c9fd 100644 --- a/server/agent_config/config.go +++ b/server/agent_config/config.go @@ -173,6 +173,7 @@ type StaticConfig struct { L7LogBlacklist map[string][]*L7LogBlacklist `yaml:"l7-log-blacklist,omitempty"` L7ProtocolAdvancedFeatures *L7ProtocolAdvancedFeatures `yaml:"l7-protocol-advanced-features,omitempty"` ConsistentTimestampInL7Metrics *bool `yaml:"consistent-timestamp-in-l7-metrics,omitempty"` + packet_segmentation_reassembly *bool `yaml:"packet-segmentation-reassembly,omitempty"` ServerPorts []uint16 `yaml:"server-ports,omitempty"` Ebpf *EbpfConfig `yaml:"ebpf,omitempty"` OsAppTagExecUser *string `yaml:"os-app-tag-exec-user,omitempty"` diff --git a/server/agent_config/example.yaml b/server/agent_config/example.yaml index df044260f6b..4d258fac94c 100644 --- a/server/agent_config/example.yaml +++ b/server/agent_config/example.yaml @@ -823,6 +823,10 @@ vtap_group_id: g-xxxxxx ## occurrence is used. #consistent-timestamp-in-l7-metrics: false + # Packet segmentation reassembly + # NOTE: After activation, it will aggregate two consecutive TCP packets together for application log parsing + #packet-segmentation-reassembly: false + ########## ## PCAP ## ##########