From 49f10372c0fb1379f3dde9c4074400de484980a0 Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Wed, 14 Dec 2022 17:19:09 -0800 Subject: [PATCH 1/6] Problem: not using readonly flag for SPI commands We're potentially losing out on some optimizations. Solution: track use of mutating commands In my unscientific benchmarks, I saw a 4-5% performance increase when doing repetitive SELECT queries as read-only vs. read-write. This is an API breaking change (again) but it would avoid leaving performance on the table for little convenience gain. --- pgx-tests/src/tests/bgworker_tests.rs | 4 +- pgx-tests/src/tests/spi_tests.rs | 10 ++-- pgx-tests/src/tests/srf_tests.rs | 4 +- pgx-tests/src/tests/struct_type_tests.rs | 2 +- pgx/src/spi.rs | 63 +++++++++++++----------- 5 files changed, 44 insertions(+), 39 deletions(-) diff --git a/pgx-tests/src/tests/bgworker_tests.rs b/pgx-tests/src/tests/bgworker_tests.rs index 4a537a72d..dcfb78cb7 100644 --- a/pgx-tests/src/tests/bgworker_tests.rs +++ b/pgx-tests/src/tests/bgworker_tests.rs @@ -25,7 +25,7 @@ pub extern "C" fn bgworker(arg: pg_sys::Datum) { if arg > 0 { BackgroundWorker::transaction(|| { Spi::run("CREATE TABLE tests.bgworker_test (v INTEGER);"); - Spi::execute(|client| { + Spi::execute(|mut client| { client.update( "INSERT INTO tests.bgworker_test VALUES ($1);", None, @@ -70,7 +70,7 @@ pub extern "C" fn bgworker_return_value(arg: pg_sys::Datum) { }; while BackgroundWorker::wait_latch(Some(Duration::from_millis(100))) {} BackgroundWorker::transaction(|| { - Spi::execute(|c| { + Spi::execute(|mut c| { c.update( "INSERT INTO tests.bgworker_test_return VALUES ($1)", None, diff --git a/pgx-tests/src/tests/spi_tests.rs b/pgx-tests/src/tests/spi_tests.rs index 8626c4011..7444478f0 100644 --- a/pgx-tests/src/tests/spi_tests.rs +++ b/pgx-tests/src/tests/spi_tests.rs @@ -177,7 +177,7 @@ mod tests { #[pg_test] fn test_inserting_null() { - Spi::execute(|client| { + Spi::execute(|mut client| { client.update("CREATE TABLE tests.null_test (id uuid)", None, None); }); let result = Spi::get_one_with_args::( @@ -189,7 +189,7 @@ mod tests { #[pg_test] fn test_cursor() { - Spi::execute(|client| { + Spi::execute(|mut client| { client.update("CREATE TABLE tests.cursor_table (id int)", None, None); client.update( "INSERT INTO tests.cursor_table (id) \ @@ -211,7 +211,7 @@ mod tests { #[pg_test] fn test_cursor_by_name() { - let cursor_name = Spi::connect(|client| { + let cursor_name = Spi::connect(|mut client| { client.update("CREATE TABLE tests.cursor_table (id int)", None, None); client.update( "INSERT INTO tests.cursor_table (id) \ @@ -275,7 +275,7 @@ mod tests { assert_eq!(res.column_name(2).unwrap(), "b"); }); - Spi::execute(|client| { + Spi::execute(|mut client| { let res = client.update("SET TIME ZONE 'PST8PDT'", None, None); assert_eq!(0, res.columns()); @@ -291,7 +291,7 @@ mod tests { #[pg_test] fn test_spi_non_mut() { // Ensures update and cursor APIs do not need mutable reference to SpiClient - Spi::execute(|client| { + Spi::execute(|mut client| { client.update("SELECT 1", None, None); let cursor = client.open_cursor("SELECT 1", None).detach_into_name(); client.find_cursor(&cursor); diff --git a/pgx-tests/src/tests/srf_tests.rs b/pgx-tests/src/tests/srf_tests.rs index eaa6ce73d..26d621fdd 100644 --- a/pgx-tests/src/tests/srf_tests.rs +++ b/pgx-tests/src/tests/srf_tests.rs @@ -177,7 +177,7 @@ mod tests { #[pg_test] fn test_srf_setof_datum_detoasting_with_borrow() { - let cnt = Spi::connect(|client| { + let cnt = Spi::connect(|mut client| { // build up a table with one large column that Postgres will be forced to TOAST client.update("CREATE TABLE test_srf_datum_detoasting AS SELECT array_to_string(array_agg(g),' ') s FROM (SELECT 'a' g FROM generate_series(1, 1000000)) x;", None, None); @@ -195,7 +195,7 @@ mod tests { #[pg_test] fn test_srf_table_datum_detoasting_with_borrow() { - let cnt = Spi::connect(|client| { + let cnt = Spi::connect(|mut client| { // build up a table with one large column that Postgres will be forced to TOAST client.update("CREATE TABLE test_srf_datum_detoasting AS SELECT array_to_string(array_agg(g),' ') s FROM (SELECT 'a' g FROM generate_series(1, 1000000)) x;", None, None); diff --git a/pgx-tests/src/tests/struct_type_tests.rs b/pgx-tests/src/tests/struct_type_tests.rs index 5b129435b..779101ff5 100644 --- a/pgx-tests/src/tests/struct_type_tests.rs +++ b/pgx-tests/src/tests/struct_type_tests.rs @@ -128,7 +128,7 @@ mod tests { #[pg_test] fn test_complex_storage_and_retrieval() { - let complex = Spi::connect(|client| { + let complex = Spi::connect(|mut client| { Ok(client.update( "CREATE TABLE complex_test AS SELECT s as id, (s || '.0, 2.0' || s)::complex as value FROM generate_series(1, 1000) s;\ SELECT value FROM complex_test ORDER BY id;", None, None).first().get_one::>()) diff --git a/pgx/src/spi.rs b/pgx/src/spi.rs index a2ec55c53..62de0c45d 100644 --- a/pgx/src/spi.rs +++ b/pgx/src/spi.rs @@ -106,7 +106,22 @@ impl TryFrom for SpiError { pub struct Spi; // TODO: should `'conn` be invariant? -pub struct SpiClient<'conn>(PhantomData<&'conn SpiConnection>); +pub struct SpiClient<'conn> { + phantom: PhantomData<&'conn SpiConnection>, + // This field indicates whether queries be readonly. Unless any `update` has been used + // `readonly` will be `true`. + // Postgres docs say: + // + // It is generally unwise to mix read-only and read-write commands within a single function + // using SPI; that could result in very confusing behavior, since the read-only queries + // would not see the results of any database updates done by the read-write queries. + // + // TODO: Alternatively, we can detect if the command counter (or something?) has incremented and if yes + // then we set read_only=false, else we can set it to true. + // However, we would still need to remember the previous value, which will be larger than the boolean. + // So, unless somebody will send commands to Postgres bypassing this SPI API, this flag seems sufficient. + readonly: bool, +} /// a struct to manage our SPI connection lifetime struct SpiConnection(PhantomData<*mut ()>); @@ -131,7 +146,7 @@ impl Drop for SpiConnection { impl SpiConnection { /// Return a client that with a lifetime scoped to this connection. fn client(&self) -> SpiClient<'_> { - SpiClient(PhantomData) + SpiClient { phantom: PhantomData, readonly: true } } } @@ -159,8 +174,8 @@ pub struct SpiHeapTupleData { impl Spi { pub fn get_one(query: &str) -> Option { - Spi::connect(|client| { - let result = client.select(query, Some(1), None).first().get_one(); + Spi::connect(|mut client| { + let result = client.update(query, Some(1), None).first().get_one(); Ok(result) }) } @@ -168,8 +183,8 @@ impl Spi { pub fn get_two( query: &str, ) -> (Option, Option) { - Spi::connect(|client| { - let (a, b) = client.select(query, Some(1), None).first().get_two::(); + Spi::connect(|mut client| { + let (a, b) = client.update(query, Some(1), None).first().get_two::(); Ok(Some((a, b))) }) .unwrap() @@ -182,8 +197,8 @@ impl Spi { >( query: &str, ) -> (Option, Option, Option) { - Spi::connect(|client| { - let (a, b, c) = client.select(query, Some(1), None).first().get_three::(); + Spi::connect(|mut client| { + let (a, b, c) = client.update(query, Some(1), None).first().get_three::(); Ok(Some((a, b, c))) }) .unwrap() @@ -193,15 +208,15 @@ impl Spi { query: &str, args: Vec<(PgOid, Option)>, ) -> Option { - Spi::connect(|client| Ok(client.select(query, Some(1), Some(args)).first().get_one())) + Spi::connect(|mut client| Ok(client.update(query, Some(1), Some(args)).first().get_one())) } pub fn get_two_with_args( query: &str, args: Vec<(PgOid, Option)>, ) -> (Option, Option) { - Spi::connect(|client| { - let (a, b) = client.select(query, Some(1), Some(args)).first().get_two::(); + Spi::connect(|mut client| { + let (a, b) = client.update(query, Some(1), Some(args)).first().get_two::(); Ok(Some((a, b))) }) .unwrap() @@ -215,9 +230,9 @@ impl Spi { query: &str, args: Vec<(PgOid, Option)>, ) -> (Option, Option, Option) { - Spi::connect(|client| { + Spi::connect(|mut client| { let (a, b, c) = - client.select(query, Some(1), Some(args)).first().get_three::(); + client.update(query, Some(1), Some(args)).first().get_three::(); Ok(Some((a, b, c))) }) .unwrap() @@ -238,7 +253,7 @@ impl Spi { /// /// The statement runs in read/write mode pub fn run_with_args(query: &str, args: Option)>>) { - Spi::execute(|client| { + Spi::execute(|mut client| { client.update(query, None, args); }) } @@ -253,7 +268,7 @@ impl Spi { query: &str, args: Option)>>, ) -> Json { - Spi::connect(|client| { + Spi::connect(|mut client| { let table = client.update(&format!("EXPLAIN (format json) {}", query), None, args).first(); Ok(Some(table.get_one::().expect("failed to get json EXPLAIN result"))) @@ -309,28 +324,18 @@ impl<'a> SpiClient<'a> { limit: Option, args: Option)>>, ) -> SpiTupleTable { - // Postgres docs say: - // - // It is generally unwise to mix read-only and read-write commands within a single function - // using SPI; that could result in very confusing behavior, since the read-only queries - // would not see the results of any database updates done by the read-write queries. - // - // As such, we don't actually set read-only to true here - - // TODO: can we detect if the command counter (or something?) has incremented and if yes - // then we set read_only=false, else we can set it to true? - // Is this even a good idea? - self.execute(query, false, limit, args) + self.execute(query, self.readonly, limit, args) } /// perform any query (including utility statements) that modify the database in some way pub fn update( - &self, + &mut self, query: &str, limit: Option, args: Option)>>, ) -> SpiTupleTable { - self.execute(query, false, limit, args) + self.readonly = false; + self.execute(query, self.readonly, limit, args) } fn execute( From 01d1d24d435866446e24d2fea8058bab9b9a5a52 Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Wed, 14 Dec 2022 17:25:54 -0800 Subject: [PATCH 2/6] Problem: cursors may or may not be readonly-friendly However, we always assume them to be non-readonly. Solution: split the API to `open_cursor` and `open_cursor_mut` This way we can ensure we're squeezing out performance in the right place, when feasible. --- pgx-tests/src/tests/spi_tests.rs | 21 +++++++++++++++++++++ pgx/src/spi.rs | 24 +++++++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pgx-tests/src/tests/spi_tests.rs b/pgx-tests/src/tests/spi_tests.rs index 7444478f0..95d6df4f5 100644 --- a/pgx-tests/src/tests/spi_tests.rs +++ b/pgx-tests/src/tests/spi_tests.rs @@ -209,6 +209,27 @@ mod tests { }); } + #[pg_test] + fn test_cursor_mut() { + Spi::execute(|mut client| { + client.update("CREATE TABLE tests.cursor_table (id int)", None, None); + + let mut portal = client.open_cursor_mut( + "INSERT INTO tests.cursor_table (id) \ + SELECT i FROM generate_series(1, 10) AS t(i) RETURNING id", + None, + ); + + fn sum_all(table: pgx::SpiTupleTable) -> i32 { + table.map(|r| r.by_ordinal(1).unwrap().value::().unwrap()).sum() + } + assert_eq!(sum_all(portal.fetch(3)), 1 + 2 + 3); + assert_eq!(sum_all(portal.fetch(3)), 4 + 5 + 6); + assert_eq!(sum_all(portal.fetch(3)), 7 + 8 + 9); + assert_eq!(sum_all(portal.fetch(3)), 10); + }); + } + #[pg_test] fn test_cursor_by_name() { let cursor_name = Spi::connect(|mut client| { diff --git a/pgx/src/spi.rs b/pgx/src/spi.rs index 62de0c45d..0f1c0338e 100644 --- a/pgx/src/spi.rs +++ b/pgx/src/spi.rs @@ -412,6 +412,28 @@ impl<'a> SpiClient<'a> { &self, query: &str, args: Option)>>, + ) -> SpiCursor { + self.open_cursor_impl(query, args) + } + + /// Set up a cursor that will execute the specified update (mutating) query + /// + /// Rows may be then fetched using [`SpiCursor::fetch`]. + /// + /// See [`SpiCursor`] docs for usage details. + pub fn open_cursor_mut( + &mut self, + query: &str, + args: Option)>>, + ) -> SpiCursor { + self.readonly = false; + self.open_cursor_impl(query, args) + } + + fn open_cursor_impl( + &self, + query: &str, + args: Option)>>, ) -> SpiCursor { let src = std::ffi::CString::new(query).expect("query contained a null byte"); let args = args.unwrap_or_default(); @@ -447,7 +469,7 @@ impl<'a> SpiClient<'a> { argtypes.as_mut_ptr(), datums.as_mut_ptr(), nulls.as_ptr(), - false, + self.readonly, 0, ) }) From d6695bcda1ff10c24ad8131bf6ead117da97cbaa Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Tue, 20 Dec 2022 10:02:13 -0800 Subject: [PATCH 3/6] Problem: one example fails to compile Solution: fix the signature --- pgx-examples/schemas/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgx-examples/schemas/src/lib.rs b/pgx-examples/schemas/src/lib.rs index 8408159e7..34945059e 100644 --- a/pgx-examples/schemas/src/lib.rs +++ b/pgx-examples/schemas/src/lib.rs @@ -100,7 +100,7 @@ mod tests { #[pg_test] fn test_my_some_schema_type() { - Spi::connect(|c| { + Spi::connect(|mut c| { // "MySomeSchemaType" is in 'some_schema', so it needs to be discoverable c.update("SET search_path TO some_schema,public", None, None); assert_eq!( From a7ed61c7e06c2c3a21c0922bd526fef1d9d85e96 Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Tue, 20 Dec 2022 15:13:42 -0800 Subject: [PATCH 4/6] Rename _phantom to __marker as per @eeeebbbbrrrr's request --- pgx/src/spi.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pgx/src/spi.rs b/pgx/src/spi.rs index 04e57950e..f65a5433e 100644 --- a/pgx/src/spi.rs +++ b/pgx/src/spi.rs @@ -303,7 +303,7 @@ impl<'a> Query for &'a str { ) }) .ok_or(Error::PortalIsNull)?; - Ok(SpiCursor { ptr, _phantom: PhantomData }) + Ok(SpiCursor { ptr, __marker: PhantomData }) } } @@ -546,7 +546,7 @@ impl<'a> SpiClient<'a> { let ptr = NonNull::new(unsafe { pg_sys::SPI_cursor_find(name.as_pg_cstr()) }) .ok_or(Error::CursorNotFound(name.to_string()))?; - Ok(SpiCursor { ptr, _phantom: PhantomData }) + Ok(SpiCursor { ptr, __marker: PhantomData }) } } @@ -605,7 +605,7 @@ type CursorName = String; /// ``` pub struct SpiCursor<'client> { ptr: NonNull, - _phantom: PhantomData<&'client SpiClient<'client>>, + __marker: PhantomData<&'client SpiClient<'client>>, } impl SpiCursor<'_> { @@ -803,7 +803,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> { ) }) .ok_or(Error::PortalIsNull)?; - Ok(SpiCursor { ptr, _phantom: PhantomData }) + Ok(SpiCursor { ptr, __marker: PhantomData }) } } From 2d2cd6f7e524c74a70e4aedc26495fba653c9159 Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Tue, 20 Dec 2022 15:18:35 -0800 Subject: [PATCH 5/6] Problem: passing readonly flag to `Query::execute` It's unnecessary as client already has it (and error-prone) Solution: get it from the client --- pgx/src/spi.rs | 41 +++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/pgx/src/spi.rs b/pgx/src/spi.rs index f65a5433e..12f69b5a2 100644 --- a/pgx/src/spi.rs +++ b/pgx/src/spi.rs @@ -188,7 +188,6 @@ pub trait Query { fn execute( self, client: &SpiClient, - read_only: bool, limit: Option, arguments: Self::Arguments, ) -> Self::Result; @@ -208,11 +207,10 @@ impl<'a> Query for &'a String { fn execute( self, client: &SpiClient, - read_only: bool, limit: Option, arguments: Self::Arguments, ) -> Self::Result { - self.as_str().execute(client, read_only, limit, arguments) + self.as_str().execute(client, limit, arguments) } fn open_cursor<'c: 'cc, 'cc>( @@ -237,8 +235,7 @@ impl<'a> Query for &'a str { fn execute( self, - _client: &SpiClient, - read_only: bool, + client: &SpiClient, limit: Option, arguments: Self::Arguments, ) -> Self::Result { @@ -264,13 +261,15 @@ impl<'a> Query for &'a str { argtypes.as_mut_ptr(), datums.as_mut_ptr(), nulls.as_ptr(), - read_only, + client.readonly, limit.unwrap_or(0), ) } } // SAFETY: arguments are prepared above - None => unsafe { pg_sys::SPI_execute(src.as_ptr(), read_only, limit.unwrap_or(0)) }, + None => unsafe { + pg_sys::SPI_execute(src.as_ptr(), client.readonly, limit.unwrap_or(0)) + }, }; SpiClient::prepare_tuple_table(status_code) @@ -471,7 +470,7 @@ impl Spi { impl<'a> SpiClient<'a> { /// perform a SELECT statement pub fn select(&self, query: Q, limit: Option, args: Q::Arguments) -> Q::Result { - self.execute(query, self.readonly, limit, args) + self.execute(query, limit, args) } /// perform any query (including utility statements) that modify the database in some way @@ -482,17 +481,11 @@ impl<'a> SpiClient<'a> { args: Q::Arguments, ) -> Q::Result { self.readonly = false; - self.execute(query, self.readonly, limit, args) + self.execute(query, limit, args) } - fn execute( - &self, - query: Q, - read_only: bool, - limit: Option, - args: Q::Arguments, - ) -> Q::Result { - query.execute(&self, read_only, limit, args) + fn execute(&self, query: Q, limit: Option, args: Q::Arguments) -> Q::Result { + query.execute(&self, limit, args) } fn prepare_tuple_table(status_code: i32) -> SpiTupleTable { @@ -689,11 +682,10 @@ impl<'a> Query for &'a OwnedPreparedStatement { fn execute( self, client: &SpiClient, - read_only: bool, limit: Option, arguments: Self::Arguments, ) -> Self::Result { - (&self.0).execute(client, read_only, limit, arguments) + (&self.0).execute(client, limit, arguments) } fn open_cursor<'c: 'cc, 'cc>( @@ -712,11 +704,10 @@ impl Query for OwnedPreparedStatement { fn execute( self, client: &SpiClient, - read_only: bool, limit: Option, arguments: Self::Arguments, ) -> Self::Result { - (&self.0).execute(client, read_only, limit, arguments) + (&self.0).execute(client, limit, arguments) } fn open_cursor<'c: 'cc, 'cc>( @@ -749,8 +740,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> { fn execute( self, - _client: &SpiClient, - read_only: bool, + client: &SpiClient, limit: Option, arguments: Self::Arguments, ) -> Self::Result { @@ -775,7 +765,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> { self.plan, datums.as_mut_ptr(), nulls.as_mut_ptr(), - read_only, + client.readonly, limit.unwrap_or(0), ) }; @@ -814,11 +804,10 @@ impl<'a> Query for PreparedStatement<'a> { fn execute( self, client: &SpiClient, - read_only: bool, limit: Option, arguments: Self::Arguments, ) -> Self::Result { - (&self).execute(client, read_only, limit, arguments) + (&self).execute(client, limit, arguments) } fn open_cursor<'c: 'cc, 'cc>( From 1038cb7379e3a067d88cc3c672749c706afb4bfb Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Tue, 20 Dec 2022 15:27:33 -0800 Subject: [PATCH 6/6] Problem: how do we know we enforce readonly? Solution: write a test that shows it --- pgx-tests/src/tests/spi_tests.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pgx-tests/src/tests/spi_tests.rs b/pgx-tests/src/tests/spi_tests.rs index 7ff383866..6744fbb79 100644 --- a/pgx-tests/src/tests/spi_tests.rs +++ b/pgx-tests/src/tests/spi_tests.rs @@ -413,4 +413,20 @@ mod tests { fn test_option() { assert!(Spi::get_one::("SELECT NULL::integer").unwrap().is_none()); } + + #[pg_test(error = "CREATE TABLE is not allowed in a non-volatile function")] + fn test_readwrite_in_readonly() { + // This is supposed to run in read-only + Spi::connect(|client| client.select("CREATE TABLE a ()", None, None)); + } + + #[pg_test] + fn test_readwrite_in_select_readwrite() { + Spi::connect(|mut client| { + // This is supposed to switch connection to read-write and run it there + client.update("CREATE TABLE a (id INT)", None, None); + // This is supposed to run in read-write + client.select("INSERT INTO a VALUES (1)", None, None); + }); + } }