Skip to content

Commit

Permalink
feat: add process listener
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Oct 9, 2024
1 parent d3f6333 commit 6406501
Show file tree
Hide file tree
Showing 15 changed files with 1,037 additions and 597 deletions.
332 changes: 316 additions & 16 deletions agent/src/config/config.rs

Large diffs are not rendered by default.

85 changes: 46 additions & 39 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,16 @@ use sysinfo::SystemExt;
use sysinfo::{CpuRefreshKind, RefreshKind, System};
use tokio::runtime::Runtime;

#[cfg(any(target_os = "linux", target_os = "android"))]
use super::config::{Ebpf, EbpfFileIoEvent, ProcessMatcher, SymbolTable};
use super::{
config::{
ApiResources, Config, EbpfFileIoEvent, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint,
HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, SymbolTable,
TagFilterOperator, UserConfig, YamlConfig,
ApiResources, Config, ExtraLogFields, ExtraLogFieldsInfo, HttpEndpoint,
HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, TagFilterOperator, UserConfig,
YamlConfig,
},
ConfigError, KubernetesPollerType,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
use super::{
config::{Ebpf, ProcessMatcher},
OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME,
};
use crate::common::decapsulate::TunnelType;
use crate::dispatcher::recv_engine;
use crate::flow_generator::protocol_logs::decode_new_rpc_trace_context_with_type;
Expand All @@ -75,7 +72,6 @@ use crate::{
use crate::{
dispatcher::recv_engine::af_packet::OptTpacketVersion,
ebpf::CAP_LEN_MAX,
platform::ProcRegRewrite,
utils::environment::{
get_container_resource_limits, get_ctrl_ip_and_mac, is_tt_workload,
set_container_resource_limit,
Expand Down Expand Up @@ -270,7 +266,6 @@ pub struct OsProcScanConfig {
pub os_proc_root: String,
pub os_proc_socket_sync_interval: u32, // for sec
pub os_proc_socket_min_lifetime: u32, // for sec
pub os_proc_regex: Vec<ProcRegRewrite>,
pub os_app_tag_exec_user: String,
pub os_app_tag_exec: Vec<String>,
// whether to sync os socket and proc info
Expand Down Expand Up @@ -1814,34 +1809,6 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig {
os_proc_root: conf.inputs.proc.proc_dir_path.clone(),
os_proc_socket_sync_interval: conf.inputs.proc.sync_interval.as_secs() as u32,
os_proc_socket_min_lifetime: conf.inputs.proc.min_lifetime.as_secs() as u32,
os_proc_regex: {
let mut v = vec![];
for i in &conf.inputs.proc.process_matcher {
if i.enabled_features
.iter()
.find(|s| s.as_str() == "os.proc.scan")
.is_none()
{
continue;
}
if let Ok(r) = ProcRegRewrite::try_from(i) {
v.push(r);
}
}

// append the .* at the end for accept the proc whic not match any regexp
v.push(
ProcRegRewrite::try_from(&ProcessMatcher {
match_regex: ".*".into(),
match_type: OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME.into(),
rewrite_name: "".into(),
action: OS_PROC_REGEXP_MATCH_ACTION_ACCEPT.into(),
..Default::default()
})
.unwrap(),
);
v
},
os_app_tag_exec_user: conf.inputs.proc.tag_extraction.exec_username.clone(),
os_app_tag_exec: conf.inputs.proc.tag_extraction.script_command.clone(),
os_proc_sync_enabled: conf.inputs.proc.enabled,
Expand All @@ -1853,7 +1820,7 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig {
.find(|m| {
m.enabled_features
.iter()
.find(|s| s.as_str() == "os.proc.scan")
.find(|s| s.as_str() == "proc.gprocess_info")
.is_some()
&& m.only_with_tag
})
Expand Down Expand Up @@ -2625,6 +2592,46 @@ impl ConfigHandler {
);
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if config.inputs.proc.process_matcher != new_config.user_config.inputs.proc.process_matcher
|| config.inputs.proc.tag_extraction.exec_username
!= new_config
.user_config
.inputs
.proc
.tag_extraction
.exec_username
|| config.inputs.proc.tag_extraction.script_command
!= new_config
.user_config
.inputs
.proc
.tag_extraction
.script_command
|| config.inputs.proc.proc_dir_path != new_config.user_config.inputs.proc.proc_dir_path
{
if let Some(c) = components.as_ref() {
c.process_listener.on_config_change(
&new_config.user_config.inputs.proc.process_matcher,
new_config.user_config.inputs.proc.proc_dir_path.clone(),
new_config
.user_config
.inputs
.proc
.tag_extraction
.exec_username
.clone(),
new_config
.user_config
.inputs
.proc
.tag_extraction
.script_command
.clone(),
);
}
}

if *config != new_config.user_config {
*config = new_config.user_config.clone();
}
Expand Down
8 changes: 2 additions & 6 deletions agent/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@
mod config;
pub mod handler;

#[cfg(any(target_os = "linux", target_os = "android"))]
pub use config::ApiResources;
pub use config::{
AgentIdType, Config, ConfigError, KubernetesPollerType, OracleConfig, PcapStream,
ProcessMatcher, PrometheusExtraLabels, RuntimeConfig, UserConfig, K8S_CA_CRT_PATH,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use config::{
ApiResources, OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, OS_PROC_REGEXP_MATCH_ACTION_DROP,
OS_PROC_REGEXP_MATCH_TYPE_CMD, OS_PROC_REGEXP_MATCH_TYPE_PARENT_PROC_NAME,
OS_PROC_REGEXP_MATCH_TYPE_PROC_NAME, OS_PROC_REGEXP_MATCH_TYPE_TAG,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use handler::FlowAccess;
pub use handler::{DispatcherConfig, FlowConfig, ModuleConfig, NpbConfig};

Expand Down
62 changes: 62 additions & 0 deletions agent/src/ebpf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

use crate::platform::ProcessData;

extern crate libc;
extern crate trace_utils;

Expand Down Expand Up @@ -721,6 +723,66 @@ extern "C" {
}
}

pub fn set_feature_uprobe_golang(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_GOLANG,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_uprobe_golang_symbol(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_GOLANG_SYMBOL,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_uprobe_tls(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_OPENSSL,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_on_cpu(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_ONCPU,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_off_cpu(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_OFFCPU,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_memory(pids: &Vec<u32>, _: &Vec<ProcessData>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_MEMORY,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

#[no_mangle]
extern "C" fn rust_info_wrapper(msg: *const libc::c_char) {
unsafe {
Expand Down
Loading

0 comments on commit 6406501

Please sign in to comment.