Skip to content

Commit

Permalink
libafl multimachine: disable ratelimiting (AFLplusplus#2558)
Browse files Browse the repository at this point in the history
* disable rate limiting for now

* fix

* clippy

---------

Co-authored-by: Dongjia "toka" Zhang <[email protected]>
  • Loading branch information
rmalmain and tokatoka authored Sep 30, 2024
1 parent 17def03 commit 173aedd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
2 changes: 2 additions & 0 deletions libafl/src/events/broker_hooks/centralized_multi_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ where
.send_interesting_event_to_nodes(&mm_msg)
.await?;

log::debug!("msg sent.");

Ok(())
});

Expand Down
45 changes: 27 additions & 18 deletions libafl/src/events/multi_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
inputs::{Input, NopInput},
};

const MAX_NB_RECEIVED_AT_ONCE: usize = 10;
// const MAX_NB_RECEIVED_AT_ONCE: usize = 100;

#[bitflags(default = SendToParent | SendToChildren)]
#[repr(u8)]
Expand Down Expand Up @@ -394,7 +394,9 @@ where

// 1. Read msg size
let mut node_msg_len: [u8; 4] = [0; 4];
log::debug!("Receiving msg len...");
stream.read_exact(&mut node_msg_len).await?;
log::debug!("msg len received.");
let node_msg_len = u32::from_le_bytes(node_msg_len) as usize;

// 2. Read msg
Expand All @@ -404,7 +406,9 @@ where
unsafe {
node_msg.set_len(node_msg_len);
}
log::debug!("Receiving msg...");
stream.read_exact(node_msg.as_mut_slice()).await?;
log::debug!("msg received.");
let node_msg = node_msg.into_boxed_slice();

Ok(Some(MultiMachineMsg::from_llmp_msg(node_msg)))
Expand All @@ -420,16 +424,19 @@ where
let msg_len = u32::to_le_bytes(serialized_msg.len() as u32);

// 0. Write the dummy byte
log::debug!("Sending dummy byte");
log::debug!("Sending dummy byte...");
stream.write_all(&[DUMMY_BYTE]).await?;
log::debug!("dummy byte sent.");

// 1. Write msg size
log::debug!("Sending msg len");
log::debug!("Sending msg len...");
stream.write_all(&msg_len).await?;
log::debug!("msg len sent.");

// 2. Write msg
log::debug!("Sending msg");
log::debug!("Sending msg...");
stream.write_all(serialized_msg).await?;
log::debug!("msg sent.");

Ok(())
}
Expand Down Expand Up @@ -483,11 +490,11 @@ where
{
let mut ids_to_remove: Vec<NodeId> = Vec::new();
for (child_id, child_stream) in &mut self.children {
log::debug!("Sending to child...");
if (Self::write_msg(child_stream, msg).await).is_err() {
log::debug!("Sending to child {child_id:?}...");
if let Err(err) = Self::write_msg(child_stream, msg).await {
// most likely the child disconnected. drop the connection later on and continue.
log::debug!(
"The child disconnected. We won't try to communicate with it again."
"The child disconnected. We won't try to communicate with it again. Error: {err:?}"
);
ids_to_remove.push(*child_id);
}
Expand All @@ -510,23 +517,25 @@ where
msgs: &mut Vec<MultiMachineMsg<'a, I>>,
) -> Result<(), Error> {
log::debug!("Checking for new events from other nodes...");
let mut nb_received = 0usize;
// let mut nb_received = 0usize;

// Our (potential) parent could have something for us
if let Some(parent) = &mut self.parent {
loop {
// Exit if received a lot of inputs at once.
if nb_received > MAX_NB_RECEIVED_AT_ONCE {
return Ok(());
}
// TODO: this causes problems in some cases, it could freeze all fuzzer instances.
// if nb_received > MAX_NB_RECEIVED_AT_ONCE {
// log::debug!("hitting MAX_NB_RECEIVED_AT_ONCE limit...");
// return Ok(());
// }

log::debug!("Receiving from parent...");
match Self::read_msg(parent).await {
Ok(Some(msg)) => {
log::debug!("Received event from parent");
// The parent has something for us, we store it
msgs.push(msg);
nb_received += 1;
// nb_received += 1;
}

Ok(None) => {
Expand Down Expand Up @@ -562,17 +571,17 @@ where
for (child_id, child_stream) in &mut self.children {
loop {
// Exit if received a lot of inputs at once.
if nb_received > MAX_NB_RECEIVED_AT_ONCE {
return Ok(());
}
// if nb_received > MAX_NB_RECEIVED_AT_ONCE {
// return Ok(());
//}

log::debug!("Receiving from child {:?}...", child_id);
log::debug!("Receiving from child {child_id:?}...");
match Self::read_msg(child_stream).await {
Ok(Some(msg)) => {
// The parent has something for us, we store it
log::debug!("Received event from child!");
msgs.push(msg);
nb_received += 1;
// nb_received += 1;
}

Ok(None) => {
Expand All @@ -584,7 +593,7 @@ where
Err(Error::OsError(e, _, _)) => {
// most likely the parent disconnected. drop the connection
log::error!(
"The parent disconnected. We won't try to communicate with it again."
"The child disconnected. We won't try to communicate with it again."
);
log::error!("Error: {e:?}");
ids_to_remove.push(*child_id);
Expand Down

0 comments on commit 173aedd

Please sign in to comment.