Skip to content

Commit

Permalink
g3proxy: send small http body along with header
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq committed Nov 17, 2024
1 parent 35b344e commit ce06bfb
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 114 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions g3proxy/src/escape/direct_fixed/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req).await
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
8 changes: 6 additions & 2 deletions g3proxy/src/escape/direct_float/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if self.bind.is_expired() {
Err(io::Error::other("connection has expired"))
} else {
send_req_header_to_origin(&mut self.inner, req).await
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
}
15 changes: 12 additions & 3 deletions g3proxy/src/escape/proxy_float/peer/http/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
Expand All @@ -124,6 +128,7 @@ where
send_req_header_via_proxy(
&mut self.inner,
req,
body,
&self.upstream,
&self.config.append_http_headers,
None,
Expand Down Expand Up @@ -205,13 +210,17 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
return Err(io::Error::other("connection has expired"));
}
}
send_req_header_to_origin(&mut self.inner, req).await
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
15 changes: 12 additions & 3 deletions g3proxy/src/escape/proxy_float/peer/https/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ where
self.inner.reset_stats(Arc::new(wrapper_stats));
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
Expand All @@ -115,6 +119,7 @@ where
send_req_header_via_proxy(
&mut self.inner,
req,
body,
&self.upstream,
&self.config.append_http_headers,
None,
Expand Down Expand Up @@ -184,13 +189,17 @@ where
self.inner.reset_stats(Arc::new(wrapper_stats));
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
return Err(io::Error::other("connection has expired"));
}
}
send_req_header_to_origin(&mut self.inner, req).await
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,17 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
return Err(io::Error::other("connection has expired"));
}
}
send_req_header_to_origin(&mut self.inner, req).await
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,17 @@ where
self.inner.reset_stats(Arc::new(wrapper_stats));
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
if let Some(expire) = &self.config.expire_instant {
let now = Instant::now();
if expire.checked_duration_since(now).is_none() {
return Err(io::Error::other("connection has expired"));
}
}
send_req_header_to_origin(&mut self.inner, req).await
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
15 changes: 12 additions & 3 deletions g3proxy/src/escape/proxy_http/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,16 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
let userid = self.pass_userid.as_deref();
send_req_header_via_proxy(
&mut self.inner,
req,
body,
&self.upstream,
&self.config.append_http_headers,
userid,
Expand Down Expand Up @@ -202,7 +207,11 @@ where
}
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req).await
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
15 changes: 12 additions & 3 deletions g3proxy/src/escape/proxy_https/http_forward/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,16 @@ where
self.inner.reset_stats(Arc::new(wrapper_stats));
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
let userid = self.pass_userid.as_deref();
send_req_header_via_proxy(
&mut self.inner,
req,
body,
&self.upstream,
&self.config.append_http_headers,
userid,
Expand Down Expand Up @@ -181,7 +186,11 @@ where
self.inner.reset_stats(Arc::new(wrapper_stats));
}

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req).await
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()> {
send_req_header_to_origin(&mut self.inner, req, body).await
}
}
8 changes: 6 additions & 2 deletions g3proxy/src/module/http_forward/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ pub(crate) trait HttpForwardWrite: AsyncWrite {
user_stats: Vec<Arc<UserUpstreamTrafficStats>>,
);

async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()>;
async fn send_request_header(
&mut self,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()>;
}

#[async_trait]
Expand Down Expand Up @@ -107,6 +111,6 @@ impl AsyncWrite for HttpForwardWriterForAdaptation<'_> {

impl HttpRequestUpstreamWriter<HttpProxyClientRequest> for HttpForwardWriterForAdaptation<'_> {
async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> {
self.inner.send_request_header(req).await
self.inner.send_request_header(req, None).await
}
}
31 changes: 26 additions & 5 deletions g3proxy/src/module/http_forward/connection/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
* limitations under the License.
*/

use std::io;
use std::io::{self, IoSlice};

use g3_io_ext::LimitedWriteExt;
use g3_types::net::UpstreamAddr;

use bytes::BufMut;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use g3_types::net::UpstreamAddr;

use super::HttpProxyClientRequest;
use crate::module::http_header;

pub(crate) async fn send_req_header_via_proxy<W>(
writer: &mut W,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
upstream: &UpstreamAddr,
append_header_lines: &[String],
pass_userid: Option<&str>,
Expand All @@ -45,16 +47,35 @@ where
}
buf.put_slice(b"\r\n");

writer.write_all(buf.as_ref()).await
send_request_header(writer, buf.as_slice(), body).await
}

pub(crate) async fn send_req_header_to_origin<W>(
writer: &mut W,
req: &HttpProxyClientRequest,
body: Option<&[u8]>,
) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
let buf = req.serialize_for_origin();
writer.write_all(buf.as_ref()).await
send_request_header(writer, buf.as_slice(), body).await
}

async fn send_request_header<W>(
writer: &mut W,
header: &[u8],
body: Option<&[u8]>,
) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
if let Some(body) = body {
writer
.write_all_vectored([IoSlice::new(header), IoSlice::new(body)])
.await?;
Ok(())
} else {
writer.write_all(header).await
}
}
4 changes: 2 additions & 2 deletions g3proxy/src/module/http_forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ mod task;

pub(crate) use connection::{
send_req_header_to_origin, send_req_header_via_proxy, BoxHttpForwardConnection,
BoxHttpForwardReader, BoxHttpForwardWriter, HttpConnectionEofPoller, HttpForwardRead,
HttpForwardWrite, HttpForwardWriterForAdaptation,
BoxHttpForwardReader, HttpConnectionEofPoller, HttpForwardRead, HttpForwardWrite,
HttpForwardWriterForAdaptation,
};
pub(crate) use context::{
BoxHttpForwardContext, DirectHttpForwardContext, FailoverHttpForwardContext,
Expand Down
Loading

0 comments on commit ce06bfb

Please sign in to comment.