Skip to content

Commit

Permalink
vttablet api distributed transaction changes (vitessio#16506)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Aug 7, 2024
1 parent 623e820 commit d042d7c
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 47 deletions.
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")

VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.")
VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.")
Expand Down Expand Up @@ -182,6 +183,7 @@ var (
VT09023,
VT09024,
VT10001,
VT10002,
VT12001,
VT12002,
VT13001,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,21 +458,21 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa
case querypb.TransactionState_PREPARE:
// If state is PREPARE, make a decision to rollback and
// fallthrough to the rollback workflow.
if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
if err = txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
return err
}
fallthrough
case querypb.TransactionState_ROLLBACK:
if err := txc.resumeRollback(ctx, target, transaction); err != nil {
if err = txc.resumeRollback(ctx, target, transaction); err != nil {
return err
}
case querypb.TransactionState_COMMIT:
if err := txc.resumeCommit(ctx, target, transaction); err != nil {
if err = txc.resumeCommit(ctx, target, transaction); err != nil {
return err
}
default:
// Should never happen.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid state: %v", transaction.State)
return vterrors.VT13001(fmt.Sprintf("invalid state: %v", transaction.State))
}
return nil
}
Expand Down
40 changes: 28 additions & 12 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {

// If no queries were executed, we just rollback.
if len(conn.TxProperties().Queries) == 0 {
conn.Release(tx.TxRollback)
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return nil
}

// If the connection is tainted, we cannot prepare it. As there could be temporary tables involved.
if conn.IsTainted() {
conn.Release(tx.TxRollback)
return vterrors.VT12001("cannot prepare the transaction on a reserved connection")
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.VT10002("cannot prepare the transaction on a reserved connection")
}

err = dte.te.preparedPool.Put(conn, dtid)
Expand All @@ -88,30 +88,34 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (dte *DTExecutor) CommitPrepared(dtid string) error {
func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := dte.te.preparedPool.FetchForCommit(dtid)
var conn *StatefulConnection
conn, err = dte.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
// No connection means the transaction was already committed.
if conn == nil {
return nil
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)
err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
dte.markFailed(ctx, dtid)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil {
return err
}
_, err = dte.te.txPool.Commit(ctx, conn)
if err != nil {
dte.markFailed(ctx, dtid)
if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
return err
}
dte.te.preparedPool.Forget(dtid)
Expand Down Expand Up @@ -207,6 +211,15 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
}
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

// If the connection is tainted, we cannot take a commit decision on it.
if conn.IsTainted() {
dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
// return the error, defer call above will roll back the transaction.
return vterrors.VT10002("cannot commit the transaction on a reserved connection")
}

err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
Expand All @@ -228,6 +241,9 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error {
// If the transaction is still open, it will be rolled back.
// Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart.
dte.te.Rollback(dte.ctx, transactionID)
} else {
// This is a warning because it should not happen in normal operation.
log.Warningf("SetRollback called with no transactionID for dtid %s", dtid)
}

return dte.inTransaction(func(conn *StatefulConnection) error {
Expand Down
85 changes: 77 additions & 8 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"github.com/stretchr/testify/require"
Expand All @@ -43,11 +45,42 @@ func TestTxExecutorEmptyPrepare(t *testing.T) {
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

// start a transaction.
txid := newTransaction(tsv, nil)
err := txe.Prepare(txid, "aa")

// taint the connection.
sc, err := tsv.te.txPool.GetAndLock(txid, "taint")
require.NoError(t, err)
sc.Taint(ctx, nil)
sc.Unlock()

err = txe.Prepare(txid, "aa")
require.NoError(t, err)
// Nothing should be prepared.
require.Empty(t, txe.te.preparedPool.conns, "txe.te.preparedPool.conns")
require.False(t, sc.IsInTransaction(), "transaction should be roll back before returning the connection to the pool")
}

func TestExecutorPrepareFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

// start a transaction
txid := newTxForPrep(ctx, tsv)

// taint the connection.
sc, err := tsv.te.txPool.GetAndLock(txid, "taint")
require.NoError(t, err)
sc.Taint(ctx, nil)
sc.Unlock()

// try 2pc commit of Metadata Manager.
err = txe.Prepare(txid, "aa")
require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepare(t *testing.T) {
Expand Down Expand Up @@ -82,7 +115,7 @@ func TestDTExecutorPrepareResevedConn(t *testing.T) {
txe.te.Reserve(ctx, nil, txid, nil)

err := txe.Prepare(txid, "aa")
require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection")
require.ErrorContains(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepareNotInTx(t *testing.T) {
Expand Down Expand Up @@ -174,20 +207,31 @@ func TestTxExecutorCommitRedoFail(t *testing.T) {
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

tl := syslogger.NewTestLogger()
defer tl.Close()

// start a transaction.
txid := newTxForPrep(ctx, tsv)
// Allow all additions to redo logs to succeed

// prepare the transaction
db.AddQueryPattern("insert into _vt\\.redo_state.*", &sqltypes.Result{})
err := txe.Prepare(txid, "bb")
require.NoError(t, err)
defer txe.RollbackPrepared("bb", 0)
db.AddQuery("update _vt.redo_state set state = 'Failed' where dtid = 'bb'", &sqltypes.Result{})

// fail commit prepare as the delete redo query is in rejected query.
db.AddRejectedQuery("delete from _vt.redo_state where dtid = 'bb'", errors.New("delete redo log fail"))
db.AddQuery("update _vt.redo_state set state = 0 where dtid = 'bb'", sqltypes.MakeTestResult(nil))
err = txe.CommitPrepared("bb")
require.Error(t, err)
require.Contains(t, err.Error(), "is not supported")
// A retry should fail differently.
require.ErrorContains(t, err, "delete redo log fail")

// A retry should fail differently as the prepared transaction is marked as failed.
err = txe.CommitPrepared("bb")
require.Error(t, err)
require.Contains(t, err.Error(), "cannot commit dtid bb, state: failed")

require.Contains(t, strings.Join(tl.GetAllLogs(), "|"),
"failed to commit the prepared transaction 'bb' with error: unknown error: delete redo log fail")
}

func TestTxExecutorCommitRedoCommitFail(t *testing.T) {
Expand Down Expand Up @@ -273,6 +317,31 @@ func TestExecutorStartCommit(t *testing.T) {
require.Contains(t, err.Error(), "could not transition to COMMIT: aa")
}

func TestExecutorStartCommitFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

// start a transaction
txid := newTxForPrep(ctx, tsv)

// taint the connection.
sc, err := tsv.te.txPool.GetAndLock(txid, "taint")
require.NoError(t, err)
sc.Taint(ctx, nil)
sc.Unlock()

// add rollback state update expectation
rollbackTransition := fmt.Sprintf("update _vt.dt_state set state = %d where dtid = 'aa' and state = %d", int(querypb.TransactionState_ROLLBACK), int(querypb.TransactionState_PREPARE))
db.AddQuery(rollbackTransition, sqltypes.MakeTestResult(nil))

// try 2pc commit of Metadata Manager.
err = txe.StartCommit(txid, "aa")
require.EqualError(t, err, "VT10002: atomic distributed transaction not allowed: cannot commit the transaction on a reserved connection")
}

func TestExecutorSetRollback(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

var (
errPrepCommitting = errors.New("committing")
errPrepFailed = errors.New("failed")
errPrepCommitting = errors.New("locked for committing")
errPrepFailed = errors.New("failed to commit")
)

// TxPreparedPool manages connections for prepared transactions.
Expand Down
33 changes: 12 additions & 21 deletions go/vt/vttablet/tabletserver/tx_prep_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletserver
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -82,38 +83,28 @@ func TestPrepFetchForCommit(t *testing.T) {
conn := &StatefulConnection{}
got, err := pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)

pp.Put(conn, "aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != conn {
t.Errorf("pp.Get(aa): %p, want %p", got, conn)
}
assert.Equal(t, conn, got)

_, err = pp.FetchForCommit("aa")
want := "committing"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "locked for committing")

pp.SetFailed("aa")
_, err = pp.FetchForCommit("aa")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.SetFailed("bb")
_, err = pp.FetchForCommit("bb")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.Forget("aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)
}

func TestPrepFetchAll(t *testing.T) {
Expand Down

0 comments on commit d042d7c

Please sign in to comment.