From 5d7f3276ffd83714de4b75cd7cab9fc515d85669 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 11 Dec 2023 11:08:59 +0800 Subject: [PATCH] chore(pgwire): avoid error log for health check (#13849) --- src/utils/pgwire/src/lib.rs | 1 + src/utils/pgwire/src/pg_message.rs | 30 +++++++++++++++++++++++++++-- src/utils/pgwire/src/pg_protocol.rs | 6 ++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index e7b487bb42c2b..ca297c170438d 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -18,6 +18,7 @@ #![feature(iterator_try_collect)] #![feature(trusted_len)] #![feature(lazy_cell)] +#![feature(buf_read_has_data_left)] #![expect(clippy::doc_markdown, reason = "FIXME: later")] pub mod error; diff --git a/src/utils/pgwire/src/pg_message.rs b/src/utils/pgwire/src/pg_message.rs index 838edf52e547b..ea976a0d61909 100644 --- a/src/utils/pgwire/src/pg_message.rs +++ b/src/utils/pgwire/src/pg_message.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::ffi::CStr; use std::io::{Error, ErrorKind, IoSlice, Result, Write}; -use byteorder::{BigEndian, ByteOrder}; +use byteorder::{BigEndian, ByteOrder, NetworkEndian}; /// Part of code learned from . use bytes::{Buf, BufMut, Bytes, BytesMut}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -43,6 +43,8 @@ pub enum FeMessage { CancelQuery(FeCancelMessage), Terminate, Flush, + // special msg to detect health check, which represents the client immediately closes the connection cleanly without sending any data. + HealthCheck, } #[derive(Debug)] @@ -293,7 +295,31 @@ impl FeMessage { impl FeStartupMessage { /// Read startup message from the stream. pub async fn read(stream: &mut (impl AsyncRead + Unpin)) -> Result { - let len = stream.read_i32().await?; + let mut buffer1 = vec![0; 1]; + let result = stream.read_exact(&mut buffer1).await; + let filled1 = match result { + Ok(n) => n, + Err(err) => { + // Detect whether it is a health check. + if err.kind() == ErrorKind::UnexpectedEof { + return Ok(FeMessage::HealthCheck); + } else { + return Err(err); + } + } + }; + assert_eq!(filled1, 1); + + let mut buffer2 = vec![0; 3]; + let filled2 = stream.read_exact(&mut buffer2).await?; + assert_eq!(filled2, 3); + + let mut buffer3 = BytesMut::with_capacity(4); + buffer3.put_slice(&buffer1); + buffer3.put_slice(&buffer2); + + let len = NetworkEndian::read_i32(&buffer3); + let protocol_num = stream.read_i32().await?; let payload_len = (len - 8) as usize; if payload_len >= isize::MAX as usize { diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 2ca58c76b2a5a..81eb83d7c146f 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -343,6 +343,7 @@ where return Err(err.into()); } } + FeMessage::HealthCheck => self.process_health_check(), } self.stream.flush().await?; Ok(()) @@ -584,6 +585,11 @@ where self.is_terminate = true; } + fn process_health_check(&mut self) { + tracing::debug!("health check"); + self.is_terminate = true; + } + fn process_parse_msg(&mut self, msg: FeParseMessage) -> PsqlResult<()> { let sql = cstr_to_str(&msg.sql_bytes).unwrap(); let session = self.session.clone().unwrap();