Skip to content

Commit

Permalink
reduce size of PipelineItem. (#987)
Browse files Browse the repository at this point in the history
* reduce size of PipelineItem.

* clippy fix.
  • Loading branch information
fakeshadow authored Mar 14, 2024
1 parent 1234aee commit 2eaaa03
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
23 changes: 0 additions & 23 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use core::{fmt, iter, mem, str};

use std::{
borrow::Cow,
error,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -326,28 +325,6 @@ impl fmt::Debug for Config {
}
}

#[derive(Debug)]
struct UnknownOption(String);

impl fmt::Display for UnknownOption {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "unknown option `{}`", self.0)
}
}

impl error::Error for UnknownOption {}

#[derive(Debug)]
struct InvalidValue(&'static str);

impl fmt::Display for InvalidValue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "invalid value for option `{}`", self.0)
}
}

impl error::Error for InvalidValue {}

struct Parser<'a> {
s: &'a str,
it: iter::Peekable<str::CharIndices<'a>>,
Expand Down
48 changes: 32 additions & 16 deletions postgres/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ impl Pipeline<'_, true> {
///
/// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html
pub fn new() -> Self {
Self {
columns: VecDeque::new(),
buf: BytesMut::new(),
}
Self::with_capacity(0)
}
}

Expand All @@ -85,8 +82,18 @@ impl Pipeline<'_, false> {
/// of socket syscall is needed.
#[inline]
pub fn unsync() -> Self {
Self::with_capacity(0)
}
}

impl<const SYNC_MODE: bool> Pipeline<'_, SYNC_MODE> {
/// start a new pipeline with given capacity.
/// capacity represent how many queries will be contained by a single pipeline. a determined cap
/// can possibly reduce memory reallocation when constructing the pipeline.
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self {
columns: VecDeque::new(),
columns: VecDeque::with_capacity(cap),
buf: BytesMut::new(),
}
}
Expand Down Expand Up @@ -162,7 +169,7 @@ impl Client {
}

/// streaming response of pipeline.
/// impls [AsyncIterator] trait and can be collected asynchronously.
/// impl [AsyncLendingIterator] trait and can be collected asynchronously.
pub struct PipelineStream<'a> {
pub(crate) res: Response,
pub(crate) columns: VecDeque<&'a [Column]>,
Expand All @@ -180,7 +187,6 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> {
return Ok(Some(PipelineItem {
finished: false,
stream: self,
rows_affected: 0,
}));
}
backend::Message::DataRow(_) | backend::Message::CommandComplete(_) => {
Expand All @@ -203,17 +209,30 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> {
}

/// streaming item of certain query inside pipeline's [PipelineStream].
/// impls [AsyncIterator] and can be used to collect [Row] from item.
/// impl [AsyncLendingIterator] and can be used to collect [Row] from item.
pub struct PipelineItem<'a, 'c> {
finished: bool,
stream: &'a mut PipelineStream<'c>,
rows_affected: u64,
}

impl PipelineItem<'_, '_> {
/// return the number of rows affected by certain query in pipeline.
pub fn rows_affected(&self) -> u64 {
self.rows_affected
/// collect rows affected by this pipelined query. [Row] information will be ignored.
///
/// # Panic
/// calling this method on an already finished PipelineItem will cause panic. PipelineItem is marked as finished
/// when its [AsyncLendingIterator::try_next] method returns [Option::None]
pub async fn row_affected(mut self) -> Result<u64, Error> {
assert!(!self.finished, "PipelineItem has already finished");
loop {
match self.stream.res.recv().await? {
backend::Message::DataRow(_) => {}
backend::Message::CommandComplete(body) => {
self.finished = true;
return crate::query::decode::body_to_affected_rows(&body);
}
_ => return Err(Error::UnexpectedMessage),
}
}
}
}

Expand All @@ -232,10 +251,7 @@ impl AsyncLendingIterator for PipelineItem<'_, '_> {
.expect("PipelineItem must not overflow PipelineStream's columns array");
return Row::try_new(columns, body, &mut self.stream.ranges).map(Some);
}
backend::Message::CommandComplete(body) => {
self.finished = true;
self.rows_affected = crate::query::decode::body_to_affected_rows(&body)?;
}
backend::Message::CommandComplete(_) => self.finished = true,
_ => return Err(Error::UnexpectedMessage),
}
}
Expand Down

0 comments on commit 2eaaa03

Please sign in to comment.