Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic Transactions correctness with PRS, ERS and MySQL & Vttablet Restarts #16553

Merged
merged 37 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e922f5f
test: add the basic framework for 2pc fuzzer testing
GuptaManan100 Jul 25, 2024
68dab94
test: generate the queries for the transactions
GuptaManan100 Jul 25, 2024
baa764f
test: add verification logic to see if all the transactions were inde…
GuptaManan100 Jul 25, 2024
e0efd76
test: add multiple threads tests
GuptaManan100 Jul 25, 2024
8d9f8c7
test: rollback transactions if any query fails
GuptaManan100 Jul 25, 2024
2cfa733
feat: fix deletions from table
GuptaManan100 Jul 26, 2024
8c0bb8a
test: make update set values a member of the fuzzer struct
GuptaManan100 Jul 26, 2024
7fd96a5
test: split the information into more columns instead of overloading …
GuptaManan100 Jul 26, 2024
a382456
feat: use a different connection for verification
GuptaManan100 Jul 26, 2024
813d5dd
feat: address review comments
GuptaManan100 Jul 29, 2024
745a491
cleanup remove unrequired changes
GuptaManan100 Jul 29, 2024
56a4c09
Merge remote-tracking branch 'upstream/main' into fuzzer-disruptions
GuptaManan100 Jul 29, 2024
8c6fdfd
feat: add disruptions to fuzzer
GuptaManan100 Aug 7, 2024
9326e5b
test: fix close function and add more logs
GuptaManan100 Aug 7, 2024
25aa54f
feat: refactor redo logic for prepared transactions to make them prs,…
GuptaManan100 Aug 7, 2024
e92436c
Merge remote-tracking branch 'upstream/main' into fuzzer-disruptions
harshit-gangal Aug 7, 2024
ba6ec7f
chore: use local variables
harshit-gangal Aug 7, 2024
586f993
Merge remote-tracking branch 'upstream/main' into fuzzer-disruptions
GuptaManan100 Aug 12, 2024
7237a29
test: move disruption test to stress package and run writer threads t…
GuptaManan100 Aug 12, 2024
9893c34
feat: fix where we rollback prepared transactions, and fix SetReadOnl…
GuptaManan100 Aug 12, 2024
743c1c3
feat: fix the point where we mark the pool usable again
GuptaManan100 Aug 13, 2024
4731618
feat: handle vttablet restarts and add testing for it
GuptaManan100 Aug 13, 2024
3faec64
Merge remote-tracking branch 'upstream/main' into fuzzer-disruptions
GuptaManan100 Aug 14, 2024
66208b7
test: read the unresolved transactions and wait for them to complete …
GuptaManan100 Aug 14, 2024
691cd0d
test: fix unit tests to reflect the fixes
GuptaManan100 Aug 14, 2024
62876ea
feat: fix bug that caused twoPC pool to never be open
GuptaManan100 Aug 14, 2024
db2cc1f
test: make tests more robust to failures
GuptaManan100 Aug 14, 2024
f91872c
feat: ignore error for servers that don't know super_read_only variable
GuptaManan100 Aug 14, 2024
dc72f0d
test: run ers disruption in the end, because it can sometimes lead to…
GuptaManan100 Aug 14, 2024
6f74090
comments: add comments explaining the code
GuptaManan100 Aug 14, 2024
328850f
feat: add design doc with all the explanation of the code
GuptaManan100 Aug 14, 2024
2ec319e
chore: remove error from return parameter from redo prepared function
harshit-gangal Aug 19, 2024
20e9775
chore: close pool in opposite order of opening
harshit-gangal Aug 19, 2024
2317f69
chore: move non-stress test to different package
harshit-gangal Aug 19, 2024
250e5b0
comment: remove comments that are not required
GuptaManan100 Aug 20, 2024
8716c2c
Merge remote-tracking branch 'upstream/main' into fuzzer-disruptions
GuptaManan100 Aug 21, 2024
226933c
comment: fix typing errors in docs and comments
GuptaManan100 Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions doc/design-docs/AtomicTransactionsWithDisruptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Handling disruptions in atomic transactions

## Overview

This document describes how to make atomic transactions resilient in the face of disruptions. The basic design and components involved in an atomic transaction are described in [here](./TwoPhaseCommitDesign.md) The document describes each of the disruptions that can happen in a running cluster and how atomic transactions are engineered to handle them without breaking their guarantee of being atomic.

## `PlannedReparentShard` and `EmergencyReparentShard`

For both, Planned and Emergency reparents, we call `DemotePrimary` on the primary tablet. For Planned reparent, this call has to succeed, while on Emergency reparent, if the primary is unreachable then this call can fail, and we would still proceed further.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

As part of the `DemotePrimary` flow, when we transition the tablet to a non-serving state, we wait for all the transactions to have completed (in `TxEngine.shutdownLocked()` we have `te.txPool.WaitForEmpty()`). If the user has specified a shutdown grace-period, then after that much time elapses, we go ahead and forcefully kill all running queries. We then also rollback the prepared transactions. It is crucial that we rollback the prepared transactions only after all other writes have been killed, because when we rollback a prepared transaction, it lets go of the locks it was holding. If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through and cause data corruption since we won't be able to prepare the transaction again. All the code to kill queries can be found in `stateManager.terminateAllQueries()`.

The above outlined steps ensure that we either wait for all prepared transactions to conclude or we rollback them safely so that they can be prepared again on the new primary.

On the new primary, when we call `PromoteReplica`, we redo all the prepared transactions before we allow any new writes to go through. This ensures that the new primary is in the same state as the old primary was before the reparent. The code for redoing the prepared transactions can be found in `TxEngine.RedoPreparedTransactions()`.

If everything goes as described above, there is no reason for redoing of prepared transactions to fail. But in case, something unexpected happens and preparing transactions fails, we still allow the vttablet to accept new writes because we decided availability of the tablet is more important. We will however, build tooling and metrics for the users to be notified of these failures and let them handle this in the way they see fit.

While Planned reparent is an operation where all the processes are running fine, Emergency reparent is called when something has gone wrong with the cluster. Because we call `DemotePrimary` in parallel with `StopReplicationAndBuildStatusMap`, we can run into a case where-in the primary tries to write something to the binlog after all the replicas have stopped replicating. If we were to run without semi-sync, then the primary could potentially commit a prepared transaction, and return a success to the vtgate trying to commit this transaction. The vtgate can then conclude that the transaction is safe to conclude and remove all the metadata information. However, on the new primary since the transaction commit didn't get replicated, it would re-prepare the transaction and would wait for a coordinator to either commit or rollback it, but that would never happen. Essentially we would have a transaction stuck in prepared state on a shard indefinitely. To avoid this situation, it is essential that we run with semi-sync, because this ensures that any write that is acknowledged as a success to the caller, would necessarily have to be replicated to at least one replica. This ensures that the transaction would also already be committed on the new primary.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

## MySQL Restarts

When MySQL restarts, it loses all the ongoing transactions which includes all the prepared transactions. This is because the transaction logs are not persistent across restarts. This is a MySQL limitation and there is no way to get around this. However, at the Vitess level we must ensure that we can commit the prepared transactions even in case of MySQL restarts without any failures.

Vttablet has the code to detect MySQL failures and call `stateManager.checkMySQL()` which transitions the tablet to a NotConnected state. This prevents any writes from going through until the vttablet has transitioned back to a serving state.

We however, cannot rely on `checkMySQL` to ensure that no conflicting writes go through. This is because the time between MySQL restart and the vttablet transitioning to a NotConnected state can be large. During this time, the vttablet would still be accepting writes and some of them could potentially conflict with the prepared transactions.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

To handle this, we rely on the fact that when MySQL restarts, it starts with super-read-only turned on. This means that no writes can go through. It is VTOrc that registers this as an issue and fixes it by calling `UndoDemotePrimary`. As part of that call, before we set MySQL to read-write, we ensure that all the prepared transactions are redone in the read_only state. We use the dba pool (that has admin permissions) to prepare the transactions. This is safe because we know that no conflicting writes can go through until we set MySQL to read-write. The code to set MySQL to read-write after redoing prepared transactions can be found in `TabletManager.redoPreparedTransactionsAndSetReadWrite()`.

Handling MySQL restarts is the only reason we needed to add the code to redo prepared transactions whenever MySQL transitions from super-read-only to read-write state. Even though, we only need to do this in `UndoDemotePrimary`, it not necessary that it is `UndoDemotePrimary` that sets MySQL to read-write. If the user notices that the tablet is in a read-only state before VTOrc has a chance to fix it, they can manually call `SetReadWrite` on the tablet.
Therefore, the safest option was to always check if we need to redo the prepared transactions whenever MySQL transitions from super-read-only to read-write state.

## Vttablet Restarts

When Vttabet restarts, all the previous connections are dropped. It starts in a non-serving state, and then after reading the shard and tablet records from the topo, it transitions to a serving state.
As part of this transition we need to ensure that we redo the prepared transactions before we start accepting any writes. This is done as part of the `TxEngine.transition` function when we transition to an `AcceptingReadWrite` state. We call the same code for redoing the prepared transactions that we called for MySQL restarts, PRS and ERS.
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
cleanup(t)

return conn, func() {
conn.Close()
Expand Down
19 changes: 11 additions & 8 deletions go/test/endtoend/transaction/twopc/schema.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
create table twopc_user (
id bigint,
create table twopc_user
(
id bigint,
name varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_music (
id varchar(64),
create table twopc_music
(
id varchar(64),
user_id bigint,
title varchar(64),
title varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
create table twopc_t1
(
id bigint,
col bigint,
primary key (id, col)
primary key (id)
) Engine=InnoDB;
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzzer
package stress

import (
"context"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/vt/log"
)

var (
Expand Down Expand Up @@ -67,10 +74,12 @@ var (
// Moreover, the threadIDs of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order.
func TestTwoPCFuzzTest(t *testing.T) {
testcases := []struct {
name string
threads int
updateSets int
timeForTesting time.Duration
name string
threads int
updateSets int
timeForTesting time.Duration
clusterDisruptions []func()
disruptionProbability []int
}{
{
name: "Single Thread - Single Set",
Expand All @@ -90,15 +99,24 @@ func TestTwoPCFuzzTest(t *testing.T) {
updateSets: 15,
timeForTesting: 5 * time.Second,
},
{
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions",
threads: 15,
updateSets: 15,
timeForTesting: 5 * time.Second,
clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts},
disruptionProbability: []int{5, 5, 5, 5},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
fz := newFuzzer(tt.threads, tt.updateSets)
fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)

fz.initialize(t, conn)
conn.Close()
// Start the fuzzer.
fz.start(t)

Expand All @@ -108,8 +126,12 @@ func TestTwoPCFuzzTest(t *testing.T) {
// Signal the fuzzer to stop.
fz.stop()

// Wait for all transactions to be resolved.
waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
// Verify that all the transactions run were actually atomic and no data issues have occurred.
fz.verifyTransactionsWereAtomic(t)

log.Errorf("Verification complete. All good!")
})
}
}
Expand Down Expand Up @@ -176,14 +198,20 @@ type fuzzer struct {
wg sync.WaitGroup
// updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment.
updateRowsVals [][]int
// clusterDisruptions are the cluster level disruptions that can happen in a running cluster.
clusterDisruptions []func()
// disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds.
disruptionProbability []int
}

// newFuzzer creates a new fuzzer struct.
func newFuzzer(threads int, updateSets int) *fuzzer {
func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer {
fz := &fuzzer{
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
clusterDisruptions: clusterDisruptions,
disruptionProbability: disruptionProbability,
}
// Initially the fuzzer thread is stopped.
fz.shouldStop.Store(true)
Expand All @@ -202,12 +230,16 @@ func (fz *fuzzer) stop() {
func (fz *fuzzer) start(t *testing.T) {
// We mark the fuzzer thread to be running now.
fz.shouldStop.Store(false)
fz.wg.Add(fz.threads)
// fz.threads is the count of fuzzer threads, and one disruption thread.
fz.wg.Add(fz.threads + 1)
for i := 0; i < fz.threads; i++ {
go func() {
fz.runFuzzerThread(t, i)
}()
}
go func() {
fz.runClusterDisruptionThread(t)
}()
}

// runFuzzerThread is used to run a thread of the fuzzer.
Expand Down Expand Up @@ -308,3 +340,108 @@ func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string {
})
return queries
}

// runClusterDisruptionThread runs the cluster level disruptions in a separate thread.
func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) {
// Whenever we finish running this thread, we should mark the thread has stopped.
defer func() {
fz.wg.Done()
}()

for {
// If disruption thread is marked to be stopped, then we should exit this go routine.
if fz.shouldStop.Load() == true {
return
}
// Run a potential disruption
fz.runClusterDisruption(t)
time.Sleep(100 * time.Millisecond)
}

}

// runClusterDisruption tries to run a single cluster disruption.
func (fz *fuzzer) runClusterDisruption(t *testing.T) {
for idx, prob := range fz.disruptionProbability {
if rand.Intn(100) < prob {
fz.clusterDisruptions[idx]()
return
}
}
}

/*
Cluster Level Disruptions for the fuzzer
*/

func prs() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
newPrimary := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Running PRS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias)
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
if err != nil {
log.Errorf("error running PRS - %v", err)
}
}

func ers() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
newPrimary := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Running ERS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
if err != nil {
log.Errorf("error running ERS - %v", err)
}
}

func vttabletRestarts() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
tablet := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Restarting vttablet for - %v/%v - %v", keyspaceName, shard.Name, tablet.Alias)
err := tablet.VttabletProcess.TearDown()
if err != nil {
log.Errorf("error stopping vttablet - %v", err)
return
}
tablet.VttabletProcess.ServingStatus = "SERVING"
for {
err = tablet.VttabletProcess.Setup()
if err == nil {
return
}
// Sometimes vttablets fail to connect to the topo server due to a minor blip there.
// We don't want to fail the test, so we retry setting up the vttablet.
log.Errorf("error restarting vttablet - %v", err)
time.Sleep(1 * time.Second)
}
}

func mysqlRestarts() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
tablet := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias)
pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID))
pidBytes, err := os.ReadFile(pidFile)
if err != nil {
// We can't read the file which means the PID file does not exist
// The server must have stopped
return
}
pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes)))
if err != nil {
log.Errorf("Error in conversion to integer: %v", err)
return
}
err = syscallutil.Kill(pid, syscall.SIGKILL)
if err != nil {
log.Errorf("Error in killing process: %v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzzer
package stress

import (
"context"
Expand Down Expand Up @@ -75,12 +75,13 @@ func TestMain(m *testing.M) {

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil {
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

Expand Down Expand Up @@ -113,4 +114,5 @@ func cleanup(t *testing.T) {

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ create table twopc_fuzzer_insert (
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
Loading
Loading