Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support tcp packet reassembly #7942

Draft
wants to merge 1 commit into
base: v6.5
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
98 changes: 81 additions & 17 deletions agent/src/common/meta_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,38 @@ use reorder::{CacheItem, Downcast};
pub enum RawPacket<'a> {
Borrowed(&'a [u8]),
Owned(BatchedBuffer<u8>),
OwnedVec(Vec<u8>),
}

impl<'a> RawPacket<'a> {
pub fn len(&self) -> usize {
match self {
Self::Borrowed(b) => b.len(),
Self::Owned(o) => o.len(),
Self::OwnedVec(v) => v.len(),
}
}

pub fn to_vec(&self) -> Vec<u8> {
match self {
Self::Borrowed(b) => b.to_vec(),
Self::Owned(o) => o.to_vec(),
Self::OwnedVec(v) => v.clone(),
}
}

pub fn to_owned_vec(&mut self) {
match self {
Self::Borrowed(b) => *self = Self::OwnedVec(b.to_vec()),
Self::Owned(o) => *self = Self::OwnedVec(o.to_vec()),
_ => {}
}
}

pub fn append(&mut self, payload: &[u8]) {
match self {
Self::OwnedVec(v) => v.append(&mut payload.to_vec()),
_ => {}
}
}
}
Expand All @@ -89,6 +114,7 @@ impl<'a> Deref for RawPacket<'a> {
match self {
Self::Borrowed(b) => b,
Self::Owned(o) => &o,
Self::OwnedVec(v) => v.as_slice(),
}
}
}
Expand All @@ -99,6 +125,12 @@ impl<'a> From<&'a [u8]> for RawPacket<'a> {
}
}

impl<'a> From<Vec<u8>> for RawPacket<'a> {
fn from(b: Vec<u8>) -> Self {
Self::OwnedVec(b)
}
}

impl<'a> From<BatchedBuffer<u8>> for RawPacket<'a> {
fn from(b: BatchedBuffer<u8>) -> Self {
Self::Owned(b)
Expand Down Expand Up @@ -953,24 +985,56 @@ impl<'a> MetaPacket<'a> {
0
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn to_owned_vec_packet(&mut self) -> MetaPacket<'static> {
let raw = self.raw.as_ref().unwrap().to_vec();

MetaPacket {
lookup_key: self.lookup_key.clone(),
raw: Some(RawPacket::from(raw)),
packet_len: self.packet_len,
l3_payload_len: self.l3_payload_len,
l4_payload_len: self.l4_payload_len,
tcp_options_flag: self.tcp_options_flag,
protocol_data: self.protocol_data.clone(),
tap_port: self.tap_port,
signal_source: self.signal_source,
payload_len: self.payload_len,
sub_packets: self.sub_packets.clone(),
header_type: self.header_type,
l2_l3_opt_size: self.l2_l3_opt_size,
l4_opt_size: self.l4_opt_size,
..Default::default()
}
}

pub fn merge(&mut self, packet: &mut MetaPacket) {
self.raw_from_ebpf.append(&mut packet.raw_from_ebpf);
self.sub_packets.push(SubPacket {
cap_seq: packet.cap_start_seq,
syscall_trace_id: packet.syscall_trace_id,
raw_from_ebpf_offset: self.l4_payload_len as usize,
timestamp: packet.lookup_key.timestamp,
tcp_seq: if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data {
tcp_data.seq
} else {
0
},
});
self.packet_len += packet.packet_len - 54;
self.payload_len += packet.payload_len;
self.l4_payload_len += packet.l4_payload_len;
self.cap_end_seq = packet.cap_start_seq;
if self.ebpf_type != EbpfType::None {
self.raw_from_ebpf.append(&mut packet.raw_from_ebpf);
self.sub_packets.push(SubPacket {
cap_seq: packet.cap_start_seq,
syscall_trace_id: packet.syscall_trace_id,
raw_from_ebpf_offset: self.l4_payload_len as usize,
timestamp: packet.lookup_key.timestamp,
tcp_seq: if let ProtocolData::TcpHeader(tcp_data) = &mut packet.protocol_data {
tcp_data.seq
} else {
0
},
});
self.packet_len += packet.packet_len - 54;
self.payload_len += packet.payload_len;
self.l4_payload_len += packet.l4_payload_len;
self.cap_end_seq = packet.cap_start_seq;
} else {
if let Some(payload) = packet.get_l4_payload() {
if let Some(raw) = self.raw.as_mut() {
raw.append(payload);
self.packet_len += packet.payload_len as u32;
self.payload_len += packet.payload_len;
self.l4_payload_len += packet.l4_payload_len;
}
}
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down
2 changes: 2 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ pub struct YamlConfig {
pub oracle_parse_config: OracleParseConfig,
pub server_ports: Vec<u16>,
pub consistent_timestamp_in_l7_metrics: bool,
pub packet_segmentation_reassembly: bool,
}

impl YamlConfig {
Expand Down Expand Up @@ -1063,6 +1064,7 @@ impl Default for YamlConfig {
ebpf_collector_queue_size: 65535,
server_ports: vec![],
consistent_timestamp_in_l7_metrics: false,
packet_segmentation_reassembly: false,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ pub struct FlowConfig {
pub obfuscate_enabled_protocols: L7ProtocolBitmap,
pub server_ports: Vec<u16>,
pub consistent_timestamp_in_l7_metrics: bool,

pub packet_segmentation_reassembly: bool,
}

impl From<&RuntimeConfig> for FlowConfig {
Expand Down Expand Up @@ -574,6 +576,7 @@ impl From<&RuntimeConfig> for FlowConfig {
),
server_ports: conf.yaml_config.server_ports.clone(),
consistent_timestamp_in_l7_metrics: conf.yaml_config.consistent_timestamp_in_l7_metrics,
packet_segmentation_reassembly: conf.yaml_config.packet_segmentation_reassembly,
}
}
}
Expand Down Expand Up @@ -623,6 +626,10 @@ impl fmt::Debug for FlowConfig {
// .field("l7_protocol_parse_port_bitmap", &self.l7_protocol_parse_port_bitmap)
.field("plugins", &self.plugins)
.field("server_ports", &self.server_ports)
.field(
"packet_segmentation_reassembly",
&self.packet_segmentation_reassembly,
)
.finish()
}
}
Expand Down
49 changes: 38 additions & 11 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,10 @@ impl FlowMap {
return false;
}

let config = &config.flow;
let flow_config = &config.flow;

// FlowMap 时间窗口无法推动
if timestamp - config.packet_delay - TIME_UNIT < self.start_time {
if timestamp - flow_config.packet_delay - TIME_UNIT < self.start_time {
return true;
}

Expand All @@ -550,7 +550,7 @@ impl FlowMap {
);
// 根据包到达时间的容差调整
let next_start_time_in_unit =
((timestamp - config.packet_delay).as_nanos() / TIME_UNIT.as_nanos()) as u64;
((timestamp - flow_config.packet_delay).as_nanos() / TIME_UNIT.as_nanos()) as u64;
debug!(
"flow_map#{} ticker flush [{:?}, {:?}) at {:?} time diff is {:?}",
self.id,
Expand Down Expand Up @@ -614,7 +614,7 @@ impl FlowMap {
continue;
}
// 未超时Flow的统计信息发送到队列下游
self.node_updated_aftercare(&config, node, timestamp, None);
self.node_updated_aftercare(&flow_config, node, timestamp, None);
// Enterprise Edition Feature: packet-sequence
if self.packet_sequence_enabled {
if let Some(block) = node.packet_sequence_block.take() {
Expand Down Expand Up @@ -650,7 +650,7 @@ impl FlowMap {
self.node_map.replace((node_map, time_set));

self.start_time_in_unit = next_start_time_in_unit;
self.flush_queue(&config, timestamp);
self.flush_queue(&flow_config, timestamp);

self.flush_app_protolog();

Expand Down Expand Up @@ -765,7 +765,7 @@ impl FlowMap {
let node = nodes.swap_remove(index);
self.send_socket_close_event(&node);
self.node_removed_aftercare(
&flow_config,
&config,
node,
meta_packet.lookup_key.timestamp.into(),
Some(meta_packet),
Expand Down Expand Up @@ -877,8 +877,15 @@ impl FlowMap {
let collector_config = config.collector;
let flow_closed = self.update_tcp_flow(config, meta_packet, node);
if flow_config.collector_enabled {
let direction = meta_packet.lookup_key.direction == PacketDirection::ClientToServer;
self.collect_metric(config, node, meta_packet, direction, false);
if let Some(mut packets) = node.tcp_segments.inject(meta_packet) {
for packet in &mut packets {
let direction = packet.lookup_key.direction == PacketDirection::ClientToServer;
self.collect_metric(config, node, packet, direction, false);
}
} else {
let direction = meta_packet.lookup_key.direction == PacketDirection::ClientToServer;
self.collect_metric(config, node, meta_packet, direction, false);
}
}

// After collect_metric() is called for eBPF MetaPacket, its direction is determined.
Expand Down Expand Up @@ -1253,6 +1260,8 @@ impl FlowMap {
node.endpoint_data_cache = Default::default();
node.packet_sequence_block = None; // Enterprise Edition Feature: packet-sequence
node.residual_request = 0;
node.tcp_segments
.init(config.flow.packet_segmentation_reassembly);
#[cfg(any(target_os = "linux", target_os = "android"))]
let local_epc_id = match config.ebpf.as_ref() {
Some(c) => c.epc_id as i32,
Expand Down Expand Up @@ -1313,6 +1322,7 @@ impl FlowMap {
);
if need_reverse {
node.tagged_flow.flow.reverse(true);
node.tcp_segments.reverse();
}
node.tagged_flow.flow.direction_score = direction_score;
}
Expand Down Expand Up @@ -1739,7 +1749,14 @@ impl FlowMap {
}

if flow_config.collector_enabled {
self.collect_metric(config, &mut node, meta_packet, !reverse, true);
if let Some(mut packets) = node.tcp_segments.inject(meta_packet) {
for packet in &mut packets {
let direction = packet.lookup_key.direction == PacketDirection::ClientToServer;
self.collect_metric(config, &mut node, packet, direction, false);
}
} else {
self.collect_metric(config, &mut node, meta_packet, !reverse, true);
}
}

// After collect_metric() is called for eBPF MetaPacket, its direction is determined.
Expand Down Expand Up @@ -1921,11 +1938,20 @@ impl FlowMap {
// go 版本的removeAndOutput
fn node_removed_aftercare(
&mut self,
config: &FlowConfig,
config: &Config,
mut node: Box<FlowNode>,
timeout: Duration,
meta_packet: Option<&mut MetaPacket>,
) {
if config.flow.collector_enabled {
if let Some(mut packets) = node.tcp_segments.flush() {
for packet in &mut packets {
let direction = packet.lookup_key.direction == PacketDirection::ClientToServer;
self.collect_metric(config, &mut node, packet, direction, false);
}
}
}

// 统计数据输出前矫正流方向
self.update_flow_direction(&mut node, meta_packet);

Expand All @@ -1952,7 +1978,7 @@ impl FlowMap {
}

let mut collect_stats = false;
if config.collector_enabled
if config.flow.collector_enabled
&& (flow.flow_key.proto == IpProtocol::TCP
|| flow.flow_key.proto == IpProtocol::UDP
|| flow.flow_key.proto == IpProtocol::ICMPV4
Expand Down Expand Up @@ -2247,6 +2273,7 @@ impl FlowMap {
}
node.tagged_flow.flow.reverse(is_first_packet);
node.tagged_flow.tag.reverse();
node.tcp_segments.reverse();
// Enterprise Edition Feature: packet-sequence
if node.packet_sequence_block.is_some() {
node.packet_sequence_block
Expand Down
Loading
Loading