From 28ea2c62de5fd7b9c8d4fcd03decfd0e0e1db09e Mon Sep 17 00:00:00 2001 From: Eloi DEMOLIS Date: Tue, 7 Jan 2025 18:18:46 +0100 Subject: [PATCH] Refactor backend logic with an Origin struct I feel like going it's nowhere... Backend logic is messy and unclear, buffers are needlessly kept alive, borrowing subsets of the HTTP session is harder and harder, reseting default answers request is way too hard for what it's worth (it's probably broken right now), gauges are desynched and we are cloning again and again String ids... Signed-off-by: Eloi DEMOLIS --- command/assets/custom_200.html | 1 - command/assets/custom_404.html | 1 - command/assets/custom_503.html | 1 - command/src/state.rs | 1 + e2e/src/http_utils/mod.rs | 15 +- e2e/src/tests/tests.rs | 20 +- lib/src/http.rs | 25 +- lib/src/https.rs | 23 +- lib/src/lib.rs | 4 +- lib/src/protocol/kawa_h1/answers.rs | 83 +++-- lib/src/protocol/kawa_h1/editor.rs | 2 + lib/src/protocol/kawa_h1/mod.rs | 544 +++++++++++++--------------- lib/src/router/mod.rs | 5 +- lib/src/tcp.rs | 12 +- 14 files changed, 360 insertions(+), 377 deletions(-) diff --git a/command/assets/custom_200.html b/command/assets/custom_200.html index 73e5926d9..12f2ca492 100644 --- a/command/assets/custom_200.html +++ b/command/assets/custom_200.html @@ -1,5 +1,4 @@ HTTP/1.1 200 OK -%Content-Length: %CONTENT_LENGTH Sozu-Id: %REQUEST_ID

%CLUSTER_ID Custom 200

diff --git a/command/assets/custom_404.html b/command/assets/custom_404.html index 8302d4319..e331b285e 100644 --- a/command/assets/custom_404.html +++ b/command/assets/custom_404.html @@ -1,6 +1,5 @@ HTTP/1.1 404 Not Found Cache-Control: no-cache -Connection: close Sozu-Id: %REQUEST_ID

My own 404 error page

diff --git a/command/assets/custom_503.html b/command/assets/custom_503.html index ac12b8825..4484f8a4e 100644 --- a/command/assets/custom_503.html +++ b/command/assets/custom_503.html @@ -1,7 +1,6 @@ HTTP/1.1 503 Service Unavailable Cache-Control: no-cache Connection: close -%Content-Length: %CONTENT_LENGTH Sozu-Id: %REQUEST_ID

MyCluster: 503 Service Unavailable

diff --git a/command/src/state.rs b/command/src/state.rs index 9413fa203..0d6524b23 100644 --- a/command/src/state.rs +++ b/command/src/state.rs @@ -1724,6 +1724,7 @@ mod tests { hostname: String::from("test.local"), path: PathRule::prefix(String::from("/abc")), address: SocketAddress::new_v4(0, 0, 0, 0, 8080), + required_auth: Some(false), redirect: Some(RedirectPolicy::Forward.into()), redirect_scheme: Some(RedirectScheme::UseSame.into()), ..Default::default() diff --git a/e2e/src/http_utils/mod.rs b/e2e/src/http_utils/mod.rs index 9e6248df2..345ebc1a3 100644 --- a/e2e/src/http_utils/mod.rs +++ b/e2e/src/http_utils/mod.rs @@ -24,12 +24,17 @@ pub fn http_request, S2: Into, S3: Into, S4: In ) } -pub fn immutable_answer(status: u16) -> String { +pub fn immutable_answer(status: u16, content_length: bool) -> String { + let content_length = if content_length { + "\r\nContent-Length: 0" + } else { + "" + }; match status { - 400 => String::from("HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"), - 404 => String::from("HTTP/1.1 404 Not Found\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"), - 502 => String::from("HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"), - 503 => String::from("HTTP/1.1 503 Service Unavailable\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"), + 400 => format!("HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"), + 404 => format!("HTTP/1.1 404 Not Found\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"), + 502 => format!("HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"), + 503 => format!("HTTP/1.1 503 Service Unavailable\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"), _ => unimplemented!() } } diff --git a/e2e/src/tests/tests.rs b/e2e/src/tests/tests.rs index 89727f53e..4fb54f908 100644 --- a/e2e/src/tests/tests.rs +++ b/e2e/src/tests/tests.rs @@ -645,10 +645,10 @@ fn try_http_behaviors() -> State { .to_http(None) .unwrap(); http_config.answers = BTreeMap::from([ - ("400".to_string(), immutable_answer(400)), - ("404".to_string(), immutable_answer(404)), - ("502".to_string(), immutable_answer(502)), - ("503".to_string(), immutable_answer(503)), + ("400".to_string(), immutable_answer(400, false)), + ("404".to_string(), immutable_answer(404, false)), + ("502".to_string(), immutable_answer(502, false)), + ("503".to_string(), immutable_answer(503, false)), ]); worker.send_proxy_request_type(RequestType::AddHttpListener(http_config)); @@ -671,7 +671,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(immutable_answer(404))); + assert_eq!(response, Some(immutable_answer(404, true))); assert_eq!(client.receive(), None); worker.send_proxy_request_type(RequestType::AddHttpFrontend(RequestHttpFrontend { @@ -686,7 +686,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(immutable_answer(503))); + assert_eq!(response, Some(immutable_answer(503, true))); assert_eq!(client.receive(), None); let back_address = create_local_address(); @@ -706,7 +706,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(immutable_answer(400))); + assert_eq!(response, Some(immutable_answer(400, true))); assert_eq!(client.receive(), None); let mut backend = SyncBackend::new("backend", back_address, "TEST\r\n\r\n"); @@ -723,7 +723,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("request: {request:?}"); println!("response: {response:?}"); - assert_eq!(response, Some(immutable_answer(502))); + assert_eq!(response, Some(immutable_answer(502, true))); assert_eq!(client.receive(), None); info!("expecting 200"); @@ -786,7 +786,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("request: {request:?}"); println!("response: {response:?}"); - assert_eq!(response, Some(immutable_answer(503))); + assert_eq!(response, Some(immutable_answer(503, true))); assert_eq!(client.receive(), None); worker.send_proxy_request_type(RequestType::RemoveBackend(RemoveBackend { @@ -984,7 +984,7 @@ fn try_https_redirect() -> State { client.connect(); client.send(); let answer = client.receive(); - let expected_answer = format!("{answer_301_prefix}https://example.com/redirected?true\r\n\r\n"); + let expected_answer = format!("{answer_301_prefix}https://example.com/redirected?true\r\nContent-Length: 0\r\n\r\n"); assert_eq!(answer, Some(expected_answer)); State::Success diff --git a/lib/src/http.rs b/lib/src/http.rs index a430c1f17..8c2b20c07 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -209,11 +209,14 @@ impl HttpSession { } } - fn upgrade_http(&mut self, http: Http) -> Option { + fn upgrade_http( + &mut self, + mut http: Http, + ) -> Option { debug!("http switching to ws"); - let front_token = self.frontend_token; - let back_token = match http.backend_token { - Some(back_token) => back_token, + let frontend_token = self.frontend_token; + let origin = match http.origin.take() { + Some(origin) => origin, None => { warn!( "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'", @@ -223,7 +226,7 @@ impl HttpSession { } }; - let ws_context = http.websocket_context(); + let websocket_context = http.websocket_context(); let mut container_frontend_timeout = http.container_frontend_timeout; let mut container_backend_timeout = http.container_backend_timeout; container_frontend_timeout.reset(); @@ -237,25 +240,25 @@ impl HttpSession { let mut pipe = Pipe::new( backend_buffer, - http.context.backend_id, - http.backend_socket, - http.backend, + Some(origin.backend_id), + Some(origin.socket), + Some(origin.backend), Some(container_backend_timeout), Some(container_frontend_timeout), http.context.cluster_id, http.request_stream.storage.buffer, - front_token, + frontend_token, http.frontend_socket, self.listener.clone(), Protocol::HTTP, http.context.id, http.context.session_address, - ws_context, + websocket_context, ); pipe.frontend_readiness.event = http.frontend_readiness.event; pipe.backend_readiness.event = http.backend_readiness.event; - pipe.set_back_token(back_token); + pipe.set_back_token(origin.token); gauge_add!("protocol.http", -1); gauge_add!("protocol.ws", 1); diff --git a/lib/src/https.rs b/lib/src/https.rs index abaf162c0..58e632ef3 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -329,11 +329,14 @@ impl HttpsSession { } } - fn upgrade_http(&self, http: Http) -> Option { + fn upgrade_http( + &self, + mut http: Http, + ) -> Option { debug!("https switching to wss"); let front_token = self.frontend_token; - let back_token = match http.backend_token { - Some(back_token) => back_token, + let origin = match http.origin.take() { + Some(origin) => origin, None => { warn!( "Could not upgrade https request on cluster '{:?}' ({:?}) using backend '{:?}' into secure websocket for request '{}'", @@ -343,7 +346,7 @@ impl HttpsSession { } }; - let ws_context = http.websocket_context(); + let websocket_context = http.websocket_context(); let mut container_frontend_timeout = http.container_frontend_timeout; let mut container_backend_timeout = http.container_backend_timeout; container_frontend_timeout.reset(); @@ -357,9 +360,9 @@ impl HttpsSession { let mut pipe = Pipe::new( backend_buffer, - http.context.backend_id, - http.backend_socket, - http.backend, + Some(origin.backend_id), + Some(origin.socket), + Some(origin.backend), Some(container_backend_timeout), Some(container_frontend_timeout), http.context.cluster_id, @@ -367,15 +370,15 @@ impl HttpsSession { front_token, http.frontend_socket, self.listener.clone(), - Protocol::HTTP, + Protocol::HTTPS, http.context.id, http.context.session_address, - ws_context, + websocket_context, ); pipe.frontend_readiness.event = http.frontend_readiness.event; pipe.backend_readiness.event = http.backend_readiness.event; - pipe.set_back_token(back_token); + pipe.set_back_token(origin.token); gauge_add!("protocol.https", -1); gauge_add!("protocol.wss", 1); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 311e83f16..eb858aafe 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -953,7 +953,6 @@ pub struct SessionMetrics { pub service_start: Option, pub wait_start: Instant, - pub backend_id: Option, pub backend_start: Option, pub backend_connected: Option, pub backend_stop: Option, @@ -971,7 +970,6 @@ impl SessionMetrics { bout: 0, service_start: None, wait_start: Instant::now(), - backend_id: None, backend_start: None, backend_connected: None, backend_stop: None, @@ -1072,7 +1070,7 @@ impl SessionMetrics { time!("request_time", request_time.as_millis()); time!("service_time", service_time.as_millis()); - if let Some(backend_id) = self.backend_id.as_ref() { + if let Some(backend_id) = context.backend_id { if let Some(backend_response_time) = self.backend_response_time() { record_backend_metrics!( context.cluster_id.as_str_or("-"), diff --git a/lib/src/protocol/kawa_h1/answers.rs b/lib/src/protocol/kawa_h1/answers.rs index 6a8ae2b76..a4b4651ad 100644 --- a/lib/src/protocol/kawa_h1/answers.rs +++ b/lib/src/protocol/kawa_h1/answers.rs @@ -1,7 +1,7 @@ use crate::{protocol::http::DefaultAnswer, sozu_command::state::ClusterId}; use kawa::{ - h1::NoCallbacks, AsBuffer, Block, BodySize, Buffer, Chunk, Kawa, Kind, Pair, ParsingPhase, - ParsingPhaseMarker, StatusLine, Store, + h1::NoCallbacks, AsBuffer, Block, BodySize, Buffer, Chunk, Flags, Kawa, Kind, Pair, + ParsingPhase, ParsingPhaseMarker, StatusLine, Store, }; use nom::AsBytes; use std::{ @@ -11,6 +11,8 @@ use std::{ str::from_utf8_unchecked, }; +use super::parser::compare_no_case; + #[derive(Clone)] pub struct SharedBuffer(Rc<[u8]>); @@ -34,6 +36,8 @@ pub enum TemplateError { InvalidTemplate(ParsingPhase), #[error("unexpected status code: {0}")] InvalidStatusCode(u16), + #[error("unexpected size info: {0:?}")] + InvalidSizeInfo(BodySize), #[error("streaming is not supported in templates")] UnsupportedStreaming, #[error("template variable {0} is not allowed in headers")] @@ -68,6 +72,7 @@ pub struct Replacement { // TODO: rename for clarity, for instance HttpAnswerTemplate pub struct Template { status: u16, + keep_alive: bool, kawa: DefaultAnswerStream, body_replacements: Vec, header_replacements: Vec, @@ -126,6 +131,9 @@ impl Template { if !kawa.is_main_phase() { return Err(TemplateError::InvalidTemplate(kawa.parsing_phase)); } + if kawa.body_size != BodySize::Empty { + return Err(TemplateError::InvalidSizeInfo(kawa.body_size)); + } let status = if let StatusLine::Response { code, .. } = &kawa.detached.status_line { if let Some(expected_code) = status { if expected_code != *code { @@ -141,16 +149,35 @@ impl Template { let mut header_replacements = Vec::new(); let mut body_replacements = Vec::new(); let mut body_size = 0; + let mut keep_alive = true; let mut used_once = Vec::new(); for mut block in kawa.blocks.into_iter() { match &mut block { Block::ChunkHeader(_) => return Err(TemplateError::UnsupportedStreaming), + Block::Flags(Flags { + end_header: true, .. + }) => { + header_replacements.push(Replacement { + block_index: blocks.len(), + typ: ReplacementType::ContentLength, + }); + blocks.push_back(Block::Header(Pair { + key: Store::Static(b"Content-Length"), + val: Store::Static(b"PLACEHOLDER"), + })); + blocks.push_back(block); + } Block::StatusLine | Block::Cookies | Block::Flags(_) => { blocks.push_back(block); } Block::Header(Pair { key, val }) => { let val_data = val.data(buf); let key_data = key.data(buf); + if compare_no_case(key_data, b"connection") + && compare_no_case(val_data, b"close") + { + keep_alive = false; + } if let Some(b'%') = val_data.first() { for variable in &variables { if &val_data[1..] == variable.name.as_bytes() { @@ -168,11 +195,7 @@ impl Template { } used_once.push(var_index); } - ReplacementType::ContentLength => { - if let Some(b'%') = key_data.first() { - *key = Store::new_slice(buf, &key_data[1..]); - } - } + ReplacementType::ContentLength => {} } header_replacements.push(Replacement { block_index: blocks.len(), @@ -240,6 +263,7 @@ impl Template { kawa.blocks = blocks; Ok(Self { status, + keep_alive, kawa, body_replacements, header_replacements, @@ -306,7 +330,6 @@ pub struct HttpAnswers { } // const HEADERS: &str = "Connection: close\r -// Content-Length: 0\r // Sozu-Id: %REQUEST_ID\r // \r"; // const STYLE: &str = "