diff --git a/sstxn/sstxn.go b/sstxn/sstxn.go index ed47471c..008313d8 100644 --- a/sstxn/sstxn.go +++ b/sstxn/sstxn.go @@ -24,6 +24,7 @@ package sstxn import ( + "context" "errors" "fmt" @@ -54,6 +55,8 @@ func (nilLogger) Criticalf(message string, args ...interface{}) {} var _ Logger = nilLogger{} +var ErrTimeout = fmt.Errorf("transaction failed after retrying for 120 seconds") + // A Runner applies operations as part of a transaction onto any number // of collections within a database. See the Run method for details. type Runner struct { @@ -105,7 +108,7 @@ func NewRunner(db *mgo.Database, logger Logger) *Runner { // // Any number of transactions may be run concurrently, with one // runner or many. -func (r *Runner) Run(ops []txn.Op, id bson.ObjectId, info interface{}) (err error) { +func (r *Runner) Run(ctx context.Context, ops []txn.Op, id bson.ObjectId, info interface{}) (err error) { const efmt = "error in transaction op %d: %s" for i := range ops { op := &ops[i] @@ -133,21 +136,19 @@ func (r *Runner) Run(ops []txn.Op, id bson.ObjectId, info interface{}) (err erro id = bson.NewObjectId() } - // Sometimes the mongo server will return an error code 112 (write conflict). - // This is a signal the transaction needs to be retried. - // We'll retry 3 times but not forever. - for i := 0; i < 3; i++ { - err = r.runTxn(ops, id) - if err == errWriteConflict { - r.logger.Tracef("attempt %d retrying txn ops", i) - continue + for { + err := r.runTxn(ops, id) + if err != errWriteConflict { + return err } - break - } - if err == errWriteConflict { - err = txn.ErrAborted + select { + case <-ctx.Done(): + r.logger.Debugf("transaction cancelled by caller or timeout reached, ops '%+v'", ops) + return ctx.Err() + default: + } + r.logger.Tracef("retrying txn ops '%+v'", ops) } - return err } func (r *Runner) runTxn(ops []txn.Op, id bson.ObjectId) error { @@ -474,7 +475,7 @@ func (r *Runner) updateLog(ops []txn.Op, revnos []int64, txnId bson.ObjectId) er // // Saved documents are in the format: // -// {"_id": , : {"d": [, ...], "r": [, ...]}} +// {"_id": , : {"d": [, ...], "r": [, ...]}} // // The document revision is the value of the txn-revno field after // the change has been applied. Negative values indicate the document diff --git a/sstxn/sstxn_test.go b/sstxn/sstxn_test.go index 0ab53300..e16979b4 100644 --- a/sstxn/sstxn_test.go +++ b/sstxn/sstxn_test.go @@ -3,6 +3,7 @@ package sstxn_test import ( + "context" "flag" "fmt" "sync" @@ -152,17 +153,17 @@ func (s *S) TestDocExists(c *C) { Assert: txn.DocMissing, }} - err = s.runner.Run(exists, "", nil) + err = s.runner.Run(context.Background(), exists, "", nil) c.Assert(err, IsNil) - err = s.runner.Run(missing, "", nil) + err = s.runner.Run(context.Background(), missing, "", nil) c.Assert(err, Equals, txn.ErrAborted) err = s.accounts.RemoveId(0) c.Assert(err, IsNil) - err = s.runner.Run(exists, "", nil) + err = s.runner.Run(context.Background(), exists, "", nil) c.Assert(err, Equals, txn.ErrAborted) - err = s.runner.Run(missing, "", nil) + err = s.runner.Run(context.Background(), missing, "", nil) c.Assert(err, IsNil) } @@ -181,7 +182,7 @@ func (s *S) TestInsert(c *C) { // server side transactions, trying to insert a doc that already // exists aborts the entire transaction, so we no longer hide // that fact. - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, Equals, txn.ErrAborted) var account Account @@ -190,7 +191,7 @@ func (s *S) TestInsert(c *C) { c.Assert(account.Balance, Equals, 300) ops[0].Id = 1 - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) err = s.accounts.FindId(1).One(&account) @@ -215,7 +216,7 @@ func (s *S) TestInsertStructID(c *C) { Insert: M{"balance": 800}, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) n, err := s.accounts.Find(nil).Count() @@ -233,7 +234,7 @@ func (s *S) TestRemove(c *C) { Remove: true, }} - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) err = s.accounts.FindId(0).One(nil) @@ -241,7 +242,7 @@ func (s *S) TestRemove(c *C) { // Removing a non-existing doc does not abort the transaction, // so we preserve the behavior. - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) } @@ -258,7 +259,7 @@ func (s *S) TestUpdate(c *C) { Update: M{"$inc": M{"balance": 100}}, }} - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var account Account @@ -282,7 +283,7 @@ func (s *S) TestInsertUpdate(c *C) { Update: M{"$inc": M{"balance": 100}}, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var account Account @@ -295,7 +296,7 @@ func (s *S) TestInsertUpdate(c *C) { // was causing the Update to happen, ignoring the invalid Insert. // We no longer treat Insert as a no-op, so we don't // run the Update. - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, Equals, txn.ErrAborted) err = s.accounts.FindId(0).One(&account) @@ -317,7 +318,7 @@ func (s *S) TestUpdateInsert(c *C) { Insert: M{"balance": 200}, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var account Account @@ -325,7 +326,7 @@ func (s *S) TestUpdateInsert(c *C) { c.Assert(err, IsNil) c.Assert(account.Balance, Equals, 200) - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) err = s.accounts.FindId(0).One(&account) @@ -348,7 +349,7 @@ func (s *S) TestInsertRemoveInsert(c *C) { Insert: M{"_id": 0, "balance": 300}, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var account Account @@ -388,7 +389,7 @@ func (s *S) TestErrors(c *C) { txn.SetChaos(txn.Chaos{KillChance: 1.0}) for _, op := range tests { c.Logf("op: %v", op) - err := s.runner.Run([]txn.Op{op}, "", nil) + err := s.runner.Run(context.Background(), []txn.Op{op}, "", nil) c.Assert(err, ErrorMatches, "error in transaction op 0: .*") } } @@ -401,7 +402,7 @@ func (s *S) TestAssertRefusesUpdate(c *C) { Assert: bson.D{{"balance", 200}}, Update: bson.D{{"$inc", bson.D{{"balance", 100}}}}, }} - c.Assert(s.runner.Run(ops, "", nil), Equals, txn.ErrAborted) + c.Assert(s.runner.Run(context.Background(), ops, "", nil), Equals, txn.ErrAborted) var account Account c.Assert(s.accounts.FindId(0).One(&account), IsNil) c.Assert(account.Balance, Equals, 100) @@ -419,7 +420,7 @@ func (s *S) TestAssertNestedOr(c *C) { Update: bson.D{{"$inc", bson.D{{"balance", 100}}}}, }} - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var account Account @@ -427,7 +428,7 @@ func (s *S) TestAssertNestedOr(c *C) { c.Assert(err, IsNil) c.Assert(account.Balance, Equals, 200) - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, Equals, txn.ErrAborted) ops2 := []txn.Op{{ @@ -436,11 +437,11 @@ func (s *S) TestAssertNestedOr(c *C) { Assert: bson.D{{"balance", 200}}, Update: bson.D{{"$set", bson.D{{"balance", 300}}}}, }} - err = s.runner.Run(ops2, "", nil) + err = s.runner.Run(context.Background(), ops2, "", nil) c.Assert(err, IsNil) // Now that we're at 300, the original ops should apply again - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) err = s.accounts.FindId(0).One(&account) @@ -456,7 +457,7 @@ func (s *S) TestInsertInvalidAssert(c *C) { Insert: bson.D{{"balance", 100}}, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, ErrorMatches, `Insert can only Assert txn.DocMissing not \[\{balance 100\}\]`) ops = []txn.Op{{ @@ -466,7 +467,7 @@ func (s *S) TestInsertInvalidAssert(c *C) { Insert: bson.D{{"balance", 100}}, }} - err = s.runner.Run(ops, "", nil) + err = s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, ErrorMatches, "Insert can only Assert txn.DocMissing not txn.DocExists") } @@ -480,7 +481,7 @@ func (s *S) TestVerifyFieldOrdering(c *C) { Insert: fields, }} - err := s.runner.Run(ops, "", nil) + err := s.runner.Run(context.Background(), ops, "", nil) c.Assert(err, IsNil) var d bson.D @@ -522,7 +523,7 @@ func (s *S) TestChangeLog(c *C) { Insert: M{"accounts": []int64{0, 1}}, }} id := bson.NewObjectId() - err := s.runner.Run(ops, id, nil) + err := s.runner.Run(context.Background(), ops, id, nil) c.Assert(err, IsNil) type IdList []interface{} @@ -548,7 +549,7 @@ func (s *S) TestChangeLog(c *C) { Update: M{"$inc": M{"balance": 100}}, }} id = bson.NewObjectId() - err = s.runner.Run(ops, id, nil) + err = s.runner.Run(context.Background(), ops, id, nil) c.Assert(err, IsNil) m = nil @@ -568,7 +569,7 @@ func (s *S) TestChangeLog(c *C) { Remove: true, }} id = bson.NewObjectId() - err = s.runner.Run(ops, id, nil) + err = s.runner.Run(context.Background(), ops, id, nil) c.Assert(err, IsNil) m = nil @@ -613,7 +614,7 @@ func (s *S) TestInsertStressTest(c *C) { "added-by": i, }, }} - err := runner.Run(ops, "", nil) + err := runner.Run(context.Background(), ops, "", nil) if err != txn.ErrAborted { c.Check(err, IsNil) } @@ -643,7 +644,7 @@ func (s *S) TestConcurrentUpdateTest(c *C) { for j := 0; j < count; j++ { db := session.DB("test") runner := sstxn.NewRunner(db, &testLogger{c}) - err := runner.Run([]txn.Op{{ + err := runner.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocExists, @@ -672,10 +673,10 @@ func (s *S) TestConcurrentInsertPreAssertFailure(c *C) { Insert: bson.M{"foo": "bar"}, }} runner1.SetStartHook(func() { - err := runner2.Run(ops, "", nil) + err := runner2.Run(context.Background(), ops, "", nil) c.Check(err, IsNil) }) - err := runner1.Run(ops, "", nil) + err := runner1.Run(context.Background(), ops, "", nil) c.Check(err, Equals, txn.ErrAborted) } @@ -692,10 +693,10 @@ func (s *S) TestConcurrentInsertPostAssertFailure(c *C) { Insert: bson.M{"foo": "bar"}, }} runner1.SetPostAssertHook(func() { - err := runner2.Run(ops, "", nil) + err := runner2.Run(context.Background(), ops, "", nil) c.Check(err, IsNil) }) - err := runner1.Run(ops, "", nil) + err := runner1.Run(context.Background(), ops, "", nil) c.Check(err, Equals, txn.ErrAborted) } @@ -705,14 +706,14 @@ func (s *S) TestConcurrentUpdatePreAssertFailure(c *C) { session2 := s.session.Copy() defer session2.Close() runner2 := sstxn.NewRunner(s.db.With(session2), logger) - runner1.Run([]txn.Op{{ + runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocMissing, Insert: bson.M{"balance": 0}, }}, "", nil) runner1.SetStartHook(func() { - err := runner2.Run([]txn.Op{{ + err := runner2.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocExists, @@ -720,7 +721,7 @@ func (s *S) TestConcurrentUpdatePreAssertFailure(c *C) { }}, "", nil) c.Check(err, IsNil) }) - err := runner1.Run([]txn.Op{{ + err := runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: bson.M{"balance": 0}, @@ -735,14 +736,14 @@ func (s *S) TestConcurrentUpdatePostAssertFailure(c *C) { session2 := s.session.Copy() defer session2.Close() runner2 := sstxn.NewRunner(s.db.With(session2), logger) - runner1.Run([]txn.Op{{ + runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocMissing, Insert: bson.M{"balance": 0}, }}, "", nil) runner1.SetPostAssertHook(func() { - err := runner2.Run([]txn.Op{{ + err := runner2.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocExists, @@ -750,7 +751,7 @@ func (s *S) TestConcurrentUpdatePostAssertFailure(c *C) { }}, "", nil) c.Check(err, IsNil) }) - err := runner1.Run([]txn.Op{{ + err := runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: bson.M{"balance": 0}, @@ -766,7 +767,7 @@ func (s *S) TestConcurrentRemoveUpdatePostAssertFailure(c *C) { session2 := s.session.Copy() defer session2.Close() runner2 := sstxn.NewRunner(s.db.With(session2), logger) - err := runner1.Run([]txn.Op{{ + err := runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocMissing, @@ -774,7 +775,7 @@ func (s *S) TestConcurrentRemoveUpdatePostAssertFailure(c *C) { }}, "", nil) c.Assert(err, IsNil) runner1.SetPostAssertHook(func() { - err := runner2.Run([]txn.Op{{ + err := runner2.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Assert: txn.DocExists, @@ -782,12 +783,14 @@ func (s *S) TestConcurrentRemoveUpdatePostAssertFailure(c *C) { }}, "", nil) c.Check(err, IsNil) }) - err = runner1.Run([]txn.Op{{ + err = runner1.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Remove: true, }}, "", nil) - c.Assert(err, Equals, txn.ErrAborted) + // Since we are getting a WriteConflict, we retry for 120 seconds + // and then fail with timeout error. + c.Assert(err, Equals, sstxn.ErrTimeout) } type NotMarshallable struct { @@ -807,7 +810,7 @@ func (s *S) TestNotMarshallableUpdate(c *C) { c.Assert(err, IsNil) logger := &testLogger{c} runner := sstxn.NewRunner(s.db, logger) - err = runner.Run([]txn.Op{{ + err = runner.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Update: NotMarshallable{Error: fmt.Errorf("cannot marshall for update")}, @@ -824,7 +827,7 @@ func (s *S) TestNotMarshallableUpdate(c *C) { func (s *S) TestNotMarshallableInsert(c *C) { logger := &testLogger{c} runner := sstxn.NewRunner(s.db, logger) - err := runner.Run([]txn.Op{{ + err := runner.Run(context.Background(), []txn.Op{{ C: "accounts", Id: 0, Insert: NotMarshallable{Error: fmt.Errorf("cannot marshall for insert")}, @@ -844,7 +847,7 @@ type testStruct struct { func (s *S) TestInsertValueFromStructForcedID(c *C) { // If someone only sets the Value part of a document, we still want to make // sure the _id of the inserted document is correct. - c.Assert(s.runner.Run([]txn.Op{{ + c.Assert(s.runner.Run(context.Background(), []txn.Op{{ C: "accounts", Id: "1", Assert: txn.DocMissing, diff --git a/txn/txn.go b/txn/txn.go index bd6e0095..d91508c4 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -2,11 +2,11 @@ // // For details check the following blog post: // -// http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb -// +// http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb package txn import ( + "context" crand "crypto/rand" "encoding/binary" "fmt" @@ -301,7 +301,7 @@ var ErrAborted = fmt.Errorf("transaction aborted") // // Any number of transactions may be run concurrently, with one // runner or many. -func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) { +func (r *Runner) Run(ctx context.Context, ops []Op, id bson.ObjectId, info interface{}) (err error) { const efmt = "error in transaction op %d: %s" for i := range ops { op := &ops[i] @@ -399,7 +399,7 @@ func (r *Runner) Resume(id bson.ObjectId) (err error) { // // Saved documents are in the format: // -// {"_id": , : {"d": [, ...], "r": [, ...]}} +// {"_id": , : {"d": [, ...], "r": [, ...]}} // // The document revision is the value of the txn-revno field after // the change has been applied. Negative values indicate the document