diff --git a/postgres/CHANGES.md b/postgres/CHANGES.md index 2ae54119..bb292a27 100644 --- a/postgres/CHANGES.md +++ b/postgres/CHANGES.md @@ -1,16 +1,18 @@ # unreleased 0.2.0 ## Remove -- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait as sole query API +- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait family as sole API ```rust use xitca_postgres::{Client, Execute, RowSimpleStream, RowStream, Statement}; // create a named statement and execute it. on success returns a prepared statement let stmt: StatementGuarded<'_, Client> = Statement::named("SELECT 1").execute(&client).await?; // query with the prepared statement. on success returns an async row stream. - let stream: RowStream<'_> = stmt.query(&client)?; + let stream: RowStream<'_> = stmt.query(&client).await?; // query with raw string sql. - let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client)?; + let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client).await?; // execute raw string sql. let row_affected: u64 = "SELECT 1; SELECT 1".execute(&client).await?; + // execute sql file. + let row_affected: u64 = std::path::Path::new("./foo.sql").execute(&client).await?; ``` - remove `Client::pipeline` and `Pool::pipeline`. `pipeline::Pipeline` type can be execute with `Execute::query` method @@ -23,8 +25,8 @@ // use ExecuteMut trait to add query to pipeline use xitca_postgres::ExecuteMut; - stmt.query(&mut pipe)?; - stmt.query(&mut pipe)?; + stmt.query_mut(&mut pipe)?; + stmt.query_mut(&mut pipe)?; // use Execute trait to start pipeline query let pipe_stream = pipe.query(&client)?; @@ -38,7 +40,7 @@ // prepare a statement. let stmt = Statement::named("SELECT * FROM users WHERE id = $1 AND age = $2", &[Type::INT4, Type::INT4]).execute(&client).await?; // bind statement to typed value and start query - let stream = stmt.bind([9527, 42]).query(&client)?; + let stream = stmt.bind([9527, 42]).query(&client).await?; ``` - query without parameter value can be queried with `Statement` alone. ```rust @@ -46,22 +48,22 @@ // prepare a statement. let stmt = Statement::named("SELECT * FROM users", &[]).execute(&client).await?; // statement have no value params and can be used for query. - let stream = stmt.query(&client)?; + let stream = stmt.query(&client).await?; ``` - `AsyncLendingIterator` is no longer exported from crate's root path. use `iter::AsyncLendingIterator` instead - `query::RowStreamOwned` and `row::RowOwned` are no longer behind `compat` crate feature anymore - `statement::Statement::unnamed` must bind to value parameters with `bind` or `bind_dyn` before calling `Execute` methods. ```rust let stmt = Statement::unnamed("SELECT * FROM users WHERE id = $1", &[Type::INT4]); - let row_stream = stmt.bind([9527]).query(&client)?; + let row_stream = stmt.bind([9527]).query(&client).await?; ``` - `Query::_send_encode_query` method's return type is changed to `Result<(::Output, Response), Error>`. Enabling further simplify of the surface level API at the cost of more internal complexity - `Encode` trait implementation detail change - `IntoStream` trait is renamed to `IntoResponse` with implementation detail change ## Add -- add `Execute` and `ExecuteMut` traits for extending query customization -- add `Prepare::{_prepare_blocking, _get_type_blocking}` +- add `Execute`, `ExecuteMut`, `ExecuteBlocking` traits for extending query customization +- add `Prepare::_get_type_blocking` - add `iter::AsyncLendingIteratorExt` for extending async iterator APIs - add `statement::Statement::{bind, bind_dyn}` methods for binding value parameters to a prepared statement for query - add `query::RowSimpleStreamOwned` diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml index 54da21b1..7ffcecc4 100644 --- a/postgres/Cargo.toml +++ b/postgres/Cargo.toml @@ -48,7 +48,6 @@ rustls-pemfile = { version = "2", optional = true } [dev-dependencies] xitca-postgres = { version = "0.2", features = ["compat"] } xitca-postgres-codegen = "0.1" -async-stream = "0.3" bb8 = "0.8.5" futures = { version = "0.3", default-features = false } rcgen = "0.13" diff --git a/postgres/README.md b/postgres/README.md index 4575e115..1b0faff2 100644 --- a/postgres/README.md +++ b/postgres/README.md @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box> { // bind the prepared statement to parameter values it declared. // when parameters are different Rust types it's suggested to use dynamic binding as following // query with the bind and get an async streaming for database rows on success - let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli)?; + let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli).await?; // use async iterator to visit rows let row = stream.try_next().await?.ok_or("no row found")?; @@ -88,7 +88,7 @@ async fn main() -> Result<(), Box> { assert!(stream.try_next().await?.is_none()); // like execute method. query can be used with raw sql string. - let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli)?; + let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli).await?; let row = stream.try_next().await?.ok_or("no row found")?; // unlike query with prepared statement. raw sql query would return rows that can only be parsed to Rust string types. @@ -104,13 +104,13 @@ async fn main() -> Result<(), Box> { ## Synchronous API `xitca_postgres::Client` can run outside of tokio async runtime and using blocking API to interact with database ```rust -use xitca_postgres::{Client, Error, Execute}; +use xitca_postgres::{Client, Error, ExecuteBlocking}; fn query(client: &Client) -> Result<(), Error> { // execute sql query with blocking api "SELECT 1".execute_blocking(client)?; - let stream = "SELECT 1".query(client)?; + let stream = "SELECT 1".query_blocking(client)?; // use iterator to visit streaming rows for item in stream { diff --git a/postgres/examples/execute.rs b/postgres/examples/execute.rs index 15d9096c..75ab8369 100644 --- a/postgres/examples/execute.rs +++ b/postgres/examples/execute.rs @@ -5,8 +5,9 @@ use std::{ pin::Pin, }; -use futures::stream::{Stream, TryStreamExt}; -use xitca_postgres::{row::RowOwned, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement}; +use xitca_postgres::{ + iter::AsyncLendingIteratorExt, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -45,7 +46,7 @@ async fn main() -> Result<(), Box> { type ExecuteOutput = Pin> + Send + 'c>>; // like the execute but output an async stream iterator that produces database rows. - type QueryOutput = Pin> + Send + 'c>>; + type QueryOutput = Pin> + Send + 'c>>; fn execute(self, cli: &'c Client) -> Self::ExecuteOutput { // move PrepareAndExecute<'p> and &'c Client into an async block. @@ -60,32 +61,15 @@ async fn main() -> Result<(), Box> { } fn query(self, cli: &'c Client) -> Self::QueryOutput { - // async stream macro is used to move prepare statement and query into a single - // streaming type. - Box::pin(async_stream::try_stream! { + Box::pin(async { // prepare statement and query for async iterator of rows - let stmt = Statement::named(self.stmt, self.types) - .execute(cli) - .await?; - let stream = stmt.query(cli)?; - - // async stream macro does not support lending iterator types and we convert - // row stream to an owned version where it does not contain references. - let mut stream = RowStreamOwned::from(stream); - - // futures::stream::TryStreamExt trait is utilized here to produce database rows. - while let Some(item) = stream.try_next().await? { - yield item; - } + let stmt = Statement::named(self.stmt, self.types).execute(cli).await?; + let stream = stmt.query(cli).await?; + // convert borrowed stream to owned stream. as borrowed stream reference the statement this function + // just produced. + Ok(RowStreamOwned::from(stream)) }) } - - // blocking version execute method. it's much simpler to implement than it's async variant. - fn execute_blocking(self, cli: &Client) -> Result { - Statement::named(self.stmt, self.types) - .execute_blocking(cli)? - .execute_blocking(cli) - } } // use the new type to prepare and execute a statement. @@ -103,7 +87,8 @@ async fn main() -> Result<(), Box> { types: &[], } .query(&cli) - // use TryStreamExt trait methods to visit rows and collect column index 0 to integers. + .await? + // use async iterator methods to visit rows and collect column index 0 to integers. .map_ok(|row| row.get::(0)) .try_collect::>() .await?; diff --git a/postgres/examples/macro.rs b/postgres/examples/macro.rs index 60548dd8..3d47c4b5 100644 --- a/postgres/examples/macro.rs +++ b/postgres/examples/macro.rs @@ -13,16 +13,13 @@ async fn main() -> Result<(), Box> { .await?; tokio::spawn(drv.into_future()); - "CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT)" - .execute(&cli) - .await?; - "INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');" - .execute(&cli) - .await?; + std::path::Path::new("./samples/test.sql").execute(&cli).await?; // this macro is expand into xitca_postgres::statement::Statement::unnamed // it's also possible to utilize xitca-postgres's Execute traits for more customizable macro usage - let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice").query(&cli)?; + let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice") + .query(&cli) + .await?; // the macro also have basic function for sql validation check. try uncomment below to see compile error. // let _ = sql!("SELECT * FRO foo WHERR id = $1 AN name = $2", &1i32, &"alice"); diff --git a/postgres/examples/pooling.rs b/postgres/examples/pooling.rs index ec8be136..6800c936 100644 --- a/postgres/examples/pooling.rs +++ b/postgres/examples/pooling.rs @@ -121,7 +121,7 @@ async fn main() -> Result<(), Box> { // you can forward query to xitca-postgres's client completely. let transaction = conn.conn.transaction().await?; - let mut res = "SELECT 1".query(&transaction)?; + let mut res = "SELECT 1".query(&transaction).await?; let row = res.try_next().await?.ok_or("row not found")?; assert_eq!(Some("1"), row.get(0)); transaction.rollback().await?; @@ -129,7 +129,7 @@ async fn main() -> Result<(), Box> { // or use the new type definition of your pool connection for additional state and functionalities your // connection type could offer let transaction = conn.transaction().await?; - let mut res = "SELECT 1".query(&transaction)?; + let mut res = "SELECT 1".query(&transaction).await?; let row = res.try_next().await?.ok_or("row not found")?; assert_eq!(Some("1"), row.get(0)); transaction.commit().await?; diff --git a/postgres/examples/sync.rs b/postgres/examples/sync.rs index 28bf18b8..40efb113 100644 --- a/postgres/examples/sync.rs +++ b/postgres/examples/sync.rs @@ -1,7 +1,7 @@ //! example of running client in non async environment. use std::future::IntoFuture; -use xitca_postgres::{types::Type, Execute, Postgres, Statement}; +use xitca_postgres::{types::Type, ExecuteBlocking, Postgres, Statement}; fn main() -> Result<(), Box> { // prepare a tokio runtime for client's Driver. @@ -32,7 +32,7 @@ fn main() -> Result<(), Box> { ) .execute_blocking(&cli)?; // query api shares the same convention no matter the context. - let stream = stmt.bind_dyn(&[&4i32, &"david"]).query(&cli)?; + let stream = stmt.bind_dyn(&[&4i32, &"david"]).query_blocking(&cli)?; // async row stream implement IntoIterator trait to convert stream into a sync iterator. for item in stream { diff --git a/postgres/samples/test.sql b/postgres/samples/test.sql new file mode 100644 index 00000000..e12d8538 --- /dev/null +++ b/postgres/samples/test.sql @@ -0,0 +1,2 @@ +CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT); +INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie'); diff --git a/postgres/src/driver.rs b/postgres/src/driver.rs index f47dcd28..8594f41c 100644 --- a/postgres/src/driver.rs +++ b/postgres/src/driver.rs @@ -185,7 +185,7 @@ async fn dns_resolve<'p>(host: &'p str, ports: &'p [u16]) -> Result where Self: Sized, { - /// async outcome of execute. - type ExecuteOutput: Future; - /// iterator outcome of query. - /// - /// by default this type should be matching `C`'s [`Query::_query`] output type. + /// outcome of execute. + /// used for single time database response: number of rows affected by execution for example. + type ExecuteOutput; + + /// outcome of query. + /// used for repeated database response: database rows for example /// /// consider impl [`AsyncLendingIterator`] for async iterator of rows /// consider impl [`Iterator`] for iterator of rows /// + /// for type of statement where no repeated response will returning this type can point to + /// [`Execute::ExecuteOutput`] and it's encouraged to make `query` behave identical to `execute` + /// /// [`AsyncLendingIterator`]: crate::iter::AsyncLendingIterator type QueryOutput; - /// define how a query is executed with async outcome. + /// define how a statement is executed with single time response fn execute(self, cli: &'c C) -> Self::ExecuteOutput; - /// define how a query is executed with iterator of database rows as return type. + /// define how a statement is queried with repeated response fn query(self, cli: &'c C) -> Self::QueryOutput; - - /// blocking version of [`Execute::execute`] - fn execute_blocking(self, cli: &'c C) -> ::Output; } /// mutable version of [`Execute`] trait where C type is mutably borrowed @@ -61,182 +47,23 @@ pub trait ExecuteMut<'c, C> where Self: Sized, { - type ExecuteMutOutput: Future; + type ExecuteMutOutput; type QueryMutOutput; fn execute_mut(self, cli: &'c mut C) -> Self::ExecuteMutOutput; fn query_mut(self, cli: &'c mut C) -> Self::QueryMutOutput; - - fn execute_mut_blocking(self, cli: &'c mut C) -> ::Output; -} - -impl<'s, C> Execute<'_, C> for &'s Statement -where - C: Query, -{ - type ExecuteOutput = ResultFuture; - type QueryOutput = Result, Error>; - - #[inline] - fn execute(self, cli: &C) -> Self::ExecuteOutput { - self.query(cli).map(RowAffected::from).into() - } - - #[inline] - fn query(self, cli: &C) -> Self::QueryOutput { - cli._query(self) - } - - #[inline] - fn execute_blocking(self, cli: &C) -> Result { - let stream = self.query(cli)?; - RowAffected::from(stream).wait() - } } -impl<'s, C> Execute<'_, C> for &'s str +/// blocking version of [`Execute`] for synchronous environment +pub trait ExecuteBlocking<'c, C> where - C: Query, -{ - type ExecuteOutput = ResultFuture; - type QueryOutput = Result; - - #[inline] - fn execute(self, cli: &C) -> Self::ExecuteOutput { - self.query(cli).map(RowAffected::from).into() - } - - #[inline] - fn query(self, cli: &C) -> Self::QueryOutput { - cli._query(self) - } - - #[inline] - fn execute_blocking(self, cli: &C) -> Result { - let stream = self.query(cli)?; - RowAffected::from(stream).wait() - } -} - -type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result>, C>; - -pub struct IntoGuarded<'a, F, C> { - fut: F, - cli: &'a C, -} - -impl<'a, F, C> Future for IntoGuarded<'a, F, C> -where - F: Future> + Unpin, - C: Query, -{ - type Output = Result, Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - Pin::new(&mut this.fut) - .poll(cx) - .map_ok(|stmt| stmt.into_guarded(this.cli)) - } -} - -impl<'c, 's, C> Execute<'c, C> for StatementNamed<'s> -where - C: Prepare + 'c, - 's: 'c, -{ - type ExecuteOutput = ResultFuture>; - type QueryOutput = Self::ExecuteOutput; - - #[inline] - fn execute(self, cli: &'c C) -> Self::ExecuteOutput { - cli._query(StatementCreate::from((self, cli))) - .map(|fut| IntoGuarded { fut, cli }) - .into() - } - - #[inline] - fn query(self, cli: &'c C) -> Self::QueryOutput { - self.execute(cli) - } - - #[inline] - fn execute_blocking(self, cli: &'c C) -> Result, Error> { - let stmt = cli._query(StatementCreateBlocking::from((self, cli)))??; - Ok(stmt.into_guarded(cli)) - } -} - -impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P> -where - C: Query, - P: AsParams + 's, -{ - type ExecuteOutput = ResultFuture; - type QueryOutput = Result, Error>; - - #[inline] - fn execute(self, cli: &C) -> Self::ExecuteOutput { - self.query(cli).map(RowAffected::from).into() - } - - #[inline] - fn query(self, cli: &C) -> Self::QueryOutput { - cli._query(self) - } - - #[inline] - fn execute_blocking(self, cli: &C) -> Result { - let stream = self.query(cli)?; - RowAffected::from(stream).wait() - } -} - -impl<'s, 'c, C, P> Execute<'c, C> for StatementUnnamedBind<'s, P> -where - C: Prepare + 'c, - P: AsParams + 'c, - 's: 'c, + Self: Sized, { - type ExecuteOutput = ResultFuture; - type QueryOutput = Result, Error>; - - #[inline] - fn execute(self, cli: &C) -> Self::ExecuteOutput { - self.query(cli).map(RowAffected::from).into() - } - - #[inline] - fn query(self, cli: &'c C) -> Self::QueryOutput { - cli._query(StatementUnnamedQuery::from((self, cli))) - } - - #[inline] - fn execute_blocking(self, cli: &C) -> Result { - let stream = self.query(cli)?; - RowAffected::from(stream).wait() - } -} - -pub struct ResultFuture(Result>); - -impl From> for ResultFuture { - fn from(res: Result) -> Self { - Self(res.map_err(Some)) - } -} + type ExecuteOutput; + type QueryOutput; -impl Future for ResultFuture -where - F: Future> + Unpin, -{ - type Output = F::Output; + fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut().0 { - Ok(ref mut res) => Pin::new(res).poll(cx), - Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())), - } - } + fn query_blocking(self, cli: &'c C) -> Self::QueryOutput; } diff --git a/postgres/src/execute/async_impl.rs b/postgres/src/execute/async_impl.rs new file mode 100644 index 00000000..b9f6d207 --- /dev/null +++ b/postgres/src/execute/async_impl.rs @@ -0,0 +1,193 @@ +use core::{ + future::{ready, Future, Ready}, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::{ + driver::codec::AsParams, + error::Error, + prepare::Prepare, + query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded}, + statement::{ + Statement, StatementCreate, StatementGuarded, StatementNamed, StatementQuery, StatementUnnamedBind, + StatementUnnamedQuery, + }, + BoxedFuture, +}; + +use super::Execute; + +impl<'s, C> Execute<'_, C> for &'s Statement +where + C: Query, +{ + type ExecuteOutput = ResultFuture; + type QueryOutput = Ready, Error>>; + + #[inline] + fn execute(self, cli: &C) -> Self::ExecuteOutput { + cli._query(self).map(RowAffected::from).into() + } + + #[inline] + fn query(self, cli: &C) -> Self::QueryOutput { + ready(cli._query(self)) + } +} + +impl<'s, C> Execute<'_, C> for &'s str +where + C: Query, +{ + type ExecuteOutput = ResultFuture; + type QueryOutput = Ready>; + + #[inline] + fn execute(self, cli: &C) -> Self::ExecuteOutput { + cli._query(self).map(RowAffected::from).into() + } + + #[inline] + fn query(self, cli: &C) -> Self::QueryOutput { + ready(cli._query(self)) + } +} + +type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result>, C>; + +pub struct IntoGuarded<'a, F, C> { + fut: F, + cli: &'a C, +} + +impl<'a, F, C> Future for IntoGuarded<'a, F, C> +where + F: Future> + Unpin, + C: Query, +{ + type Output = Result, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + Pin::new(&mut this.fut) + .poll(cx) + .map_ok(|stmt| stmt.into_guarded(this.cli)) + } +} + +impl<'c, 's, C> Execute<'c, C> for StatementNamed<'s> +where + C: Prepare + 'c, + 's: 'c, +{ + type ExecuteOutput = ResultFuture>; + type QueryOutput = Self::ExecuteOutput; + + #[inline] + fn execute(self, cli: &'c C) -> Self::ExecuteOutput { + cli._query(StatementCreate::from((self, cli))) + .map(|fut| IntoGuarded { fut, cli }) + .into() + } + + #[inline] + fn query(self, cli: &'c C) -> Self::QueryOutput { + self.execute(cli) + } +} + +impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P> +where + C: Query, + P: AsParams + 's, +{ + type ExecuteOutput = ResultFuture; + type QueryOutput = Ready, Error>>; + + #[inline] + fn execute(self, cli: &C) -> Self::ExecuteOutput { + cli._query(self).map(RowAffected::from).into() + } + + #[inline] + fn query(self, cli: &C) -> Self::QueryOutput { + ready(cli._query(self)) + } +} + +impl<'s, 'c, C, P> Execute<'c, C> for StatementUnnamedBind<'s, P> +where + C: Prepare + 'c, + P: AsParams + 'c, + 's: 'c, +{ + type ExecuteOutput = ResultFuture; + type QueryOutput = Ready, Error>>; + + #[inline] + fn execute(self, cli: &C) -> Self::ExecuteOutput { + cli._query(StatementUnnamedQuery::from((self, cli))) + .map(RowAffected::from) + .into() + } + + #[inline] + fn query(self, cli: &'c C) -> Self::QueryOutput { + ready(cli._query(StatementUnnamedQuery::from((self, cli)))) + } +} + +impl<'c, C> Execute<'c, C> for &std::path::Path +where + C: Query + Sync + 'c, +{ + type ExecuteOutput = BoxedFuture<'c, Result>; + type QueryOutput = BoxedFuture<'c, Result>; + + #[inline] + fn execute(self, cli: &'c C) -> Self::ExecuteOutput { + let path = self.to_path_buf(); + Box::pin(async move { + tokio::task::spawn_blocking(|| std::fs::read_to_string(path)) + .await + .unwrap()? + .execute(cli) + .await + }) + } + + #[inline] + fn query(self, cli: &'c C) -> Self::QueryOutput { + let path = self.to_path_buf(); + Box::pin(async move { + tokio::task::spawn_blocking(|| std::fs::read_to_string(path)) + .await + .unwrap()? + .query(cli) + .await + }) + } +} + +pub struct ResultFuture(Result>); + +impl From> for ResultFuture { + fn from(res: Result) -> Self { + Self(res.map_err(Some)) + } +} + +impl Future for ResultFuture +where + F: Future> + Unpin, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut().0 { + Ok(ref mut res) => Pin::new(res).poll(cx), + Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())), + } + } +} diff --git a/postgres/src/execute/sync_impl.rs b/postgres/src/execute/sync_impl.rs new file mode 100644 index 00000000..b39403ce --- /dev/null +++ b/postgres/src/execute/sync_impl.rs @@ -0,0 +1,129 @@ +use crate::{ + driver::codec::AsParams, + error::Error, + prepare::Prepare, + query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded}, + statement::{ + Statement, StatementCreateBlocking, StatementGuarded, StatementNamed, StatementQuery, StatementUnnamedBind, + StatementUnnamedQuery, + }, +}; + +use super::ExecuteBlocking; + +impl<'s, C> ExecuteBlocking<'_, C> for &'s Statement +where + C: Query, +{ + type ExecuteOutput = Result; + type QueryOutput = Result, Error>; + + #[inline] + fn execute_blocking(self, cli: &C) -> Self::ExecuteOutput { + let stream = self.query_blocking(cli)?; + RowAffected::from(stream).wait() + } + + #[inline] + fn query_blocking(self, cli: &C) -> Self::QueryOutput { + cli._query(self) + } +} + +impl ExecuteBlocking<'_, C> for &str +where + C: Query, +{ + type ExecuteOutput = Result; + type QueryOutput = Result; + + #[inline] + fn execute_blocking(self, cli: &C) -> Self::ExecuteOutput { + let stream = self.query_blocking(cli)?; + RowAffected::from(stream).wait() + } + + #[inline] + fn query_blocking(self, cli: &C) -> Self::QueryOutput { + cli._query(self) + } +} + +impl<'c, 's, C> ExecuteBlocking<'c, C> for StatementNamed<'s> +where + C: Prepare + 'c, + 's: 'c, +{ + type ExecuteOutput = Result, Error>; + type QueryOutput = Self::ExecuteOutput; + + #[inline] + fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput { + let stmt = cli._query(StatementCreateBlocking::from((self, cli)))??; + Ok(stmt.into_guarded(cli)) + } + + #[inline] + fn query_blocking(self, cli: &'c C) -> Self::QueryOutput { + self.execute_blocking(cli) + } +} + +impl<'s, C, P> ExecuteBlocking<'_, C> for StatementQuery<'s, P> +where + C: Query, + P: AsParams + 's, +{ + type ExecuteOutput = Result; + type QueryOutput = Result, Error>; + + #[inline] + fn execute_blocking(self, cli: &C) -> Result { + let stream = self.query_blocking(cli)?; + RowAffected::from(stream).wait() + } + + #[inline] + fn query_blocking(self, cli: &C) -> Self::QueryOutput { + cli._query(self) + } +} + +impl<'s, 'c, C, P> ExecuteBlocking<'c, C> for StatementUnnamedBind<'s, P> +where + C: Prepare + 'c, + P: AsParams + 'c, + 's: 'c, +{ + type ExecuteOutput = Result; + type QueryOutput = Result, Error>; + + #[inline] + fn execute_blocking(self, cli: &C) -> Result { + let stream = self.query_blocking(cli)?; + RowAffected::from(stream).wait() + } + + #[inline] + fn query_blocking(self, cli: &'c C) -> Self::QueryOutput { + cli._query(StatementUnnamedQuery::from((self, cli))) + } +} + +impl<'c, C> ExecuteBlocking<'c, C> for &std::path::Path +where + C: Query + 'c, +{ + type ExecuteOutput = Result; + type QueryOutput = Result; + + #[inline] + fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput { + std::fs::read_to_string(self)?.execute_blocking(cli) + } + + #[inline] + fn query_blocking(self, cli: &'c C) -> Self::QueryOutput { + std::fs::read_to_string(self)?.query_blocking(cli) + } +} diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs index f5f14150..259cf44a 100644 --- a/postgres/src/lib.rs +++ b/postgres/src/lib.rs @@ -32,7 +32,7 @@ pub use self::{ config::Config, driver::Driver, error::Error, - execute::{Execute, ExecuteMut}, + execute::{Execute, ExecuteBlocking, ExecuteMut}, from_sql::FromSqlExt, query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned}, session::Session, @@ -53,7 +53,7 @@ pub mod compat { //! # async fn convert(client: Client) -> Result<(), Error> { //! // prepare a statement and query for rows. //! let stmt = Statement::named("SELECT * from users", &[]).execute(&client).await?; - //! let mut stream = stmt.query(&client)?; + //! let mut stream = stmt.query(&client).await?; //! //! // assuming we want to spawn a tokio async task and handle the stream inside it. //! // but code below would not work as stream is a borrowed type with lending iterator implements. diff --git a/postgres/src/pipeline.rs b/postgres/src/pipeline.rs index c7048433..0806615c 100644 --- a/postgres/src/pipeline.rs +++ b/postgres/src/pipeline.rs @@ -8,14 +8,13 @@ //! - ordered response handling with a single stream type. reduce memory footprint and possibility of deadlock //! //! [`tokio-postgres`]: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining -use core::{ - future::{ready, Ready}, - ops::{Deref, DerefMut, Range}, -}; +use core::ops::{Deref, DerefMut, Range}; use postgres_protocol::message::{backend, frontend}; use xitca_io::bytes::BytesMut; +use crate::ExecuteBlocking; + use super::{ column::Column, driver::codec::{self, encode::Encode, Response}, @@ -241,26 +240,22 @@ where B: DerefMut, E: Encode, { - type ExecuteMutOutput = Ready; + type ExecuteMutOutput = Self::QueryMutOutput; type QueryMutOutput = Result<(), Error>; #[inline] fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput { - ready(self.query_mut(pipe)) + self.query_mut(pipe) } fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput { let len = pipe.buf.len(); + self.encode::(&mut pipe.buf) .map(|columns| pipe.columns.push(columns)) // revert back to last pipelined query when encoding error occurred. .inspect_err(|_| pipe.buf.truncate(len)) } - - #[inline] - fn execute_mut_blocking(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput { - self.query_mut(pipe) - } } pub struct PipelineQuery<'a, 'b> { @@ -280,8 +275,8 @@ where fn execute(self, cli: &C) -> Self::ExecuteOutput { let res = self.query(cli); Box::pin(async move { - let mut row_affected = 0; let mut res = res?; + let mut row_affected = 0; while let Some(item) = res.try_next().await? { row_affected += item.row_affected().await?; } @@ -289,6 +284,7 @@ where }) } + #[inline] fn query(self, cli: &C) -> Self::QueryOutput { let Pipeline { columns, mut buf } = self; assert!(!buf.is_empty()); @@ -306,9 +302,18 @@ where buf: buf.as_ref(), }) } +} + +impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE> +where + C: Query, + B: DerefMut, +{ + type ExecuteOutput = Result; + type QueryOutput = Result, Error>; fn execute_blocking(self, cli: &C) -> Result { - let mut res = self.query(cli)?; + let mut res = self.query_blocking(cli)?; let mut row_affected = 0; loop { @@ -331,6 +336,10 @@ where } } } + + fn query_blocking(self, cli: &C) -> Self::QueryOutput { + self.query(cli) + } } /// streaming response of pipeline. diff --git a/postgres/src/pool.rs b/postgres/src/pool.rs index 1f4a939a..b63e5895 100644 --- a/postgres/src/pool.rs +++ b/postgres/src/pool.rs @@ -311,16 +311,6 @@ where fn query_mut(self, cli: &'c mut PoolConnection) -> Self::QueryMutOutput { self.execute_mut(cli) } - - fn execute_mut_blocking(self, cli: &mut PoolConnection) -> ::Output { - match cli.conn().statements.get(self.stmt) { - Some(stmt) => Ok(stmt.clone()), - None => { - let stmt = self.execute_blocking(cli)?.leak(); - Ok(cli.insert_cache(self.stmt, stmt)) - } - } - } } pub enum StatementCacheFuture<'c> { diff --git a/postgres/src/prepare.rs b/postgres/src/prepare.rs index 2f4d018d..a1769b41 100644 --- a/postgres/src/prepare.rs +++ b/postgres/src/prepare.rs @@ -3,7 +3,7 @@ use postgres_types::{Field, Kind, Oid}; use super::{ client::Client, error::{DbError, Error, SqlState}, - execute::Execute, + execute::{Execute, ExecuteBlocking}, iter::AsyncLendingIterator, query::Query, statement::{Statement, StatementNamed}, @@ -29,7 +29,7 @@ impl Prepare for Client { let stmt = self.typeinfo_statement().await?; - let mut rows = stmt.bind([oid]).query(self)?; + let mut rows = stmt.bind([oid]).query(self).await?; let row = rows.try_next().await?.ok_or_else(Error::unexpected)?; let name = row.try_get::(0)?; @@ -75,7 +75,7 @@ impl Prepare for Client { let stmt = self.typeinfo_statement_blocking()?; - let rows = stmt.bind([oid]).query(self)?; + let rows = stmt.bind([oid]).query_blocking(self)?; let row = rows.into_iter().next().ok_or_else(Error::unexpected)??; let name = row.try_get::(0)?; @@ -162,7 +162,7 @@ const TYPEINFO_COMPOSITE_QUERY: StatementNamed = Statement::named( impl Client { async fn get_enum_variants(&self, oid: Oid) -> Result, Error> { let stmt = self.typeinfo_enum_statement().await?; - let mut rows = stmt.bind([oid]).query(self)?; + let mut rows = stmt.bind([oid]).query(self).await?; let mut res = Vec::new(); while let Some(row) = rows.try_next().await? { let variant = row.try_get(0)?; @@ -173,7 +173,7 @@ impl Client { async fn get_composite_fields(&self, oid: Oid) -> Result, Error> { let stmt = self.typeinfo_composite_statement().await?; - let mut rows = stmt.bind([oid]).query(self)?; + let mut rows = stmt.bind([oid]).query(self).await?; let mut fields = Vec::new(); while let Some(row) = rows.try_next().await? { let name = row.try_get(0)?; @@ -240,7 +240,7 @@ impl Client { fn get_enum_variants_blocking(&self, oid: Oid) -> Result, Error> { let stmt = self.typeinfo_enum_statement_blocking()?; stmt.bind([oid]) - .query(self)? + .query_blocking(self)? .into_iter() .map(|row| row?.try_get(0)) .collect() @@ -249,7 +249,7 @@ impl Client { fn get_composite_fields_blocking(&self, oid: Oid) -> Result, Error> { let stmt = self.typeinfo_composite_statement_blocking()?; stmt.bind([oid]) - .query(self)? + .query_blocking(self)? .into_iter() .map(|row| { let row = row?; diff --git a/postgres/src/query/stream.rs b/postgres/src/query/stream.rs index 0b17d810..b9678d4d 100644 --- a/postgres/src/query/stream.rs +++ b/postgres/src/query/stream.rs @@ -87,7 +87,7 @@ async fn try_next<'r>( /// # async fn collect(cli: Client) -> Result<(), Error> { /// // prepare statement and query for some users from database. /// let stmt = Statement::named("SELECT * FROM users", &[]).execute(&cli).await?; -/// let mut stream = stmt.query(&cli)?; +/// let mut stream = stmt.query(&cli).await?; /// /// // assuming users contain name column where it can be parsed to string. /// // then collecting all user name to a collection @@ -97,7 +97,7 @@ async fn try_next<'r>( /// } /// /// // the same operation with owned row stream can be simplified a bit: -/// let stream = stmt.query(&cli)?; +/// let stream = stmt.query(&cli).await?; /// // use extended api on top of AsyncIterator to collect user names to collection /// let strings_2: Vec = RowStreamOwned::from(stream).map_ok(|row| row.get("name")).try_collect().await?; /// diff --git a/postgres/src/statement.rs b/postgres/src/statement.rs index 4975a9a6..eab2c2c8 100644 --- a/postgres/src/statement.rs +++ b/postgres/src/statement.rs @@ -131,7 +131,7 @@ impl Statement { /// let stmt = Statement::named("SELECT * FROM users WHERE id = $1 AND age = $2", &[Type::INT4, Type::INT4]) /// .execute(&cli).await?; /// // bind statement to typed value parameters and start query - /// let row_stream = stmt.bind([9527_i32, 18]).query(&cli)?; + /// let row_stream = stmt.bind([9527_i32, 18]).query(&cli).await?; /// # Ok(()) /// # } /// ``` @@ -397,7 +397,7 @@ mod test { drop(stmt); - let mut stream = stmt_raw.query(&cli).unwrap(); + let mut stream = stmt_raw.query(&cli).await.unwrap(); let e = stream.try_next().await.err().unwrap(); diff --git a/postgres/tests/runtime.rs b/postgres/tests/runtime.rs index 03baa859..1502877b 100644 --- a/postgres/tests/runtime.rs +++ b/postgres/tests/runtime.rs @@ -18,7 +18,7 @@ async fn connect(s: &str) -> Client { async fn smoke_test(s: &str) { let client = connect(s).await; let stmt = Statement::named("SELECT $1::INT", &[]).execute(&client).await.unwrap(); - let mut stream = stmt.bind([1i32]).query(&client).unwrap(); + let mut stream = stmt.bind([1i32]).query(&client).await.unwrap(); let row = stream.try_next().await.unwrap().unwrap(); assert_eq!(row.get::(0), 1i32); } @@ -179,7 +179,7 @@ async fn poll_after_response_finish() { tokio::spawn(drv.into_future()); - let mut stream = "SELECT 1".query(&cli).unwrap(); + let mut stream = "SELECT 1".query(&cli).await.unwrap(); stream.try_next().await.unwrap().unwrap(); @@ -194,11 +194,7 @@ async fn poll_after_response_finish() { async fn query_portal() { let mut client = connect("postgres://postgres:postgres@localhost:5432").await; - "CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT); - INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');" - .execute(&client) - .await - .unwrap(); + std::path::Path::new("samples/test.sql").execute(&client).await.unwrap(); let transaction = client.transaction().await.unwrap(); @@ -245,6 +241,7 @@ async fn query_unnamed_with_transaction() { ) .bind_dyn(&[&"alice", &20i32, &"bob", &30i32, &"carol", &40i32]) .query(&transaction) + .await .unwrap(); let mut inserted_values = Vec::new(); @@ -268,6 +265,7 @@ async fn query_unnamed_with_transaction() { ) .bind_dyn(&[&"alice", &50i32]) .query(&transaction) + .await .unwrap(); let row = stream.try_next().await.unwrap().unwrap(); @@ -288,6 +286,7 @@ async fn query_unnamed_with_transaction() { let mut stream = Statement::unnamed("UPDATE foo set age = 33", &[]) .bind_dyn(&[]) .query(&transaction) + .await .unwrap(); assert!(stream.try_next().await.unwrap().is_none()); } diff --git a/postgres/tests/sync.rs b/postgres/tests/sync.rs index 9ef1f176..1c2ae2cc 100644 --- a/postgres/tests/sync.rs +++ b/postgres/tests/sync.rs @@ -5,7 +5,7 @@ use xitca_postgres::{ pipeline::Pipeline, statement::Statement, types::Type, - Client, Execute, ExecuteMut, Postgres, + Client, ExecuteBlocking, ExecuteMut, Postgres, }; fn connect() -> Client { @@ -32,7 +32,7 @@ fn query_unnamed() { &[Type::TEXT, Type::INT4, Type::TEXT, Type::INT4, Type::TEXT, Type::INT4], ) .bind_dyn(&[&"alice", &20i32, &"bob", &30i32, &"carol", &40i32]) - .query(&cli) + .query_blocking(&cli) .unwrap() .into_iter();