From 1a004bcc9fa5697b72da9ba573ea02e42ae70612 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Tue, 23 Jul 2024 16:41:27 +0800 Subject: [PATCH] add data types for global speed limit --- Cargo.lock | 1 + lib/g3-io-ext/Cargo.toml | 1 + lib/g3-io-ext/src/io/limited_read.rs | 4 + lib/g3-io-ext/src/io/limited_write.rs | 4 + lib/g3-io-ext/src/lib.rs | 6 +- lib/g3-io-ext/src/limit/datagram.rs | 46 +++- lib/g3-io-ext/src/limit/mod.rs | 3 + lib/g3-io-ext/src/limit/stream.rs | 22 +- .../src/limit/token_bucket/datagram.rs | 224 ++++++++++++++++++ lib/g3-io-ext/src/limit/token_bucket/mod.rs | 21 ++ .../src/limit/token_bucket/stream.rs | 85 +++++++ lib/g3-io-ext/src/quic/limited_socket.rs | 18 +- lib/g3-io-ext/src/udp/ext/unix.rs | 2 +- lib/g3-io-ext/src/udp/recv.rs | 20 +- lib/g3-io-ext/src/udp/send.rs | 20 +- lib/g3-types/src/limit/datagram_speed.rs | 99 ++++++++ lib/g3-types/src/limit/mod.rs | 10 +- lib/g3-types/src/limit/stream_speed.rs | 74 ++++++ 18 files changed, 626 insertions(+), 34 deletions(-) create mode 100644 lib/g3-io-ext/src/limit/token_bucket/datagram.rs create mode 100644 lib/g3-io-ext/src/limit/token_bucket/mod.rs create mode 100644 lib/g3-io-ext/src/limit/token_bucket/stream.rs create mode 100644 lib/g3-types/src/limit/datagram_speed.rs create mode 100644 lib/g3-types/src/limit/stream_speed.rs diff --git a/Cargo.lock b/Cargo.lock index 4750479a8..b20d64920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1281,6 +1281,7 @@ name = "g3-io-ext" version = "0.7.0" dependencies = [ "ahash", + "arc-swap", "bytes", "fastrand", "futures-util", diff --git a/lib/g3-io-ext/Cargo.toml b/lib/g3-io-ext/Cargo.toml index d3af7d074..ffbc7774f 100644 --- a/lib/g3-io-ext/Cargo.toml +++ b/lib/g3-io-ext/Cargo.toml @@ -19,6 +19,7 @@ fastrand.workspace = true bytes.workspace = true ahash.workspace = true smallvec.workspace = true +arc-swap.workspace = true quinn = { workspace = true, optional = true } g3-types.workspace = true g3-resolver = { workspace = true, optional = true } diff --git a/lib/g3-io-ext/src/io/limited_read.rs b/lib/g3-io-ext/src/io/limited_read.rs index 2e672cf9f..69ed92738 100644 --- a/lib/g3-io-ext/src/io/limited_read.rs +++ b/lib/g3-io-ext/src/io/limited_read.rs @@ -118,6 +118,10 @@ impl LimitedReaderState { } } } + StreamLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(())) + } StreamLimitAction::DelayFor(ms) => { self.delay .as_mut() diff --git a/lib/g3-io-ext/src/io/limited_write.rs b/lib/g3-io-ext/src/io/limited_write.rs index 975858c1c..191f99e74 100644 --- a/lib/g3-io-ext/src/io/limited_write.rs +++ b/lib/g3-io-ext/src/io/limited_write.rs @@ -118,6 +118,10 @@ impl LimitedWriterState { Poll::Pending } }, + StreamLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } StreamLimitAction::DelayFor(ms) => { self.delay .as_mut() diff --git a/lib/g3-io-ext/src/lib.rs b/lib/g3-io-ext/src/lib.rs index 580ad0ed2..48c884c8c 100644 --- a/lib/g3-io-ext/src/lib.rs +++ b/lib/g3-io-ext/src/lib.rs @@ -26,9 +26,9 @@ pub use cache::{ }; pub use io::*; pub use limit::{ - DatagramLimitAction, DatagramLimiter, GlobalDatagramLimit, GlobalStreamLimit, - LocalDatagramLimiter, LocalStreamLimiter, StreamLimitAction, StreamLimiter, - ThreadedCountLimiter, + DatagramLimitAction, DatagramLimiter, GlobalDatagramLimit, GlobalDatagramLimiter, + GlobalStreamLimit, GlobalStreamLimiter, LocalDatagramLimiter, LocalStreamLimiter, + StreamLimitAction, StreamLimiter, ThreadedCountLimiter, }; pub use listen::{LimitedTcpListener, LimitedTlsListener}; pub use udp::*; diff --git a/lib/g3-io-ext/src/limit/datagram.rs b/lib/g3-io-ext/src/limit/datagram.rs index 4900797e2..b70991a78 100644 --- a/lib/g3-io-ext/src/limit/datagram.rs +++ b/lib/g3-io-ext/src/limit/datagram.rs @@ -16,24 +16,27 @@ use std::sync::Arc; +use tokio::time::Instant; + use super::LocalDatagramLimiter; pub enum DatagramLimitAction { Advance(usize), + DelayUntil(Instant), DelayFor(u64), } pub trait GlobalDatagramLimit { fn check_packet(&self, buf_size: usize) -> DatagramLimitAction; fn check_packets(&self, total_size_v: &[usize]) -> DatagramLimitAction; - fn release_size(&self, size: usize); - fn release_packets(&self, packets: usize); + fn release_bytes(&self, size: usize); + fn release_packets(&self, count: usize); } struct GlobalLimiter { inner: Arc, checked_packets: Option, - checked_size: Option, + checked_bytes: Option, } impl GlobalLimiter { @@ -44,7 +47,7 @@ impl GlobalLimiter { GlobalLimiter { inner, checked_packets: None, - checked_size: None, + checked_bytes: None, } } } @@ -97,14 +100,21 @@ impl DatagramLimiter { pub fn check_packet(&mut self, cur_millis: u64, buf_size: usize) -> DatagramLimitAction { match self.local.check_packet(cur_millis, buf_size) { DatagramLimitAction::Advance(_) => {} - DatagramLimitAction::DelayFor(n) => return DatagramLimitAction::DelayFor(n), + DatagramLimitAction::DelayUntil(t) => return DatagramLimitAction::DelayUntil(t), + DatagramLimitAction::DelayFor(n) => { + return DatagramLimitAction::DelayFor(n); + } }; for limiter in &mut self.global { match limiter.inner.check_packet(buf_size) { DatagramLimitAction::Advance(_) => { limiter.checked_packets = Some(1); - limiter.checked_size = Some(buf_size); + limiter.checked_bytes = Some(buf_size); + } + DatagramLimitAction::DelayUntil(t) => { + self.release_global(); + return DatagramLimitAction::DelayUntil(t); } DatagramLimitAction::DelayFor(n) => { self.release_global(); @@ -123,7 +133,10 @@ impl DatagramLimiter { ) -> DatagramLimitAction { let mut to_advance = match self.local.check_packets(cur_millis, total_size_v) { DatagramLimitAction::Advance(n) => n, - DatagramLimitAction::DelayFor(n) => return DatagramLimitAction::DelayFor(n), + DatagramLimitAction::DelayUntil(t) => return DatagramLimitAction::DelayUntil(t), + DatagramLimitAction::DelayFor(n) => { + return DatagramLimitAction::DelayFor(n); + } }; if self.global.is_empty() { return DatagramLimitAction::Advance(to_advance); @@ -135,6 +148,10 @@ impl DatagramLimiter { to_advance = n; limiter.checked_packets = Some(n); } + DatagramLimitAction::DelayUntil(t) => { + self.release_global(); + return DatagramLimitAction::DelayUntil(t); + } DatagramLimitAction::DelayFor(n) => { self.release_global(); return DatagramLimitAction::DelayFor(n); @@ -143,14 +160,17 @@ impl DatagramLimiter { } if total_size_v.len() > to_advance { - let buf_size = total_size_v[to_advance]; + let buf_size = total_size_v[to_advance - 1]; for limiter in &mut self.global { let checked = limiter.checked_packets.take().unwrap(); if checked > to_advance { limiter.inner.release_packets(checked - to_advance); + limiter + .inner + .release_bytes(total_size_v[checked - 1] - buf_size); } limiter.checked_packets = Some(to_advance); - limiter.checked_size = Some(buf_size); + limiter.checked_bytes = Some(buf_size); } } DatagramLimitAction::Advance(to_advance) @@ -162,8 +182,8 @@ impl DatagramLimiter { break; }; limiter.inner.release_packets(packets); - if let Some(size) = limiter.checked_size.take() { - limiter.inner.release_size(size); + if let Some(size) = limiter.checked_bytes.take() { + limiter.inner.release_bytes(size); } } } @@ -180,9 +200,9 @@ impl DatagramLimiter { limiter.inner.release_packets(checked - packets); } - if let Some(checked) = limiter.checked_size.take() { + if let Some(checked) = limiter.checked_bytes.take() { if checked > size { - limiter.inner.release_size(checked - size); + limiter.inner.release_bytes(checked - size); } } } diff --git a/lib/g3-io-ext/src/limit/mod.rs b/lib/g3-io-ext/src/limit/mod.rs index 92dab7947..dd66f3373 100644 --- a/lib/g3-io-ext/src/limit/mod.rs +++ b/lib/g3-io-ext/src/limit/mod.rs @@ -22,3 +22,6 @@ pub use stream::{GlobalStreamLimit, StreamLimitAction, StreamLimiter}; mod fixed_window; pub use fixed_window::{LocalDatagramLimiter, LocalStreamLimiter, ThreadedCountLimiter}; + +mod token_bucket; +pub use token_bucket::{GlobalDatagramLimiter, GlobalStreamLimiter}; diff --git a/lib/g3-io-ext/src/limit/stream.rs b/lib/g3-io-ext/src/limit/stream.rs index 49aaf6e18..c44d124b0 100644 --- a/lib/g3-io-ext/src/limit/stream.rs +++ b/lib/g3-io-ext/src/limit/stream.rs @@ -16,11 +16,14 @@ use std::sync::Arc; +use tokio::time::Instant; + use super::LocalStreamLimiter; #[derive(Debug, Eq, PartialEq)] pub enum StreamLimitAction { AdvanceBy(usize), + DelayUntil(Instant), DelayFor(u64), } @@ -31,7 +34,7 @@ pub trait GlobalStreamLimit { struct GlobalLimiter { inner: Arc, - checked_size: Option, + checked_bytes: Option, } impl GlobalLimiter { @@ -41,7 +44,7 @@ impl GlobalLimiter { { GlobalLimiter { inner, - checked_size: None, + checked_bytes: None, } } } @@ -88,6 +91,7 @@ impl StreamLimiter { let target = to_advance; let mut to_advance = match self.local.check(cur_millis, to_advance) { StreamLimitAction::AdvanceBy(size) => size, + StreamLimitAction::DelayUntil(t) => return StreamLimitAction::DelayUntil(t), StreamLimitAction::DelayFor(n) => return StreamLimitAction::DelayFor(n), }; @@ -95,7 +99,11 @@ impl StreamLimiter { match limiter.inner.check(to_advance) { StreamLimitAction::AdvanceBy(size) => { to_advance = size; - limiter.checked_size = Some(size); + limiter.checked_bytes = Some(size); + } + StreamLimitAction::DelayUntil(t) => { + self.release_global(); + return StreamLimitAction::DelayUntil(t); } StreamLimitAction::DelayFor(n) => { self.release_global(); @@ -107,11 +115,11 @@ impl StreamLimiter { if target > to_advance { // shrink in time for limiter in &mut self.global { - let checked = limiter.checked_size.take().unwrap(); + let checked = limiter.checked_bytes.take().unwrap(); if checked > to_advance { limiter.inner.release(checked - to_advance); } - limiter.checked_size = Some(to_advance); + limiter.checked_bytes = Some(to_advance); } } StreamLimitAction::AdvanceBy(to_advance) @@ -119,7 +127,7 @@ impl StreamLimiter { pub fn release_global(&mut self) { for limiter in &mut self.global { - let Some(taken) = limiter.checked_size.take() else { + let Some(taken) = limiter.checked_bytes.take() else { break; }; limiter.inner.release(taken); @@ -130,7 +138,7 @@ impl StreamLimiter { self.local.set_advance(size); for limiter in &mut self.global { - let Some(taken) = limiter.checked_size.take() else { + let Some(taken) = limiter.checked_bytes.take() else { break; }; if taken > size { diff --git a/lib/g3-io-ext/src/limit/token_bucket/datagram.rs b/lib/g3-io-ext/src/limit/token_bucket/datagram.rs new file mode 100644 index 000000000..847af0482 --- /dev/null +++ b/lib/g3-io-ext/src/limit/token_bucket/datagram.rs @@ -0,0 +1,224 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::atomic::{AtomicU64, Ordering}; + +use arc_swap::ArcSwap; +use tokio::time::Instant; + +use g3_types::limit::GlobalDatagramSpeedLimitConfig; + +use crate::limit::{DatagramLimitAction, GlobalDatagramLimit}; + +pub struct GlobalDatagramLimiter { + config: ArcSwap, + byte_tokens: AtomicU64, + packet_tokens: AtomicU64, + last_updated: ArcSwap, +} + +impl GlobalDatagramLimiter { + fn add_bytes(&self, size: u64, max_burst: u64) { + let mut cur_tokens = self.byte_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens >= max_burst { + break; + } + let next_tokens = (cur_tokens + size).max(max_burst); + match self.byte_tokens.compare_exchange( + cur_tokens, + next_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => cur_tokens = actual, + } + } + } + + fn add_packets(&self, count: u64, max_burst: u64) { + let mut cur_tokens = self.packet_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens >= max_burst { + break; + } + let next_tokens = (cur_tokens + count).max(max_burst); + match self.packet_tokens.compare_exchange( + cur_tokens, + next_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => cur_tokens = actual, + } + } + } + + fn wait_until(&self) -> Instant { + let last_updated = *self.last_updated.load().as_ref(); + let interval = self.config.load().as_ref().replenish_interval(); + last_updated + interval + } +} + +impl GlobalDatagramLimit for GlobalDatagramLimiter { + fn check_packet(&self, buf_size: usize) -> DatagramLimitAction { + let config = *self.config.load().as_ref(); + + if config.replenish_packets() > 0 { + let mut cur_tokens = self.packet_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens == 0 { + return DatagramLimitAction::DelayUntil(self.wait_until()); + } + let left_tokens = cur_tokens.saturating_sub(1); + match self.packet_tokens.compare_exchange( + cur_tokens, + left_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => cur_tokens = actual, + } + } + } + + if config.replenish_bytes() > 0 { + let mut cur_tokens = self.byte_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens < buf_size as u64 { + if config.replenish_packets() > 0 { + self.add_packets(1, config.max_burst_packets()); + } + return DatagramLimitAction::DelayUntil(self.wait_until()); + } + let left_tokens = cur_tokens - buf_size as u64; + match self.byte_tokens.compare_exchange( + cur_tokens, + left_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => cur_tokens = actual, + } + } + } + + DatagramLimitAction::Advance(1) + } + + fn check_packets(&self, total_size_v: &[usize]) -> DatagramLimitAction { + let config = *self.config.load().as_ref(); + + let mut to_advance = total_size_v.len(); + if config.replenish_packets() > 0 { + let mut cur_tokens = self.packet_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens == 0 { + return DatagramLimitAction::DelayUntil(self.wait_until()); + } + let left_tokens = cur_tokens.saturating_sub(to_advance as u64); + match self.packet_tokens.compare_exchange( + cur_tokens, + left_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => to_advance = (cur_tokens - left_tokens) as usize, + Err(actual) => cur_tokens = actual, + } + } + } + + let mut buf_size = total_size_v[to_advance - 1]; + if config.replenish_bytes() > 0 { + let mut cur_tokens = self.byte_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens == 0 { + if config.replenish_packets() > 0 { + self.add_packets(to_advance as u64, config.max_burst_packets()); + } + return DatagramLimitAction::DelayUntil(self.wait_until()); + } + let left_tokens = cur_tokens.saturating_sub(buf_size as u64); + match self.byte_tokens.compare_exchange( + cur_tokens, + left_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => buf_size = (cur_tokens - left_tokens) as usize, + Err(actual) => cur_tokens = actual, + } + } + } + + if buf_size == total_size_v[to_advance - 1] { + return DatagramLimitAction::Advance(to_advance); + } + + match total_size_v.binary_search(&buf_size) { + Ok(found_index) => { + if config.replenish_packets() > 0 { + // release unneeded packets + self.add_packets( + (to_advance - found_index - 1) as u64, + config.max_burst_packets(), + ); + } + to_advance = found_index + 1; + } + Err(insert_index) => { + if config.replenish_packets() > 0 { + // release unneeded packets + self.add_packets( + (to_advance - insert_index) as u64, + config.max_burst_packets(), + ); + } + to_advance = insert_index; + if config.replenish_bytes() > 0 { + // release unneeded bytes + self.add_bytes( + (buf_size - total_size_v[to_advance - 1]) as u64, + config.max_burst_bytes(), + ); + } + } + } + DatagramLimitAction::Advance(to_advance) + } + + fn release_bytes(&self, size: usize) { + let max_burst = self.config.load().as_ref().max_burst_bytes(); + self.add_bytes(size as u64, max_burst); + } + + fn release_packets(&self, count: usize) { + let max_burst = self.config.load().as_ref().max_burst_packets(); + self.add_packets(count as u64, max_burst); + } +} diff --git a/lib/g3-io-ext/src/limit/token_bucket/mod.rs b/lib/g3-io-ext/src/limit/token_bucket/mod.rs new file mode 100644 index 000000000..4957fcbdd --- /dev/null +++ b/lib/g3-io-ext/src/limit/token_bucket/mod.rs @@ -0,0 +1,21 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod stream; +pub use stream::GlobalStreamLimiter; + +mod datagram; +pub use datagram::GlobalDatagramLimiter; diff --git a/lib/g3-io-ext/src/limit/token_bucket/stream.rs b/lib/g3-io-ext/src/limit/token_bucket/stream.rs new file mode 100644 index 000000000..dd03a2be3 --- /dev/null +++ b/lib/g3-io-ext/src/limit/token_bucket/stream.rs @@ -0,0 +1,85 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::atomic::{AtomicU64, Ordering}; + +use arc_swap::ArcSwap; +use tokio::time::Instant; + +use g3_types::limit::GlobalStreamSpeedLimitConfig; + +use crate::limit::{GlobalStreamLimit, StreamLimitAction}; + +pub struct GlobalStreamLimiter { + config: ArcSwap, + byte_tokens: AtomicU64, + last_updated: ArcSwap, +} + +impl GlobalStreamLimiter { + fn add_bytes(&self, size: u64, max_burst: u64) { + let mut cur_tokens = self.byte_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens >= max_burst { + break; + } + let next_tokens = (cur_tokens + size).max(max_burst); + match self.byte_tokens.compare_exchange( + cur_tokens, + next_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => cur_tokens = actual, + } + } + } + + fn wait_until(&self) -> Instant { + let last_updated = *self.last_updated.load().as_ref(); + let interval = self.config.load().as_ref().replenish_interval(); + last_updated + interval + } +} + +impl GlobalStreamLimit for GlobalStreamLimiter { + fn check(&self, to_advance: usize) -> StreamLimitAction { + let mut cur_tokens = self.byte_tokens.load(Ordering::Acquire); + + loop { + if cur_tokens == 0 { + return StreamLimitAction::DelayUntil(self.wait_until()); + } + let left_tokens = cur_tokens.saturating_sub(to_advance as u64); + match self.byte_tokens.compare_exchange( + cur_tokens, + left_tokens, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return StreamLimitAction::AdvanceBy((cur_tokens - left_tokens) as usize), + Err(actual) => cur_tokens = actual, + } + } + } + + fn release(&self, size: usize) { + let max_burst = self.config.load().as_ref().max_burst_bytes(); + self.add_bytes(size as u64, max_burst); + } +} diff --git a/lib/g3-io-ext/src/quic/limited_socket.rs b/lib/g3-io-ext/src/quic/limited_socket.rs index 542c661fa..aac4f3791 100644 --- a/lib/g3-io-ext/src/quic/limited_socket.rs +++ b/lib/g3-io-ext/src/quic/limited_socket.rs @@ -304,6 +304,14 @@ impl AsyncUdpSocket for LimitedUdpSocket { Err(e) } }, + DatagramLimitAction::DelayUntil(t) => { + l.delay.as_mut().reset(t); + l.poll_delay = true; + Err(io::Error::new( + io::ErrorKind::WouldBlock, + "delayed by rate limiter", + )) + } DatagramLimitAction::DelayFor(ms) => { l.delay .as_mut() @@ -334,15 +342,15 @@ impl AsyncUdpSocket for LimitedUdpSocket { let dur_millis = l.started.elapsed().as_millis() as u64; let mut total_size_v = Vec::with_capacity(meta.len()); let mut total_size = 0; - for m in meta.iter() { - total_size += m.len; + for b in bufs.iter() { + total_size += b.len(); total_size_v.push(total_size); } match l.limit.check_packets(dur_millis, &total_size_v) { DatagramLimitAction::Advance(n) => { match self.inner.poll_recv(cx, &mut bufs[0..n], &mut meta[0..n]) { Poll::Ready(Ok(nr)) => { - let len = total_size_v[nr]; + let len = meta.iter().take(nr).map(|m| m.len).sum(); l.limit.set_advance(nr, len); l.stats.add_recv_packets(nr); l.stats.add_recv_bytes(len); @@ -358,6 +366,10 @@ impl AsyncUdpSocket for LimitedUdpSocket { } } } + DatagramLimitAction::DelayUntil(t) => { + l.delay.as_mut().reset(t); + l.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { l.delay .as_mut() diff --git a/lib/g3-io-ext/src/udp/ext/unix.rs b/lib/g3-io-ext/src/udp/ext/unix.rs index 42d2495fd..baa4f0f90 100644 --- a/lib/g3-io-ext/src/udp/ext/unix.rs +++ b/lib/g3-io-ext/src/udp/ext/unix.rs @@ -106,7 +106,7 @@ impl From for RawSocketAddr { } pub struct SendMsgHdr<'a, const C: usize> { - iov: [IoSlice<'a>; C], + pub(crate) iov: [IoSlice<'a>; C], c_addr: Option>, pub n_send: usize, } diff --git a/lib/g3-io-ext/src/udp/recv.rs b/lib/g3-io-ext/src/udp/recv.rs index 0c2c7e28e..3ab30c9fc 100644 --- a/lib/g3-io-ext/src/udp/recv.rs +++ b/lib/g3-io-ext/src/udp/recv.rs @@ -119,6 +119,12 @@ where Poll::Pending } }, + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay + .poll_unpin(cx) + .map(|_| Ok((0, SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)))) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() @@ -156,6 +162,10 @@ where Poll::Pending } }, + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() @@ -186,16 +196,16 @@ where if self.limit.is_set() { let dur_millis = self.started.elapsed().as_millis() as u64; let mut total_size_v = [0usize; C]; - let mut total_size = 0; + let mut total_size = 0usize; for i in 0..C { - total_size += hdr_v[i].n_recv; + total_size += hdr_v[i].iov.iter().map(|v| v.len()).sum::(); total_size_v[i] = total_size; } match self.limit.check_packets(dur_millis, &total_size_v) { DatagramLimitAction::Advance(n) => { match self.inner.poll_batch_recvmsg(cx, &mut hdr_v[0..n]) { Poll::Ready(Ok(count)) => { - let len = total_size_v[count]; + let len = hdr_v.iter().take(count).map(|h| h.n_recv).sum(); self.limit.set_advance(count, len); self.stats.add_recv_packets(count); self.stats.add_recv_bytes(len); @@ -211,6 +221,10 @@ where } } } + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() diff --git a/lib/g3-io-ext/src/udp/send.rs b/lib/g3-io-ext/src/udp/send.rs index 882ccd0c9..88a664e50 100644 --- a/lib/g3-io-ext/src/udp/send.rs +++ b/lib/g3-io-ext/src/udp/send.rs @@ -124,6 +124,10 @@ where Poll::Pending } }, + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() @@ -159,6 +163,10 @@ where Poll::Pending } }, + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() @@ -200,6 +208,10 @@ where Poll::Pending } }, + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() @@ -232,14 +244,14 @@ where let mut total_size_v = [0usize, C]; let mut total_size = 0; for i in 0..C { - total_size += msgs[i].n_send; + total_size += msgs[i].iov.iter().map(|v| v.len()).sum::(); total_size_v[i] = total_size; } match self.limit.check_packets(dur_millis, &total_size_v) { DatagramLimitAction::Advance(n) => { match self.inner.poll_batch_sendmsg(cx, &mut msgs[0..n]) { Poll::Ready(Ok(count)) => { - let len = total_size_v[count]; + let len = msgs.iter().take(count).map(|v| v.n_send).sum(); self.limit.set_advance(count, len); self.stats.add_send_packets(count); self.stats.add_send_bytes(len); @@ -255,6 +267,10 @@ where } } } + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + self.delay.poll_unpin(cx).map(|_| Ok(0)) + } DatagramLimitAction::DelayFor(ms) => { self.delay .as_mut() diff --git a/lib/g3-types/src/limit/datagram_speed.rs b/lib/g3-types/src/limit/datagram_speed.rs new file mode 100644 index 000000000..cb389aabc --- /dev/null +++ b/lib/g3-types/src/limit/datagram_speed.rs @@ -0,0 +1,99 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::time::Duration; + +use anyhow::anyhow; + +#[derive(Clone, Copy, Default, PartialEq, Eq)] +pub struct GlobalDatagramSpeedLimitConfig { + replenish_interval: Duration, + replenish_bytes: u64, + replenish_packets: u64, + max_burst_bytes: u64, + max_burst_packets: u64, +} + +impl GlobalDatagramSpeedLimitConfig { + pub fn per_second(bytes: u64) -> Self { + GlobalDatagramSpeedLimitConfig { + replenish_interval: Duration::from_secs(1), + replenish_bytes: bytes, + replenish_packets: 0, + max_burst_bytes: bytes, + max_burst_packets: 0, + } + } + + #[inline] + pub fn replenish_interval(&self) -> Duration { + self.replenish_interval + } + + pub fn set_replenish_interval(&mut self, interval: Duration) { + self.replenish_interval = interval; + } + + #[inline] + pub fn replenish_bytes(&self) -> u64 { + self.replenish_bytes + } + + pub fn set_replenish_bytes(&mut self, size: u64) { + self.replenish_bytes = size; + } + + #[inline] + pub fn replenish_packets(&self) -> u64 { + self.replenish_packets + } + + pub fn set_replenish_packets(&mut self, count: u64) { + self.replenish_packets = count; + } + + #[inline] + pub fn max_burst_bytes(&self) -> u64 { + self.max_burst_bytes + } + + pub fn set_max_burst_bytes(&mut self, size: u64) { + self.max_burst_bytes = size; + } + + #[inline] + pub fn max_burst_packets(&self) -> u64 { + self.max_burst_packets + } + + pub fn set_max_burst_packets(&mut self, count: u64) { + self.max_burst_packets = count; + } + + pub fn check(&mut self) -> anyhow::Result<()> { + if self.replenish_bytes == 0 && self.replenish_packets == 0 { + return Err(anyhow!("no replenish bytes/packets set")); + } + if self.max_burst_bytes < self.replenish_bytes { + self.max_burst_bytes = self.replenish_bytes; + } + if self.max_burst_packets < self.replenish_packets { + self.max_burst_packets = self.replenish_packets; + } + + Ok(()) + } +} diff --git a/lib/g3-types/src/limit/mod.rs b/lib/g3-types/src/limit/mod.rs index 13debfcc3..1d948991c 100644 --- a/lib/g3-types/src/limit/mod.rs +++ b/lib/g3-types/src/limit/mod.rs @@ -15,7 +15,13 @@ */ mod gauge_semaphore; -mod rate_limit_quota; - pub use gauge_semaphore::{GaugeSemaphore, GaugeSemaphoreAcquireError, GaugeSemaphorePermit}; + +mod rate_limit_quota; pub use rate_limit_quota::RateLimitQuotaConfig; + +mod stream_speed; +pub use stream_speed::GlobalStreamSpeedLimitConfig; + +mod datagram_speed; +pub use datagram_speed::GlobalDatagramSpeedLimitConfig; diff --git a/lib/g3-types/src/limit/stream_speed.rs b/lib/g3-types/src/limit/stream_speed.rs new file mode 100644 index 000000000..6545fefa6 --- /dev/null +++ b/lib/g3-types/src/limit/stream_speed.rs @@ -0,0 +1,74 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::time::Duration; + +use anyhow::anyhow; + +#[derive(Clone, Copy, Default, PartialEq, Eq)] +pub struct GlobalStreamSpeedLimitConfig { + replenish_interval: Duration, + replenish_bytes: u64, + max_burst_bytes: u64, +} + +impl GlobalStreamSpeedLimitConfig { + pub fn per_second(size: u64) -> Self { + GlobalStreamSpeedLimitConfig { + replenish_interval: Duration::from_secs(1), + replenish_bytes: size, + max_burst_bytes: size, + } + } + + #[inline] + pub fn replenish_interval(&self) -> Duration { + self.replenish_interval + } + + pub fn set_replenish_interval(&mut self, interval: Duration) { + self.replenish_interval = interval; + } + + #[inline] + pub fn replenish_bytes(&self) -> u64 { + self.replenish_bytes + } + + pub fn set_replenish_bytes(&mut self, size: u64) { + self.replenish_bytes = size; + } + + #[inline] + pub fn max_burst_bytes(&self) -> u64 { + self.max_burst_bytes + } + + pub fn set_max_burst_bytes(&mut self, size: u64) { + self.max_burst_bytes = size; + } + + pub fn check(&mut self) -> anyhow::Result<()> { + if self.replenish_bytes == 0 { + return Err(anyhow!("no replenish bytes set")); + } + if self.max_burst_bytes < self.replenish_bytes { + self.max_burst_bytes = self.replenish_bytes; + } + + Ok(()) + } +}