Skip to content

Commit

Permalink
feat: agent add syscall_trace_id_disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
TomatoMr committed Jul 30, 2024
1 parent 78eed03 commit 04fae80
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 124 deletions.
9 changes: 6 additions & 3 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ pub struct EbpfYamlConfig {
pub syscall_out_of_order_cache_size: usize,
pub syscall_out_of_order_reassembly: Vec<String>,
pub syscall_segmentation_reassembly: Vec<String>,
pub syscall_trace_id_disabled: bool,
}

impl Default for EbpfYamlConfig {
Expand Down Expand Up @@ -431,6 +432,7 @@ impl Default for EbpfYamlConfig {
syscall_out_of_order_reassembly: vec![],
syscall_segmentation_reassembly: vec![],
syscall_out_of_order_cache_size: 16,
syscall_trace_id_disabled: false,
}
}
}
Expand Down Expand Up @@ -1412,8 +1414,7 @@ impl RuntimeConfig {
global_pps_threshold: 2000000,
#[cfg(target_os = "linux")]
extra_netns_regex: Default::default(),
tap_interface_regex: "^(tap.*|cali.*|veth.*|eth.*|en[ospx].*|lxc.*|lo|[0-9a-f]+_h)$"
.into(),
tap_interface_regex: "".into(),
host: Default::default(),
rsyslog_enabled: false,
output_vlan: 0,
Expand Down Expand Up @@ -1540,7 +1541,9 @@ impl RuntimeConfig {
)));
}

if regex::Regex::new(&self.tap_interface_regex).is_err() {
if !self.tap_interface_regex.is_empty()
&& regex::Regex::new(&self.tap_interface_regex).is_err()
{
return Err(ConfigError::RuntimeConfigInvalid(format!(
"malformed tap-interface-regex({})",
self.tap_interface_regex
Expand Down
7 changes: 7 additions & 0 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,13 @@ impl ConfigHandler {
callbacks.push(leaky_bucket_callback);
}

// FIXME: 仅做测试用
if new_config.dispatcher.tap_interface_regex
== "^(tap.*|cali.*|veth.*|eth.*|en[osipx].*|lxc.*|lo|[0-9a-f]+_h)$"
{
new_config.dispatcher.tap_interface_regex = "".to_string();
}

info!(
"dispatcher config change from {:#?} to {:#?}",
candidate_config.dispatcher, new_config.dispatcher
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/analyzer_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl AnalyzerModeDispatcher {
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
let mut flow_map = FlowMap::new(
id as u32,
flow_output_queue,
Some(flow_output_queue),
l7_stats_output_queue,
policy_getter,
log_output_queue,
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/local_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl LocalModeDispatcher {
}
let mut flow_map = FlowMap::new(
base.id as u32,
base.flow_output_queue.clone(),
Some(base.flow_output_queue.clone()),
base.l7_stats_output_queue.clone(),
base.policy_getter,
base.log_output_queue.clone(),
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/local_plus_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl LocalPlusModeDispatcher {
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
let mut flow_map = FlowMap::new(
id as u32,
flow_output_queue,
Some(flow_output_queue),
l7_stats_output_queue,
policy_getter,
log_output_queue,
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/mirror_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl MirrorModeDispatcher {

let mut flow_map = FlowMap::new(
self.base.id as u32,
self.base.flow_output_queue.clone(),
Some(self.base.flow_output_queue.clone()),
self.base.l7_stats_output_queue.clone(),
self.base.policy_getter,
self.base.log_output_queue.clone(),
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/mirror_plus_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl MirrorPlusModeDispatcher {
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
let mut flow_map = FlowMap::new(
id as u32,
flow_output_queue,
Some(flow_output_queue),
l7_stats_output_queue,
policy_getter,
log_output_queue,
Expand Down
13 changes: 7 additions & 6 deletions agent/src/ebpf_dispatcher/ebpf_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::common::l7_protocol_log::{
};
use crate::common::meta_packet::{MetaPacket, SegmentFlags};
use crate::common::proc_event::{BoxedProcEvents, EventType, ProcEvent};
use crate::common::{FlowAclListener, FlowAclListenerId, TaggedFlow};
use crate::common::{FlowAclListener, FlowAclListenerId};
use crate::config::handler::{CollectorAccess, EbpfAccess, EbpfConfig, LogParserAccess};
use crate::config::FlowAccess;
use crate::ebpf;
Expand Down Expand Up @@ -221,8 +221,7 @@ struct EbpfDispatcher {

config: EbpfAccess,
output: DebugSender<Box<AppProto>>, // Send AppProtos to the AppProtoLogsParser
flow_output: DebugSender<Arc<BatchedBox<TaggedFlow>>>, // Send TaggedFlows to the QuadrupleGenerator
l7_stats_output: DebugSender<BatchedBox<L7Stats>>, // Send L7Stats to the QuadrupleGenerator
l7_stats_output: DebugSender<BatchedBox<L7Stats>>, // Send L7Stats to the QuadrupleGenerator
stats_collector: Arc<stats::Collector>,
}

Expand Down Expand Up @@ -323,7 +322,7 @@ impl EbpfDispatcher {
);
let mut flow_map = FlowMap::new(
self.dispatcher_id as u32,
self.flow_output.clone(),
None,
self.l7_stats_output.clone(),
self.policy_getter,
self.output.clone(),
Expand Down Expand Up @@ -692,6 +691,10 @@ impl EbpfCollector {
}
}

if config.ebpf.syscall_trace_id_disabled {
ebpf::disable_syscall_trace_id();
}

if ebpf::running_socket_tracer(
Self::ebpf_l7_callback, /* 回调接口 rust -> C */
config.ebpf.thread_num as i32, /* 工作线程数,是指用户态有多少线程参与数据处理 */
Expand Down Expand Up @@ -861,7 +864,6 @@ impl EbpfCollector {
collector_config: CollectorAccess,
policy_getter: PolicyGetter,
output: DebugSender<Box<AppProto>>,
flow_output: DebugSender<Arc<BatchedBox<TaggedFlow>>>,
l7_stats_output: DebugSender<BatchedBox<L7Stats>>,
proc_event_output: DebugSender<BoxedProcEvents>,
ebpf_profile_sender: DebugSender<Profile>,
Expand Down Expand Up @@ -907,7 +909,6 @@ impl EbpfCollector {
config,
log_parser_config,
output,
flow_output,
l7_stats_output,
flow_map_config,
stats_collector,
Expand Down
38 changes: 25 additions & 13 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub struct FlowMap {

tagged_flow_allocator: Allocator<TaggedFlow>,
l7_stats_allocator: Allocator<L7Stats>,
output_queue: DebugSender<Arc<BatchedBox<TaggedFlow>>>,
output_queue: Option<DebugSender<Arc<BatchedBox<TaggedFlow>>>>,
l7_stats_output_queue: DebugSender<BatchedBox<L7Stats>>,
out_log_queue: DebugSender<Box<AppProto>>,
output_buffer: Vec<Arc<BatchedBox<TaggedFlow>>>,
Expand Down Expand Up @@ -211,7 +211,7 @@ pub struct FlowMap {
impl FlowMap {
pub fn new(
id: u32,
output_queue: DebugSender<Arc<BatchedBox<TaggedFlow>>>,
output_queue: Option<DebugSender<Arc<BatchedBox<TaggedFlow>>>>,
l7_stats_output_queue: DebugSender<BatchedBox<L7Stats>>,
policy_getter: PolicyGetter,
app_proto_log_queue: DebugSender<Box<AppProto>>,
Expand Down Expand Up @@ -1837,8 +1837,13 @@ impl FlowMap {
self.l7_stats_buffer.clear();
}
}
if self.output_buffer.len() > 0 {
if let Err(e) = self.output_queue.send_all(&mut self.output_buffer) {
if self.output_queue.is_some() && self.output_buffer.len() > 0 {
if let Err(e) = self
.output_queue
.as_ref()
.unwrap()
.send_all(&mut self.output_buffer)
{
warn!(
"flow-map push tagged flows to queue failed, because {:?}",
e
Expand Down Expand Up @@ -1866,14 +1871,21 @@ impl FlowMap {
Ordering::Relaxed,
);

self.output_buffer.push(tagged_flow);
if self.output_buffer.len() >= QUEUE_BATCH_SIZE {
if let Err(e) = self.output_queue.send_all(&mut self.output_buffer) {
warn!(
"flow-map push tagged flows to queue failed, because {:?}",
e
);
self.output_buffer.clear();
if self.output_queue.is_some() {
self.output_buffer.push(tagged_flow);
if self.output_buffer.len() >= QUEUE_BATCH_SIZE {
if let Err(e) = self
.output_queue
.as_ref()
.unwrap()
.send_all(&mut self.output_buffer)
{
warn!(
"flow-map push tagged flows to queue failed, because {:?}",
e
);
self.output_buffer.clear();
}
}
}
}
Expand Down Expand Up @@ -2505,7 +2517,7 @@ pub fn _new_flow_map_and_receiver(
config.flow.trident_type = trident_type;
let flow_map = FlowMap::new(
0,
output_queue_sender,
Some(output_queue_sender),
l7_stats_output_queue_sender,
policy_getter,
app_proto_log_queue,
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ impl MetaAppProto {
}

Some(Self {
base_info,
direction: meta_packet.lookup_key.direction,
base_info,
direction_score: flow.flow.direction_score,
l7_info,
})
Expand Down
18 changes: 15 additions & 3 deletions agent/src/metric/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use public::{
utils::net::MacAddr,
};

#[derive(Debug)]
#[derive(Serialize, Debug)]
pub struct Document {
pub timestamp: u32,
pub tagger: Tagger,
Expand Down Expand Up @@ -89,9 +89,20 @@ impl Sendable for BoxedDocument {
fn message_type(&self) -> SendMessageType {
SendMessageType::Metrics
}

fn to_kv_string(&self, dst: &mut String) {
let json = serde_json::to_string(&(*self.0)).unwrap();
dst.push_str(&json);
dst.push('\n');
}

fn file_name(&self) -> &str {
"flow_metrics"
}
}

bitflags! {
#[derive(Serialize)]
pub struct DocumentFlag: u32 {
const NONE = 0; // PER_MINUTE_METRICS
const PER_SECOND_METRICS = 1<<0;
Expand All @@ -105,6 +116,7 @@ impl Default for DocumentFlag {
}

bitflags! {
#[derive(Serialize)]
pub struct Code:u64 {
const NONE = 0;

Expand Down Expand Up @@ -144,7 +156,7 @@ impl Default for Code {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum Direction {
None,
Expand Down Expand Up @@ -265,7 +277,7 @@ impl From<SpanKind> for TapSide {
}
}

#[derive(Debug, Clone)]
#[derive(Serialize, Debug, Clone)]
pub struct Tagger {
pub code: Code,

Expand Down
Loading

0 comments on commit 04fae80

Please sign in to comment.