From 64065011ae630d188fa81e6935a088b3d71cd525 Mon Sep 17 00:00:00 2001 From: yuanchao Date: Tue, 24 Sep 2024 17:52:20 +0800 Subject: [PATCH] feat: add process listener --- agent/src/config/config.rs | 332 ++++++++++++++++- agent/src/config/handler.rs | 85 +++-- agent/src/config/mod.rs | 8 +- agent/src/ebpf/mod.rs | 62 ++++ agent/src/ebpf_dispatcher.rs | 63 +++- agent/src/platform/mod.rs | 5 +- .../platform/platform_synchronizer/linux.rs | 41 ++- .../platform_synchronizer/linux_process.rs | 341 +++++------------- .../platform_synchronizer/linux_socket.rs | 247 +++++++------ .../src/platform/platform_synchronizer/mod.rs | 2 +- agent/src/platform/synchronizer.rs | 19 +- agent/src/trident.rs | 40 +- agent/src/utils/process/linux.rs | 220 ++++++++++- agent/src/utils/process/process.rs | 151 -------- server/agent_config/template.yaml | 18 +- 15 files changed, 1037 insertions(+), 597 deletions(-) diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 5e714935afa..92b6f352647 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -23,8 +23,12 @@ use std::net::{IpAddr, ToSocketAddrs}; use std::path::Path; use std::time::Duration; +#[cfg(any(target_os = "linux", target_os = "android"))] +use envmnt::{ExpandOptions, ExpansionType}; use log::{debug, error, info, warn}; use md5::{Digest, Md5}; +#[cfg(any(target_os = "linux", target_os = "android"))] +use procfs::process::Process; use regex::Regex; use serde::{ de::{self, Unexpected}, @@ -36,6 +40,8 @@ use tokio::runtime::Runtime; use crate::common::l7_protocol_log::L7ProtocolParser; use crate::dispatcher::recv_engine::DEFAULT_BLOCK_SIZE; use crate::flow_generator::{DnsLog, OracleLog, TlsLog}; +#[cfg(any(target_os = "linux", target_os = "android"))] +use crate::platform::{get_container_id, OsAppTag, ProcessData}; use crate::{ common::{ decapsulate::TunnelType, enums::CaptureNetworkType, DEFAULT_LOG_FILE, @@ -46,6 +52,7 @@ use crate::{ rpc::Session, trident::RunningMode, }; + use public::{ bitmap::Bitmap, consts::NPB_DEFAULT_PORT, @@ -309,10 +316,45 @@ impl Default for TagExtraction { } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +pub enum ProcessMatchType { + Cmd, + ProcessName, + ParentProcessName, + Tag, +} + +impl From<&str> for ProcessMatchType { + fn from(value: &str) -> Self { + match value { + OS_PROC_REGEXP_MATCH_TYPE_CMD => Self::Cmd, + OS_PROC_REGEXP_MATCH_TYPE_PARENT_PROC_NAME => Self::ParentProcessName, + OS_PROC_REGEXP_MATCH_TYPE_TAG => Self::Tag, + _ => Self::ProcessName, + } + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +pub enum ProcessMatchAction { + Accept, + Drop, +} + +impl From<&str> for ProcessMatchAction { + fn from(value: &str) -> Self { + match value { + OS_PROC_REGEXP_MATCH_ACTION_DROP => ProcessMatchAction::Drop, + _ => ProcessMatchAction::Accept, + } + } +} + +#[derive(Clone, Debug, Deserialize)] #[serde(default)] pub struct ProcessMatcher { - pub match_regex: String, - pub match_type: String, + #[serde(deserialize_with = "to_match_regex")] + pub match_regex: Regex, + pub match_type: ProcessMatchType, pub match_languages: Vec, pub match_usernames: Vec, pub only_in_container: bool, @@ -320,14 +362,46 @@ pub struct ProcessMatcher { pub ignore: bool, pub rewrite_name: String, pub enabled_features: Vec, - pub action: String, + pub action: ProcessMatchAction, +} + +impl Eq for ProcessMatcher {} + +impl PartialEq for ProcessMatcher { + fn eq(&self, other: &Self) -> bool { + self.match_regex.as_str() == other.match_regex.as_str() + && self.match_type == other.match_type + && self.match_languages == other.match_languages + && self.match_usernames == other.match_usernames + && self.only_in_container == other.only_in_container + && self.only_with_tag == other.only_with_tag + && self.ignore == other.ignore + && self.rewrite_name == other.rewrite_name + && self.enabled_features == other.enabled_features + && self.action == other.action + } +} + +fn to_match_regex<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let raw = String::deserialize(deserializer)?; + if let Ok(regex) = Regex::new(raw.as_str()) { + Ok(regex) + } else { + Err(de::Error::invalid_value( + Unexpected::Str(raw.as_str()), + &"See: https://regexr.com/", + )) + } } impl Default for ProcessMatcher { fn default() -> Self { Self { - match_regex: "deepflow-*".to_string(), - match_type: "cmdline".to_string(), + match_regex: Regex::new("deepflow-*").unwrap(), + match_type: ProcessMatchType::Cmd, match_languages: vec![], match_usernames: vec![], only_in_container: false, @@ -338,7 +412,122 @@ impl Default for ProcessMatcher { "ebpf.profile.on_cpu".to_string(), "ebpf.profile.off_cpu".to_string(), ], - action: OS_PROC_REGEXP_MATCH_ACTION_ACCEPT.to_string(), + action: ProcessMatchAction::Accept, + } + } +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +impl ProcessMatcher { + // TODO: match_languages + pub fn get_process_data( + &self, + process: &Process, + tag_map: &HashMap, + ) -> Option { + if self.only_in_container && get_container_id(process).is_none() { + return None; + } + if self.only_with_tag && !tag_map.contains_key(&(process.pid as u64)) { + return None; + } + + let env_rewrite = |r: String| { + envmnt::expand( + r.as_str(), + Some(ExpandOptions { + expansion_type: Some(ExpansionType::Windows), + default_to_empty: true, + }), + ) + }; + let Ok(mut process_data) = ProcessData::try_from(process) else { + return None; + }; + let mut match_replace_fn = + |reg: &Regex, act: &ProcessMatchAction, s: &String, replace: &String| { + if reg.is_match(s.as_str()) { + if act == &ProcessMatchAction::Accept && !replace.is_empty() { + process_data.name = reg.replace_all(s.as_str(), replace).to_string(); + } + true + } else { + false + } + }; + let replace = env_rewrite(self.rewrite_name.clone()); + + match self.match_type { + ProcessMatchType::Cmd => { + if match_replace_fn( + &self.match_regex, + &self.action, + &process_data.cmd.join(" "), + &replace, + ) { + Some(process_data) + } else { + None + } + } + ProcessMatchType::ProcessName => { + if match_replace_fn( + &self.match_regex, + &self.action, + &process_data.process_name, + &replace, + ) { + Some(process_data) + } else { + None + } + } + ProcessMatchType::ParentProcessName => { + fn match_parent(proc: &ProcessData, reg: &Regex) -> Option { + const MAX_DEPTH: usize = 10; + let mut ppid = proc.ppid; + let mut pid = proc.pid; + for _ in 0..MAX_DEPTH { + if ppid == 0 { + return None; + } + + let Ok(parent) = Process::new(ppid as i32) else { + return None; + }; + + let Ok(parent_data) = ProcessData::try_from(&parent) else { + error!("pid {} have no parent proc with ppid: {}", pid, ppid); + return None; + }; + + if reg.is_match(&parent_data.process_name.as_str()) { + return Some(parent_data); + } + ppid = parent_data.ppid; + pid = parent_data.pid; + } + + None + } + + match_parent(&process_data, &self.match_regex) + } + ProcessMatchType::Tag => { + if let Some(tag) = tag_map.get(&process_data.pid) { + let mut found = None; + for tag_kv in tag.tags.iter() { + let composed = format!("{}:{}", &tag_kv.key, &tag_kv.value); + if self.match_regex.is_match(&composed.as_str()) { + found = Some(process_data); + break; + } + } + found + } else { + None + } + } } } } @@ -2229,25 +2418,136 @@ impl From<&RuntimeConfig> for UserConfig { process_matcher: { let mut matchers = vec![]; if !rc.yaml_config.ebpf.on_cpu_profile.disabled { - matchers.push(ProcessMatcher { - match_regex: rc.yaml_config.ebpf.on_cpu_profile.regex.clone(), - enabled_features: vec!["ebpf.profile.on_cpu".to_string()], - ..Default::default() - }); + if let Ok(regex) = + Regex::new(rc.yaml_config.ebpf.on_cpu_profile.regex.as_str()) + { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::ProcessName, + enabled_features: vec!["ebpf.profile.on_cpu".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid on_cpu_profile regex: {}", + rc.yaml_config.ebpf.on_cpu_profile.regex + ); + } } if !rc.yaml_config.ebpf.off_cpu_profile.disabled { + if let Ok(regex) = + Regex::new(rc.yaml_config.ebpf.off_cpu_profile.regex.as_str()) + { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::ProcessName, + enabled_features: vec!["ebpf.profile.off_cpu".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid off_cpu_profile regex: {}", + rc.yaml_config.ebpf.off_cpu_profile.regex + ); + } + } + if !rc.yaml_config.ebpf.memory_profile.disabled { + if let Ok(regex) = + Regex::new(rc.yaml_config.ebpf.memory_profile.regex.as_str()) + { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::ProcessName, + enabled_features: vec!["ebpf.profile.memory".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid memory_profile regex: {}", + rc.yaml_config.ebpf.memory_profile.regex + ); + } + } + if rc.yaml_config.os_proc_socket_sync_interval > 0 { matchers.push(ProcessMatcher { - match_regex: rc.yaml_config.ebpf.off_cpu_profile.regex.clone(), - enabled_features: vec!["ebpf.profile.off_cpu".to_string()], + match_regex: Regex::new(".*").unwrap(), + match_type: ProcessMatchType::ProcessName, + enabled_features: vec!["proc.socket_list".to_string()], ..Default::default() }); } + if !rc.yaml_config.ebpf.uprobe_proc_regexp.golang.is_empty() { + if let Ok(regex) = + Regex::new(rc.yaml_config.ebpf.uprobe_proc_regexp.golang.as_str()) + { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::Cmd, + enabled_features: vec!["ebpf.socket.uprobe.golang".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid golang uprobe regex: {}", + rc.yaml_config.ebpf.uprobe_proc_regexp.golang + ); + } + } + if !rc + .yaml_config + .ebpf + .uprobe_proc_regexp + .golang_symbol + .is_empty() + { + if let Ok(regex) = Regex::new( + rc.yaml_config + .ebpf + .uprobe_proc_regexp + .golang_symbol + .as_str(), + ) { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::Cmd, + enabled_features: vec!["proc.golang_symbol_table".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid golang symbol_table uprobe regex: {}", + rc.yaml_config.ebpf.uprobe_proc_regexp.golang_symbol + ); + } + } + if !rc.yaml_config.ebpf.uprobe_proc_regexp.openssl.is_empty() { + if let Ok(regex) = + Regex::new(rc.yaml_config.ebpf.uprobe_proc_regexp.openssl.as_str()) + { + matchers.push(ProcessMatcher { + match_regex: regex, + match_type: ProcessMatchType::Cmd, + enabled_features: vec!["ebpf.socket.uprobe.tls".to_string()], + ..Default::default() + }); + } else { + warn!( + "Invalid tls uprobe regex: {}", + rc.yaml_config.ebpf.uprobe_proc_regexp.openssl + ); + } + } for o in &rc.yaml_config.os_proc_regex { + let Ok(regex) = Regex::new(o.match_regex.as_str()) else { + warn!("Invalid gprocess info regex: {}", o.match_regex); + continue; + }; matchers.push(ProcessMatcher { - match_regex: o.match_regex.clone(), - match_type: o.match_type.clone(), + match_regex: regex, + match_type: ProcessMatchType::from(o.match_type.as_str()), rewrite_name: o.rewrite_name.clone(), - action: o.action.clone(), + action: ProcessMatchAction::from(o.action.as_str()), + enabled_features: vec!["proc.gprocess_info".to_string()], ..Default::default() }); } diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index 0f1856f00ad..477bc9b0048 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -43,19 +43,16 @@ use sysinfo::SystemExt; use sysinfo::{CpuRefreshKind, RefreshKind, System}; use tokio::runtime::Runtime; +#[cfg(any(target_os = "linux", target_os = "android"))] +use super::config::{Ebpf, EbpfFileIoEvent, ProcessMatcher, SymbolTable}; use super::{ config::{ - ApiResources, Config, EbpfFileIoEvent, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint, - HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, SymbolTable, - TagFilterOperator, UserConfig, YamlConfig, + ApiResources, Config, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint, + HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, TagFilterOperator, UserConfig, + YamlConfig, }, ConfigError, KubernetesPollerType, }; -#[cfg(any(target_os = "linux", target_os = "android"))] -use super::{ - config::{Ebpf, ProcessMatcher}, - OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME, -}; use crate::common::decapsulate::TunnelType; use crate::dispatcher::recv_engine; use crate::flow_generator::protocol_logs::decode_new_rpc_trace_context_with_type; @@ -75,7 +72,6 @@ use crate::{ use crate::{ dispatcher::recv_engine::af_packet::OptTpacketVersion, ebpf::CAP_LEN_MAX, - platform::ProcRegRewrite, utils::environment::{ get_container_resource_limits, get_ctrl_ip_and_mac, is_tt_workload, set_container_resource_limit, @@ -270,7 +266,6 @@ pub struct OsProcScanConfig { pub os_proc_root: String, pub os_proc_socket_sync_interval: u32, // for sec pub os_proc_socket_min_lifetime: u32, // for sec - pub os_proc_regex: Vec, pub os_app_tag_exec_user: String, pub os_app_tag_exec: Vec, // whether to sync os socket and proc info @@ -1814,34 +1809,6 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig { os_proc_root: conf.inputs.proc.proc_dir_path.clone(), os_proc_socket_sync_interval: conf.inputs.proc.sync_interval.as_secs() as u32, os_proc_socket_min_lifetime: conf.inputs.proc.min_lifetime.as_secs() as u32, - os_proc_regex: { - let mut v = vec![]; - for i in &conf.inputs.proc.process_matcher { - if i.enabled_features - .iter() - .find(|s| s.as_str() == "os.proc.scan") - .is_none() - { - continue; - } - if let Ok(r) = ProcRegRewrite::try_from(i) { - v.push(r); - } - } - - // append the .* at the end for accept the proc whic not match any regexp - v.push( - ProcRegRewrite::try_from(&ProcessMatcher { - match_regex: ".*".into(), - match_type: OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME.into(), - rewrite_name: "".into(), - action: OS_PROC_REGEXP_MATCH_ACTION_ACCEPT.into(), - ..Default::default() - }) - .unwrap(), - ); - v - }, os_app_tag_exec_user: conf.inputs.proc.tag_extraction.exec_username.clone(), os_app_tag_exec: conf.inputs.proc.tag_extraction.script_command.clone(), os_proc_sync_enabled: conf.inputs.proc.enabled, @@ -1853,7 +1820,7 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig { .find(|m| { m.enabled_features .iter() - .find(|s| s.as_str() == "os.proc.scan") + .find(|s| s.as_str() == "proc.gprocess_info") .is_some() && m.only_with_tag }) @@ -2625,6 +2592,46 @@ impl ConfigHandler { ); } + #[cfg(any(target_os = "linux", target_os = "android"))] + if config.inputs.proc.process_matcher != new_config.user_config.inputs.proc.process_matcher + || config.inputs.proc.tag_extraction.exec_username + != new_config + .user_config + .inputs + .proc + .tag_extraction + .exec_username + || config.inputs.proc.tag_extraction.script_command + != new_config + .user_config + .inputs + .proc + .tag_extraction + .script_command + || config.inputs.proc.proc_dir_path != new_config.user_config.inputs.proc.proc_dir_path + { + if let Some(c) = components.as_ref() { + c.process_listener.on_config_change( + &new_config.user_config.inputs.proc.process_matcher, + new_config.user_config.inputs.proc.proc_dir_path.clone(), + new_config + .user_config + .inputs + .proc + .tag_extraction + .exec_username + .clone(), + new_config + .user_config + .inputs + .proc + .tag_extraction + .script_command + .clone(), + ); + } + } + if *config != new_config.user_config { *config = new_config.user_config.clone(); } diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 9fb7b80ef26..d588efc805f 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -17,17 +17,13 @@ mod config; pub mod handler; +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use config::ApiResources; pub use config::{ AgentIdType, Config, ConfigError, KubernetesPollerType, OracleConfig, PcapStream, ProcessMatcher, PrometheusExtraLabels, RuntimeConfig, UserConfig, K8S_CA_CRT_PATH, }; #[cfg(any(target_os = "linux", target_os = "android"))] -pub use config::{ - ApiResources, OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, OS_PROC_REGEXP_MATCH_ACTION_DROP, - OS_PROC_REGEXP_MATCH_TYPE_CMD, OS_PROC_REGEXP_MATCH_TYPE_PARENT_PROC_NAME, - OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME, OS_PROC_REGEXP_MATCH_TYPE_TAG, -}; -#[cfg(any(target_os = "linux", target_os = "android"))] pub use handler::FlowAccess; pub use handler::{DispatcherConfig, FlowConfig, ModuleConfig, NpbConfig}; diff --git a/agent/src/ebpf/mod.rs b/agent/src/ebpf/mod.rs index a1c2f869b3f..14020838cfb 100644 --- a/agent/src/ebpf/mod.rs +++ b/agent/src/ebpf/mod.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +use crate::platform::ProcessData; + extern crate libc; extern crate trace_utils; @@ -721,6 +723,66 @@ extern "C" { } } +pub fn set_feature_uprobe_golang(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_UPROBE_GOLANG, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + +pub fn set_feature_uprobe_golang_symbol(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_UPROBE_GOLANG_SYMBOL, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + +pub fn set_feature_uprobe_tls(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_UPROBE_OPENSSL, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + +pub fn set_feature_on_cpu(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_PROFILE_ONCPU, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + +pub fn set_feature_off_cpu(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_PROFILE_OFFCPU, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + +pub fn set_feature_memory(pids: &Vec, _: &Vec) { + unsafe { + set_feature_pids( + FEATURE_PROFILE_MEMORY, + pids.as_ptr() as *const i32, + pids.len() as i32, + ); + } +} + #[no_mangle] extern "C" fn rust_info_wrapper(msg: *const libc::c_char) { unsafe { diff --git a/agent/src/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher.rs index 90a426c6808..d58e05848ab 100644 --- a/agent/src/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher.rs @@ -71,7 +71,7 @@ use crate::flow_generator::{flow_map::Config, AppProto, FlowMap}; use crate::integration_collector::Profile; use crate::policy::PolicyGetter; use crate::rpc::get_timestamp; -use crate::utils::stats; +use crate::utils::{process::ProcessListener, stats}; use public::{ buffer::BatchedBox, @@ -463,6 +463,7 @@ pub struct EbpfCollector { counter: Arc, exception_handler: ExceptionHandler, + process_listener: Arc, } static mut SWITCH: bool = false; @@ -614,10 +615,11 @@ impl EbpfCollector { ebpf_profile_sender: DebugSender, policy_getter: PolicyGetter, time_diff: Arc, + process_listener: &ProcessListener, ) -> Result { // ebpf和ebpf collector通信配置初始化 unsafe { - let handle = Self::ebpf_core_init(config); + let handle = Self::ebpf_core_init(process_listener, config); // initialize communication between core and ebpf collector SWITCH = false; SENDER = Some(sender); @@ -631,27 +633,30 @@ impl EbpfCollector { } } - unsafe fn ebpf_core_init(config: &EbpfConfig) -> Result { + unsafe fn ebpf_core_init( + process_listener: &ProcessListener, + config: &EbpfConfig, + ) -> Result { // ebpf core modules init #[allow(unused_mut)] let mut handle = ConfigHandle::default(); ebpf::set_uprobe_golang_enabled(config.ebpf.socket.uprobe.golang.enabled); if config.ebpf.socket.uprobe.golang.enabled { + let feature = "ebpf.socket.uprobe.golang"; + process_listener.register(feature, ebpf::set_feature_uprobe_golang); + let uprobe_proc_regexp = config .process_matcher .iter() .find(|p| { p.enabled_features .iter() - .find(|f| f.eq_ignore_ascii_case("ebpf.socket.uprobe.golang")) + .find(|f| f.eq_ignore_ascii_case(feature)) .is_some() }) - .map(|p| p.match_regex.to_owned()) + .map(|p| p.match_regex.as_str()) .unwrap_or_default(); - info!( - "ebpf set golang uprobe proc regexp: {}", - uprobe_proc_regexp.as_str() - ); + info!("ebpf set golang uprobe proc regexp: {}", uprobe_proc_regexp); ebpf::set_feature_regex( ebpf::FEATURE_UPROBE_GOLANG, CString::new(uprobe_proc_regexp.as_bytes()) @@ -665,20 +670,23 @@ impl EbpfCollector { ebpf::set_uprobe_openssl_enabled(config.ebpf.socket.uprobe.tls.enabled); if config.ebpf.socket.uprobe.tls.enabled { + let feature = "ebpf.socket.uprobe.tls"; + process_listener.register(feature, ebpf::set_feature_uprobe_tls); + let uprobe_proc_regexp = config .process_matcher .iter() .find(|p| { p.enabled_features .iter() - .find(|f| f.eq_ignore_ascii_case("ebpf.socket.uprobe.tls")) + .find(|f| f.eq_ignore_ascii_case(feature)) .is_some() }) - .map(|p| p.match_regex.to_owned()) + .map(|p| p.match_regex.as_str()) .unwrap_or_default(); info!( "ebpf set openssl uprobe proc regexp: {}", - uprobe_proc_regexp.as_str() + uprobe_proc_regexp ); ebpf::set_feature_regex( ebpf::FEATURE_UPROBE_OPENSSL, @@ -692,24 +700,27 @@ impl EbpfCollector { } if config.symbol_table.golang_specific.enabled { + let feature = "proc.golang_symbol_table"; + process_listener.register(feature, ebpf::set_feature_uprobe_golang_symbol); + let uprobe_proc_regexp = config .process_matcher .iter() .find(|p| { p.enabled_features .iter() - .find(|f| f.eq_ignore_ascii_case("input.proc.symbol_table.golang_specific")) + .find(|f| f.eq_ignore_ascii_case(feature)) .is_some() }) - .map(|p| p.match_regex.to_owned()) + .map(|p| p.match_regex.as_str()) .unwrap_or_default(); info!( "ebpf set golang symbol uprobe proc regexp: {}", - uprobe_proc_regexp.as_str() + uprobe_proc_regexp ); ebpf::set_feature_regex( ebpf::FEATURE_UPROBE_GOLANG_SYMBOL, - CString::new(uprobe_proc_regexp.as_str().as_bytes()) + CString::new(uprobe_proc_regexp.as_bytes()) .unwrap() .as_c_str() .as_ptr(), @@ -903,16 +914,19 @@ impl EbpfCollector { } if !on_cpu.disabled { + let feature = "ebpf.profile.on_cpu"; + process_listener.register(feature, ebpf::set_feature_on_cpu); + let on_cpu_regexp = config .process_matcher .iter() .find(|p| { p.enabled_features .iter() - .find(|f| f.eq_ignore_ascii_case("ebpf.profile.on_cpu")) + .find(|f| f.eq_ignore_ascii_case(feature)) .is_some() }) - .map(|p| p.match_regex.to_owned()) + .map(|p| p.match_regex.as_str()) .unwrap_or_default(); ebpf::set_feature_regex( ebpf::FEATURE_PROFILE_ONCPU, @@ -928,18 +942,21 @@ impl EbpfCollector { #[cfg(feature = "extended_profile")] { + let feature = "ebpf.profile.off_cpu"; let off_cpu_regexp = config .process_matcher .iter() .find(|p| { p.enabled_features .iter() - .find(|f| f.eq_ignore_ascii_case("ebpf.profile.off_cpu")) + .find(|f| f.eq_ignore_ascii_case(feature)) .is_some() }) .map(|p| p.match_regex.to_owned()) .unwrap_or_default(); if !off_cpu.disabled { + process_listener.register(feature, ebpf::set_feature_off_cpu); + ebpf::set_feature_regex( ebpf::FEATURE_PROFILE_ONCPU, CString::new(off_cpu_regexp.as_bytes()) @@ -953,6 +970,9 @@ impl EbpfCollector { } if !memory.disabled { + let feature = "ebpf.profile.memory"; + process_listener.register(feature, ebpf::set_feature_memory); + let memory_cpu_regexp = config .process_matcher .iter() @@ -1050,6 +1070,7 @@ impl EbpfCollector { queue_debugger: &QueueDebugger, stats_collector: Arc, exception_handler: ExceptionHandler, + process_listener: &Arc, ) -> Result> { let ebpf_config = config.load(); if ebpf_config.ebpf.disabled { @@ -1075,6 +1096,7 @@ impl EbpfCollector { ebpf_profile_sender, policy_getter, time_diff.clone(), + process_listener, )?; Self::ebpf_on_config_change(ebpf::CAP_LEN_MAX); @@ -1101,6 +1123,7 @@ impl EbpfCollector { get_token_failed: AtomicU64::new(0), }), exception_handler, + process_listener: process_listener.clone(), })) } @@ -1147,7 +1170,7 @@ impl EbpfCollector { as *mut memory_profile::MemoryContext, )); } - if let Ok(handle) = Self::ebpf_core_init(config) { + if let Ok(handle) = Self::ebpf_core_init(&self.process_listener, config) { self.config_handle = handle; } else { warn!("ebpf start_continuous_profiler error."); diff --git a/agent/src/platform/mod.rs b/agent/src/platform/mod.rs index d4f5071229e..d4d5f821ad0 100644 --- a/agent/src/platform/mod.rs +++ b/agent/src/platform/mod.rs @@ -25,7 +25,10 @@ cfg_if::cfg_if! { } #[cfg(any(target_os = "linux", target_os = "android"))] -pub use platform_synchronizer::{ProcRegRewrite, SocketSynchronizer}; +pub use platform_synchronizer::{ + get_container_id, get_os_app_tag_by_exec, OsAppTag, ProcessData, ProcessDataOp, + SocketSynchronizer, +}; mod platform_synchronizer; diff --git a/agent/src/platform/platform_synchronizer/linux.rs b/agent/src/platform/platform_synchronizer/linux.rs index c2e17924197..bb6a28e8a22 100644 --- a/agent/src/platform/platform_synchronizer/linux.rs +++ b/agent/src/platform/platform_synchronizer/linux.rs @@ -16,7 +16,7 @@ use std::{ net::{IpAddr, SocketAddr, SocketAddrV4}, - sync::{Arc, Condvar, Mutex, MutexGuard}, + sync::{Arc, Condvar, Mutex, MutexGuard, RwLock as SysRwLock}, thread, time::Duration, }; @@ -29,10 +29,11 @@ use tokio::runtime::Runtime; use crate::{ common::policy::GpidEntry, config::handler::PlatformAccess, + platform::ProcessData, policy::{PolicyGetter, PolicySetter}, rpc::Session, trident::AgentId, - utils::lru::Lru, + utils::{lru::Lru, process::ProcessListener}, }; use public::{ proto::{ @@ -57,6 +58,19 @@ pub struct SocketSynchronizer { policy_getter: Arc>, policy_setter: PolicySetter, lru_toa_info: Arc>>, + process_listener: Arc, +} + +static mut PIDS: Option>>> = None; + +pub fn get_socket_pids() -> Vec { + unsafe { + if let Some(pids) = PIDS.as_ref() { + pids.read().unwrap().clone() + } else { + vec![] + } + } } impl SocketSynchronizer { @@ -72,6 +86,7 @@ impl SocketSynchronizer { receiver: Receiver>, // toa info cache, Lru lru_toa_info: Arc>>, + process_listener: Arc, ) -> Self { if process_info_enabled(config.load().agent_type) { let lru_toa_info_clone = lru_toa_info.clone(); @@ -93,6 +108,17 @@ impl SocketSynchronizer { session, running: Arc::new(Mutex::new(false)), lru_toa_info, + process_listener, + } + } + + fn set_socket_pids(pids: &Vec, _: &Vec) { + unsafe { + if let Some(last) = PIDS.as_ref() { + *last.write().unwrap() = pids.clone(); + } else { + PIDS = Some(Arc::new(SysRwLock::new(pids.clone()))); + } } } @@ -108,6 +134,9 @@ impl SocketSynchronizer { return; } + self.process_listener + .register("proc.socket_list", Self::set_socket_pids); + let ( runtime, running, @@ -174,6 +203,11 @@ impl SocketSynchronizer { conf_guard.os_proc_scan_conf.os_proc_socket_sync_interval as u64, ); + if sync_interval == Duration::ZERO { + thread::sleep(Duration::from_secs(1)); + continue; + } + // wait for config from server if !conf_guard.os_proc_scan_conf.os_proc_sync_enabled { if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) { @@ -187,11 +221,12 @@ impl SocketSynchronizer { (id.ip.to_string(), id.mac.to_string(), id.team_id.clone()) }; let mut policy_getter = policy_getter.lock().unwrap(); - + let pids = get_socket_pids(); let sock_entries = match get_all_socket( &conf_guard.os_proc_scan_conf, &mut policy_getter, conf_guard.epc_id, + pids, ) { Err(e) => { error!("fetch socket info fail: {}", e); diff --git a/agent/src/platform/platform_synchronizer/linux_process.rs b/agent/src/platform/platform_synchronizer/linux_process.rs index 58b55e726d1..f202cdc7ad3 100644 --- a/agent/src/platform/platform_synchronizer/linux_process.rs +++ b/agent/src/platform/platform_synchronizer/linux_process.rs @@ -18,13 +18,14 @@ use std::os::android::fs::MetadataExt; #[cfg(target_os = "linux")] use std::os::linux::fs::MetadataExt; +#[cfg(any(target_os = "linux", target_os = "android"))] +use std::sync::{Arc, RwLock}; use std::collections::HashSet; use std::path::{PathBuf, MAIN_SEPARATOR}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, os::unix::process::CommandExt, process::Command}; -use envmnt::{ExpandOptions, ExpansionType}; use log::{debug, error}; use nom::AsBytes; use procfs::{process::Process, ProcError, ProcResult}; @@ -32,7 +33,6 @@ use public::bytes::write_u64_be; use public::proto::agent::{ProcessInfo, Tag}; use public::proto::trident; use public::pwd::PasswordInfo; -use regex::Regex; use ring::digest; use serde::Deserialize; @@ -40,16 +40,15 @@ use super::linux_socket::get_proc_netns; use super::proc_scan_hook::proc_scan_hook; use crate::config::handler::OsProcScanConfig; -use crate::config::{ - ProcessMatcher, OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, OS_PROC_REGEXP_MATCH_ACTION_DROP, - OS_PROC_REGEXP_MATCH_TYPE_CMD, OS_PROC_REGEXP_MATCH_TYPE_PARENT_PROC_NAME, - OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME, OS_PROC_REGEXP_MATCH_TYPE_TAG, -}; const CONTAINER_ID_LEN: usize = 64; const SHA1_DIGEST_LEN: usize = 20; -#[derive(Debug, Clone)] +pub trait ProcessDataOp { + fn merge_and_dedup(&mut self); +} + +#[derive(Debug, Clone, PartialEq)] pub struct ProcessData { pub name: String, // the replaced name pub pid: u64, @@ -67,7 +66,36 @@ pub struct ProcessData { pub container_id: String, } +impl ProcessDataOp for Vec { + // NOTICE: the arrry must be ordered. + fn merge_and_dedup(&mut self) { + let mut dest: Vec = vec![]; + + for p in self.into_iter() { + let Some(last) = dest.last_mut() else { + dest.push(p.clone()); + continue; + }; + + if p.pid != last.pid { + dest.push(p.clone()); + continue; + } + + last.merge(p); + } + *self = dest; + } +} + impl ProcessData { + fn merge(&mut self, other: &Self) { + if self.name != other.name && other.name != other.process_name { + self.name = other.name.clone(); + } + self.os_app_tags.extend_from_slice(&other.os_app_tags); + } + // proc data only hash the pid and tag pub fn digest(&self, dist_ctx: &mut digest::Context) { let mut pid = [0u8; 8]; @@ -214,173 +242,7 @@ impl From<&ProcessData> for trident::ProcessInfo { } } -#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)] -pub enum RegExpAction { - Accept, - Drop, -} - -impl Default for RegExpAction { - fn default() -> Self { - Self::Accept - } -} - -#[derive(Clone, Debug)] -pub enum ProcRegRewrite { - // (match reg, action, rewrite string) - Cmd(Regex, RegExpAction, String), - ProcessName(Regex, RegExpAction, String), - ParentProcessName(Regex, RegExpAction), - Tag(Regex, RegExpAction), -} - -impl ProcRegRewrite { - pub fn action(&self) -> RegExpAction { - match self { - ProcRegRewrite::Cmd(_, act, _) => *act, - ProcRegRewrite::ProcessName(_, act, _) => *act, - ProcRegRewrite::ParentProcessName(_, act) => *act, - ProcRegRewrite::Tag(_, act) => *act, - } - } -} - -impl PartialEq for ProcRegRewrite { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::Cmd(lr, lact, ls), Self::Cmd(rr, ract, rs)) => { - lr.as_str() == rr.as_str() && lact == ract && ls == rs - } - (Self::ProcessName(lr, lact, ls), Self::ProcessName(rr, ract, rs)) => { - lr.as_str() == rr.as_str() && lact == ract && ls == rs - } - (Self::ParentProcessName(lr, lact), Self::ParentProcessName(rr, ract)) => { - lr.as_str() == rr.as_str() && lact == ract - } - (Self::Tag(lr, lact), Self::Tag(rr, ract)) => { - lr.as_str() == rr.as_str() && lact == ract - } - _ => false, - } - } -} - -impl Eq for ProcRegRewrite {} - -impl TryFrom<&ProcessMatcher> for ProcRegRewrite { - type Error = regex::Error; - - fn try_from(value: &ProcessMatcher) -> Result { - let re = Regex::new(value.match_regex.as_str())?; - let action = match value.action.as_str() { - "" | OS_PROC_REGEXP_MATCH_ACTION_ACCEPT => RegExpAction::Accept, - OS_PROC_REGEXP_MATCH_ACTION_DROP => RegExpAction::Drop, - _ => return Err(regex::Error::Syntax("action must accept or drop".into())), - }; - let env_rewrite = |r: String| { - envmnt::expand( - r.as_str(), - Some(ExpandOptions { - expansion_type: Some(ExpansionType::Windows), - default_to_empty: true, - }), - ) - }; - - match value.match_type.as_str() { - OS_PROC_REGEXP_MATCH_TYPE_CMD => Ok(Self::Cmd( - re, - action, - env_rewrite(value.rewrite_name.clone()), - )), - "" | OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME => Ok(Self::ProcessName( - re, - action, - env_rewrite(value.rewrite_name.clone()), - )), - OS_PROC_REGEXP_MATCH_TYPE_PARENT_PROC_NAME => Ok(Self::ParentProcessName(re, action)), - OS_PROC_REGEXP_MATCH_TYPE_TAG => Ok(Self::Tag(re, action)), - _ => Err(regex::Error::Syntax( - "regexp match type incorrect".to_string(), - )), - } - } -} - -impl ProcRegRewrite { - pub(super) fn match_and_rewrite_proc( - &self, - proc: &mut ProcessData, - pid_process_map: &PidProcMap, - tags: &HashMap, - match_only: bool, - ) -> bool { - let mut match_replace_fn = - |reg: &Regex, act: &RegExpAction, s: &String, replace: &String| { - if reg.is_match(s.as_str()) { - if act == &RegExpAction::Accept && !replace.is_empty() && !match_only { - proc.name = reg.replace_all(s.as_str(), replace).to_string(); - } - true - } else { - false - } - }; - - match self { - ProcRegRewrite::Cmd(reg, act, replace) => { - match_replace_fn(reg, act, &proc.cmd.join(" "), replace) - } - ProcRegRewrite::ProcessName(reg, act, replace) => { - match_replace_fn(reg, act, &proc.process_name, replace) - } - ProcRegRewrite::ParentProcessName(reg, _) => { - fn match_parent( - proc: &ProcessData, - pid_process_map: &PidProcMap, - reg: &Regex, - ) -> bool { - if proc.ppid == 0 { - return false; - } - - let Some(parent_proc) = pid_process_map.get(&(proc.ppid as u32)) else { - error!( - "pid {} have no parent proc with ppid: {}", - proc.pid, proc.ppid - ); - return false; - }; - if reg.is_match(&parent_proc.process_name.as_str()) { - return true; - } - // recursive match along the parent. - match_parent(parent_proc, pid_process_map, reg) - } - - match_parent(&*proc, pid_process_map, reg) - } - ProcRegRewrite::Tag(reg, _) => { - if let Some(tag) = tags.get(&proc.pid) { - let mut found = false; - for tag_kv in tag.tags.iter() { - let composed = format!("{}:{}", &tag_kv.key, &tag_kv.value); - if reg.is_match(&composed.as_str()) { - found = true; - break; - } - } - found - } else { - false - } - } - } - } -} - -#[derive(Debug, Default, Clone, Deserialize)] +#[derive(Debug, Default, PartialEq, Clone, Deserialize)] pub struct OsAppTagKV { pub key: String, pub value: String, @@ -388,12 +250,13 @@ pub struct OsAppTagKV { #[derive(Default, Deserialize)] pub struct OsAppTag { - pid: u64, + pub pid: u64, // Vec - tags: Vec, + pub tags: Vec, } pub(super) type PidProcMap = HashMap; +static mut PIDS: Option>>> = None; // get the pid and process map // now only use for match parent proc name to filter proc, the proc data in map will not fill the tag and not set username @@ -422,21 +285,38 @@ pub(crate) fn get_all_process(conf: &OsProcScanConfig) -> Vec { ret } +fn get_proc_scan_process_datas() -> Vec { + unsafe { + if let Some(pids) = PIDS.as_ref() { + pids.read().unwrap().clone() + } else { + vec![] + } + } +} + +pub fn set_proc_scan_process_datas(_: &Vec, process_datas: &Vec) { + unsafe { + if let Some(last) = PIDS.as_ref() { + *last.write().unwrap() = process_datas.clone(); + } else { + PIDS = Some(Arc::new(RwLock::new(process_datas.clone()))); + } + } +} + pub(crate) fn get_all_process_in(conf: &OsProcScanConfig, ret: &mut Vec) { - // Hashmap let mut pwd_info = HashMap::new(); - let (user, cmd, proc_root, proc_regexp, tagged_only, now_sec) = ( + let (user, cmd, proc_root, now_sec) = ( conf.os_app_tag_exec_user.as_str(), conf.os_app_tag_exec.as_slice(), conf.os_proc_root.as_str(), - conf.os_proc_regex.as_slice(), - conf.os_proc_sync_tagged_only, SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), ); - + let process_datas = get_proc_scan_process_datas(); let mut tags_map = match get_os_app_tag_by_exec(user, cmd) { Ok(tags) => tags, Err(err) => { @@ -446,76 +326,49 @@ pub(crate) fn get_all_process_in(conf: &OsProcScanConfig, ret: &mut Vec error!("pid {} get root inode fail: {}", proc_data.pid, e), - Ok(inode) => { - if let Some(pwd) = pwd_info.get(&inode) { - proc_data.set_username(&pwd); - } else { - // not in hashmap, parse from /proc/pid/root/etc/passwd - let p = PathBuf::from_iter([ - proc_root, - proc_data.pid.to_string().as_str(), - "root/etc/passwd", - ]); - if let Ok(pwd) = PasswordInfo::new(p) { - proc_data.set_username(&pwd); - pwd_info.insert(inode, pwd); - } - } - } - } + // filter the short live proc + if up_sec < u64::from(conf.os_proc_socket_min_lifetime) { + continue; + } - // fill tags - if let Some(tags) = tags_map.remove(&proc_data.pid) { - proc_data.os_app_tags = tags.tags - } else if tagged_only { - break; + match process_data.get_root_inode(proc_root) { + Err(e) => error!("pid {} get root inode fail: {}", process_data.pid, e), + Ok(inode) => { + if let Some(pwd) = pwd_info.get(&inode) { + process_data.set_username(&pwd); + } else { + // not in hashmap, parse from /proc/pid/root/etc/passwd + let p = PathBuf::from_iter([ + proc_root, + process_data.pid.to_string().as_str(), + "root/etc/passwd", + ]); + if let Ok(pwd) = PasswordInfo::new(p) { + process_data.set_username(&pwd); + pwd_info.insert(inode, pwd); } - - ret.push(proc_data); - break; } } } - fill_child_proc_tag_by_parent(ret.as_mut()); - proc_scan_hook(ret); + + // fill tags + if let Some(tags) = tags_map.remove(&process_data.pid) { + process_data.os_app_tags = tags.tags + } + + ret.push(process_data); } + fill_child_proc_tag_by_parent(ret.as_mut()); + proc_scan_hook(ret); } pub(super) fn get_self_proc() -> ProcResult { @@ -529,7 +382,7 @@ pub(super) fn get_self_proc() -> ProcResult { } // return Hashmap -pub(super) fn get_os_app_tag_by_exec( +pub fn get_os_app_tag_by_exec( username: &str, cmd: &[String], ) -> Result, String> { @@ -636,7 +489,7 @@ fn merge_tag(child_tag: &mut Vec, parent_tag: &[OsAppTagKV]) { } } -fn get_container_id(proc: &Process) -> Option { +pub fn get_container_id(proc: &Process) -> Option { let Ok(cgruop) = proc.cgroups() else { return None; }; diff --git a/agent/src/platform/platform_synchronizer/linux_socket.rs b/agent/src/platform/platform_synchronizer/linux_socket.rs index eec6d237c7c..562ab638a8c 100644 --- a/agent/src/platform/platform_synchronizer/linux_socket.rs +++ b/agent/src/platform/platform_synchronizer/linux_socket.rs @@ -34,16 +34,17 @@ use procfs::{ ProcError, }; -use crate::{config::handler::OsProcScanConfig, policy::PolicyGetter}; +use crate::{ + config::handler::OsProcScanConfig, platform::platform_synchronizer::ProcessData, + policy::PolicyGetter, +}; + use public::{ bytes::read_u32_be, proto::agent::{GpidSyncEntry, RoleType, ServiceProtocol}, + proto::trident::GpidSyncEntry as TridentGpidSyncEntry, }; -use public::proto::trident::GpidSyncEntry as TridentGpidSyncEntry; - -use super::linux_process::{get_all_pid_process_map, get_os_app_tag_by_exec, RegExpAction}; - #[derive(Debug, PartialEq, Eq, Hash)] pub enum Role { Client, @@ -242,6 +243,7 @@ pub(super) fn get_all_socket( conf: &OsProcScanConfig, policy_getter: &mut PolicyGetter, epc_id: u32, + pids: Vec, ) -> Result, ProcError> { // Hashmap let mut inode_pid_fd_map = HashMap::new(); @@ -255,9 +257,7 @@ pub(super) fn get_all_socket( let mut spec_addr_listen_sock = HashSet::new(); let ( - user, - cmd, - tagged_only, + _tagged_only, proc_root, min_sock_lifetime, now_sec, @@ -265,8 +265,6 @@ pub(super) fn get_all_socket( mut udp_entries, mut sock_entries, ) = ( - conf.os_app_tag_exec_user.as_str(), - conf.os_app_tag_exec.as_slice(), conf.os_proc_sync_tagged_only, conf.os_proc_root.as_str(), conf.os_proc_socket_min_lifetime as u64, @@ -278,47 +276,28 @@ pub(super) fn get_all_socket( vec![], vec![], ); - - let tags_map = match get_os_app_tag_by_exec(user, cmd) { - Ok(tags) => tags, - Err(err) => { - error!( - "get process tags by execute cmd `{}` with user {} fail: {}", - cmd.join(" "), - user, - err - ); - HashMap::new() - } - }; - // netns idx increase every time get the new netns id let mut netns_idx = 0u16; - let mut pid_proc_map = get_all_pid_process_map(conf.os_proc_root.as_str()); - - // get all process, and record the open fd and fetch listining socket info - // note that the /proc/pid/net/{tcp,tcp6,udp,udp6} include all the connection in the proc netns, not the process created connection. - for p in procfs::process::all_processes_with_root(proc_root)? { - let Ok(proc) = p else { + for pid in pids { + let process = Process::new(pid as i32); + if let Err(ref e) = process { + warn!("get process(pid: {}) failed: {:?}", pid, e); + continue; + } + let process = process.unwrap(); + let Ok(process_data) = ProcessData::try_from(&process) else { continue; }; - let mut proc_data = { - let Some(proc_data) = pid_proc_map.get_mut(&(proc.pid as u32)) else { - continue; - }; - proc_data.clone() - }; - - let (fds, netns, pid) = match (proc.fd(), get_proc_netns(&proc)) { - (Ok(fds), Ok(netns)) => (fds, netns, proc.pid), + let (fds, netns, pid) = match (process.fd(), get_proc_netns(&process)) { + (Ok(fds), Ok(netns)) => (fds, netns, process.pid), _ => { continue; } }; - let Ok(up_sec) = proc_data.up_sec(now_sec) else { + let Ok(up_sec) = process_data.up_sec(now_sec) else { continue; }; @@ -327,77 +306,139 @@ pub(super) fn get_all_socket( continue; } - for i in conf.os_proc_regex.as_slice() { - if i.match_and_rewrite_proc(&mut proc_data, &pid_proc_map, &tags_map, true) { - if i.action() == RegExpAction::Drop { - break; - } - - if tags_map.get(&(proc.pid as u64)).is_none() && tagged_only { - break; - } - - // when match proc, will record the inode and (pid, fd) map, use for get the connection pid and fd in later. - for fd in fds { - let Ok(f) = fd else { - continue; - }; - if let FDTarget::Socket(fd_inode) = f.target { - inode_pid_fd_map.insert(fd_inode, (pid, f.fd)); - } - } + // when match proc, will record the inode and (pid, fd) map, use for get the connection pid and fd in later. + for fd in fds { + let Ok(f) = fd else { + continue; + }; + if let FDTarget::Socket(fd_inode) = f.target { + inode_pid_fd_map.insert(fd_inode, (pid, f.fd)); + } + } - // break if the netns had been fetched - if netns_id_idx_map.contains_key(&netns) { - break; - }; - - netns_id_idx_map.insert(netns, { - if netns_idx == u16::MAX { - warn!("netns_idx reach u16::Max, set to 0"); - 0 - } else { - netns_idx += 1; - netns_idx - } - }); - - // also recoed the listining socket info, use for determine client or server connection. - // note that proc.{tcp(), tcp6(), udp(), udp6()} include all connection in the proc netns - if let Err(err) = record_tcp_listening_ip_port( - &proc, - Some(netns), - &mut all_iface_listen_sock, - &mut spec_addr_listen_sock, - ) { - error!("pid {} record_tcp_listening_ip_port fail: {}", pid, err); - break; - } + // break if the netns had been fetched + if netns_id_idx_map.contains_key(&netns) { + break; + }; - // record the tcp and udp connection in current netns - // only support ipv4 now, ipv6 dual stack will extra ipv4 addr - match (proc.tcp(), proc.udp()) { - (Ok(tcp), Ok(udp)) => { - tcp_entries.push((tcp, netns)); - udp_entries.push((udp, netns)); - } - _ => error!("pid {} get connection info fail", pid), - } + netns_id_idx_map.insert(netns, { + if netns_idx == u16::MAX { + warn!("netns_idx reach u16::Max, set to 0"); + 0 + } else { + netns_idx += 1; + netns_idx + } + }); + + // also recoed the listining socket info, use for determine client or server connection. + // note that proc.{tcp(), tcp6(), udp(), udp6()} include all connection in the proc netns + if let Err(err) = record_tcp_listening_ip_port( + &process, + Some(netns), + &mut all_iface_listen_sock, + &mut spec_addr_listen_sock, + ) { + error!("pid {} record_tcp_listening_ip_port fail: {}", pid, err); + break; + } - // old kernel have no tcp6/udp6 - match (proc.tcp6(), proc.udp6()) { - (Ok(tcp6), Ok(udp6)) => { - tcp_entries.push((tcp6, netns)); - udp_entries.push((udp6, netns)); - } - _ => {} - } + // record the tcp and udp connection in current netns + // only support ipv4 now, ipv6 dual stack will extra ipv4 addr + match (process.tcp(), process.udp()) { + (Ok(tcp), Ok(udp)) => { + tcp_entries.push((tcp, netns)); + udp_entries.push((udp, netns)); + } + _ => error!("pid {} get connection info fail", pid), + } - break; + // old kernel have no tcp6/udp6 + match (process.tcp6(), process.udp6()) { + (Ok(tcp6), Ok(udp6)) => { + tcp_entries.push((tcp6, netns)); + udp_entries.push((udp6, netns)); } + _ => {} } } + // let mut pid_proc_map = get_all_pid_process_map(conf.os_proc_root.as_str()); + + // // get all process, and record the open fd and fetch listining socket info + // // note that the /proc/pid/net/{tcp,tcp6,udp,udp6} include all the connection in the proc netns, not the process created connection. + // for p in procfs::process::all_processes_with_root(proc_root)? { + // for i in conf.os_proc_regex.as_slice() { + // if i.match_and_rewrite_proc(&mut proc_data, &pid_proc_map, &tags_map, true) { + // if i.action() == RegExpAction::Drop { + // break; + // } + + // if tags_map.get(&(proc.pid as u64)).is_none() && tagged_only { + // break; + // } + + // // when match proc, will record the inode and (pid, fd) map, use for get the connection pid and fd in later. + // for fd in fds { + // let Ok(f) = fd else { + // continue; + // }; + // if let FDTarget::Socket(fd_inode) = f.target { + // inode_pid_fd_map.insert(fd_inode, (pid, f.fd)); + // } + // } + + // // break if the netns had been fetched + // if netns_id_idx_map.contains_key(&netns) { + // break; + // }; + + // netns_id_idx_map.insert(netns, { + // if netns_idx == u16::MAX { + // warn!("netns_idx reach u16::Max, set to 0"); + // 0 + // } else { + // netns_idx += 1; + // netns_idx + // } + // }); + + // // also recoed the listining socket info, use for determine client or server connection. + // // note that proc.{tcp(), tcp6(), udp(), udp6()} include all connection in the proc netns + // if let Err(err) = record_tcp_listening_ip_port( + // &proc, + // Some(netns), + // &mut all_iface_listen_sock, + // &mut spec_addr_listen_sock, + // ) { + // error!("pid {} record_tcp_listening_ip_port fail: {}", pid, err); + // break; + // } + + // // record the tcp and udp connection in current netns + // // only support ipv4 now, ipv6 dual stack will extra ipv4 addr + // match (proc.tcp(), proc.udp()) { + // (Ok(tcp), Ok(udp)) => { + // tcp_entries.push((tcp, netns)); + // udp_entries.push((udp, netns)); + // } + // _ => error!("pid {} get connection info fail", pid), + // } + + // // old kernel have no tcp6/udp6 + // match (proc.tcp6(), proc.udp6()) { + // (Ok(tcp6), Ok(udp6)) => { + // tcp_entries.push((tcp6, netns)); + // udp_entries.push((udp6, netns)); + // } + // _ => {} + // } + + // break; + // } + // } + // } + divide_tcp_entry( epc_id, proc_root, diff --git a/agent/src/platform/platform_synchronizer/mod.rs b/agent/src/platform/platform_synchronizer/mod.rs index b496953db1a..d00df5f1c1a 100644 --- a/agent/src/platform/platform_synchronizer/mod.rs +++ b/agent/src/platform/platform_synchronizer/mod.rs @@ -23,7 +23,7 @@ cfg_if::cfg_if! { mod linux_socket; pub use linux::SocketSynchronizer; - pub use linux_process::{ProcessData, ProcRegRewrite}; + pub use linux_process::{ProcessData, ProcessDataOp, get_container_id, get_os_app_tag_by_exec, OsAppTag}; } else if #[cfg(target_os = "windows")] { pub struct ProcessData {} } diff --git a/agent/src/platform/synchronizer.rs b/agent/src/platform/synchronizer.rs index 3cba07d95ed..0cf2294191c 100644 --- a/agent/src/platform/synchronizer.rs +++ b/agent/src/platform/synchronizer.rs @@ -29,11 +29,17 @@ use parking_lot::RwLock; use tokio::runtime::Runtime; -#[cfg(target_os = "linux")] -use crate::platform::{kubernetes::GenericPoller, LibvirtXmlExtractor}; use crate::{ config::handler::PlatformAccess, exception::ExceptionHandler, rpc::Session, trident::AgentId, }; +#[cfg(target_os = "linux")] +use crate::{ + platform::{ + kubernetes::GenericPoller, + platform_synchronizer::linux_process::set_proc_scan_process_datas, LibvirtXmlExtractor, + }, + utils::process::ProcessListener, +}; use public::proto::agent::{self, Exception}; use public::proto::trident; @@ -83,6 +89,8 @@ pub struct Synchronizer { #[cfg(target_os = "linux")] xml_extractor: Arc, + #[cfg(any(target_os = "linux", target_os = "android"))] + process_listener: Option>, } impl Synchronizer { @@ -113,9 +121,16 @@ impl Synchronizer { kubernetes_poller: Default::default(), #[cfg(target_os = "linux")] xml_extractor, + #[cfg(any(target_os = "linux", target_os = "android"))] + process_listener: None, } } + #[cfg(any(target_os = "linux", target_os = "android"))] + pub fn set_process_listener(&self, process_listener: &Arc) { + process_listener.register("proc.gprocess_info", set_proc_scan_process_datas); + } + #[cfg(target_os = "linux")] pub fn set_kubernetes_poller(&self, poller: Arc) { info!("updating kubernetes poller"); diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 9acef41ba97..c1346695091 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -102,7 +102,7 @@ use crate::{ use crate::{ ebpf_dispatcher::EbpfCollector, platform::SocketSynchronizer, - utils::{environment::core_file_check, lru::Lru}, + utils::{environment::core_file_check, lru::Lru, process::ProcessListener}, }; #[cfg(target_os = "linux")] use crate::{ @@ -1574,6 +1574,8 @@ pub struct AgentComponents { pub tap_interfaces: Vec, pub bpf_options: Arc>, pub last_dispatcher_component_id: usize, + #[cfg(any(target_os = "linux", target_os = "android"))] + pub process_listener: Arc, max_memory: u64, capture_mode: PacketCaptureType, @@ -2058,6 +2060,32 @@ impl AgentComponents { }; let debugger = Debugger::new(context); let queue_debugger = debugger.clone_queue(); + #[cfg(any(target_os = "linux", target_os = "android"))] + let process_listener = Arc::new(ProcessListener::new( + &candidate_config.user_config.inputs.proc.process_matcher, + candidate_config + .user_config + .inputs + .proc + .proc_dir_path + .clone(), + candidate_config + .user_config + .inputs + .proc + .tag_extraction + .exec_username + .clone(), + candidate_config + .user_config + .inputs + .proc + .tag_extraction + .script_command + .clone(), + )); + #[cfg(any(target_os = "linux", target_os = "android"))] + platform_synchronizer.set_process_listener(&process_listener); #[cfg(any(target_os = "linux", target_os = "android"))] let (toa_sender, toa_recv, _) = queue::bounded_with_debug( @@ -2084,6 +2112,7 @@ impl AgentComponents { user_config.processors.packet.toa.cache_size >> 5, user_config.processors.packet.toa.cache_size, ))), + process_listener.clone(), ); let rx_leaky_bucket = Arc::new(LeakyBucket::new(match candidate_config.capture_mode { @@ -2477,6 +2506,7 @@ impl AgentComponents { &queue_debugger, stats_collector.clone(), exception_handler.clone(), + &process_listener, ) { Ok(ebpf_collector) => { synchronizer @@ -2748,6 +2778,8 @@ impl AgentComponents { tap_interfaces, last_dispatcher_component_id: otel_dispatcher_id, bpf_options, + #[cfg(any(target_os = "linux", target_os = "android"))] + process_listener, }) } @@ -2820,6 +2852,8 @@ impl AgentComponents { self.npb_bandwidth_watcher.start(); self.npb_arp_table.start(); + #[cfg(any(target_os = "linux", target_os = "android"))] + self.process_listener.start(); info!("Started agent components."); } @@ -2897,6 +2931,10 @@ impl AgentComponents { if let Some(h) = self.stats_collector.notify_stop() { join_handles.push(h); } + #[cfg(any(target_os = "linux", target_os = "android"))] + if let Some(h) = self.process_listener.notify_stop() { + join_handles.push(h); + } for handle in join_handles { if !handle.is_finished() { diff --git a/agent/src/utils/process/linux.rs b/agent/src/utils/process/linux.rs index a17421186da..b9a24d10afe 100644 --- a/agent/src/utils/process/linux.rs +++ b/agent/src/utils/process/linux.rs @@ -15,16 +15,27 @@ */ use std::{ + collections::HashMap, fs::{self, File, OpenOptions}, io::{self, BufReader, Error, ErrorKind, Read, Result, Write}, net::TcpStream, os::unix::fs::OpenOptionsExt, path::PathBuf, process, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, Mutex, RwLock, + }, + thread::{self, JoinHandle}, + time::Duration, }; -use log::debug; +use log::{debug, error, info}; use nix::sys::utsname::uname; +use procfs::process::all_processes_with_root; + +use crate::config::ProcessMatcher; +use crate::platform::{get_os_app_tag_by_exec, ProcessData, ProcessDataOp}; //返回当前进程占用内存RSS单位(字节) pub fn get_memory_rss() -> Result { @@ -246,3 +257,210 @@ fn get_num_from_status_file(pattern: &str, value: &str) -> Result { Ok(num) } + +type ProcessListenerCallback = fn(pids: &Vec, process_datas: &Vec); + +struct ProcessNode { + process_matcher: Vec, + + pids: Vec, + process_datas: Vec, + + callback: Option, +} + +pub struct ProcessListener { + features: Arc>>, + running: Arc, + proc_root: Arc>, + user: Arc>, + command: Arc>>, + + thread_handle: Mutex>>, +} + +impl ProcessListener { + const INTERVAL: Duration = Duration::from_secs(10); + + pub fn new( + process_matcher: &Vec, + proc_root: String, + user: String, + command: Vec, + ) -> Self { + let listener = Self { + features: Arc::new(RwLock::new(HashMap::new())), + running: Arc::new(AtomicBool::new(false)), + thread_handle: Mutex::new(None), + proc_root: Arc::new(RwLock::new(proc_root)), + user: Arc::new(RwLock::new(user)), + command: Arc::new(RwLock::new(command)), + }; + + listener.set(process_matcher); + + listener + } + + pub fn on_config_change( + &self, + process_matcher: &Vec, + proc_root: String, + user: String, + command: Vec, + ) { + self.set(process_matcher); + + *self.proc_root.write().unwrap() = proc_root; + *self.user.write().unwrap() = user; + *self.command.write().unwrap() = command; + } + + pub fn set(&self, process_matcher: &Vec) { + let mut features: HashMap = HashMap::new(); + + for matcher in process_matcher.iter() { + for feature in matcher.enabled_features.iter() { + if let Some(node) = features.get_mut(feature) { + node.process_matcher.push(matcher.clone()); + } else { + let _ = features.insert( + feature.to_string(), + ProcessNode { + process_matcher: vec![matcher.clone()], + pids: vec![], + process_datas: vec![], + callback: None, + }, + ); + } + } + } + + *self.features.write().unwrap() = features; + } + + pub fn register(&self, feature: &str, callback: ProcessListenerCallback) { + info!("Process listener register feature {}", feature); + let mut features = self.features.write().unwrap(); + if let Some(node) = features.get_mut(&feature.to_string()) { + node.pids = vec![]; + node.process_datas = vec![]; + node.callback = Some(callback); + } else { + let _ = features.insert( + feature.to_string(), + ProcessNode { + process_matcher: vec![], + pids: vec![], + process_datas: vec![], + callback: Some(callback), + }, + ); + } + } + + pub fn stop(&mut self) { + self.running.store(false, Relaxed); + + if let Some(handler) = self.thread_handle.lock().unwrap().take() { + let _ = handler.join(); + } + } + + fn process( + proc_root: &str, + features: &Arc>>, + user: &String, + command: &[String], + ) { + let mut features = features.write().unwrap(); + let Ok(processes) = all_processes_with_root(proc_root) else { + return; + }; + let tags_map = match get_os_app_tag_by_exec(user, command) { + Ok(tags) => tags, + Err(err) => { + error!( + "get process tags by execute cmd `{}` with user {} fail: {}", + command.join(" "), + user, + err + ); + HashMap::new() + } + }; + let mut current_processes = vec![]; + for process in processes { + if let Err(e) = process { + error!("get process failed: {}", e); + continue; + } + current_processes.push(process.unwrap()); + } + + for (key, value) in features.iter_mut() { + if value.process_matcher.is_empty() || value.callback.is_none() { + continue; + } + + let mut pids = vec![]; + let mut process_datas = vec![]; + + for matcher in &value.process_matcher { + for process in ¤t_processes { + if let Some(process_data) = matcher.get_process_data(process, &tags_map) { + pids.push(process.pid() as u32); + process_datas.push(process_data); + } + } + } + + pids.sort(); + pids.dedup(); + process_datas.sort_by_key(|x| x.pid); + process_datas.merge_and_dedup(); + + if pids != value.pids { + debug!("Feature {} update {} pids {:?}.", key, pids.len(), pids); + value.callback.as_ref().unwrap()(&pids, &process_datas); + value.pids = pids; + value.process_datas = process_datas; + } + } + } + + pub fn start(&self) { + if self.running.swap(true, Relaxed) { + return; + } + info!("Startting process listener ..."); + let features = self.features.clone(); + let running = self.running.clone(); + let proc_root = self.proc_root.clone(); + let user = self.user.clone(); + let command = self.command.clone(); + + running.store(true, Relaxed); + *self.thread_handle.lock().unwrap() = Some( + thread::Builder::new() + .name("process-listener".to_owned()) + .spawn(move || { + while running.load(Relaxed) { + thread::sleep(Self::INTERVAL); + let proc = proc_root.read().unwrap().clone(); + let user = user.read().unwrap().clone(); + let command = command.read().unwrap().clone(); + + Self::process(proc.as_str(), &features, &user, command.as_slice()); + } + }) + .unwrap(), + ); + } + + pub fn notify_stop(&self) -> Option> { + self.running.store(false, Relaxed); + self.thread_handle.lock().unwrap().take() + } +} diff --git a/agent/src/utils/process/process.rs b/agent/src/utils/process/process.rs index 56fd3b1e886..cf553b127ae 100644 --- a/agent/src/utils/process/process.rs +++ b/agent/src/utils/process/process.rs @@ -14,19 +14,8 @@ * limitations under the License. */ -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; -use std::sync::{Arc, RwLock}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; use std::{cmp::Ordering, fs, io, path::Path, time::SystemTime}; -use log::{debug, warn}; -use regex::Regex; -use sysinfo::{PidExt, ProcessExt, ProcessRefreshKind, System, SystemExt}; - -use crate::config::ProcessMatcher; - pub struct FileAndSizeSum { pub file_infos: Vec, // 文件信息 pub file_sizes_sum: u64, // 文件体积总和,单位:B @@ -105,143 +94,3 @@ pub fn get_file_and_size_sum(dir: &String) -> io::Result { file_and_size_sum.file_infos = file_infos; Ok(file_and_size_sum) } - -type ProcessListenerCallback = fn(pids: Vec); - -struct ProcessNode { - process_matcher: Vec, - - pids: Vec, - - callback: Option, -} - -pub struct ProcessListener { - features: Arc>>, - running: Arc, - - thread_handle: Option>, -} - -impl ProcessListener { - const INTERVAL: Duration = Duration::from_secs(10); - - fn new(process_matcher: &Vec) -> Self { - let listener = Self { - features: Arc::new(RwLock::new(HashMap::new())), - running: Arc::new(AtomicBool::new(false)), - thread_handle: None, - }; - - listener.set(process_matcher); - - listener - } - - fn set(&self, process_matcher: &Vec) { - let mut features = self.features.write().unwrap(); - - for matcher in process_matcher.iter() { - for feature in matcher.enabled_features.iter() { - if let Some(node) = features.get_mut(feature) { - node.process_matcher.push(matcher.clone()); - } else { - let _ = features.insert( - feature.to_string(), - ProcessNode { - process_matcher: vec![matcher.clone()], - pids: vec![], - callback: None, - }, - ); - } - } - } - } - - fn register(&self, feature: &str, callback: ProcessListenerCallback) { - let mut features = self.features.write().unwrap(); - if let Some(node) = features.get_mut(&feature.to_string()) { - node.callback = Some(callback); - } else { - let _ = features.insert( - feature.to_string(), - ProcessNode { - process_matcher: vec![], - pids: vec![], - callback: Some(callback), - }, - ); - } - } - - fn stop(&mut self) { - self.running.store(false, Relaxed); - - if let Some(handler) = self.thread_handle.take() { - let _ = handler.join(); - } - } - - fn process(system: &mut System, features: &Arc>>) { - system.refresh_processes_specifics(ProcessRefreshKind::new()); - - let processes = system.processes(); - let mut features = features.write().unwrap(); - - for (key, value) in features.iter_mut() { - if value.process_matcher.is_empty() || value.callback.is_none() { - continue; - } - - let mut pids = vec![]; - - for matcher in &value.process_matcher { - let Ok(regex) = Regex::new(matcher.match_regex.as_str()) else { - warn!("Invalid process regex: {}", matcher.match_regex.as_str()); - continue; - }; - - for (pid, process) in processes { - // TODO: match_languages match_type match_usernames only_in_container only_with_tag - if regex.is_match(process.name()) { - pids.push(pid.as_u32()); - } - } - } - - pids.sort(); - pids.dedup(); - - if pids != value.pids { - debug!("Feature {} update pids {:?}.", key, pids); - value.callback.as_ref().unwrap()(pids.clone()); - value.pids = pids; - } - } - } - - fn start(&mut self) { - if self.running.load(Relaxed) { - return; - } - - let features = self.features.clone(); - let running = self.running.clone(); - - running.store(true, Relaxed); - self.thread_handle = Some( - thread::Builder::new() - .name("process-listener".to_owned()) - .spawn(move || { - let mut system = System::new(); - - while running.load(Relaxed) { - thread::sleep(Self::INTERVAL); - Self::process(&mut system, &features); - } - }) - .unwrap(), - ); - } -} diff --git a/server/agent_config/template.yaml b/server/agent_config/template.yaml index abb07f66838..9a9e449ed3a 100644 --- a/server/agent_config/template.yaml +++ b/server/agent_config/template.yaml @@ -1291,20 +1291,20 @@ inputs: # unit: # range: [] # enum_options: - # - input.proc.symbol_table.golang_specific + # - proc.gprocess_info + # - proc.golang_symbol_table # - proc.socket_list - # - proc.symbol_table - # - proc.proc_event # XXX + # #- proc.proc_event # - ebpf.socket.uprobe.golang # - ebpf.socket.uprobe.tls - # - ebpf.socket.uprobe.rdma # XXX - # - ebpf.file.io_event - # - ebpf.file.management_event # XXX + # #- ebpf.socket.uprobe.rdma + # #- ebpf.file.io_event + # #- ebpf.file.management_event # - ebpf.profile.on_cpu # - ebpf.profile.off_cpu # - ebpf.profile.memory - # - ebpf.profile.cuda # XXX - # - ebpf.profile.hbm # XXX + # #- ebpf.profile.cuda + # #- ebpf.profile.hbm # modification: agent_restart # ee_feature: false # description: @@ -1317,7 +1317,7 @@ inputs: process_matcher: - match_regex: deepflow-* only_in_container: false - enabled_features: [ebpf.profile.on_cpu, ebpf.profile.off_cpu, os.proc.scan] + enabled_features: [ebpf.profile.on_cpu, ebpf.profile.off_cpu, proc.gprocess_info] # type: section # name: # en: Symbol Table