Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC to read the statements to be executed in an unresolved prepared transaction #17131

Merged
15 changes: 15 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Summary

### Table of Contents

- **[Major Changes](#major-changes)**
- **[RPC Changes](#rpc-changes)**


## <a id="major-changes"/>Major Changes</a>

### <a id="rpc-changes"/>RPC Changes</a>

These are the RPC changes made in this release -

1. `GetTransactionInfo` RPC has been added to both `VtctldServer`, and `TabletManagerClient` interface. These RPCs are used to fecilitate the users in reading the state of an unresolved distributed transaction. This can be useful in debugging what went wrong and how to fix the problem.
2 changes: 2 additions & 0 deletions changelog/22.0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
## v22.0
* **[22.0.0](22.0.0)**
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Releases
* [22.0](22.0)
* [21.0](21.0)
* [20.0](20.0)
* [19.0](19.0)
Expand Down
56 changes: 56 additions & 0 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ var (

DisableFlagsInUseLine: true,
}

getTransactionInfoOptions = struct {
Dtid string
}{}

// GetTransactionInfo makes a GetTransactionInfo gRPC call to a vtctld.
GetTransactionInfo = &cobra.Command{
Use: "get-info --dtid <dtid>",
Short: "Reads the state of the unresolved transaction by querying each participating shard.",
Aliases: []string{"Read"},
Args: cobra.NoArgs,
RunE: commandGetTransactionInfo,

DisableFlagsInUseLine: true,
}
)

type ConcludeTransactionOutput struct {
Expand All @@ -72,6 +87,12 @@ type ConcludeTransactionOutput struct {
Error string `json:"error,omitempty"`
}

type GetTransactionInfoOutput struct {
Dtid string `json:"dtid"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
}

const (
concludeSuccess = "Successfully concluded the distributed transaction"
concludeFailure = "Failed to conclude the distributed transaction"
Expand All @@ -86,11 +107,13 @@ func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
AbandonAge: unresolvedTransactionsOptions.AbandonAge,
})
if err != nil {
prettyPrintError(err)
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
prettyPrintError(err)
return err
}
fmt.Println(string(data))
Expand Down Expand Up @@ -120,6 +143,36 @@ func commandConcludeTransaction(cmd *cobra.Command, args []string) (err error) {
return err
}

func commandGetTransactionInfo(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

rts, err := client.GetTransactionInfo(commandCtx,
&vtctldatapb.GetTransactionInfoRequest{
Dtid: getTransactionInfoOptions.Dtid,
})

if err != nil || rts == nil {
prettyPrintError(err)
return err
}

fmt.Println(string(rts.String()))
return nil
}

func prettyPrintError(err error) {
if err == nil {
return
}
st := struct {
Error string `json:"error"`
}{
Error: err.Error(),
}
data, _ := cli.MarshalJSON(st)
fmt.Println(string(data))
}

func init() {
GetUnresolvedTransactions.Flags().StringVarP(&unresolvedTransactionsOptions.Keyspace, "keyspace", "k", "", "unresolved transactions list for the given keyspace.")
GetUnresolvedTransactions.Flags().Int64VarP(&unresolvedTransactionsOptions.AbandonAge, "abandon-age", "a", 0, "unresolved transactions list which are older than the specified age(in seconds).")
Expand All @@ -128,5 +181,8 @@ func init() {
ConcludeTransaction.Flags().StringVarP(&concludeTransactionOptions.Dtid, "dtid", "d", "", "conclude transaction for the given distributed transaction ID.")
DistributedTransaction.AddCommand(ConcludeTransaction)

GetTransactionInfo.Flags().StringVarP(&getTransactionInfoOptions.Dtid, "dtid", "d", "", "read transaction state for the given distributed transaction ID.")
DistributedTransaction.AddCommand(GetTransactionInfo)

Root.AddCommand(DistributedTransaction)
}
25 changes: 2 additions & 23 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSettings(t *testing.T) {
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
twopcutil.RunMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
// Run the vttablet restart to ensure that the transaction needs to be redone.
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestDisruptions(t *testing.T) {
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
twopcutil.RunMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
writeCtx, writeCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -261,27 +261,6 @@ func getMultiShardInsertQueries() []string {
return queries
}

// runMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func runMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

func mergeShards(t *testing.T) error {
return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-")
}
Expand Down
70 changes: 69 additions & 1 deletion go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletpb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
"vitess.io/vitess/go/vt/vttablet/grpctmclient"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
Expand Down Expand Up @@ -1349,7 +1351,12 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {

out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
defer func() {
clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
for _, shard := range clusterInstance.Keyspaces[0].Shards {
clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[0].Alias)
}
}()

// After changing the durability policy for the given keyspace to none, we run PRS.
shard := clusterInstance.Keyspaces[0].Shards[2]
Expand All @@ -1369,3 +1376,64 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
}

// TestReadTransactionStatus tests that read transaction state rpc works as expected.
func TestReadTransactionStatus(t *testing.T) {
conn, closer := start(t)
defer closer()
defer conn.Close()
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)

// We create a multi shard commit and delay its commit on one of the shards.
// This allows us to query to transaction status and actually get some data back.
var wg sync.WaitGroup
twopcutil.RunMultiShardCommitWithDelay(t, conn, "10", &wg, []string{
"begin",
"insert into twopc_t1(id, col) values(4, 4)",
"insert into twopc_t1(id, col) values(6, 4)",
"insert into twopc_t1(id, col) values(9, 4)",
})
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)

// Create a tablet manager client and use it to read the transaction state.
tmc := grpctmclient.NewClient()
defer tmc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort)
var unresTransaction *querypb.TransactionMetadata
for _, shard := range clusterInstance.Keyspaces[0].Shards {
urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1)
require.NoError(t, err)
if len(urtRes) > 0 {
unresTransaction = urtRes[0]
}
}
require.NotNil(t, unresTransaction)
res, err := tmc.GetTransactionInfo(ctx, primaryTablet, unresTransaction.Dtid)
require.NoError(t, err)
assert.Equal(t, "PREPARED", res.State)
assert.Equal(t, "", res.Message)
assert.Equal(t, []string{"insert into twopc_t1(id, col) values (9, 4)"}, res.Statements)

// Also try running the RPC from vtctld and verify we see the same values.
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction",
"Read",
fmt.Sprintf(`--dtid=%s`, unresTransaction.Dtid),
)
require.NoError(t, err)
require.Contains(t, out, "insert into twopc_t1(id, col) values (9, 4)")
require.Contains(t, out, unresTransaction.Dtid)

// Wait for the commit to have returned.
wg.Wait()
}

func getTablet(tabletGrpcPort int) *tabletpb.Tablet {
portMap := make(map[string]int32)
portMap["grpc"] = int32(tabletGrpcPort)
return &tabletpb.Tablet{Hostname: hostname, PortMap: portMap}
}
22 changes: 22 additions & 0 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"slices"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -94,6 +95,27 @@ func WriteTestCommunicationFile(t *testing.T, fileName string, content string) {
require.NoError(t, err)
}

// RunMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func RunMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
WriteTestCommunicationFile(t, DebugDelayCommitShard, "80-")
WriteTestCommunicationFile(t, DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

// DeleteFile deletes the file specified.
func DeleteFile(fileName string) {
_ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName))
Expand Down
Loading
Loading