diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index d485c930b77..857ba538ebe 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -98,6 +98,7 @@ var ( VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") + VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.") VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.") @@ -182,6 +183,7 @@ var ( VT09023, VT09024, VT10001, + VT10002, VT12001, VT12002, VT13001, diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index f8b08def10c..e388740ee6a 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -458,21 +458,21 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa case querypb.TransactionState_PREPARE: // If state is PREPARE, make a decision to rollback and // fallthrough to the rollback workflow. - if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil { + if err = txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil { return err } fallthrough case querypb.TransactionState_ROLLBACK: - if err := txc.resumeRollback(ctx, target, transaction); err != nil { + if err = txc.resumeRollback(ctx, target, transaction); err != nil { return err } case querypb.TransactionState_COMMIT: - if err := txc.resumeCommit(ctx, target, transaction); err != nil { + if err = txc.resumeCommit(ctx, target, transaction); err != nil { return err } default: // Should never happen. - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid state: %v", transaction.State) + return vterrors.VT13001(fmt.Sprintf("invalid state: %v", transaction.State)) } return nil } diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index edf4438b8b2..5f4e7644766 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -63,14 +63,14 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // If no queries were executed, we just rollback. if len(conn.TxProperties().Queries) == 0 { - conn.Release(tx.TxRollback) + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) return nil } // If the connection is tainted, we cannot prepare it. As there could be temporary tables involved. if conn.IsTainted() { - conn.Release(tx.TxRollback) - return vterrors.VT12001("cannot prepare the transaction on a reserved connection") + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + return vterrors.VT10002("cannot prepare the transaction on a reserved connection") } err = dte.te.preparedPool.Put(conn, dtid) @@ -88,30 +88,34 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // CommitPrepared commits a prepared transaction. If the operation // fails, an error counter is incremented and the transaction is // marked as failed in the redo log. -func (dte *DTExecutor) CommitPrepared(dtid string) error { +func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now()) - conn, err := dte.te.preparedPool.FetchForCommit(dtid) + var conn *StatefulConnection + conn, err = dte.te.preparedPool.FetchForCommit(dtid) if err != nil { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err) } + // No connection means the transaction was already committed. if conn == nil { return nil } // We have to use a context that will never give up, // even if the original context expires. ctx := trace.CopySpan(context.Background(), dte.ctx) - defer dte.te.txPool.RollbackAndRelease(ctx, conn) - err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid) - if err != nil { - dte.markFailed(ctx, dtid) + defer func() { + if err != nil { + dte.markFailed(ctx, dtid) + log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) + } + dte.te.txPool.RollbackAndRelease(ctx, conn) + }() + if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil { return err } - _, err = dte.te.txPool.Commit(ctx, conn) - if err != nil { - dte.markFailed(ctx, dtid) + if _, err = dte.te.txPool.Commit(ctx, conn); err != nil { return err } dte.te.preparedPool.Forget(dtid) @@ -207,6 +211,15 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { } defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + // If the connection is tainted, we cannot take a commit decision on it. + if conn.IsTainted() { + dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + }) + // return the error, defer call above will roll back the transaction. + return vterrors.VT10002("cannot commit the transaction on a reserved connection") + } + err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT) if err != nil { return err @@ -228,6 +241,9 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { // If the transaction is still open, it will be rolled back. // Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart. dte.te.Rollback(dte.ctx, transactionID) + } else { + // This is a warning because it should not happen in normal operation. + log.Warningf("SetRollback called with no transactionID for dtid %s", dtid) } return dte.inTransaction(func(conn *StatefulConnection) error { diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 448dd63bf5a..045496eb4b8 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "reflect" + "strings" "testing" "time" + "vitess.io/vitess/go/event/syslogger" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "github.com/stretchr/testify/require" @@ -43,11 +45,42 @@ func TestTxExecutorEmptyPrepare(t *testing.T) { txe, tsv, db := newTestTxExecutor(t, ctx) defer db.Close() defer tsv.StopService() + + // start a transaction. txid := newTransaction(tsv, nil) - err := txe.Prepare(txid, "aa") + + // taint the connection. + sc, err := tsv.te.txPool.GetAndLock(txid, "taint") + require.NoError(t, err) + sc.Taint(ctx, nil) + sc.Unlock() + + err = txe.Prepare(txid, "aa") require.NoError(t, err) // Nothing should be prepared. require.Empty(t, txe.te.preparedPool.conns, "txe.te.preparedPool.conns") + require.False(t, sc.IsInTransaction(), "transaction should be roll back before returning the connection to the pool") +} + +func TestExecutorPrepareFailure(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + // start a transaction + txid := newTxForPrep(ctx, tsv) + + // taint the connection. + sc, err := tsv.te.txPool.GetAndLock(txid, "taint") + require.NoError(t, err) + sc.Taint(ctx, nil) + sc.Unlock() + + // try 2pc commit of Metadata Manager. + err = txe.Prepare(txid, "aa") + require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection") } func TestTxExecutorPrepare(t *testing.T) { @@ -82,7 +115,7 @@ func TestDTExecutorPrepareResevedConn(t *testing.T) { txe.te.Reserve(ctx, nil, txid, nil) err := txe.Prepare(txid, "aa") - require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection") + require.ErrorContains(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection") } func TestTxExecutorPrepareNotInTx(t *testing.T) { @@ -174,20 +207,31 @@ func TestTxExecutorCommitRedoFail(t *testing.T) { txe, tsv, db := newTestTxExecutor(t, ctx) defer db.Close() defer tsv.StopService() + + tl := syslogger.NewTestLogger() + defer tl.Close() + + // start a transaction. txid := newTxForPrep(ctx, tsv) - // Allow all additions to redo logs to succeed + + // prepare the transaction db.AddQueryPattern("insert into _vt\\.redo_state.*", &sqltypes.Result{}) err := txe.Prepare(txid, "bb") require.NoError(t, err) - defer txe.RollbackPrepared("bb", 0) - db.AddQuery("update _vt.redo_state set state = 'Failed' where dtid = 'bb'", &sqltypes.Result{}) + + // fail commit prepare as the delete redo query is in rejected query. + db.AddRejectedQuery("delete from _vt.redo_state where dtid = 'bb'", errors.New("delete redo log fail")) + db.AddQuery("update _vt.redo_state set state = 0 where dtid = 'bb'", sqltypes.MakeTestResult(nil)) err = txe.CommitPrepared("bb") - require.Error(t, err) - require.Contains(t, err.Error(), "is not supported") - // A retry should fail differently. + require.ErrorContains(t, err, "delete redo log fail") + + // A retry should fail differently as the prepared transaction is marked as failed. err = txe.CommitPrepared("bb") require.Error(t, err) require.Contains(t, err.Error(), "cannot commit dtid bb, state: failed") + + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), + "failed to commit the prepared transaction 'bb' with error: unknown error: delete redo log fail") } func TestTxExecutorCommitRedoCommitFail(t *testing.T) { @@ -273,6 +317,31 @@ func TestExecutorStartCommit(t *testing.T) { require.Contains(t, err.Error(), "could not transition to COMMIT: aa") } +func TestExecutorStartCommitFailure(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + // start a transaction + txid := newTxForPrep(ctx, tsv) + + // taint the connection. + sc, err := tsv.te.txPool.GetAndLock(txid, "taint") + require.NoError(t, err) + sc.Taint(ctx, nil) + sc.Unlock() + + // add rollback state update expectation + rollbackTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = 'aa' and state = %d", int(querypb.TransactionState_ROLLBACK), int(querypb.TransactionState_PREPARE)) + db.AddQuery(rollbackTransition, sqltypes.MakeTestResult(nil)) + + // try 2pc commit of Metadata Manager. + err = txe.StartCommit(txid, "aa") + require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot commit the transaction on a reserved connection") +} + func TestExecutorSetRollback(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index 89547570cfc..22e0ce295c0 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -23,8 +23,8 @@ import ( ) var ( - errPrepCommitting = errors.New("committing") - errPrepFailed = errors.New("failed") + errPrepCommitting = errors.New("locked for committing") + errPrepFailed = errors.New("failed to commit") ) // TxPreparedPool manages connections for prepared transactions. diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index a1cf50edb56..cd2b5a180c1 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -19,6 +19,7 @@ package tabletserver import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -82,38 +83,28 @@ func TestPrepFetchForCommit(t *testing.T) { conn := &StatefulConnection{} got, err := pp.FetchForCommit("aa") require.NoError(t, err) - if got != nil { - t.Errorf("Get(aa): %v, want nil", got) - } + assert.Nil(t, got) + pp.Put(conn, "aa") got, err = pp.FetchForCommit("aa") require.NoError(t, err) - if got != conn { - t.Errorf("pp.Get(aa): %p, want %p", got, conn) - } + assert.Equal(t, conn, got) + _, err = pp.FetchForCommit("aa") - want := "committing" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "locked for committing") + pp.SetFailed("aa") _, err = pp.FetchForCommit("aa") - want = "failed" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "failed to commit") + pp.SetFailed("bb") _, err = pp.FetchForCommit("bb") - want = "failed" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "failed to commit") + pp.Forget("aa") got, err = pp.FetchForCommit("aa") require.NoError(t, err) - if got != nil { - t.Errorf("Get(aa): %v, want nil", got) - } + assert.Nil(t, got) } func TestPrepFetchAll(t *testing.T) {