Skip to content

Commit

Permalink
chore(pgwire): avoid error log for health check (#13849)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Dec 11, 2023
1 parent e513e6b commit 5d7f327
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/utils/pgwire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![feature(iterator_try_collect)]
#![feature(trusted_len)]
#![feature(lazy_cell)]
#![feature(buf_read_has_data_left)]
#![expect(clippy::doc_markdown, reason = "FIXME: later")]

pub mod error;
Expand Down
30 changes: 28 additions & 2 deletions src/utils/pgwire/src/pg_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::ffi::CStr;
use std::io::{Error, ErrorKind, IoSlice, Result, Write};

use byteorder::{BigEndian, ByteOrder};
use byteorder::{BigEndian, ByteOrder, NetworkEndian};
/// Part of code learned from <https://github.com/zenithdb/zenith/blob/main/zenith_utils/src/pq_proto.rs>.
use bytes::{Buf, BufMut, Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt};
Expand All @@ -43,6 +43,8 @@ pub enum FeMessage {
CancelQuery(FeCancelMessage),
Terminate,
Flush,
// special msg to detect health check, which represents the client immediately closes the connection cleanly without sending any data.
HealthCheck,
}

#[derive(Debug)]
Expand Down Expand Up @@ -293,7 +295,31 @@ impl FeMessage {
impl FeStartupMessage {
/// Read startup message from the stream.
pub async fn read(stream: &mut (impl AsyncRead + Unpin)) -> Result<FeMessage> {
let len = stream.read_i32().await?;
let mut buffer1 = vec![0; 1];
let result = stream.read_exact(&mut buffer1).await;
let filled1 = match result {
Ok(n) => n,
Err(err) => {
// Detect whether it is a health check.
if err.kind() == ErrorKind::UnexpectedEof {
return Ok(FeMessage::HealthCheck);
} else {
return Err(err);
}
}
};
assert_eq!(filled1, 1);

let mut buffer2 = vec![0; 3];
let filled2 = stream.read_exact(&mut buffer2).await?;
assert_eq!(filled2, 3);

let mut buffer3 = BytesMut::with_capacity(4);
buffer3.put_slice(&buffer1);
buffer3.put_slice(&buffer2);

let len = NetworkEndian::read_i32(&buffer3);

let protocol_num = stream.read_i32().await?;
let payload_len = (len - 8) as usize;
if payload_len >= isize::MAX as usize {
Expand Down
6 changes: 6 additions & 0 deletions src/utils/pgwire/src/pg_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ where
return Err(err.into());
}
}
FeMessage::HealthCheck => self.process_health_check(),
}
self.stream.flush().await?;
Ok(())
Expand Down Expand Up @@ -584,6 +585,11 @@ where
self.is_terminate = true;
}

fn process_health_check(&mut self) {
tracing::debug!("health check");
self.is_terminate = true;
}

fn process_parse_msg(&mut self, msg: FeParseMessage) -> PsqlResult<()> {
let sql = cstr_to_str(&msg.sql_bytes).unwrap();
let session = self.session.clone().unwrap();
Expand Down

0 comments on commit 5d7f327

Please sign in to comment.