Skip to content

Commit df7a2f4

Browse files
authored
Use a synchronous mutex for bw/iop_tokens (#946)
1 parent 6513507 commit df7a2f4

File tree

2 files changed

+58
-58
lines changed

2 files changed

+58
-58
lines changed

upstairs/src/lib.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9018,16 +9018,16 @@ pub struct Guest {
90189018
* pulled off will be limited. No setting means they are sent right
90199019
* away.
90209020
*/
9021-
iop_tokens: Mutex<usize>,
9021+
iop_tokens: std::sync::Mutex<usize>,
90229022
bytes_per_iop: Option<usize>,
90239023
iop_limit: Option<usize>,
90249024

90259025
/*
90269026
* Setting a bandwidth limit will also limit the rate at which block
90279027
* reqs are pulled off the queue.
90289028
*/
9029-
bw_tokens: Mutex<usize>, // bytes
9030-
bw_limit: Option<usize>, // bytes per second
9029+
bw_tokens: std::sync::Mutex<usize>, // bytes
9030+
bw_limit: Option<usize>, // bytes per second
90319031
}
90329032

90339033
/*
@@ -9054,11 +9054,11 @@ impl Guest {
90549054
completed: AllocRingBuffer::new(2048),
90559055
}),
90569056

9057-
iop_tokens: Mutex::new(0),
9057+
iop_tokens: std::sync::Mutex::new(0),
90589058
bytes_per_iop: None,
90599059
iop_limit: None,
90609060

9061-
bw_tokens: Mutex::new(0),
9061+
bw_tokens: std::sync::Mutex::new(0),
90629062
bw_limit: None,
90639063
}
90649064
}
@@ -9117,8 +9117,8 @@ impl Guest {
91179117
*/
91189118
async fn consume_req(&self) -> Option<BlockReq> {
91199119
let mut reqs = self.reqs.lock().await;
9120-
let mut bw_tokens = self.bw_tokens.lock().await;
9121-
let mut iop_tokens = self.iop_tokens.lock().await;
9120+
let mut bw_tokens = self.bw_tokens.lock().unwrap();
9121+
let mut iop_tokens = self.iop_tokens.lock().unwrap();
91229122

91239123
self.consume_req_locked(&mut reqs, &mut bw_tokens, &mut iop_tokens)
91249124

@@ -9207,8 +9207,8 @@ impl Guest {
92079207
* IOPs are IO operations per second, so leak tokens to allow that
92089208
* through.
92099209
*/
9210-
pub async fn leak_iop_tokens(&self, tokens: usize) {
9211-
let mut iop_tokens = self.iop_tokens.lock().await;
9210+
pub fn leak_iop_tokens(&self, tokens: usize) {
9211+
let mut iop_tokens = self.iop_tokens.lock().unwrap();
92129212

92139213
if tokens > *iop_tokens {
92149214
*iop_tokens = 0;
@@ -9221,8 +9221,8 @@ impl Guest {
92219221
}
92229222

92239223
// Leak bytes from bandwidth tokens
9224-
pub async fn leak_bw_tokens(&self, bytes: usize) {
9225-
let mut bw_tokens = self.bw_tokens.lock().await;
9224+
pub fn leak_bw_tokens(&self, bytes: usize) {
9225+
let mut bw_tokens = self.bw_tokens.lock().unwrap();
92269226

92279227
if bytes > *bw_tokens {
92289228
*bw_tokens = 0;
@@ -10108,12 +10108,12 @@ async fn up_listen(
1010810108
_ = sleep_until(leak_deadline) => {
1010910109
if let Some(iop_limit) = up.guest.get_iop_limit() {
1011010110
let tokens = iop_limit / (1000 / LEAK_MS);
10111-
up.guest.leak_iop_tokens(tokens).await;
10111+
up.guest.leak_iop_tokens(tokens);
1011210112
}
1011310113

1011410114
if let Some(bw_limit) = up.guest.get_bw_limit() {
1011510115
let tokens = bw_limit / (1000 / LEAK_MS);
10116-
up.guest.leak_bw_tokens(tokens).await;
10116+
up.guest.leak_bw_tokens(tokens);
1011710117
}
1011810118

1011910119
leak_deadline = Instant::now().checked_add(leak_tick).unwrap();

upstairs/src/test.rs

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5406,7 +5406,7 @@ pub(crate) mod up_test {
54065406
assert!(guest.consume_req().await.is_none());
54075407

54085408
// If no IOP limit set, don't track it
5409-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5409+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
54105410

54115411
Ok(())
54125412
}
@@ -5447,21 +5447,21 @@ pub(crate) mod up_test {
54475447
// remains in the queue.
54485448
assert!(guest.consume_req().await.is_none());
54495449
assert!(!guest.reqs.lock().await.is_empty());
5450-
assert_eq!(*guest.iop_tokens.lock().await, 2);
5450+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 2);
54515451

54525452
// Replenish one token, meaning next read can be consumed
5453-
guest.leak_iop_tokens(1).await;
5454-
assert_eq!(*guest.iop_tokens.lock().await, 1);
5453+
guest.leak_iop_tokens(1);
5454+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 1);
54555455

54565456
assert!(guest.consume_req().await.is_some());
54575457
assert!(guest.reqs.lock().await.is_empty());
5458-
assert_eq!(*guest.iop_tokens.lock().await, 2);
5458+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 2);
54595459

5460-
guest.leak_iop_tokens(2).await;
5461-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5460+
guest.leak_iop_tokens(2);
5461+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
54625462

5463-
guest.leak_iop_tokens(16000).await;
5464-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5463+
guest.leak_iop_tokens(16000);
5464+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
54655465

54665466
Ok(())
54675467
}
@@ -5535,21 +5535,21 @@ pub(crate) mod up_test {
55355535
// remains in the queue.
55365536
assert!(guest.consume_req().await.is_none());
55375537
assert!(!guest.reqs.lock().await.is_empty());
5538-
assert_eq!(*guest.bw_tokens.lock().await, 1024 * 1024);
5538+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 1024 * 1024);
55395539

55405540
// Replenish enough tokens, meaning next read can be consumed
5541-
guest.leak_bw_tokens(1024 * 1024 / 2).await;
5542-
assert_eq!(*guest.bw_tokens.lock().await, 1024 * 1024 / 2);
5541+
guest.leak_bw_tokens(1024 * 1024 / 2);
5542+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 1024 * 1024 / 2);
55435543

55445544
assert!(guest.consume_req().await.is_some());
55455545
assert!(guest.reqs.lock().await.is_empty());
5546-
assert_eq!(*guest.bw_tokens.lock().await, 1024 * 1024);
5546+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 1024 * 1024);
55475547

5548-
guest.leak_bw_tokens(1024 * 1024).await;
5549-
assert_eq!(*guest.bw_tokens.lock().await, 0);
5548+
guest.leak_bw_tokens(1024 * 1024);
5549+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 0);
55505550

5551-
guest.leak_bw_tokens(1024 * 1024 * 1024).await;
5552-
assert_eq!(*guest.bw_tokens.lock().await, 0);
5551+
guest.leak_bw_tokens(1024 * 1024 * 1024);
5552+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 0);
55535553

55545554
Ok(())
55555555
}
@@ -5618,21 +5618,21 @@ pub(crate) mod up_test {
56185618
assert!(guest.consume_req().await.is_none());
56195619

56205620
// Assert we've hit the BW limit before IOPS
5621-
assert_eq!(*guest.iop_tokens.lock().await, 438); // 437.5 rounded up
5622-
assert_eq!(*guest.bw_tokens.lock().await, 7000 * 1024);
5621+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 438); // 437.5 rounded up
5622+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 7000 * 1024);
56235623

5624-
guest.leak_iop_tokens(438).await;
5625-
guest.leak_bw_tokens(7000 * 1024).await;
5624+
guest.leak_iop_tokens(438);
5625+
guest.leak_bw_tokens(7000 * 1024);
56265626

56275627
assert!(guest.consume_req().await.is_some());
56285628
assert!(guest.reqs.lock().await.is_empty());
56295629

56305630
// Back to zero
5631-
guest.leak_iop_tokens(438).await;
5632-
guest.leak_bw_tokens(7000 * 1024).await;
5631+
guest.leak_iop_tokens(438);
5632+
guest.leak_bw_tokens(7000 * 1024);
56335633

5634-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5635-
assert_eq!(*guest.bw_tokens.lock().await, 0);
5634+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
5635+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 0);
56365636

56375637
// Validate that IOP limit activates by sending 501 1024b IOs
56385638
for _ in 0..500 {
@@ -5654,17 +5654,17 @@ pub(crate) mod up_test {
56545654
assert!(guest.consume_req().await.is_none());
56555655

56565656
// Assert we've hit the IOPS limit
5657-
assert_eq!(*guest.iop_tokens.lock().await, 500);
5658-
assert_eq!(*guest.bw_tokens.lock().await, 500 * 1024);
5657+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 500);
5658+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 500 * 1024);
56595659

56605660
// Back to zero
5661-
guest.leak_iop_tokens(500).await;
5662-
guest.leak_bw_tokens(500 * 1024).await;
5661+
guest.leak_iop_tokens(500);
5662+
guest.leak_bw_tokens(500 * 1024);
56635663
guest.reqs.lock().await.clear();
56645664

56655665
assert!(guest.reqs.lock().await.is_empty());
5666-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5667-
assert_eq!(*guest.bw_tokens.lock().await, 0);
5666+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
5667+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 0);
56685668

56695669
// From
56705670
// https://aws.amazon.com/premiumsupport/knowledge-center/ebs-calculate-optimal-io-size/:
@@ -5680,8 +5680,8 @@ pub(crate) mod up_test {
56805680
// I mean, it makes sense: now we submit 500 of those to reach both
56815681
// limits at the same time.
56825682
for i in 0..500 {
5683-
assert_eq!(*guest.iop_tokens.lock().await, i);
5684-
assert_eq!(*guest.bw_tokens.lock().await, i * optimal_io_size);
5683+
assert_eq!(*guest.iop_tokens.lock().unwrap(), i);
5684+
assert_eq!(*guest.bw_tokens.lock().unwrap(), i * optimal_io_size);
56855685

56865686
let _ = guest
56875687
.send(BlockOp::Read {
@@ -5693,8 +5693,8 @@ pub(crate) mod up_test {
56935693
assert!(guest.consume_req().await.is_some());
56945694
}
56955695

5696-
assert_eq!(*guest.iop_tokens.lock().await, 500);
5697-
assert_eq!(*guest.bw_tokens.lock().await, 500 * optimal_io_size);
5696+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 500);
5697+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 500 * optimal_io_size);
56985698

56995699
Ok(())
57005700
}
@@ -5723,8 +5723,8 @@ pub(crate) mod up_test {
57235723
})
57245724
.await;
57255725

5726-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5727-
assert_eq!(*guest.bw_tokens.lock().await, 0);
5726+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
5727+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 0);
57285728

57295729
// Even though the first IO is larger than the bandwidth and IOP limit,
57305730
// it should still succeed. The next IO should not, even if it consumes
@@ -5734,25 +5734,25 @@ pub(crate) mod up_test {
57345734
assert!(guest.consume_req().await.is_some());
57355735
assert!(guest.consume_req().await.is_none());
57365736

5737-
assert_eq!(*guest.iop_tokens.lock().await, 20);
5738-
assert_eq!(*guest.bw_tokens.lock().await, 10 * 1024 * 1024);
5737+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 20);
5738+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 10 * 1024 * 1024);
57395739

57405740
// Bandwidth trigger is going to be larger and need more leaking to get
57415741
// down to a point where the zero sized IO can fire.
57425742
for _ in 0..9 {
5743-
guest.leak_iop_tokens(10).await;
5744-
guest.leak_bw_tokens(1024 * 1024).await;
5743+
guest.leak_iop_tokens(10);
5744+
guest.leak_bw_tokens(1024 * 1024);
57455745

57465746
assert!(guest.consume_req().await.is_none());
57475747
}
57485748

5749-
assert_eq!(*guest.iop_tokens.lock().await, 0);
5750-
assert_eq!(*guest.bw_tokens.lock().await, 1024 * 1024);
5749+
assert_eq!(*guest.iop_tokens.lock().unwrap(), 0);
5750+
assert_eq!(*guest.bw_tokens.lock().unwrap(), 1024 * 1024);
57515751

57525752
assert!(guest.consume_req().await.is_none());
57535753

5754-
guest.leak_iop_tokens(10).await;
5755-
guest.leak_bw_tokens(1024 * 1024).await;
5754+
guest.leak_iop_tokens(10);
5755+
guest.leak_bw_tokens(1024 * 1024);
57565756

57575757
// We've leaked 10 KiB worth, it should fire now!
57585758
assert!(guest.consume_req().await.is_some());

0 commit comments

Comments
 (0)