diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 20528a5b98d..5bc7399f74e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -328,8 +328,8 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { // Exec executes the query and the related actions. // Example insert statement: // insert into _vt.vreplication -// (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) -// values ('Resharding', 'keyspace:"ks" shard:"0" tables:"a" tables:"b" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')` +// (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) +// values ('Resharding', 'keyspace:"ks" shard:"0" tables:"a" tables:"b" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')` // Example update statement: // update _vt.vreplication set state='Stopped', message='testing stop' where id=1 // Example delete: delete from _vt.vreplication where id=1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/shard_sorter.go b/go/vt/vttablet/tabletmanager/vreplication/shard_sorter.go index 28d41fbda46..07cefaef976 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/shard_sorter.go +++ b/go/vt/vttablet/tabletmanager/vreplication/shard_sorter.go @@ -18,25 +18,25 @@ package vreplication import "strings" -//ShardSorter implements a sort.Sort() function for sorting shard ranges +// ShardSorter implements a sort.Sort() function for sorting shard ranges type ShardSorter []string -//Len implements the required interface for a sorting function +// Len implements the required interface for a sorting function func (s ShardSorter) Len() int { return len(s) } -//Swap implements the required interface for a sorting function +// Swap implements the required interface for a sorting function func (s ShardSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -//Key returns the prefix of a shard range +// Key returns the prefix of a shard range func (s ShardSorter) Key(ind int) string { return strings.Split(s[ind], "-")[0] } -//Less implements the required interface for a sorting function +// Less implements the required interface for a sorting function func (s ShardSorter) Less(i, j int) bool { return s.Key(i) < s.Key(j) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 81842fadbdf..372bb3d03cb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -70,13 +70,13 @@ type vplayer struct { // newVPlayer creates a new vplayer. Parameters: // vreplicator: the outer replicator. It's used for common functions like setState. -// Also used to access the engine for registering journal events. +// Also used to access the engine for registering journal events. // settings: current settings read from _vt.vreplication. // copyState: if set, contains the list of tables yet to be copied, or in the process -// of being copied. If copyState is non-nil, the plans generated make sure that -// replication is only applied to parts that have been copied so far. +// of being copied. If copyState is non-nil, the plans generated make sure that +// replication is only applied to parts that have been copied so far. // pausePos: if set, replication will stop at that position without updating the state to "Stopped". -// This is used by the fastForward function during copying. +// This is used by the fastForward function during copying. func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result, pausePos mysql.Position, phase string) *vplayer { saveStop := true if !pausePos.IsZero() { @@ -279,30 +279,30 @@ func (vp *vplayer) recordHeartbeat() error { // applyEvents is the main thread that applies the events. It has the following use // cases to take into account: -// * Normal transaction that has row mutations. In this case, the transaction -// is committed along with an update of the position. -// * DDL event: the action depends on the OnDDL setting. -// * OTHER event: the current position of the event is saved. -// * JOURNAL event: if the event is relevant to the current stream, invoke registerJournal -// of the engine, and terminate. -// * HEARTBEAT: update SecondsBehindMaster. -// * Empty transaction: The event is remembered as an unsavedEvent. If no commits -// happen for idleTimeout since timeLastSaved, the current position of the unsavedEvent -// is committed (updatePos). -// * An empty transaction: Empty transactions are necessary because the current -// position of that transaction may be the stop position. If so, we have to record it. -// If not significant, we should avoid saving these empty transactions individually -// because they can cause unnecessary churn and binlog bloat. We should -// also not go for too long without saving because we should not fall way behind -// on the current replication position. Additionally, WaitForPos or other external -// agents could be waiting on that specific position by watching the vreplication -// record. -// * A group of transactions: Combine them into a single transaction. -// * Partial transaction: Replay the events received so far and refetch from relay log -// for more. -// * A combination of any of the above: The trickier case is the one where a group -// of transactions come in, with the last one being partial. In this case, all transactions -// up to the last one have to be committed, and the final one must be partially applied. +// - Normal transaction that has row mutations. In this case, the transaction +// is committed along with an update of the position. +// - DDL event: the action depends on the OnDDL setting. +// - OTHER event: the current position of the event is saved. +// - JOURNAL event: if the event is relevant to the current stream, invoke registerJournal +// of the engine, and terminate. +// - HEARTBEAT: update SecondsBehindMaster. +// - Empty transaction: The event is remembered as an unsavedEvent. If no commits +// happen for idleTimeout since timeLastSaved, the current position of the unsavedEvent +// is committed (updatePos). +// - An empty transaction: Empty transactions are necessary because the current +// position of that transaction may be the stop position. If so, we have to record it. +// If not significant, we should avoid saving these empty transactions individually +// because they can cause unnecessary churn and binlog bloat. We should +// also not go for too long without saving because we should not fall way behind +// on the current replication position. Additionally, WaitForPos or other external +// agents could be waiting on that specific position by watching the vreplication +// record. +// - A group of transactions: Combine them into a single transaction. +// - Partial transaction: Replay the events received so far and refetch from relay log +// for more. +// - A combination of any of the above: The trickier case is the one where a group +// of transactions come in, with the last one being partial. In this case, all transactions +// up to the last one have to be committed, and the final one must be partially applied. // // Of the above events, the saveable ones are COMMIT, DDL, and OTHER. Eventhough // A GTID comes as a separate event, it's not saveable until a subsequent saveable diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index fe0547cf8c3..59f2d1f8184 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -94,20 +94,20 @@ type vreplicator struct { // The Filter can be empty: get all rows and columns. // The Filter can be a keyrange, like "-80": get all rows that are within the keyrange. // The Filter can be a select expression. Examples. -// "select * from t", same as an empty Filter, -// "select * from t where in_keyrange('-80')", same as "-80", -// "select * from t where in_keyrange(col1, 'hash', '-80')", -// "select col1, col2 from t where...", -// "select col1, keyspace_id() as ksid from t where...", -// "select id, count(*), sum(price) from t group by id", -// "select * from t where customer_id=1 and val = 'newton'". -// Only "in_keyrange" expressions, integer and string comparisons are supported in the where clause. -// The select expressions can be any valid non-aggregate expressions, -// or count(*), or sum(col). -// If the target column name does not match the source expression, an -// alias like "a+b as targetcol" must be used. -// More advanced constructs can be used. Please see the table plan builder -// documentation for more info. +// "select * from t", same as an empty Filter, +// "select * from t where in_keyrange('-80')", same as "-80", +// "select * from t where in_keyrange(col1, 'hash', '-80')", +// "select col1, col2 from t where...", +// "select col1, keyspace_id() as ksid from t where...", +// "select id, count(*), sum(price) from t group by id", +// "select * from t where customer_id=1 and val = 'newton'". +// Only "in_keyrange" expressions, integer and string comparisons are supported in the where clause. +// The select expressions can be any valid non-aggregate expressions, +// or count(*), or sum(col). +// If the target column name does not match the source expression, an +// alias like "a+b as targetcol" must be used. +// More advanced constructs can be used. Please see the table plan builder +// documentation for more info. func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator { if *vreplicationHeartbeatUpdateInterval > vreplicationMinimumHeartbeatUpdateInterval { log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",