Skip to content

Commit

Permalink
add data types for global speed limit
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Jul 23, 2024
1 parent 2a75d83 commit 1a004bc
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/g3-io-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fastrand.workspace = true
bytes.workspace = true
ahash.workspace = true
smallvec.workspace = true
arc-swap.workspace = true
quinn = { workspace = true, optional = true }
g3-types.workspace = true
g3-resolver = { workspace = true, optional = true }
Expand Down
4 changes: 4 additions & 0 deletions lib/g3-io-ext/src/io/limited_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl LimitedReaderState {
}
}
}
StreamLimitAction::DelayUntil(t) => {
self.delay.as_mut().reset(t);
self.delay.poll_unpin(cx).map(|_| Ok(()))
}
StreamLimitAction::DelayFor(ms) => {
self.delay
.as_mut()
Expand Down
4 changes: 4 additions & 0 deletions lib/g3-io-ext/src/io/limited_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl LimitedWriterState {
Poll::Pending
}
},
StreamLimitAction::DelayUntil(t) => {
self.delay.as_mut().reset(t);
self.delay.poll_unpin(cx).map(|_| Ok(0))
}
StreamLimitAction::DelayFor(ms) => {
self.delay
.as_mut()
Expand Down
6 changes: 3 additions & 3 deletions lib/g3-io-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub use cache::{
};
pub use io::*;
pub use limit::{
DatagramLimitAction, DatagramLimiter, GlobalDatagramLimit, GlobalStreamLimit,
LocalDatagramLimiter, LocalStreamLimiter, StreamLimitAction, StreamLimiter,
ThreadedCountLimiter,
DatagramLimitAction, DatagramLimiter, GlobalDatagramLimit, GlobalDatagramLimiter,
GlobalStreamLimit, GlobalStreamLimiter, LocalDatagramLimiter, LocalStreamLimiter,
StreamLimitAction, StreamLimiter, ThreadedCountLimiter,
};
pub use listen::{LimitedTcpListener, LimitedTlsListener};
pub use udp::*;
Expand Down
46 changes: 33 additions & 13 deletions lib/g3-io-ext/src/limit/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@

use std::sync::Arc;

use tokio::time::Instant;

use super::LocalDatagramLimiter;

pub enum DatagramLimitAction {
Advance(usize),
DelayUntil(Instant),
DelayFor(u64),
}

pub trait GlobalDatagramLimit {
fn check_packet(&self, buf_size: usize) -> DatagramLimitAction;
fn check_packets(&self, total_size_v: &[usize]) -> DatagramLimitAction;
fn release_size(&self, size: usize);
fn release_packets(&self, packets: usize);
fn release_bytes(&self, size: usize);
fn release_packets(&self, count: usize);
}

struct GlobalLimiter {
inner: Arc<dyn GlobalDatagramLimit + Send + Sync>,
checked_packets: Option<usize>,
checked_size: Option<usize>,
checked_bytes: Option<usize>,
}

impl GlobalLimiter {
Expand All @@ -44,7 +47,7 @@ impl GlobalLimiter {
GlobalLimiter {
inner,
checked_packets: None,
checked_size: None,
checked_bytes: None,
}
}
}
Expand Down Expand Up @@ -97,14 +100,21 @@ impl DatagramLimiter {
pub fn check_packet(&mut self, cur_millis: u64, buf_size: usize) -> DatagramLimitAction {
match self.local.check_packet(cur_millis, buf_size) {
DatagramLimitAction::Advance(_) => {}
DatagramLimitAction::DelayFor(n) => return DatagramLimitAction::DelayFor(n),
DatagramLimitAction::DelayUntil(t) => return DatagramLimitAction::DelayUntil(t),
DatagramLimitAction::DelayFor(n) => {
return DatagramLimitAction::DelayFor(n);
}
};

for limiter in &mut self.global {
match limiter.inner.check_packet(buf_size) {
DatagramLimitAction::Advance(_) => {
limiter.checked_packets = Some(1);
limiter.checked_size = Some(buf_size);
limiter.checked_bytes = Some(buf_size);
}
DatagramLimitAction::DelayUntil(t) => {
self.release_global();
return DatagramLimitAction::DelayUntil(t);
}
DatagramLimitAction::DelayFor(n) => {
self.release_global();
Expand All @@ -123,7 +133,10 @@ impl DatagramLimiter {
) -> DatagramLimitAction {
let mut to_advance = match self.local.check_packets(cur_millis, total_size_v) {
DatagramLimitAction::Advance(n) => n,
DatagramLimitAction::DelayFor(n) => return DatagramLimitAction::DelayFor(n),
DatagramLimitAction::DelayUntil(t) => return DatagramLimitAction::DelayUntil(t),
DatagramLimitAction::DelayFor(n) => {
return DatagramLimitAction::DelayFor(n);
}
};
if self.global.is_empty() {
return DatagramLimitAction::Advance(to_advance);
Expand All @@ -135,6 +148,10 @@ impl DatagramLimiter {
to_advance = n;
limiter.checked_packets = Some(n);
}
DatagramLimitAction::DelayUntil(t) => {
self.release_global();
return DatagramLimitAction::DelayUntil(t);
}
DatagramLimitAction::DelayFor(n) => {
self.release_global();
return DatagramLimitAction::DelayFor(n);
Expand All @@ -143,14 +160,17 @@ impl DatagramLimiter {
}

if total_size_v.len() > to_advance {
let buf_size = total_size_v[to_advance];
let buf_size = total_size_v[to_advance - 1];
for limiter in &mut self.global {
let checked = limiter.checked_packets.take().unwrap();
if checked > to_advance {
limiter.inner.release_packets(checked - to_advance);
limiter
.inner
.release_bytes(total_size_v[checked - 1] - buf_size);
}
limiter.checked_packets = Some(to_advance);
limiter.checked_size = Some(buf_size);
limiter.checked_bytes = Some(buf_size);
}
}
DatagramLimitAction::Advance(to_advance)
Expand All @@ -162,8 +182,8 @@ impl DatagramLimiter {
break;
};
limiter.inner.release_packets(packets);
if let Some(size) = limiter.checked_size.take() {
limiter.inner.release_size(size);
if let Some(size) = limiter.checked_bytes.take() {
limiter.inner.release_bytes(size);
}
}
}
Expand All @@ -180,9 +200,9 @@ impl DatagramLimiter {
limiter.inner.release_packets(checked - packets);
}

if let Some(checked) = limiter.checked_size.take() {
if let Some(checked) = limiter.checked_bytes.take() {
if checked > size {
limiter.inner.release_size(checked - size);
limiter.inner.release_bytes(checked - size);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions lib/g3-io-ext/src/limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ pub use stream::{GlobalStreamLimit, StreamLimitAction, StreamLimiter};

mod fixed_window;
pub use fixed_window::{LocalDatagramLimiter, LocalStreamLimiter, ThreadedCountLimiter};

mod token_bucket;
pub use token_bucket::{GlobalDatagramLimiter, GlobalStreamLimiter};
22 changes: 15 additions & 7 deletions lib/g3-io-ext/src/limit/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

use std::sync::Arc;

use tokio::time::Instant;

use super::LocalStreamLimiter;

#[derive(Debug, Eq, PartialEq)]
pub enum StreamLimitAction {
AdvanceBy(usize),
DelayUntil(Instant),
DelayFor(u64),
}

Expand All @@ -31,7 +34,7 @@ pub trait GlobalStreamLimit {

struct GlobalLimiter {
inner: Arc<dyn GlobalStreamLimit + Send + Sync>,
checked_size: Option<usize>,
checked_bytes: Option<usize>,
}

impl GlobalLimiter {
Expand All @@ -41,7 +44,7 @@ impl GlobalLimiter {
{
GlobalLimiter {
inner,
checked_size: None,
checked_bytes: None,
}
}
}
Expand Down Expand Up @@ -88,14 +91,19 @@ impl StreamLimiter {
let target = to_advance;
let mut to_advance = match self.local.check(cur_millis, to_advance) {
StreamLimitAction::AdvanceBy(size) => size,
StreamLimitAction::DelayUntil(t) => return StreamLimitAction::DelayUntil(t),
StreamLimitAction::DelayFor(n) => return StreamLimitAction::DelayFor(n),
};

for limiter in &mut self.global {
match limiter.inner.check(to_advance) {
StreamLimitAction::AdvanceBy(size) => {
to_advance = size;
limiter.checked_size = Some(size);
limiter.checked_bytes = Some(size);
}
StreamLimitAction::DelayUntil(t) => {
self.release_global();
return StreamLimitAction::DelayUntil(t);
}
StreamLimitAction::DelayFor(n) => {
self.release_global();
Expand All @@ -107,19 +115,19 @@ impl StreamLimiter {
if target > to_advance {
// shrink in time
for limiter in &mut self.global {
let checked = limiter.checked_size.take().unwrap();
let checked = limiter.checked_bytes.take().unwrap();
if checked > to_advance {
limiter.inner.release(checked - to_advance);
}
limiter.checked_size = Some(to_advance);
limiter.checked_bytes = Some(to_advance);
}
}
StreamLimitAction::AdvanceBy(to_advance)
}

pub fn release_global(&mut self) {
for limiter in &mut self.global {
let Some(taken) = limiter.checked_size.take() else {
let Some(taken) = limiter.checked_bytes.take() else {
break;
};
limiter.inner.release(taken);
Expand All @@ -130,7 +138,7 @@ impl StreamLimiter {
self.local.set_advance(size);

for limiter in &mut self.global {
let Some(taken) = limiter.checked_size.take() else {
let Some(taken) = limiter.checked_bytes.take() else {
break;
};
if taken > size {
Expand Down
Loading

0 comments on commit 1a004bc

Please sign in to comment.