Skip to content

Commit

Permalink
Add more fine grained timings for messaging
Browse files Browse the repository at this point in the history
Take nu_cores from mpi when run with mpi.
  • Loading branch information
janekdererste committed Feb 8, 2024
1 parent 1cf95dd commit 4e54ce2
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 7 deletions.
4 changes: 3 additions & 1 deletion assets/equil/equil-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ modules:
type: Output
output_dir: /Users/janek/Documents/rust_q_sim/equil/output/size
profiling: !CSV
level: TRACE
routing:
type: Routing
mode: UsePlans
simulation:
type: Simulation
start_time: 0
end_time: 86400
end_time: 10
sample_size: 1.0
stuck_threshold: 1000

1 change: 0 additions & 1 deletion src/simulation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::simulation::config::VertexWeight::InLinkCapacity;
pub struct CommandLineArgs {
#[arg(long, short)]
pub config_path: String,

#[arg(long, short)]
pub num_parts: Option<u32>,
}
Expand Down
4 changes: 3 additions & 1 deletion src/simulation/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ pub fn run_mpi() {
mpi_communicator: world,
};

let args = CommandLineArgs::parse();
let mut args = CommandLineArgs::parse();
// override the num part argument, with the number of processes mpi has started.
args.num_parts = Some(world.size() as u32);
let config = Config::from_file(&args);

let _guards = logging::init_logging(&config, comm.rank().to_string().as_str());
Expand Down
18 changes: 16 additions & 2 deletions src/simulation/messaging/communication/communicators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mpi::datatype::PartitionMut;
use mpi::point_to_point::{Destination, Source};
use mpi::topology::{Communicator, SystemCommunicator};
use mpi::{Count, Rank};
use tracing::{debug, info, instrument};
use tracing::{debug, info, instrument, span, Level};

use crate::simulation::wire_types::messages::{SimMessage, SyncMessage, TravelTimesMessage};

Expand Down Expand Up @@ -197,7 +197,7 @@ pub struct MpiSimCommunicator {
}

impl SimCommunicator for MpiSimCommunicator {
#[instrument(level = "trace", skip_all, fields(rank = self.rank()))]
#[instrument(level = "trace", skip(self, on_msg), fields(rank = self.rank()))]
fn send_receive_vehicles<F>(
&self,
out_messages: HashMap<u32, SyncMessage>,
Expand All @@ -207,6 +207,8 @@ impl SimCommunicator for MpiSimCommunicator {
) where
F: FnMut(SyncMessage),
{
let send_span = span!(Level::TRACE, "send_msgs", rank = self.rank(), now = now);
let send_time = send_span.enter();
let buf_msg: Vec<_> = out_messages
.into_iter()
.map(|(to, m)| (to, SimMessage::from_sync_message(m).serialize()))
Expand Down Expand Up @@ -235,11 +237,19 @@ impl SimCommunicator for MpiSimCommunicator {
.immediate_send(scope, buf);
reqs.add(req);
}
drop(send_time);

let receive_span = span!(Level::TRACE, "receive_msgs", rank = self.rank(), now = now);
let handle_span = span!(Level::TRACE, "handle_msgs", rank = self.rank(), now = now);
// Use blocking MPI_recv here, since we don't have anything to do if there are no other
// messages.
while !expected_vehicle_messages.is_empty() {
// measure the wait time for receiving
let receive_time = receive_span.enter();
let (encoded_msg, _status) = self.mpi_communicator.any_process().receive_vec();
drop(receive_time);

let handle_time = handle_span.enter();
let msg = SimMessage::deserialize(&encoded_msg).sync_message();
let from_rank = msg.from_process;

Expand All @@ -251,12 +261,16 @@ impl SimCommunicator for MpiSimCommunicator {
}

on_msg(msg);
drop(handle_time);
}

// wait here, so that all requests finish. This is necessary, because a process might send
// more messages than it receives. This happens, if a process sends messages to remote
// partitions (teleported legs) but only receives messages from neighbor partitions.
// this also accounts for wait times
let receive_time = receive_span.enter();
reqs.wait_all(&mut Vec::new());
drop(receive_time)
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/simulation/network/sim_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use nohash_hasher::{IntMap, IntSet};
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
use tracing::instrument;

use crate::simulation::config;
use crate::simulation::id::Id;
Expand Down Expand Up @@ -215,6 +216,7 @@ impl SimNetworkPartition {
}
}

#[instrument(level = "trace", skip(self), fields(rank = self.partition))]
pub fn move_links(&mut self, now: u32) -> (Vec<Vehicle>, Vec<SplitStorage>) {
let mut storage_cap: Vec<_> = Vec::new();
let mut vehicles: Vec<_> = Vec::new();
Expand Down Expand Up @@ -282,6 +284,7 @@ impl SimNetworkPartition {
false
}

#[instrument(level = "trace", skip(self), fields(rank = self.partition))]
pub fn move_nodes(&mut self, events: &mut EventsPublisher, now: u32) -> Vec<Vehicle> {
let mut exited_vehicles = Vec::new();
let new_active_nodes: IntSet<_> = self
Expand Down
3 changes: 1 addition & 2 deletions src/simulation/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ where
}
}

#[instrument(level = "trace", skip(self, agent), fields(rank = self.net_message_broker.rank()))]
fn departure(&mut self, mut agent: Person, now: u32) -> Vehicle {
//here, current element counter is going to be increased
agent.advance_plan();
Expand Down Expand Up @@ -208,7 +207,7 @@ where
}
}

#[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))]
//#[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))]
fn move_nodes(&mut self, now: u32) {
let exited_vehicles = self.network.move_nodes(&mut self.events, now);

Expand Down

0 comments on commit 4e54ce2

Please sign in to comment.