Skip to content

Commit

Permalink
refactor: enhance limit plugin for rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed May 13, 2024
1 parent 71e9d13 commit f83ae26
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 37 deletions.
14 changes: 7 additions & 7 deletions docs/plugin_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,37 @@ category = "stats"

## Limit

可基于cookie、请求头或query参数来限制并发访问,需要注意此限制只是限制并发数而非访问频率,以及若配置的字段获取到的值为空,则不限制。
可基于cookie、请求头或query参数来限制并发访问,需要注意此限制只是限制并发数而非访问频率,以及若配置的字段获取到的值为空,则不限制,支持`inflight``rate`两种限制类型

根据cookie的`bigtree`限制并发数为`10`:

```toml
[plugins.cookieBigTreeLimit]
value = "~bigtree 10"
value = "inflight key=cookie&value=bigtree&max=10"
category = "limit"
```

根据请求头的`X-App`参数限制并发数`10`:

```toml
[plugins.headerAppLimit]
value = ">X-App 10"
value = "inflight key=header&value=X-App&max=10"
category = "limit"
```

根据query中的`app`参数限制并发数`10`:
根据query中的`app`参数限制1秒钟仅能访问`10`:

```toml
[plugins.queryAppLimit]
value = "?query 10"
value = "rate key=query&value=app&max=10&interval=1s"
category = "limit"
```

根据ip限制并发数`10`(ip获取的顺序为X-Forwarded-For --> X-Real-Ip --> Remote Addr):
根据ip限制1分钟最多访问`50`(ip获取的顺序为X-Forwarded-For --> X-Real-Ip --> Remote Addr):

```toml
[plugins.ipLimit]
value = "ip 10"
value = "rate max=10&interval=1m"
category = "limit"
```

Expand Down
146 changes: 116 additions & 30 deletions src/plugin/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use crate::state::State;
use crate::util;
use async_trait::async_trait;
use http::StatusCode;
use humantime::parse_duration;
use log::debug;
use pingora::proxy::Session;
use pingora_limits::inflight::Inflight;
use substring::Substring;
use pingora_limits::rate::Rate;
use std::time::Duration;

#[derive(PartialEq, Debug)]
pub enum LimitTag {
Expand All @@ -37,39 +39,64 @@ pub struct Limiter {
tag: LimitTag,
max: isize,
value: String,
inflight: Inflight,
inflight: Option<Inflight>,
rate: Option<Rate>,
proxy_step: PluginStep,
}

impl Limiter {
pub fn new(value: &str, proxy_step: PluginStep) -> Result<Self> {
debug!("new limit proxy plugin, {value}, {proxy_step:?}");
let (key, max) = value.split_once(' ').ok_or(Error::Invalid {
let (category, limit_value) = value.split_once(' ').ok_or(Error::Invalid {
message: value.to_string(),
})?;
let max = max
.parse::<u32>()
.map_err(|e| Error::ParseInt { source: e })?;
if key.len() < 2 {
return Err(Error::Invalid {
message: key.to_string(),
});

let mut tag = LimitTag::Ip;
let mut key_value = "".to_string();
let mut max = 0;
let mut interval = Duration::from_secs(10);
for item in limit_value.split('&') {
let (key, value) = item.split_once('=').ok_or(Error::Invalid {
message: item.to_string(),
})?;
match key {
"key" => {
tag = match value {
"cookie" => LimitTag::Cookie,
"header" => LimitTag::RequestHeader,
"query" => LimitTag::Query,
_ => LimitTag::Ip,
};
}
"value" => key_value = value.to_string(),
"max" => {
max = value.parse::<isize>().map_err(|e| Error::Invalid {
message: e.to_string(),
})?;
}
"interval" => {
interval = parse_duration(value).map_err(|e| Error::Invalid {
message: e.to_string(),
})?;
}
_ => {}
};
}
let mut inflight = None;
let mut rate = None;
if category == "inflight" {
inflight = Some(Inflight::new());
} else {
rate = Some(Rate::new(interval));
}
let ch = key.substring(0, 1);
let value = key.substring(1, key.len());
let tag = match ch {
"~" => LimitTag::Cookie,
">" => LimitTag::RequestHeader,
"?" => LimitTag::Query,
_ => LimitTag::Ip,
};

Ok(Self {
tag,
proxy_step,
max: max as isize,
value: value.to_string(),
inflight: Inflight::new(),
max,
value: key_value,
inflight,
rate,
})
}
/// Increment `key` by 1. If value gt max, an error will be return.
Expand All @@ -96,14 +123,23 @@ impl Limiter {
if key.is_empty() {
return Ok(());
}
let (guard, value) = self.inflight.incr(key, 1);
let value = if let Some(rate) = &self.rate {
rate.observe(&key, 1);
let value = rate.rate(&key);
value as isize
} else if let Some(inflight) = &self.inflight {
let (guard, value) = inflight.incr(&key, 1);
ctx.guard = Some(guard);
value
} else {
0
};
if value > self.max {
return Err(Error::Exceed {
max: self.max,
value,
});
}
ctx.guard = Some(guard);
Ok(())
}
}
Expand Down Expand Up @@ -141,7 +177,9 @@ mod tests {
use http::StatusCode;
use pingora::proxy::Session;
use pretty_assertions::assert_eq;
use std::time::Duration;
use tokio_test::io::Builder;

async fn new_session() -> Session {
let headers = [
"Host: github.com",
Expand All @@ -161,7 +199,11 @@ mod tests {
}
#[tokio::test]
async fn test_new_cookie_limiter() {
let limiter = Limiter::new("~deviceId 10", PluginStep::Request).unwrap();
let limiter = Limiter::new(
"inflight key=cookie&value=deviceId&max=10",
PluginStep::Request,
)
.unwrap();
assert_eq!(LimitTag::Cookie, limiter.tag);
let mut ctx = State {
..Default::default()
Expand All @@ -173,7 +215,11 @@ mod tests {
}
#[tokio::test]
async fn test_new_req_header_limiter() {
let limiter = Limiter::new(">X-Uuid 10", PluginStep::Request).unwrap();
let limiter = Limiter::new(
"inflight key=header&value=X-Uuid&max=10",
PluginStep::Request,
)
.unwrap();
assert_eq!(LimitTag::RequestHeader, limiter.tag);
let mut ctx = State {
..Default::default()
Expand All @@ -185,7 +231,8 @@ mod tests {
}
#[tokio::test]
async fn test_new_query_limiter() {
let limiter = Limiter::new("?key 10", PluginStep::Request).unwrap();
let limiter =
Limiter::new("inflight key=query&value=key&max=10", PluginStep::Request).unwrap();
assert_eq!(LimitTag::Query, limiter.tag);
let mut ctx = State {
..Default::default()
Expand All @@ -197,7 +244,7 @@ mod tests {
}
#[tokio::test]
async fn test_new_ip_limiter() {
let limiter = Limiter::new("ip 10", PluginStep::Request).unwrap();
let limiter = Limiter::new("inflight max=10", PluginStep::Request).unwrap();
assert_eq!(LimitTag::Ip, limiter.tag);
let mut ctx = State {
..Default::default()
Expand All @@ -208,8 +255,8 @@ mod tests {
assert_eq!(true, ctx.guard.is_some());
}
#[tokio::test]
async fn test_limit() {
let limiter = Limiter::new("ip 0", PluginStep::Request).unwrap();
async fn test_inflight_limit() {
let limiter = Limiter::new("inflight max=0", PluginStep::Request).unwrap();

let headers = ["X-Forwarded-For: 1.1.1.1"].join("\r\n");
let input_header = format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
Expand All @@ -224,12 +271,51 @@ mod tests {
assert_eq!(true, result.is_some());
assert_eq!(StatusCode::TOO_MANY_REQUESTS, result.unwrap().status);

let limiter = Limiter::new("ip 1", PluginStep::Request).unwrap();
let limiter = Limiter::new("inflight max=1", PluginStep::Request).unwrap();
let result = limiter
.handle(&mut session, &mut State::default())
.await
.unwrap();

assert_eq!(true, result.is_none());
}

#[tokio::test]
async fn test_rate_limit() {
let limiter = Limiter::new("rate max=1&interval=1s", PluginStep::Request).unwrap();

let headers = ["X-Forwarded-For: 1.1.1.1"].join("\r\n");
let input_header = format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut session = Session::new_h1(Box::new(mock_io));
session.read_request().await.unwrap();
let result = limiter
.handle(&mut session, &mut State::default())
.await
.unwrap();

assert_eq!(true, result.is_none());

let _ = limiter
.handle(&mut session, &mut State::default())
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(1)).await;

let result = limiter
.handle(&mut session, &mut State::default())
.await
.unwrap();
assert_eq!(true, result.is_some());
assert_eq!(StatusCode::TOO_MANY_REQUESTS, result.unwrap().status);

tokio::time::sleep(Duration::from_secs(1)).await;

let result = limiter
.handle(&mut session, &mut State::default())
.await
.unwrap();
assert_eq!(true, result.is_none());
}
}

0 comments on commit f83ae26

Please sign in to comment.