From f95f0cc70e0929abba29fb62be65d28c18d95b97 Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Fri, 9 Aug 2024 16:56:27 -0700 Subject: [PATCH] Add support for gRPC-web module to bridge gRPC-web client requests to gRPC server requests --- .bleep | 2 +- pingora-core/src/modules/http/grpc_web.rs | 80 ++++ pingora-core/src/modules/http/mod.rs | 30 ++ .../src/protocols/http/bridge/grpc_web.rs | 341 ++++++++++++++++++ pingora-core/src/protocols/http/bridge/mod.rs | 15 + pingora-core/src/protocols/http/mod.rs | 1 + pingora-core/src/upstreams/peer.rs | 5 - pingora-http/src/lib.rs | 38 ++ pingora-proxy/examples/grpc_web_module.rs | 90 +++++ pingora-proxy/src/lib.rs | 15 +- pingora-proxy/src/proxy_h2.rs | 25 +- 11 files changed, 623 insertions(+), 19 deletions(-) create mode 100644 pingora-core/src/modules/http/grpc_web.rs create mode 100644 pingora-core/src/protocols/http/bridge/grpc_web.rs create mode 100644 pingora-core/src/protocols/http/bridge/mod.rs create mode 100644 pingora-proxy/examples/grpc_web_module.rs diff --git a/.bleep b/.bleep index 3fdcf461..c2d9bce9 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -9b92c0fed7b703c61415414c69ff196e9deb11eb \ No newline at end of file +761f676b044dcf0d34205f96921e3385ffac7810 \ No newline at end of file diff --git a/pingora-core/src/modules/http/grpc_web.rs b/pingora-core/src/modules/http/grpc_web.rs new file mode 100644 index 00000000..85b6ea64 --- /dev/null +++ b/pingora-core/src/modules/http/grpc_web.rs @@ -0,0 +1,80 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; +use crate::protocols::http::bridge::grpc_web::GrpcWebCtx; +use std::ops::{Deref, DerefMut}; + +/// gRPC-web bridge module, this will convert +/// HTTP/1.1 gRPC-web requests to H2 gRPC requests +#[derive(Default)] +pub struct GrpcWebBridge(GrpcWebCtx); + +impl Deref for GrpcWebBridge { + type Target = GrpcWebCtx; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for GrpcWebBridge { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[async_trait] +impl HttpModule for GrpcWebBridge { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> { + self.0.request_header_filter(req); + Ok(()) + } + + async fn response_header_filter( + &mut self, + resp: &mut ResponseHeader, + _end_of_stream: bool, + ) -> Result<()> { + self.0.response_header_filter(resp); + Ok(()) + } + + fn response_trailer_filter( + &mut self, + trailers: &mut Option>, + ) -> Result> { + if let Some(trailers) = trailers { + return self.0.response_trailer_filter(trailers); + } + Ok(None) + } +} + +/// The builder for gRPC-web bridge module +pub struct GrpcWeb; + +impl HttpModuleBuilder for GrpcWeb { + fn init(&self) -> Module { + Box::new(GrpcWebBridge::default()) + } +} diff --git a/pingora-core/src/modules/http/mod.rs b/pingora-core/src/modules/http/mod.rs index 02a3e0c3..f4b17ad9 100644 --- a/pingora-core/src/modules/http/mod.rs +++ b/pingora-core/src/modules/http/mod.rs @@ -19,9 +19,11 @@ //! See the [ResponseCompression] module for an example of how to implement a basic module. pub mod compression; +pub mod grpc_web; use async_trait::async_trait; use bytes::Bytes; +use http::HeaderMap; use once_cell::sync::OnceCell; use pingora_error::Result; use pingora_http::{RequestHeader, ResponseHeader}; @@ -61,6 +63,13 @@ pub trait HttpModule { Ok(()) } + fn response_trailer_filter( + &mut self, + _trailers: &mut Option>, + ) -> Result> { + Ok(None) + } + fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any; } @@ -226,6 +235,27 @@ impl HttpModuleCtx { } Ok(()) } + + /// Run the `response_trailer_filter` for all the modules according to their orders. + /// + /// Returns an `Option` which can be used to write response trailers into + /// the response body. Note, if multiple modules attempt to write trailers into + /// the body the last one will be used. + /// + /// Implementors that intend to write trailers into the body need to ensure their filter + /// is using an encoding that supports this. + pub fn response_trailer_filter( + &mut self, + trailers: &mut Option>, + ) -> Result> { + let mut encoded = None; + for filter in self.module_ctx.iter_mut() { + if let Some(buf) = filter.response_trailer_filter(trailers)? { + encoded = Some(buf); + } + } + Ok(encoded) + } } #[cfg(test)] diff --git a/pingora-core/src/protocols/http/bridge/grpc_web.rs b/pingora-core/src/protocols/http/bridge/grpc_web.rs new file mode 100644 index 00000000..b5737e81 --- /dev/null +++ b/pingora-core/src/protocols/http/bridge/grpc_web.rs @@ -0,0 +1,341 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::{BufMut, Bytes, BytesMut}; +use http::{ + header::{CONTENT_LENGTH, CONTENT_TYPE, TRANSFER_ENCODING}, + HeaderMap, +}; +use pingora_error::{ErrorType::ReadError, OrErr, Result}; +use pingora_http::{RequestHeader, ResponseHeader}; + +/// Used for bridging gRPC to gRPC-web and vice-versa. +/// See gRPC-web [spec](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) and +/// gRPC h2 [spec](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) for more details. +#[derive(Default, PartialEq, Debug)] +pub enum GrpcWebCtx { + #[default] + Disabled, + Init, + Upgrade, + Trailers, + Done, +} + +const GRPC: &str = "application/grpc"; +const GRPC_WEB: &str = "application/grpc-web"; + +impl GrpcWebCtx { + pub fn init(&mut self) { + *self = Self::Init; + } + + /// gRPC-web request is fed into this filter, if the module is initialized + /// we attempt to convert it to a gRPC request + pub fn request_header_filter(&mut self, req: &mut RequestHeader) { + if *self != Self::Init { + // not enabled + return; + } + + let content_type = req + .headers + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or_default(); + + // check we have a valid grpc-web prefix + if !(content_type.len() >= GRPC_WEB.len() + && content_type[..GRPC_WEB.len()].eq_ignore_ascii_case(GRPC_WEB)) + { + // not gRPC-web + return; + } + + // change content type to grpc + let ct = content_type.to_lowercase().replace(GRPC_WEB, GRPC); + req.insert_header(CONTENT_TYPE, ct).expect("insert header"); + + // The 'te' request header is used to detect incompatible proxies + // which are supposed to remove 'te' if it is unsupported. + // This header is required by gRPC over h2 protocol. + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + req.insert_header("te", "trailers").expect("insert header"); + + // For gRPC requests, EOS (end-of-stream) is indicated by the presence of the + // END_STREAM flag on the last received DATA frame. + // In scenarios where the Request stream needs to be closed + // but no data remains to be sent implementations + // MUST send an empty DATA frame with this flag set. + req.set_send_end_stream(false); + + *self = Self::Upgrade + } + + /// gRPC response is fed into this filter, if the module is in the bridge state + /// attempt to convert the response it to a gRPC-web response + pub fn response_header_filter(&mut self, resp: &mut ResponseHeader) { + if *self != Self::Upgrade { + // not an upgrade + return; + } + + if resp.status.is_informational() { + // proxy informational statuses through + return; + } + + let content_type = resp + .headers + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or_default(); + + // upstream h2, no reason to normalize case + if !content_type.starts_with(GRPC) { + // not gRPC + *self = Self::Disabled; + return; + } + + // change content type to gRPC-web + let ct = content_type.replace(GRPC, GRPC_WEB); + resp.insert_header(CONTENT_TYPE, ct).expect("insert header"); + + // always use chunked for gRPC-web + resp.remove_header(&CONTENT_LENGTH); + resp.insert_header(TRANSFER_ENCODING, "chunked") + .expect("insert header"); + + *self = Self::Trailers + } + + /// Used to convert gRPC trailers into gRPC-web trailers, note + /// gRPC-web trailers are encoded into the response body so we return + /// the encoded bytes here. + pub fn response_trailer_filter( + &mut self, + resp_trailers: &mut HeaderMap, + ) -> Result> { + /* Trailer header frame and trailer headers + 0 - - 1 - - 2 - - 3 - - 4 - - 5 - - 6 - - 7 - - 8 + | Ind | Length | Headers | <- trailer header indicator, length of headers + | Headers | <- rest is headers + | Headers | + */ + // TODO compressed trailer? + // grpc-web trailers frame head + const GRPC_WEB_TRAILER: u8 = 0x80; + + // number of bytes in trailer header + const GRPC_TRAILER_HEADER_LEN: usize = 5; + + // just some estimate + const DEFAULT_TRAILER_BUFFER_SIZE: usize = 256; + + if *self != Self::Trailers { + // not an upgrade + *self = Self::Disabled; + return Ok(None); + } + + // trailers are expected to arrive all at once encoded into a single trailers frame + // trailers in frame are separated by CRLFs + let mut buf = BytesMut::with_capacity(DEFAULT_TRAILER_BUFFER_SIZE); + let mut trailers = buf.split_off(GRPC_TRAILER_HEADER_LEN); + + // iterate the key/value pairs and encode them into the tmp buffer + for (key, value) in resp_trailers.iter() { + // encode header + trailers.put_slice(key.as_ref()); + trailers.put_slice(b":"); + + // encode value + trailers.put_slice(value.as_ref()); + + // encode header separator + trailers.put_slice(b"\r\n"); + } + + // ensure trailer length within u32 + let len = trailers.len().try_into().or_err_with(ReadError, || { + format!("invalid gRPC trailer length: {}", trailers.len()) + })?; + buf.put_u8(GRPC_WEB_TRAILER); + buf.put_u32(len); + buf.unsplit(trailers); + + *self = Self::Done; + Ok(Some(buf.freeze())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::{request::Request, response::Response, Version}; + + #[test] + fn non_grpc_web_request_ignored() { + let request = Request::get("https://pingora.dev/") + .header(CONTENT_TYPE, "application/grpc-we") + .version(Version::HTTP_2) // only set this to verify send_end_stream is configured + .body(()) + .unwrap(); + let mut request = request.into_parts().0.into(); + + let mut filter = GrpcWebCtx::default(); + filter.init(); + filter.request_header_filter(&mut request); + assert_eq!(filter, GrpcWebCtx::Init); + + let headers = &request.headers; + assert_eq!(headers.get("te"), None); + assert_eq!(headers.get("application/grpc"), None); + assert_eq!(request.send_end_stream(), Some(true)); + } + + #[test] + fn grpc_web_request_module_disabled_ignored() { + let request = Request::get("https://pingora.dev/") + .header(CONTENT_TYPE, "application/grpc-web") + .version(Version::HTTP_2) // only set this to verify send_end_stream is configured + .body(()) + .unwrap(); + let mut request = request.into_parts().0.into(); + + // do not init + let mut filter = GrpcWebCtx::default(); + filter.request_header_filter(&mut request); + assert_eq!(filter, GrpcWebCtx::Disabled); + + let headers = &request.headers; + assert_eq!(headers.get("te"), None); + assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "application/grpc-web"); + assert_eq!(request.send_end_stream(), Some(true)); + } + + #[test] + fn grpc_web_request_upgrade() { + let request = Request::get("https://pingora.org/") + .header(CONTENT_TYPE, "application/gRPC-web+thrift") + .version(Version::HTTP_2) // only set this to verify send_end_stream is configured + .body(()) + .unwrap(); + let mut request = request.into_parts().0.into(); + + let mut filter = GrpcWebCtx::default(); + filter.init(); + filter.request_header_filter(&mut request); + assert_eq!(filter, GrpcWebCtx::Upgrade); + + let headers = &request.headers; + assert_eq!(headers.get("te").unwrap(), "trailers"); + assert_eq!( + headers.get(CONTENT_TYPE).unwrap(), + "application/grpc+thrift" + ); + assert_eq!(request.send_end_stream(), Some(false)); + } + + #[test] + fn non_grpc_response_ignored() { + let response = Response::builder() + .header(CONTENT_TYPE, "text/html") + .header(CONTENT_LENGTH, "10") + .body(()) + .unwrap(); + let mut response = response.into_parts().0.into(); + + let mut filter = GrpcWebCtx::Upgrade; + filter.response_header_filter(&mut response); + assert_eq!(filter, GrpcWebCtx::Disabled); + + let headers = &response.headers; + assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "text/html"); + assert_eq!(headers.get(CONTENT_LENGTH).unwrap(), "10"); + } + + #[test] + fn grpc_response_module_disabled_ignored() { + let response = Response::builder() + .header(CONTENT_TYPE, "application/grpc") + .body(()) + .unwrap(); + let mut response = response.into_parts().0.into(); + + let mut filter = GrpcWebCtx::default(); + filter.response_header_filter(&mut response); + assert_eq!(filter, GrpcWebCtx::Disabled); + + let headers = &response.headers; + assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "application/grpc"); + } + + #[test] + fn grpc_response_upgrade() { + let response = Response::builder() + .header(CONTENT_TYPE, "application/grpc+proto") + .header(CONTENT_LENGTH, "0") + .body(()) + .unwrap(); + let mut response = response.into_parts().0.into(); + + let mut filter = GrpcWebCtx::Upgrade; + filter.response_header_filter(&mut response); + assert_eq!(filter, GrpcWebCtx::Trailers); + + let headers = &response.headers; + assert_eq!( + headers.get(CONTENT_TYPE).unwrap(), + "application/grpc-web+proto" + ); + assert_eq!(headers.get(TRANSFER_ENCODING).unwrap(), "chunked"); + assert!(headers.get(CONTENT_LENGTH).is_none()); + } + + #[test] + fn grpc_response_informational_proxied() { + let response = Response::builder().status(100).body(()).unwrap(); + let mut response = response.into_parts().0.into(); + + let mut filter = GrpcWebCtx::Upgrade; + filter.response_header_filter(&mut response); + assert_eq!(filter, GrpcWebCtx::Upgrade); // still upgrade + } + + #[test] + fn grpc_response_trailer_headers_convert_to_byte_buf() { + let mut response = Response::builder() + .header("grpc-status", "0") + .header("grpc-message", "OK") + .body(()) + .unwrap(); + let response = response.headers_mut(); + + let mut filter = GrpcWebCtx::Trailers; + let buf = filter.response_trailer_filter(response).unwrap().unwrap(); + assert_eq!(filter, GrpcWebCtx::Done); + + let expected = b"grpc-status:0\r\ngrpc-message:OK\r\n"; + let expected_len: u32 = expected.len() as u32; // 32 bytes + + // assert the length prefix message frame + // [1 byte (header)| 4 byte (length) | 15 byte (grpc-status:0\r\n) | 17 bytes (grpc-message:OK\r\n)] + assert_eq!(0x80, buf[0]); // frame should start with trailer header + assert_eq!(expected_len.to_be_bytes(), buf[1..5]); // next 4 bytes length of trailer + assert_eq!(expected[..15], buf[5..20]); // grpc-status:0\r\n (15 bytes) + assert_eq!(expected[15..], buf[20..]); // grpc-message:OK\r\n (17 bytes) + } +} diff --git a/pingora-core/src/protocols/http/bridge/mod.rs b/pingora-core/src/protocols/http/bridge/mod.rs new file mode 100644 index 00000000..f40d5f52 --- /dev/null +++ b/pingora-core/src/protocols/http/bridge/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod grpc_web; diff --git a/pingora-core/src/protocols/http/mod.rs b/pingora-core/src/protocols/http/mod.rs index a2d95402..94814ebd 100644 --- a/pingora-core/src/protocols/http/mod.rs +++ b/pingora-core/src/protocols/http/mod.rs @@ -15,6 +15,7 @@ //! HTTP/1.x and HTTP/2 implementation APIs mod body_buffer; +pub mod bridge; pub mod client; pub mod compression; pub mod conditional_filter; diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index 78eb762e..7b808575 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -309,7 +309,6 @@ pub struct PeerOptions { pub tcp_keepalive: Option, pub tcp_recv_buf: Option, pub dscp: Option, - pub no_header_eos: bool, pub h2_ping_interval: Option, // how many concurrent h2 stream are allowed in the same connection pub max_h2_streams: usize, @@ -345,7 +344,6 @@ impl PeerOptions { tcp_keepalive: None, tcp_recv_buf: None, dscp: None, - no_header_eos: false, h2_ping_interval: None, max_h2_streams: 1, extra_proxy_headers: BTreeMap::new(), @@ -397,9 +395,6 @@ impl Display for PeerOptions { if let Some(tcp_keepalive) = &self.tcp_keepalive { write!(f, "tcp_keepalive: {},", tcp_keepalive)?; } - if self.no_header_eos { - write!(f, "no_header_eos: true,")?; - } if let Some(h2_ping_interval) = self.h2_ping_interval { write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?; } diff --git a/pingora-http/src/lib.rs b/pingora-http/src/lib.rs index 71f81ee4..d57998d1 100644 --- a/pingora-http/src/lib.rs +++ b/pingora-http/src/lib.rs @@ -69,6 +69,8 @@ pub struct RequestHeader { header_name_map: Option, // store the raw path bytes only if it is invalid utf-8 raw_path_fallback: Vec, // can also be Box<[u8]> + // whether we send END_STREAM with HEADERS for h2 requests + send_end_stream: bool, } impl AsRef for RequestHeader { @@ -93,6 +95,7 @@ impl RequestHeader { base, header_name_map: None, raw_path_fallback: vec![], + send_end_stream: true, } } @@ -211,6 +214,20 @@ impl RequestHeader { self.base.uri = uri; } + /// Set whether we send an END_STREAM on H2 request HEADERS if body is empty. + pub fn set_send_end_stream(&mut self, send_end_stream: bool) { + self.send_end_stream = send_end_stream; + } + + /// Returns if we support sending an END_STREAM on H2 request HEADERS if body is empty, + /// returns None if not H2. + pub fn send_end_stream(&self) -> Option { + if self.base.version != Version::HTTP_2 { + return None; + } + Some(self.send_end_stream) + } + /// Return the request path in its raw format /// /// Non-UTF8 is supported. @@ -256,6 +273,7 @@ impl Clone for RequestHeader { base: self.as_owned_parts(), header_name_map: self.header_name_map.clone(), raw_path_fallback: self.raw_path_fallback.clone(), + send_end_stream: self.send_end_stream, } } } @@ -268,6 +286,7 @@ impl From for RequestHeader { header_name_map: None, // no illegal path raw_path_fallback: vec![], + send_end_stream: true, } } } @@ -716,4 +735,23 @@ mod tests { let reason = resp.get_reason_phrase().unwrap(); assert_eq!(reason, "OK"); } + + #[test] + fn set_test_send_end_stream() { + let mut req = RequestHeader::build("GET", b"/", None).unwrap(); + req.set_send_end_stream(true); + + // None for requests that are not h2 + assert!(req.send_end_stream().is_none()); + + let mut req = RequestHeader::build("GET", b"/", None).unwrap(); + req.set_version(Version::HTTP_2); + + // Some(true) by default for h2 + assert!(req.send_end_stream().unwrap()); + + req.set_send_end_stream(false); + // Some(false) + assert!(!req.send_end_stream().unwrap()); + } } diff --git a/pingora-proxy/examples/grpc_web_module.rs b/pingora-proxy/examples/grpc_web_module.rs new file mode 100644 index 00000000..ecd11fd1 --- /dev/null +++ b/pingora-proxy/examples/grpc_web_module.rs @@ -0,0 +1,90 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use clap::Parser; + +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_core::{ + modules::http::{ + grpc_web::{GrpcWeb, GrpcWebBridge}, + HttpModules, + }, + prelude::Opt, +}; +use pingora_proxy::{ProxyHttp, Session}; + +/// This example shows how to use the gRPC-web bridge module + +pub struct GrpcWebBridgeProxy; + +#[async_trait] +impl ProxyHttp for GrpcWebBridgeProxy { + type CTX = (); + fn new_ctx(&self) -> Self::CTX {} + + fn init_downstream_modules(&self, modules: &mut HttpModules) { + // Add the gRPC web module + modules.add_module(Box::new(GrpcWeb)) + } + + async fn early_request_filter( + &self, + session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<()> { + let grpc = session + .downstream_modules_ctx + .get_mut::() + .expect("GrpcWebBridge module added"); + + // initialize gRPC module for this request + grpc.init(); + Ok(()) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + // this needs to be your gRPC server + let grpc_peer = Box::new(HttpPeer::new( + ("1.1.1.1", 443), + true, + "one.one.one.one".to_string(), + )); + Ok(grpc_peer) + } +} + +// RUST_LOG=INFO cargo run --example grpc_web_module + +fn main() { + env_logger::init(); + + // read command line arguments + let opt = Opt::parse(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = + pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy); + my_proxy.add_tcp("0.0.0.0:6194"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 890af982..56211708 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -405,7 +405,20 @@ impl Session { self.downstream_modules_ctx .response_body_filter(data, *end)?; } - _ => { /* HttpModules doesn't handle trailer yet */ } + HttpTask::Trailer(trailers) => { + if let Some(buf) = self + .downstream_modules_ctx + .response_trailer_filter(trailers)? + { + // Write the trailers into the body if the filter + // returns a buffer. + // + // Note, this will not work if end of stream has already + // been seen or we've written content-length bytes. + *task = HttpTask::Body(Some(buf), true); + } + } + _ => { /* Done or Failed */ } } } self.downstream_session.response_duplex_vec(tasks).await diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 5216ee9b..07501607 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -124,6 +124,9 @@ impl HttpProxy { session.upstream_compression.request_filter(&req); let body_empty = session.as_mut().is_body_empty(); + // whether we support sending END_STREAM on HEADERS if body is empty + let send_end_stream = req.send_end_stream().expect("req must be h2"); + let mut req: http::request::Parts = req.into(); // H2 requires authority to be set, so copy that from H1 host if that is set @@ -133,27 +136,25 @@ impl HttpProxy { } } - debug!("Request to h2: {:?}", req); + debug!("Request to h2: {req:?}"); - // don't send END_STREAM on HEADERS for no_header_eos - let send_header_eos = !peer.options.no_header_eos && body_empty; + // send END_STREAM on HEADERS + let send_header_eos = send_end_stream && body_empty; + debug!("send END_STREAM on HEADERS: {send_end_stream}"); let req = Box::new(RequestHeader::from(req)); - match client_session.write_request_header(req, send_header_eos) { - Ok(v) => v, - Err(e) => { - return (false, Some(e.into_up())); - } - }; + if let Err(e) = client_session.write_request_header(req, send_header_eos) { + return (false, Some(e.into_up())); + } - // send END_STREAM on empty DATA frame for no_headers_eos - if peer.options.no_header_eos && body_empty { + if !send_end_stream && body_empty { + // send END_STREAM on empty DATA frame match client_session.write_request_body(Bytes::new(), true) { Ok(()) => debug!("sent empty DATA frame to h2"), Err(e) => { return (false, Some(e.into_up())); } - }; + } } client_session.read_timeout = peer.options.read_timeout;