Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce size of PipelineItem. #987

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use core::{fmt, iter, mem, str};

use std::{
borrow::Cow,
error,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -326,28 +325,6 @@ impl fmt::Debug for Config {
}
}

#[derive(Debug)]
struct UnknownOption(String);

impl fmt::Display for UnknownOption {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "unknown option `{}`", self.0)
}
}

impl error::Error for UnknownOption {}

#[derive(Debug)]
struct InvalidValue(&'static str);

impl fmt::Display for InvalidValue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "invalid value for option `{}`", self.0)
}
}

impl error::Error for InvalidValue {}

struct Parser<'a> {
s: &'a str,
it: iter::Peekable<str::CharIndices<'a>>,
Expand Down
48 changes: 32 additions & 16 deletions postgres/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ impl Pipeline<'_, true> {
///
/// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html
pub fn new() -> Self {
Self {
columns: VecDeque::new(),
buf: BytesMut::new(),
}
Self::with_capacity(0)
}
}

Expand All @@ -85,8 +82,18 @@ impl Pipeline<'_, false> {
/// of socket syscall is needed.
#[inline]
pub fn unsync() -> Self {
Self::with_capacity(0)
}
}

impl<const SYNC_MODE: bool> Pipeline<'_, SYNC_MODE> {
/// start a new pipeline with given capacity.
/// capacity represent how many queries will be contained by a single pipeline. a determined cap
/// can possibly reduce memory reallocation when constructing the pipeline.
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self {
columns: VecDeque::new(),
columns: VecDeque::with_capacity(cap),
buf: BytesMut::new(),
}
}
Expand Down Expand Up @@ -162,7 +169,7 @@ impl Client {
}

/// streaming response of pipeline.
/// impls [AsyncIterator] trait and can be collected asynchronously.
/// impl [AsyncLendingIterator] trait and can be collected asynchronously.
pub struct PipelineStream<'a> {
pub(crate) res: Response,
pub(crate) columns: VecDeque<&'a [Column]>,
Expand All @@ -180,7 +187,6 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> {
return Ok(Some(PipelineItem {
finished: false,
stream: self,
rows_affected: 0,
}));
}
backend::Message::DataRow(_) | backend::Message::CommandComplete(_) => {
Expand All @@ -203,17 +209,30 @@ impl<'a> AsyncLendingIterator for PipelineStream<'a> {
}

/// streaming item of certain query inside pipeline's [PipelineStream].
/// impls [AsyncIterator] and can be used to collect [Row] from item.
/// impl [AsyncLendingIterator] and can be used to collect [Row] from item.
pub struct PipelineItem<'a, 'c> {
finished: bool,
stream: &'a mut PipelineStream<'c>,
rows_affected: u64,
}

impl PipelineItem<'_, '_> {
/// return the number of rows affected by certain query in pipeline.
pub fn rows_affected(&self) -> u64 {
self.rows_affected
/// collect rows affected by this pipelined query. [Row] information will be ignored.
///
/// # Panic
/// calling this method on an already finished PipelineItem will cause panic. PipelineItem is marked as finished
/// when its [AsyncLendingIterator::try_next] method returns [Option::None]
pub async fn row_affected(mut self) -> Result<u64, Error> {
assert!(!self.finished, "PipelineItem has already finished");
loop {
match self.stream.res.recv().await? {
backend::Message::DataRow(_) => {}
backend::Message::CommandComplete(body) => {
self.finished = true;
return crate::query::decode::body_to_affected_rows(&body);
}
_ => return Err(Error::UnexpectedMessage),
}
}
}
}

Expand All @@ -232,10 +251,7 @@ impl AsyncLendingIterator for PipelineItem<'_, '_> {
.expect("PipelineItem must not overflow PipelineStream's columns array");
return Row::try_new(columns, body, &mut self.stream.ranges).map(Some);
}
backend::Message::CommandComplete(body) => {
self.finished = true;
self.rows_affected = crate::query::decode::body_to_affected_rows(&body)?;
}
backend::Message::CommandComplete(_) => self.finished = true,
_ => return Err(Error::UnexpectedMessage),
}
}
Expand Down
Loading