From 35d0a647812d003176a206935d0474ef8d826d9d Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Thu, 20 May 2021 15:09:36 -0400 Subject: [PATCH 1/6] add transaction example to ClientSession doc --- src/client/session/mod.rs | 55 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 644315f0e..a7573c52f 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -37,6 +37,57 @@ lazy_static! { /// /// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread /// or process at a time. +/// +/// ## Transactions +/// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a +/// `ClientSession`. To begin a transaction, call [ClientSession::start_transaction] on a +/// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the +/// transaction. +/// +/// ```rust +/// # use mongodb::{ +/// # bson::doc, +/// # error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, +/// # Client, +/// # ClientSession, +/// # Collection, +/// # }; +/// # async fn do_stuff() -> Result<()> { +/// # let client = Client::with_uri_str("mongodb://example.com").await?; +/// # let coll = client.database("foo").collection("bar"); +/// # let mut session = client.start_session(None).await?; +/// session.start_transaction(None).await?; +/// // A "TransientTransactionError" label indicates that the entire transaction can be retried +/// // with a reasonable expectation that it will succeed. +/// while let Err(error) = execute_transaction(&coll, &mut session).await { +/// if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) { +/// break; +/// } +/// } +/// # Ok(()) +/// # } +/// +/// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> { +/// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?; +/// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?; +/// // An "UnknownTransactionCommitResult" label indicates that it is unknown whether the +/// // commit has satisfied the write concern associated with the transaction. If an error +/// // with this label is returned, it is safe to retry the commit until the write concern is +/// // satisfied or an error without the label is returned. +/// loop { +/// let result = session.commit_transaction().await; +/// if let Err(ref error) = result { +/// if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { +/// continue; +/// } +/// } +/// result? +/// } +/// } +/// ``` +// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded +// transactions are supported on 4.2+ +/// Note: transactions are currently not supported on sharded clusters. #[derive(Clone, Debug)] pub struct ClientSession { cluster_time: Option, @@ -344,14 +395,14 @@ impl ClientSession { /// # let coll = client.database("foo").collection::("bar"); /// # let mut session = client.start_session(None).await?; /// session.start_transaction(None).await?; - /// match execute_transaction(coll, &mut session).await { + /// match execute_transaction(&coll, &mut session).await { /// Ok(_) => session.commit_transaction().await?, /// Err(_) => session.abort_transaction().await?, /// } /// # Ok(()) /// # } /// - /// async fn execute_transaction(coll: Collection, session: &mut ClientSession) -> Result<()> { + /// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> { /// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?; /// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?; /// Ok(()) From 55b4e0ea73b126d441a50a1aa51b7beef212d4f6 Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Thu, 27 May 2021 16:27:01 -0400 Subject: [PATCH 2/6] RUST-813 Add documentation examples for transactions --- tests/transaction_examples.rs | 89 +++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/transaction_examples.rs diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs new file mode 100644 index 000000000..773622f1c --- /dev/null +++ b/tests/transaction_examples.rs @@ -0,0 +1,89 @@ +#![allow(dead_code)] + +extern crate mongodb; + +use futures::Future; +use mongodb::{ + bson::{doc, Document}, + error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, + options::{Acknowledgment, TransactionOptions, WriteConcern}, + ClientSession, +}; + +async fn execute_transaction(session: &mut ClientSession) -> Result<()> { + let client = session.client(); + let employees = client.database("hr").collection::("employees"); + let events = client + .database("reporting") + .collection::("events"); + + employees + .update_one_with_session( + doc! { "employee": 3 }, + doc! { "$set": { "status": "Inactive" } }, + None, + session, + ) + .await?; + events + .insert_one_with_session( + doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } }, + None, + session, + ) + .await?; + + commit_with_retry(session).await +} + +async fn execute_transaction_with_retry( + execute_transaction: F, + session: &mut ClientSession, +) -> Result<()> +where + F: Fn(&mut ClientSession) -> G, + G: Future>, +{ + while let Err(err) = execute_transaction(session).await { + println!("Transaction aborted. Error returned during transaction."); + if err.contains_label(TRANSIENT_TRANSACTION_ERROR) { + println!("Encountered TransientTransactionError, retrying transaction."); + continue; + } else { + return Err(err); + } + } + Ok(()) +} + +async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { + loop { + match session.commit_transaction().await { + Ok(()) => { + println!("Transaction committed."); + return Ok(()); + } + Err(err) => { + if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { + println!( + "Encountered UnknownTransactionCommitResult, retrying commit operation." + ); + continue; + } else { + println!("Encountered non-retryable error during commit."); + return Err(err); + } + } + } + } +} + +async fn update_employee_info(session: &mut ClientSession) -> Result<()> { + // TODO RUST-824 add snapshot read concern + let transaction_options = TransactionOptions::builder() + .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) + .build(); + session.start_transaction(transaction_options).await?; + + execute_transaction_with_retry(execute_transaction, session).await +} From 6a6e8a179e8fc66b698933c926508817c76fbe95 Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Thu, 27 May 2021 17:01:52 -0400 Subject: [PATCH 3/6] refactor --- tests/transaction_examples.rs | 82 ++++++++++++++++------------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs index 773622f1c..52bbd098a 100644 --- a/tests/transaction_examples.rs +++ b/tests/transaction_examples.rs @@ -10,32 +10,6 @@ use mongodb::{ ClientSession, }; -async fn execute_transaction(session: &mut ClientSession) -> Result<()> { - let client = session.client(); - let employees = client.database("hr").collection::("employees"); - let events = client - .database("reporting") - .collection::("events"); - - employees - .update_one_with_session( - doc! { "employee": 3 }, - doc! { "$set": { "status": "Inactive" } }, - None, - session, - ) - .await?; - events - .insert_one_with_session( - doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } }, - None, - session, - ) - .await?; - - commit_with_retry(session).await -} - async fn execute_transaction_with_retry( execute_transaction: F, session: &mut ClientSession, @@ -57,25 +31,45 @@ where } async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { - loop { - match session.commit_transaction().await { - Ok(()) => { - println!("Transaction committed."); - return Ok(()); - } - Err(err) => { - if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { - println!( - "Encountered UnknownTransactionCommitResult, retrying commit operation." - ); - continue; - } else { - println!("Encountered non-retryable error during commit."); - return Err(err); - } - } + while let Err(err) = session.commit_transaction().await { + if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { + println!( + "Encountered UnknownTransactionCommitResult, retrying commit operation." + ); + continue; + } else { + println!("Encountered non-retryable error during commit."); + return Err(err); } } + println!("Transaction committed."); + Ok(()) +} + +async fn execute_employee_info_transaction(session: &mut ClientSession) -> Result<()> { + let client = session.client(); + let employees = client.database("hr").collection::("employees"); + let events = client + .database("reporting") + .collection::("events"); + + employees + .update_one_with_session( + doc! { "employee": 3 }, + doc! { "$set": { "status": "Inactive" } }, + None, + session, + ) + .await?; + events + .insert_one_with_session( + doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } }, + None, + session, + ) + .await?; + + commit_with_retry(session).await } async fn update_employee_info(session: &mut ClientSession) -> Result<()> { @@ -85,5 +79,5 @@ async fn update_employee_info(session: &mut ClientSession) -> Result<()> { .build(); session.start_transaction(transaction_options).await?; - execute_transaction_with_retry(execute_transaction, session).await + execute_transaction_with_retry(execute_employee_info_transaction, session).await } From 6f3ac9cf25dbfaec5bcb53155134ce6709e9d46b Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Thu, 27 May 2021 17:03:37 -0400 Subject: [PATCH 4/6] add snapshot read concern --- tests/transaction_examples.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs index 52bbd098a..cf30518e2 100644 --- a/tests/transaction_examples.rs +++ b/tests/transaction_examples.rs @@ -6,7 +6,7 @@ use futures::Future; use mongodb::{ bson::{doc, Document}, error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, - options::{Acknowledgment, TransactionOptions, WriteConcern}, + options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern}, ClientSession, }; @@ -33,9 +33,7 @@ where async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { while let Err(err) = session.commit_transaction().await { if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { - println!( - "Encountered UnknownTransactionCommitResult, retrying commit operation." - ); + println!("Encountered UnknownTransactionCommitResult, retrying commit operation."); continue; } else { println!("Encountered non-retryable error during commit."); @@ -73,8 +71,8 @@ async fn execute_employee_info_transaction(session: &mut ClientSession) -> Resul } async fn update_employee_info(session: &mut ClientSession) -> Result<()> { - // TODO RUST-824 add snapshot read concern let transaction_options = TransactionOptions::builder() + .read_concern(ReadConcern::snapshot()) .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) .build(); session.start_transaction(transaction_options).await?; From 42607c43faf2889b509eb423f7418cb1840767d9 Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Fri, 28 May 2021 13:41:18 -0400 Subject: [PATCH 5/6] cr feedback --- src/client/options/resolver_config.rs | 4 +- src/client/session/mod.rs | 43 ++++++++++++++----- src/runtime/mod.rs | 2 +- .../spec/initial_dns_seedlist_discovery.rs | 2 +- src/test/util/failpoint.rs | 2 +- tests/transaction_examples.rs | 2 - 6 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/client/options/resolver_config.rs b/src/client/options/resolver_config.rs index 5bb43b7e8..67d556c2a 100644 --- a/src/client/options/resolver_config.rs +++ b/src/client/options/resolver_config.rs @@ -13,7 +13,7 @@ impl ResolverConfig { /// Creates a default configuration, using 1.1.1.1, 1.0.0.1 and 2606:4700:4700::1111, /// 2606:4700:4700::1001 (thank you, Cloudflare). /// - /// Please see: https://www.cloudflare.com/dns/ + /// Please see: pub fn cloudflare() -> Self { ResolverConfig { inner: TrustDnsResolverConfig::cloudflare(), @@ -34,7 +34,7 @@ impl ResolverConfig { /// Creates a configuration, using 9.9.9.9, 149.112.112.112 and 2620:fe::fe, 2620:fe::fe:9, the /// “secure” variants of the quad9 settings (thank you, Quad9). /// - /// Please see: https://www.quad9.net/faq/ + /// Please see: pub fn quad9() -> Self { ResolverConfig { inner: TrustDnsResolverConfig::quad9(), diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index a7573c52f..1287e0e2a 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -39,24 +39,34 @@ lazy_static! { /// or process at a time. /// /// ## Transactions +/// Transactions are used to execute a series of operations across multiple documents and +/// collections atomically. For more information about when and how to use transactions in MongoDB, +/// see the [manual](https://docs.mongodb.com/manual/core/transactions/). +/// /// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a -/// `ClientSession`. To begin a transaction, call [ClientSession::start_transaction] on a +/// `ClientSession`. To begin a transaction, call [`ClientSession::start_transaction`] on a /// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the /// transaction. /// /// ```rust -/// # use mongodb::{ -/// # bson::doc, -/// # error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, -/// # Client, -/// # ClientSession, -/// # Collection, -/// # }; +/// use mongodb::{ +/// bson::doc, +/// error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, +/// options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern}, +/// # Client, +/// ClientSession, +/// Collection, +/// }; +/// /// # async fn do_stuff() -> Result<()> { /// # let client = Client::with_uri_str("mongodb://example.com").await?; /// # let coll = client.database("foo").collection("bar"); -/// # let mut session = client.start_session(None).await?; -/// session.start_transaction(None).await?; +/// let mut session = client.start_session(None).await?; +/// let options = TransactionOptions::builder() +/// .read_concern(ReadConcern::majority()) +/// .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) +/// .build(); +/// session.start_transaction(options).await?; /// // A "TransientTransactionError" label indicates that the entire transaction can be retried /// // with a reasonable expectation that it will succeed. /// while let Err(error) = execute_transaction(&coll, &mut session).await { @@ -87,7 +97,7 @@ lazy_static! { /// ``` // TODO RUST-122 Remove this note and adjust the above description to indicate that sharded // transactions are supported on 4.2+ -/// Note: transactions are currently not supported on sharded clusters. +/// Note: the driver does not currently support transactions on sharded clusters. #[derive(Clone, Debug)] pub struct ClientSession { cluster_time: Option, @@ -244,6 +254,10 @@ impl ClientSession { /// be passed into each operation within the transaction; otherwise, the operation will be /// executed outside of the transaction. /// + /// Errors returned from operations executed within a transaction may include a + /// [`crate::error::TRANSIENT_TRANSACTION_ERROR`] label. This label indicates that the entire + /// transaction can be retried with a reasonable expectation that it will succeed. + /// /// Transactions are supported on MongoDB 4.0+. The Rust driver currently only supports /// transactions on replica sets. /// @@ -327,6 +341,13 @@ impl ClientSession { /// Commits the transaction that is currently active on this session. /// + /// + /// This method may return an error with a [`crate::error::UNKNOWN_TRANSACTION_COMMIT_RESULT`] + /// label. This label indicates that it is unknown whether the commit has satisfied the write + /// concern associated with the transaction. If an error with this label is returned, it is + /// safe to retry the commit until the write concern is satisfied or an error without the label + /// is returned. + /// /// ```rust /// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession}; /// # diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3c5af3ba7..733934585 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -146,7 +146,7 @@ impl AsyncRuntime { } /// Create a new `Interval` that yields with interval of `duration`. - /// See: https://docs.rs/tokio/latest/tokio/time/fn.interval.html + /// See: pub(crate) fn interval(self, duration: Duration) -> Interval { match self { #[cfg(feature = "tokio-runtime")] diff --git a/src/test/spec/initial_dns_seedlist_discovery.rs b/src/test/spec/initial_dns_seedlist_discovery.rs index 7c9d5f6be..f35d3dd2a 100644 --- a/src/test/spec/initial_dns_seedlist_discovery.rs +++ b/src/test/spec/initial_dns_seedlist_discovery.rs @@ -44,7 +44,7 @@ async fn run() { // "encoded-userinfo-and-db.json" specifies a database name with a question mark which is // disallowed on Windows. See - // https://docs.mongodb.com/manual/reference/limits/#restrictions-on-db-names + // if let Some(ref mut options) = test_file.parsed_options { if options.db.as_deref() == Some("mydb?") && cfg!(target_os = "windows") { options.db = Some("mydb".to_string()); diff --git a/src/test/util/failpoint.rs b/src/test/util/failpoint.rs index 5cc83a4ae..6368cb015 100644 --- a/src/test/util/failpoint.rs +++ b/src/test/util/failpoint.rs @@ -23,7 +23,7 @@ impl FailPoint { } /// Create a failCommand failpoint. - /// See https://github.com/mongodb/mongo/wiki/The-%22failCommand%22-fail-point for more info. + /// See for more info. pub fn fail_command( fail_commands: &[&str], mode: FailPointMode, diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs index cf30518e2..18e72051e 100644 --- a/tests/transaction_examples.rs +++ b/tests/transaction_examples.rs @@ -1,7 +1,5 @@ #![allow(dead_code)] -extern crate mongodb; - use futures::Future; use mongodb::{ bson::{doc, Document}, From a97addfa40c471d3089ddaf54189014600835017 Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Tue, 1 Jun 2021 14:12:34 -0400 Subject: [PATCH 6/6] refactor to eliminate closure --- tests/transaction_examples.rs | 59 ++++++++++++++++------------------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs index 18e72051e..aeef81192 100644 --- a/tests/transaction_examples.rs +++ b/tests/transaction_examples.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] +#![cfg(not(feature = "sync"))] -use futures::Future; +// START TRANSACTIONS EXAMPLE use mongodb::{ bson::{doc, Document}, error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, @@ -8,15 +9,18 @@ use mongodb::{ ClientSession, }; -async fn execute_transaction_with_retry( - execute_transaction: F, - session: &mut ClientSession, -) -> Result<()> -where - F: Fn(&mut ClientSession) -> G, - G: Future>, -{ - while let Err(err) = execute_transaction(session).await { +async fn update_employee_info(session: &mut ClientSession) -> Result<()> { + let transaction_options = TransactionOptions::builder() + .read_concern(ReadConcern::snapshot()) + .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) + .build(); + session.start_transaction(transaction_options).await?; + + execute_transaction_with_retry(session).await +} + +async fn execute_transaction_with_retry(session: &mut ClientSession) -> Result<()> { + while let Err(err) = execute_employee_info_transaction(session).await { println!("Transaction aborted. Error returned during transaction."); if err.contains_label(TRANSIENT_TRANSACTION_ERROR) { println!("Encountered TransientTransactionError, retrying transaction."); @@ -28,20 +32,6 @@ where Ok(()) } -async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { - while let Err(err) = session.commit_transaction().await { - if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { - println!("Encountered UnknownTransactionCommitResult, retrying commit operation."); - continue; - } else { - println!("Encountered non-retryable error during commit."); - return Err(err); - } - } - println!("Transaction committed."); - Ok(()) -} - async fn execute_employee_info_transaction(session: &mut ClientSession) -> Result<()> { let client = session.client(); let employees = client.database("hr").collection::("employees"); @@ -68,12 +58,17 @@ async fn execute_employee_info_transaction(session: &mut ClientSession) -> Resul commit_with_retry(session).await } -async fn update_employee_info(session: &mut ClientSession) -> Result<()> { - let transaction_options = TransactionOptions::builder() - .read_concern(ReadConcern::snapshot()) - .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) - .build(); - session.start_transaction(transaction_options).await?; - - execute_transaction_with_retry(execute_employee_info_transaction, session).await +async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { + while let Err(err) = session.commit_transaction().await { + if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { + println!("Encountered UnknownTransactionCommitResult, retrying commit operation."); + continue; + } else { + println!("Encountered non-retryable error during commit."); + return Err(err); + } + } + println!("Transaction committed."); + Ok(()) } +// END TRANSACTIONS EXAMPLE