Skip to content

Commit

Permalink
[Agent] Remove pointers from ebpf collector #3967
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa authored and rvql committed Aug 28, 2023
1 parent 583c9bf commit 01487a5
Showing 1 changed file with 24 additions and 42 deletions.
66 changes: 24 additions & 42 deletions agent/src/ebpf_dispatcher/ebpf_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::ffi::{CStr, CString};
use std::slice;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -54,34 +54,22 @@ use public::{
};

pub struct EbpfCounter {
rx: u64,
rx: AtomicU64,
}

impl EbpfCounter {
fn reset(&mut self) {
self.rx = 0;
fn get_rx(&self) -> u64 {
self.rx.swap(0, Ordering::Relaxed)
}
}

#[derive(Clone, Copy)]
pub struct SyncEbpfCounter {
counter: *mut EbpfCounter,
counter: Arc<EbpfCounter>,
}

impl SyncEbpfCounter {
fn counter(&self) -> &mut EbpfCounter {
unsafe { &mut *self.counter }
}
}

unsafe impl Send for SyncEbpfCounter {}
unsafe impl Sync for SyncEbpfCounter {}

impl OwnedCountable for SyncEbpfCounter {
fn get_counters(&self) -> Vec<Counter> {
let rx = self.counter().rx;
self.counter().reset();

let rx = self.counter.get_rx();
let ebpf_counter = unsafe { ebpf::socket_tracer_stats() };

vec![
Expand Down Expand Up @@ -183,13 +171,14 @@ impl OwnedCountable for SyncEbpfCounter {
}
}

#[derive(Clone)]
struct EbpfDispatcher {
dispatcher_id: usize,
time_diff: Arc<AtomicI64>,

receiver: Receiver<Box<MetaPacket<'static>>>,
receiver: Arc<Receiver<Box<MetaPacket<'static>>>>,

pause: AtomicBool,
pause: Arc<AtomicBool>,

// 策略查询
policy_getter: PolicyGetter,
Expand All @@ -209,7 +198,7 @@ struct EbpfDispatcher {
impl EbpfDispatcher {
const FLOW_MAP_SIZE: usize = 1 << 14;

fn run(&mut self, sync_counter: SyncEbpfCounter) {
fn run(&self, counter: Arc<EbpfCounter>) {
let mut flow_map = FlowMap::new(
self.dispatcher_id as u32,
self.flow_output.clone(),
Expand Down Expand Up @@ -247,7 +236,7 @@ impl EbpfDispatcher {
}

for mut packet in batch.drain(..) {
sync_counter.counter().rx += 1;
counter.rx.fetch_add(1, Ordering::Relaxed);

packet.timestamp_adjust(self.time_diff.load(Ordering::Relaxed));
packet.set_loopback_mac(ebpf_config.ctrl_mac);
Expand All @@ -258,16 +247,7 @@ impl EbpfDispatcher {
}

pub struct SyncEbpfDispatcher {
dispatcher: *mut EbpfDispatcher,
}

unsafe impl Sync for SyncEbpfDispatcher {}
unsafe impl Send for SyncEbpfDispatcher {}

impl SyncEbpfDispatcher {
fn dispatcher(&self) -> &mut EbpfDispatcher {
unsafe { &mut *self.dispatcher }
}
pause: Arc<AtomicBool>,
}

impl FlowAclListener for SyncEbpfDispatcher {
Expand All @@ -281,7 +261,7 @@ impl FlowAclListener for SyncEbpfDispatcher {
_: &Vec<Arc<crate::_Cidr>>,
_: &Vec<Arc<crate::_Acl>>,
) -> Result<(), String> {
self.dispatcher().pause.store(false, Ordering::Relaxed);
self.pause.store(false, Ordering::Relaxed);
Ok(())
}

Expand All @@ -294,7 +274,7 @@ pub struct EbpfCollector {
thread_dispatcher: EbpfDispatcher,
thread_handle: Option<JoinHandle<()>>,

counter: EbpfCounter,
counter: Arc<EbpfCounter>,
}

static mut SWITCH: bool = false;
Expand Down Expand Up @@ -690,7 +670,7 @@ impl EbpfCollector {
thread_dispatcher: EbpfDispatcher {
dispatcher_id,
time_diff,
receiver,
receiver: Arc::new(receiver),
policy_getter,
config,
log_parser_config,
Expand All @@ -700,22 +680,24 @@ impl EbpfCollector {
flow_map_config,
stats_collector,
collector_config,
pause: AtomicBool::new(true),
pause: Arc::new(AtomicBool::new(true)),
},
thread_handle: None,
counter: EbpfCounter { rx: 0 },
counter: Arc::new(EbpfCounter {
rx: AtomicU64::new(0),
}),
}))
}

pub fn get_sync_counter(&self) -> SyncEbpfCounter {
SyncEbpfCounter {
counter: &self.counter as *const EbpfCounter as *mut EbpfCounter,
counter: self.counter.clone(),
}
}

pub fn get_sync_dispatcher(&self) -> SyncEbpfDispatcher {
SyncEbpfDispatcher {
dispatcher: &self.thread_dispatcher as *const EbpfDispatcher as *mut EbpfDispatcher,
pause: self.thread_dispatcher.pause.clone(),
}
}

Expand All @@ -742,12 +724,12 @@ impl EbpfCollector {
SWITCH = true;
}

let sync_dispatcher = self.get_sync_dispatcher();
let sync_counter = self.get_sync_counter();
let sync_counter = self.counter.clone();
let dispatcher = self.thread_dispatcher.clone();
self.thread_handle = Some(
thread::Builder::new()
.name("ebpf-collector".to_owned())
.spawn(move || sync_dispatcher.dispatcher().run(sync_counter))
.spawn(move || dispatcher.run(sync_counter))
.unwrap(),
);

Expand Down

0 comments on commit 01487a5

Please sign in to comment.