Skip to content

Commit

Permalink
Change logic of storage capacity update for split links. In links onl…
Browse files Browse the repository at this point in the history
…y report their releases, while out links use the consumed capacity until they are notified about released vehicles from downstream
  • Loading branch information
janekdererste committed Mar 7, 2024
1 parent 4e54ce2 commit 24f2931
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 58 deletions.
14 changes: 7 additions & 7 deletions src/simulation/messaging/communication/message_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::rc::Rc;

use crate::simulation::messaging::communication::communicators::SimCommunicator;
use crate::simulation::network::global_network::Network;
use crate::simulation::network::sim_network::{SimNetworkPartition, SplitStorage};
use crate::simulation::network::sim_network::{SimNetworkPartition, StorageUpdate};
use crate::simulation::wire_types::messages::{
StorageCap, SyncMessage, TravelTimesMessage, Vehicle,
};
Expand Down Expand Up @@ -87,15 +87,15 @@ where
message.add_veh(vehicle);
}

pub fn add_cap(&mut self, cap: SplitStorage, now: u32) {
pub fn add_cap_update(&mut self, cap: StorageUpdate, now: u32) {
let rank = self.rank();
let message = self
.out_messages
.entry(cap.from_part)
.or_insert_with(|| SyncMessage::new(now, rank, cap.from_part));
message.add_storage_cap(StorageCap {
link_id: cap.link_id,
value: cap.used,
value: cap.released,
});
}

Expand Down Expand Up @@ -177,7 +177,7 @@ mod tests {
NetMessageBroker, TravelTimesMessageBroker,
};
use crate::simulation::network::global_network::{Link, Network, Node};
use crate::simulation::network::sim_network::{SimNetworkPartition, SplitStorage};
use crate::simulation::network::sim_network::{SimNetworkPartition, StorageUpdate};
use crate::simulation::wire_types::messages::{TravelTimesMessage, Vehicle};
use crate::test_utils::create_agent;

Expand Down Expand Up @@ -374,10 +374,10 @@ mod tests {
let mut broker = create_net_message_broker(communicator);
// add a storage cap message for link 4, which connects parts 1 -> 2
if broker.rank() == 2 {
broker.add_cap(
SplitStorage {
broker.add_cap_update(
StorageUpdate {
link_id: 4,
used: 42.0,
released: 42.0,
from_part: 1,
},
0,
Expand Down
47 changes: 32 additions & 15 deletions src/simulation/network/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::simulation::config;
use crate::simulation::id::Id;
use crate::simulation::network::flow_cap::Flowcap;
use crate::simulation::network::global_network::Node;
use crate::simulation::network::sim_network::SplitStorage;
use crate::simulation::network::sim_network::StorageUpdate;
use crate::simulation::network::storage_cap::StorageCap;
use crate::simulation::network::stuck_timer::StuckTimer;
use crate::simulation::wire_types::messages::Vehicle;
Expand Down Expand Up @@ -324,14 +324,13 @@ impl SplitOutLink {
}
}

pub fn set_used_storage_cap(&mut self, value: f32) {
self.storage_cap.clear();
self.storage_cap.consume(value);
pub fn apply_storage_cap_update(&mut self, released: f32) {
self.storage_cap.consume(-released);
self.storage_cap.apply_updates();
}

pub fn take_veh(&mut self) -> VecDeque<Vehicle> {
self.storage_cap.clear();
self.storage_cap.apply_updates();
std::mem::take(&mut self.q)
}

Expand All @@ -355,22 +354,21 @@ impl SplitInLink {
}
}

pub fn storage_cap_updates(&self) -> Option<SplitStorage> {
if self.has_updates() {
let used = self.local_link.storage_cap.currently_used()
- self.local_link.storage_cap.released();
Some(SplitStorage {
pub fn storage_cap_updates(&self) -> Option<StorageUpdate> {
if self.has_released() {
let released = self.local_link.storage_cap.released();
Some(StorageUpdate {
link_id: self.local_link.id.internal(),
used,
released,
from_part: self.from_part,
})
} else {
None
}
}

pub fn has_updates(&self) -> bool {
self.local_link.storage_cap.consumed() > 0. || self.local_link.storage_cap.released() > 0.
pub fn has_released(&self) -> bool {
self.local_link.storage_cap.released() > 0.
}
}

Expand Down Expand Up @@ -663,10 +661,29 @@ mod out_link_tests {
let taken_2 = result.pop_front().unwrap();
assert_eq!(id2, taken_2.id);

// make sure storage capacity is released
assert_eq!(0., link.used_storage());
// make sure storage capacity is not released
assert_eq!(2., link.used_storage());
} else {
panic!("expected out link")
}
}

#[test]
fn update_storage_caps() {
// set up the link, so that we consume two units of storage.
let mut cap = StorageCap::new(100., 1., 1., 1., 1.);
cap.consume(2.);
cap.apply_updates();
let mut out_link = SplitOutLink {
id: Id::new_internal(0),
to_part: 1,
q: Default::default(),
storage_cap: cap,
};

assert_eq!(2., out_link.storage_cap.currently_used());
out_link.apply_storage_cap_update(2.);

assert_eq!(0., out_link.storage_cap.currently_used());
}
}
37 changes: 16 additions & 21 deletions src/simulation/network/sim_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use super::{
link::{LocalLink, SimLink, SplitInLink, SplitOutLink},
};

pub struct SplitStorage {
pub struct StorageUpdate {
pub link_id: u64,
pub from_part: u32,
pub used: f32,
pub released: f32,
}

#[derive(Debug)]
Expand Down Expand Up @@ -206,19 +206,19 @@ impl SimNetworkPartition {
Self::activate_link(&mut self.active_links, link.id().internal());
}

pub fn update_storage_caps(&mut self, storage_caps: Vec<StorageCap>) {
pub fn apply_storage_cap_updates(&mut self, storage_caps: Vec<StorageCap>) {
for cap in storage_caps {
if let SimLink::Out(link) = self.links.get_mut(&cap.link_id).unwrap() {
link.set_used_storage_cap(cap.value);
link.apply_storage_cap_update(cap.value);
} else {
panic!("only expecting ids for split out links ")
}
}
}

#[instrument(level = "trace", skip(self), fields(rank = self.partition))]
pub fn move_links(&mut self, now: u32) -> (Vec<Vehicle>, Vec<SplitStorage>) {
let mut storage_cap: Vec<_> = Vec::new();
pub fn move_links(&mut self, now: u32) -> (Vec<Vehicle>, Vec<StorageUpdate>) {
let mut storage_cap_updates: Vec<_> = Vec::new();
let mut vehicles: Vec<_> = Vec::new();
let mut deactivate: IntSet<u64> = IntSet::default();

Expand All @@ -227,7 +227,7 @@ impl SimNetworkPartition {
let is_active = match link {
SimLink::Local(ll) => Self::move_local_link(ll, &mut self.active_nodes, now),
SimLink::In(il) => {
Self::move_in_link(il, &mut self.active_nodes, &mut storage_cap, now)
Self::move_in_link(il, &mut self.active_nodes, &mut storage_cap_updates, now)
}
SimLink::Out(ol) => Self::move_out_link(ol, &mut vehicles),
};
Expand All @@ -244,7 +244,7 @@ impl SimNetworkPartition {
// vehicles leaving this partition are no longer part of the veh count
self.veh_counter -= vehicles.len();

(vehicles, storage_cap)
(vehicles, storage_cap_updates)
}

fn move_local_link(link: &mut LocalLink, active_nodes: &mut IntSet<u64>, now: u32) -> bool {
Expand All @@ -263,14 +263,14 @@ impl SimNetworkPartition {
fn move_in_link(
link: &mut SplitInLink,
active_nodes: &mut IntSet<u64>,
storage_caps: &mut Vec<SplitStorage>,
storage_cap_updates: &mut Vec<StorageUpdate>,
now: u32,
) -> bool {
// if anything has changed on the link, we want to report the updated storage capacity to the
// upstream partition. This must be done before we call 'move_local' link which erases the book
// keeping of what was released and consumed during the current simulation time step.
if let Some(split) = link.storage_cap_updates() {
storage_caps.push(split);
if let Some(cap_update) = link.storage_cap_updates() {
storage_cap_updates.push(cap_update);
}

Self::move_local_link(&mut link.local_link, active_nodes, now)
Expand Down Expand Up @@ -810,19 +810,14 @@ mod tests {
assert_eq!(0, storage_caps.len());

// now place vehicle on network and collect storage caps again.
// as things have changed, the used storage capacity should be
// reported by the split link.
// in links only report their releases. Therfore, no storage cap
// updates should be collected
net2.send_veh_en_route(vehicle, None, 0);
let (_, storage_caps) = net2.move_links(0);
assert_eq!(1, storage_caps.len());
let storage_cap = storage_caps.first().unwrap();
assert_eq!(split_link_id.internal(), storage_cap.link_id);
assert_approx_eq!(100., storage_cap.used, 0.0001);

// now, in the next time step, nothing has changed on the link. It should therefore not
// report any storage capacities
let _ = net2.move_nodes(&mut publisher, 1);
let (_, storage_caps) = net2.move_links(1);
let _ = net2.move_nodes(&mut publisher, 0);
let (_, storage_caps) = net2.move_links(0);
assert_eq!(0, storage_caps.len());

// Now, test whether storage caps are emitted to upstream partitions as well
Expand All @@ -836,7 +831,7 @@ mod tests {
assert_eq!(1, storage_caps.len());
let storage_cap = storage_caps.first().unwrap();
assert_eq!(split_link_id.internal(), storage_cap.link_id);
assert_approx_eq!(0., storage_cap.used, 0.00001);
assert_approx_eq!(100., storage_cap.released, 0.00001);
}

#[test]
Expand Down
11 changes: 0 additions & 11 deletions src/simulation/network/storage_cap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ impl StorageCap {
self.released
}

pub fn consumed(&self) -> f32 {
self.consumed
}

/// Consumes storage capacity on a link
///
/// This method should be called when a vehicle enters a link.
Expand All @@ -82,13 +78,6 @@ impl StorageCap {
self.consumed = 0.0;
}

/// Clears used, consumed and released storage capacity of a link. This is used when updating SplitOutLinks
pub fn clear(&mut self) {
self.used = 0.;
self.consumed = 0.;
self.released = 0.;
}

/// Tests whether there is storage capacity available on the link.
pub fn is_available(&self) -> bool {
let available_cap = self.max - self.currently_used();
Expand Down
9 changes: 5 additions & 4 deletions src/simulation/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,21 @@ where

#[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))]
fn move_links(&mut self, now: u32) {
let (vehicles, storage_cap) = self.network.move_links(now);
let (vehicles, storage_cap_updates) = self.network.move_links(now);

for veh in vehicles {
self.net_message_broker.add_veh(veh, now);
}

for cap in storage_cap {
self.net_message_broker.add_cap(cap, now);
for cap in storage_cap_updates {
self.net_message_broker.add_cap_update(cap, now);
}

let sync_messages = self.net_message_broker.send_recv(now);

for msg in sync_messages {
self.network.update_storage_caps(msg.storage_capacities);
self.network
.apply_storage_cap_updates(msg.storage_capacities);

for veh in msg.vehicles {
let veh_type_id = Id::get(veh.r#type);
Expand Down

0 comments on commit 24f2931

Please sign in to comment.