diff --git a/postgres/src/config.rs b/postgres/src/config.rs index 069f89f4..3c29e6d5 100644 --- a/postgres/src/config.rs +++ b/postgres/src/config.rs @@ -4,7 +4,6 @@ use core::{fmt, iter, mem, str}; use std::{ borrow::Cow, - error, path::{Path, PathBuf}, }; @@ -326,28 +325,6 @@ impl fmt::Debug for Config { } } -#[derive(Debug)] -struct UnknownOption(String); - -impl fmt::Display for UnknownOption { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "unknown option `{}`", self.0) - } -} - -impl error::Error for UnknownOption {} - -#[derive(Debug)] -struct InvalidValue(&'static str); - -impl fmt::Display for InvalidValue { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "invalid value for option `{}`", self.0) - } -} - -impl error::Error for InvalidValue {} - struct Parser<'a> { s: &'a str, it: iter::Peekable>, diff --git a/postgres/src/pipeline.rs b/postgres/src/pipeline.rs index 2e2da394..44b5fa6f 100644 --- a/postgres/src/pipeline.rs +++ b/postgres/src/pipeline.rs @@ -68,10 +68,7 @@ impl Pipeline<'_, true> { /// /// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html pub fn new() -> Self { - Self { - columns: VecDeque::new(), - buf: BytesMut::new(), - } + Self::with_capacity(0) } } @@ -85,8 +82,18 @@ impl Pipeline<'_, false> { /// of socket syscall is needed. #[inline] pub fn unsync() -> Self { + Self::with_capacity(0) + } +} + +impl Pipeline<'_, SYNC_MODE> { + /// start a new pipeline with given capacity. + /// capacity represent how many queries will be contained by a single pipeline. a determined cap + /// can possibly reduce memory reallocation when constructing the pipeline. + #[inline] + pub fn with_capacity(cap: usize) -> Self { Self { - columns: VecDeque::new(), + columns: VecDeque::with_capacity(cap), buf: BytesMut::new(), } } @@ -162,7 +169,7 @@ impl Client { } /// streaming response of pipeline. -/// impls [AsyncIterator] trait and can be collected asynchronously. +/// impl [AsyncLendingIterator] trait and can be collected asynchronously. pub struct PipelineStream<'a> { pub(crate) res: Response, pub(crate) columns: VecDeque<&'a [Column]>, @@ -180,7 +187,6 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> { return Ok(Some(PipelineItem { finished: false, stream: self, - rows_affected: 0, })); } backend::Message::DataRow(_) | backend::Message::CommandComplete(_) => { @@ -203,17 +209,30 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> { } /// streaming item of certain query inside pipeline's [PipelineStream]. -/// impls [AsyncIterator] and can be used to collect [Row] from item. +/// impl [AsyncLendingIterator] and can be used to collect [Row] from item. pub struct PipelineItem<'a, 'c> { finished: bool, stream: &'a mut PipelineStream<'c>, - rows_affected: u64, } impl PipelineItem<'_, '_> { - /// return the number of rows affected by certain query in pipeline. - pub fn rows_affected(&self) -> u64 { - self.rows_affected + /// collect rows affected by this pipelined query. [Row] information will be ignored. + /// + /// # Panic + /// calling this method on an already finished PipelineItem will cause panic. PipelineItem is marked as finished + /// when its [AsyncLendingIterator::try_next] method returns [Option::None] + pub async fn row_affected(mut self) -> Result { + assert!(!self.finished, "PipelineItem has already finished"); + loop { + match self.stream.res.recv().await? { + backend::Message::DataRow(_) => {} + backend::Message::CommandComplete(body) => { + self.finished = true; + return crate::query::decode::body_to_affected_rows(&body); + } + _ => return Err(Error::UnexpectedMessage), + } + } } } @@ -232,10 +251,7 @@ impl AsyncLendingIterator for PipelineItem<'_, '_> { .expect("PipelineItem must not overflow PipelineStream's columns array"); return Row::try_new(columns, body, &mut self.stream.ranges).map(Some); } - backend::Message::CommandComplete(body) => { - self.finished = true; - self.rows_affected = crate::query::decode::body_to_affected_rows(&body)?; - } + backend::Message::CommandComplete(_) => self.finished = true, _ => return Err(Error::UnexpectedMessage), } }