diff --git a/libs/proxy/tokio-postgres2/src/connect.rs b/libs/proxy/tokio-postgres2/src/connect.rs index e0cb69748d50..29548afcc623 100644 --- a/libs/proxy/tokio-postgres2/src/connect.rs +++ b/libs/proxy/tokio-postgres2/src/connect.rs @@ -1,11 +1,9 @@ use crate::client::SocketConfig; -use crate::codec::BackendMessage; use crate::config::Host; use crate::connect_raw::connect_raw; use crate::connect_socket::connect_socket; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Client, Config, Connection, Error, RawConnection}; -use postgres_protocol2::message::backend::Message; use tokio::net::TcpStream; use tokio::sync::mpsc; @@ -43,7 +41,7 @@ where let RawConnection { stream, parameters, - delayed_notice, + delayed_notice: _, process_id, secret_key, } = connect_raw(socket, tls, config).await?; @@ -63,13 +61,7 @@ where secret_key, ); - // delayed notices are always sent as "Async" messages. - let delayed = delayed_notice - .into_iter() - .map(|m| BackendMessage::Async(Message::NoticeResponse(m))) - .collect(); - - let connection = Connection::new(stream, delayed, parameters, receiver); + let connection = Connection::new(stream, parameters, receiver); Ok((client, connection)) } diff --git a/libs/proxy/tokio-postgres2/src/connection.rs b/libs/proxy/tokio-postgres2/src/connection.rs index f478717e0d2d..0761bf0e1e7f 100644 --- a/libs/proxy/tokio-postgres2/src/connection.rs +++ b/libs/proxy/tokio-postgres2/src/connection.rs @@ -1,11 +1,10 @@ use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; -use crate::error::DbError; use crate::maybe_tls_stream::MaybeTlsStream; use crate::{AsyncMessage, Error, Notification}; use bytes::BytesMut; use fallible_iterator::FallibleIterator; use futures_util::{ready, Sink, Stream}; -use log::{info, trace}; +use log::trace; use postgres_protocol2::message::backend::Message; use postgres_protocol2::message::frontend; use std::collections::{HashMap, VecDeque}; @@ -55,7 +54,7 @@ pub struct Connection { /// HACK: we need this in the Neon Proxy to forward params. pub parameters: HashMap, receiver: mpsc::UnboundedReceiver, - pending_responses: VecDeque, + pending_responses: Option, responses: VecDeque, state: State, } @@ -67,7 +66,6 @@ where { pub(crate) fn new( stream: Framed, PostgresCodec>, - pending_responses: VecDeque, parameters: HashMap, receiver: mpsc::UnboundedReceiver, ) -> Connection { @@ -75,7 +73,7 @@ where stream, parameters, receiver, - pending_responses, + pending_responses: None, responses: VecDeque::new(), state: State::Active, } @@ -85,7 +83,7 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll>> { - if let Some(message) = self.pending_responses.pop_front() { + if let Some(message) = self.pending_responses.take() { trace!("retrying pending response"); return Poll::Ready(Some(Ok(message))); } @@ -109,9 +107,8 @@ where }; let (mut messages, request_complete) = match message { - BackendMessage::Async(Message::NoticeResponse(body)) => { - let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?; - return Poll::Ready(Ok(AsyncMessage::Notice(error))); + BackendMessage::Async(Message::NoticeResponse(_)) => { + continue; } BackendMessage::Async(Message::NotificationResponse(body)) => { let notification = Notification { @@ -160,7 +157,7 @@ where } Poll::Pending => { self.responses.push_front(response); - self.pending_responses.push_back(BackendMessage::Normal { + self.pending_responses = Some(BackendMessage::Normal { messages, request_complete, }); @@ -328,11 +325,7 @@ where type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let Some(message) = ready!(self.poll_message(cx)?) { - if let AsyncMessage::Notice(notice) = message { - info!("{}: {}", notice.severity(), notice.message()); - } - } + while ready!(self.poll_message(cx)?).is_some() {} Poll::Ready(Ok(())) } } diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 9155dd82792a..88abd8533471 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -6,7 +6,6 @@ pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; pub use crate::connect_raw::RawConnection; pub use crate::connection::Connection; -use crate::error::DbError; pub use crate::error::Error; pub use crate::generic_client::GenericClient; pub use crate::query::RowStream; @@ -100,10 +99,6 @@ impl Notification { #[derive(Debug, Clone)] #[non_exhaustive] pub enum AsyncMessage { - /// A notice. - /// - /// Notices use the same format as errors, but aren't "errors" per-se. - Notice(DbError), /// A notification. /// /// Connections can subscribe to notifications with the `LISTEN` command. diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 447103edce53..3561a98a505e 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -124,9 +124,6 @@ pub(crate) fn poll_client( let message = ready!(connection.poll_message(cx)); match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session_id, "notice: {}", notice); - } Some(Ok(AsyncMessage::Notification(notif))) => { warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); } diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index c51a2bc9babb..304b806162fb 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -227,9 +227,6 @@ pub(crate) fn poll_client( let message = ready!(connection.poll_message(cx)); match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session_id, "notice: {}", notice); - } Some(Ok(AsyncMessage::Notification(notif))) => { warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); }