diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 7c49de58c9b..6c538a48f75 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -18,7 +18,6 @@ package workflow import ( "context" - "encoding/json" "errors" "fmt" "math" @@ -41,11 +40,9 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/ptr" - "vitess.io/vitess/go/sets" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" @@ -58,7 +55,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/schematools" - "vitess.io/vitess/go/vt/vtctl/workflow/common" "vitess.io/vitess/go/vt/vtctl/workflow/vexec" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" @@ -406,546 +402,28 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows span.Annotate("include_logs", req.IncludeLogs) span.Annotate("shards", req.Shards) - readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} - if req.Workflow != "" { - readReq.IncludeWorkflows = []string{req.Workflow} + w := &workflowFetcher{ + ts: s.ts, + tmc: s.tmc, + parser: s.SQLParser(), + logger: s.Logger(), } - if req.ActiveOnly { - readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} - } - - // Guards access to the maps used throughout. - m := sync.Mutex{} - shards, err := common.GetShards(ctx, s.ts, req.Keyspace, req.Shards) + workflowsByShard, err := w.fetchWorkflowsByShard(ctx, req) if err != nil { return nil, err } - results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) - readWorkflowsEg, readWorkflowsCtx := errgroup.WithContext(ctx) - for _, shard := range shards { - readWorkflowsEg.Go(func() error { - si, err := s.ts.GetShard(readWorkflowsCtx, req.Keyspace, shard) - if err != nil { - return err - } - if si.PrimaryAlias == nil { - return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, req.Keyspace, shard) - } - primary, err := s.ts.GetTablet(readWorkflowsCtx, si.PrimaryAlias) - if err != nil { - return err - } - if primary == nil { - return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, shard, topoproto.TabletAliasString(si.PrimaryAlias)) - } - // Clone the request so that we can set the correct DB name for tablet. - req := readReq.CloneVT() - wres, err := s.tmc.ReadVReplicationWorkflows(readWorkflowsCtx, primary.Tablet, req) - if err != nil { - return err - } - m.Lock() - defer m.Unlock() - results[primary] = wres - return nil - }) - } - if readWorkflowsEg.Wait() != nil { - return nil, err - } - - copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(results)) - - fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { - span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchCopyStates") - defer span.Finish() - - span.Annotate("keyspace", req.Keyspace) - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - - copyStates, err := s.getWorkflowCopyStates(ctx, tablet, streamIds) - if err != nil { - return err - } - m.Lock() - defer m.Unlock() - - for _, copyState := range copyStates { - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) - copyStatesByShardStreamId[shardStreamId] = append( - copyStatesByShardStreamId[shardStreamId], - copyState, - ) - } - - return nil - } - - fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) - for tablet, result := range results { - tablet := tablet // loop closure - - streamIds := make([]int32, 0, len(result.Workflows)) - for _, wf := range result.Workflows { - for _, stream := range wf.Streams { - streamIds = append(streamIds, stream.Id) - } - } - - if len(streamIds) == 0 { - continue - } - - fetchCopyStatesEg.Go(func() error { - return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) - }) - } - - if err := fetchCopyStatesEg.Wait(); err != nil { + copyStatesByShardStreamId, err := w.fetchCopyStatesByShardStream(ctx, workflowsByShard) + if err != nil { return nil, err } - workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) - sourceKeyspaceByWorkflow := make(map[string]string, len(results)) - sourceShardsByWorkflow := make(map[string]sets.Set[string], len(results)) - targetKeyspaceByWorkflow := make(map[string]string, len(results)) - targetShardsByWorkflow := make(map[string]sets.Set[string], len(results)) - maxVReplicationLagByWorkflow := make(map[string]float64, len(results)) - maxVReplicationTransactionLagByWorkflow := make(map[string]float64, len(results)) - - // We guarantee the following invariants when this function is called for a - // given workflow: - // - workflow.Name != "" (more precisely, ".Name is set 'properly'") - // - workflowsMap[workflow.Name] == workflow - // - sourceShardsByWorkflow[workflow.Name] != nil - // - targetShardsByWorkflow[workflow.Name] != nil - // - workflow.ShardStatuses != nil - scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, tablet *topo.TabletInfo) error { - // This is not called concurrently, but we still protect the maps to ensure - // that we're concurrency-safe in the face of future changes (e.g. where other - // things are running concurrently with this which also access these maps). - m.Lock() - defer m.Unlock() - for _, rstream := range res.Streams { - // The value in the pos column can be compressed and thus not - // have a valid GTID consisting of valid UTF-8 characters so we - // have to decode it so that it's properly decompressed first - // when needed. - pos := rstream.Pos - if pos != "" { - mpos, err := binlogplayer.DecodePosition(pos) - if err != nil { - return err - } - pos = mpos.String() - } - - cells := strings.Split(res.Cells, ",") - for i := range cells { - cells[i] = strings.TrimSpace(cells[i]) - } - options := res.Options - if options != "" { - if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { - return err - } - } - stream := &vtctldatapb.Workflow_Stream{ - Id: int64(rstream.Id), - Shard: tablet.Shard, - Tablet: tablet.Alias, - BinlogSource: rstream.Bls, - Position: pos, - StopPosition: rstream.StopPos, - State: rstream.State.String(), - DbName: tablet.DbName(), - TabletTypes: res.TabletTypes, - TabletSelectionPreference: res.TabletSelectionPreference, - Cells: cells, - TransactionTimestamp: rstream.TransactionTimestamp, - TimeUpdated: rstream.TimeUpdated, - Message: rstream.Message, - Tags: strings.Split(res.Tags, ","), - RowsCopied: rstream.RowsCopied, - ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ - ComponentThrottled: rstream.ComponentThrottled, - TimeThrottled: rstream.TimeThrottled, - }, - } - - // Merge in copy states, which we've already fetched. - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) - if copyState, ok := copyStatesByShardStreamId[shardStreamId]; ok { - stream.CopyStates = copyState - } - - if rstream.TimeUpdated == nil { - rstream.TimeUpdated = &vttimepb.Time{} - } - - switch { - case strings.Contains(strings.ToLower(stream.Message), "error"): - stream.State = binlogdatapb.VReplicationWorkflowState_Error.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: - stream.State = binlogdatapb.VReplicationWorkflowState_Copying.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: - stream.State = binlogdatapb.VReplicationWorkflowState_Lagging.String() - } - - shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) - shardStream, ok := workflow.ShardStreams[shardStreamKey] - if !ok { - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - - si, err := s.ts.GetShard(ctx, req.Keyspace, tablet.Shard) - if err != nil { - return err - } - - shardStream = &vtctldatapb.Workflow_ShardStream{ - Streams: nil, - TabletControls: si.TabletControls, - IsPrimaryServing: si.IsPrimaryServing, - } - - workflow.ShardStreams[shardStreamKey] = shardStream - } - - shardStream.Streams = append(shardStream.Streams, stream) - sourceShardsByWorkflow[workflow.Name].Insert(stream.BinlogSource.Shard) - targetShardsByWorkflow[workflow.Name].Insert(tablet.Shard) - - if ks, ok := sourceKeyspaceByWorkflow[workflow.Name]; ok && ks != stream.BinlogSource.Keyspace { - return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, ks, stream.BinlogSource.Keyspace) - } - - sourceKeyspaceByWorkflow[workflow.Name] = stream.BinlogSource.Keyspace - - if ks, ok := targetKeyspaceByWorkflow[workflow.Name]; ok && ks != tablet.Keyspace { - return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, ks, tablet.Keyspace) - } - - targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace - - if stream.TimeUpdated == nil { - stream.TimeUpdated = &vttimepb.Time{} - } - timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) - vreplicationLag := time.Since(timeUpdated) - - // MaxVReplicationLag represents the time since we last processed any event - // in the workflow. - if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok { - if vreplicationLag.Seconds() > currentMaxLag { - maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() - } - } else { - maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() - } - - workflow.WorkflowType = res.WorkflowType.String() - workflow.WorkflowSubType = res.WorkflowSubType.String() - workflow.DeferSecondaryKeys = res.DeferSecondaryKeys - - // MaxVReplicationTransactionLag estimates the actual statement processing lag - // between the source and the target. If we are still processing source events it - // is the difference b/w current time and the timestamp of the last event. If - // heartbeats are more recent than the last event, then the lag is the time since - // the last heartbeat as there can be an actual event immediately after the - // heartbeat, but which has not yet been processed on the target. - // We don't allow switching during the copy phase, so in that case we just return - // a large lag. All timestamps are in seconds since epoch. - if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0 - } - if rstream.TransactionTimestamp == nil { - rstream.TransactionTimestamp = &vttimepb.Time{} - } - lastTransactionTime := rstream.TransactionTimestamp.Seconds - if rstream.TimeHeartbeat == nil { - rstream.TimeHeartbeat = &vttimepb.Time{} - } - lastHeartbeatTime := rstream.TimeHeartbeat.Seconds - if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64 - } else { - if lastTransactionTime == 0 /* no new events after copy */ || - lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { - - lastTransactionTime = lastHeartbeatTime - } - now := time.Now().Unix() /* seconds since epoch */ - transactionReplicationLag := float64(now - lastTransactionTime) - if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag - } - } - } - - return nil - } - - for tablet, result := range results { - // In the old implementation, we knew we had at most one (0 <= N <= 1) - // workflow for each shard primary we queried. There might be multiple - // rows (streams) comprising that workflow, so we would aggregate the - // rows for a given primary into a single value ("the workflow", - // ReplicationStatusResult in the old types). - // - // In this version, we have many (N >= 0) workflows for each shard - // primary we queried, so we need to determine if each row corresponds - // to a workflow we're already aggregating, or if it's a workflow we - // haven't seen yet for that shard primary. We use the workflow name to - // dedupe for this. - for _, wfres := range result.Workflows { - workflowName := wfres.Workflow - workflow, ok := workflowsMap[workflowName] - if !ok { - workflow = &vtctldatapb.Workflow{ - Name: workflowName, - ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, - } - - workflowsMap[workflowName] = workflow - sourceShardsByWorkflow[workflowName] = sets.New[string]() - targetShardsByWorkflow[workflowName] = sets.New[string]() - } - - if err := scanWorkflow(ctx, workflow, wfres, tablet); err != nil { - return nil, err - } - } - } - - var ( - fetchLogsWG sync.WaitGroup - vrepLogQuery = strings.TrimSpace(` -SELECT - id, - vrepl_id, - type, - state, - message, - created_at, - updated_at, - count -FROM - _vt.vreplication_log -WHERE vrepl_id IN %a -ORDER BY - vrepl_id ASC, - id ASC -`) - ) - - fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchStreamLogs") - defer span.Finish() - - span.Annotate("keyspace", req.Keyspace) - span.Annotate("workflow", workflow.Name) - - vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) - for _, shardStream := range maps.Values(workflow.ShardStreams) { - for _, stream := range shardStream.Streams { - vreplIDs = append(vreplIDs, stream.Id) - } - } - idsBV, err := sqltypes.BuildBindVariable(vreplIDs) - if err != nil { - return - } - - query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) - if err != nil { - return - } - - vx := vexec.NewVExec(req.Keyspace, workflow.Name, s.ts, s.tmc, s.SQLParser()) - results, err := vx.QueryContext(ctx, query) - if err != nil { - // Note that we do not return here. If there are any query results - // in the map (i.e. some tablets returned successfully), we will - // still try to read log rows from them on a best-effort basis. But, - // we will also pre-emptively record the top-level fetch error on - // every stream in every shard in the workflow. Further processing - // below may override the error message for certain streams. - for _, streams := range workflow.ShardStreams { - for _, stream := range streams.Streams { - stream.LogFetchError = err.Error() - } - } - } - - for target, p3qr := range results { - qr := sqltypes.Proto3ToResult(p3qr) - shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) - - ss, ok := workflow.ShardStreams[shardStreamKey] - if !ok || ss == nil { - continue - } - - streams := ss.Streams - streamIdx := 0 - markErrors := func(err error) { - if streamIdx >= len(streams) { - return - } - - streams[streamIdx].LogFetchError = err.Error() - } - - for _, row := range qr.Rows { - id, err := row[0].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamID, err := row[1].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - typ := row[2].ToString() - state := row[3].ToString() - message := row[4].ToString() - - createdAt, err := time.Parse("2006-01-02 15:04:05", row[5].ToString()) - if err != nil { - markErrors(err) - continue - } - - updatedAt, err := time.Parse("2006-01-02 15:04:05", row[6].ToString()) - if err != nil { - markErrors(err) - continue - } - - count, err := row[7].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamLog := &vtctldatapb.Workflow_Stream_Log{ - Id: id, - StreamId: streamID, - Type: typ, - State: state, - CreatedAt: &vttimepb.Time{ - Seconds: createdAt.Unix(), - }, - UpdatedAt: &vttimepb.Time{ - Seconds: updatedAt.Unix(), - }, - Message: message, - Count: count, - } - - // Earlier, in the main loop where we called scanWorkflow for - // each _vt.vreplication row, we also sorted each ShardStreams - // slice by ascending id, and our _vt.vreplication_log query - // ordered by (stream_id ASC, id ASC), so we can walk the - // streams in index order in O(n) amortized over all the rows - // for this tablet. - for streamIdx < len(streams) { - stream := streams[streamIdx] - if stream.Id < streamLog.StreamId { - streamIdx++ - continue - } - - if stream.Id > streamLog.StreamId { - s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog) - // This can happen on manual/failed workflow cleanup so move to the next log. - break - } - - // stream.Id == streamLog.StreamId - stream.Logs = append(stream.Logs, streamLog) - break - } - } - } - } - - workflows := make([]*vtctldatapb.Workflow, 0, len(workflowsMap)) - - for name, workflow := range workflowsMap { - sourceShards, ok := sourceShardsByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no source shards", name) - } - - sourceKeyspace, ok := sourceKeyspaceByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no source keyspace", name) - } - - targetShards, ok := targetShardsByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no target shards", name) - } - - targetKeyspace, ok := targetKeyspaceByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no target keyspace", name) - } - - maxVReplicationLag, ok := maxVReplicationLagByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no tracked vreplication lag", name) - } - - maxVReplicationTransactionLag, ok := maxVReplicationTransactionLagByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no tracked vreplication transaction lag", name) - } - - workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: sourceKeyspace, - Shards: sets.List(sourceShards), - } - - workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: targetKeyspace, - Shards: sets.List(targetShards), - } - - workflow.MaxVReplicationLag = int64(maxVReplicationLag) - workflow.MaxVReplicationTransactionLag = int64(maxVReplicationTransactionLag) - - // Sort shard streams by stream_id ASC, to support an optimization - // in fetchStreamLogs below. - for _, shardStreams := range workflow.ShardStreams { - sort.Slice(shardStreams.Streams, func(i, j int) bool { - return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id - }) - } - - workflows = append(workflows, workflow) - - if req.IncludeLogs { - // Fetch logs for all streams associated with this workflow in the background. - fetchLogsWG.Add(1) - go func(ctx context.Context, workflow *vtctldatapb.Workflow) { - defer fetchLogsWG.Done() - fetchStreamLogs(ctx, workflow) - }(ctx, workflow) - } + workflows, err := w.buildWorkflows(ctx, workflowsByShard, copyStatesByShardStreamId, req) + if err != nil { + return nil, err } - // Wait for all the log fetchers to finish. - fetchLogsWG.Wait() - return &vtctldatapb.GetWorkflowsResponse{ Workflows: workflows, }, nil @@ -1080,51 +558,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return ts, state, nil } -func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates") - defer span.Finish() - - span.Annotate("keyspace", tablet.Keyspace) - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) - - idsBV, err := sqltypes.BuildBindVariable(streamIds) - if err != nil { - return nil, err - } - query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", - idsBV, idsBV) - if err != nil { - return nil, err - } - qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query) - if err != nil { - return nil, err - } - - result := sqltypes.Proto3ToResult(qr) - if result == nil { - return nil, nil - } - - copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) - for i, row := range result.Rows { - streamId, err := row[0].ToInt64() - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) - } - // These string fields are technically varbinary, but this is close enough. - copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ - StreamId: streamId, - Table: row[1].ToString(), - LastPk: row[2].ToString(), - } - } - - return copyStates, nil -} - // LookupVindexCreate creates the lookup vindex in the specified // keyspace and creates a VReplication workflow to backfill that // vindex from the keyspace to the target/lookup table specified. @@ -1545,7 +978,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms - if err := s.setupInitialDeniedTables(ctx, ts); err != nil { + if err := setupInitialDeniedTables(ctx, ts); err != nil { return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards") } } @@ -1600,7 +1033,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl }) } -func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error { +func validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error { if mz.IsMultiTenantMigration() { switch { case req.NoRoutingRules: @@ -1612,7 +1045,7 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque return nil } -func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { +func setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil } @@ -1630,7 +1063,7 @@ func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitch } func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error { - if err := s.validateRoutingRuleFlags(req, mz); err != nil { + if err := validateRoutingRuleFlags(req, mz); err != nil { return err } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index dbe06ab1a47..26d722f1de0 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -2470,7 +2470,7 @@ func TestGetWorkflowsStreamLogs(t *testing.T) { }, sourceShards, targetShards) logResult := sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|`count`", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), + sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|count", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), "1|0|State Change|Running|test message for non-existent 1|2006-01-02 15:04:05|2006-01-02 15:04:05|1", "2|0|State Change|Stopped|test message for non-existent 2|2006-01-02 15:04:06|2006-01-02 15:04:06|1", "3|1|State Change|Running|log message|2006-01-02 15:04:07|2006-01-02 15:04:07|1", @@ -2499,3 +2499,63 @@ func TestGetWorkflowsStreamLogs(t *testing.T) { assert.Equal(t, gotLogs[0].State, "Running") assert.Equal(t, gotLogs[0].Id, int64(3)) } + +func TestWorkflowStatus(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := "source_keyspace" + targetKeyspace := "target_keyspace" + workflow := "test_workflow" + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: sourceKeyspace, + TargetKeyspace: targetKeyspace, + Workflow: workflow, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + tablesResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name", "varchar"), "table1", "table2") + te.tmc.expectVRQuery(200, "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", tablesResult) + + tablesTargetCopyResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|table_rows|data_length", "varchar|int64|int64"), "table1|50|500", "table2|100|250") + te.tmc.expectVRQuery(200, "select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_target_keyspace' and table_name in ('table1','table2')", tablesTargetCopyResult) + + tablesSourceCopyResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|table_rows|data_length", "varchar|int64|int64"), "table1|100|1000", "table2|200|500") + te.tmc.expectVRQuery(100, "select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_source_keyspace' and table_name in ('table1','table2')", tablesSourceCopyResult) + + te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{}) + + res, err := te.ws.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ + Keyspace: targetKeyspace, + Workflow: workflow, + Shards: targetShards, + }) + + assert.NoError(t, err) + + require.NotNil(t, res.TableCopyState) + + stateTable1 := res.TableCopyState["table1"] + stateTable2 := res.TableCopyState["table2"] + require.NotNil(t, stateTable1) + require.NotNil(t, stateTable2) + + assert.Equal(t, int64(100), stateTable1.RowsTotal) + assert.Equal(t, int64(200), stateTable2.RowsTotal) + assert.Equal(t, int64(50), stateTable1.RowsCopied) + assert.Equal(t, int64(100), stateTable2.RowsCopied) + assert.Equal(t, float32(50), stateTable1.RowsPercentage) + assert.Equal(t, float32(50), stateTable2.RowsPercentage) +} diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go new file mode 100644 index 00000000000..da0ee5dfec7 --- /dev/null +++ b/go/vt/vtctl/workflow/workflows.go @@ -0,0 +1,672 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file provides functions for fetching and retrieving information about VReplication workflows + +At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to +get the following: +- Fetch workflows by shard +- Fetch copy states by shard stream +- Build workflows with metadata +- Fetch stream logs +*/ + +package workflow + +import ( + "context" + "encoding/json" + "fmt" + "math" + "sort" + "strings" + "sync" + "time" + + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + + "vitess.io/vitess/go/sets" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/workflow/common" + "vitess.io/vitess/go/vt/vtctl/workflow/vexec" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" +) + +// workflowFetcher is responsible for fetching and retrieving information +// about VReplication workflows. +type workflowFetcher struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + + logger logutil.Logger + parser *sqlparser.Parser +} + +type workflowMetadata struct { + sourceKeyspace string + sourceShards sets.Set[string] + targetKeyspace string + targetShards sets.Set[string] + maxVReplicationLag float64 + maxVReplicationTransactionLag float64 +} + +var vrepLogQuery = strings.TrimSpace(` +SELECT + id, + vrepl_id, + type, + state, + message, + created_at, + updated_at, + count +FROM + _vt.vreplication_log +WHERE vrepl_id IN %a +ORDER BY + vrepl_id ASC, + id ASC +`) + +func (wf *workflowFetcher) fetchWorkflowsByShard( + ctx context.Context, + req *vtctldatapb.GetWorkflowsRequest, +) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} + if req.Workflow != "" { + readReq.IncludeWorkflows = []string{req.Workflow} + } + if req.ActiveOnly { + readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} + } + + m := sync.Mutex{} + + shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards) + if err != nil { + return nil, err + } + + results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) + + err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error { + primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias) + if err != nil { + return err + } + if primary == nil { + return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias)) + } + // Clone the request so that we can set the correct DB name for tablet. + req := readReq.CloneVT() + wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req) + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + results[primary] = wres + return nil + }) + if err != nil { + return nil, err + } + + return results, nil +} + +func (wf *workflowFetcher) fetchCopyStatesByShardStream( + ctx context.Context, + workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, +) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) { + m := sync.Mutex{} + + copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard)) + + fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates") + defer span.Finish() + + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + + copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds) + if err != nil { + return err + } + + m.Lock() + defer m.Unlock() + + for _, copyState := range copyStates { + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) + copyStatesByShardStreamId[shardStreamId] = append( + copyStatesByShardStreamId[shardStreamId], + copyState, + ) + } + + return nil + } + + fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) + for tablet, result := range workflowsByShard { + streamIds := make([]int32, 0, len(result.Workflows)) + for _, wf := range result.Workflows { + for _, stream := range wf.Streams { + streamIds = append(streamIds, stream.Id) + } + } + + if len(streamIds) == 0 { + continue + } + + fetchCopyStatesEg.Go(func() error { + return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) + }) + } + if err := fetchCopyStatesEg.Wait(); err != nil { + return nil, err + } + + return copyStatesByShardStreamId, nil +} + +func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates") + defer span.Finish() + + span.Annotate("keyspace", tablet.Keyspace) + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) + + idsBV, err := sqltypes.BuildBindVariable(streamIds) + if err != nil { + return nil, err + } + query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", + idsBV, idsBV) + if err != nil { + return nil, err + } + qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query) + if err != nil { + return nil, err + } + + result := sqltypes.Proto3ToResult(qr) + if result == nil { + return nil, nil + } + + copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) + for i, row := range result.Named().Rows { + streamId, err := row["vrepl_id"].ToInt64() + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) + } + // These string fields are technically varbinary, but this is close enough. + copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ + StreamId: streamId, + Table: row["table_name"].ToString(), + LastPk: row["lastpk"].ToString(), + } + } + + return copyStates, nil +} + +func (wf *workflowFetcher) buildWorkflows( + ctx context.Context, + results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + req *vtctldatapb.GetWorkflowsRequest, +) ([]*vtctldatapb.Workflow, error) { + workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) + workflowMetadataMap := make(map[string]*workflowMetadata, len(results)) + + for tablet, result := range results { + // In the old implementation, we knew we had at most one (0 <= N <= 1) + // workflow for each shard primary we queried. There might be multiple + // rows (streams) comprising that workflow, so we would aggregate the + // rows for a given primary into a single value ("the workflow", + // ReplicationStatusResult in the old types). + // + // In this version, we have many (N >= 0) workflows for each shard + // primary we queried, so we need to determine if each row corresponds + // to a workflow we're already aggregating, or if it's a workflow we + // haven't seen yet for that shard primary. We use the workflow name to + // dedupe for this. + for _, wfres := range result.Workflows { + workflowName := wfres.Workflow + workflow, ok := workflowsMap[workflowName] + if !ok { + workflow = &vtctldatapb.Workflow{ + Name: workflowName, + ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, + } + + workflowsMap[workflowName] = workflow + workflowMetadataMap[workflowName] = &workflowMetadata{ + sourceShards: sets.New[string](), + targetShards: sets.New[string](), + } + } + + metadata := workflowMetadataMap[workflowName] + err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace) + if err != nil { + return nil, err + } + } + } + + for name, workflow := range workflowsMap { + meta := workflowMetadataMap[name] + updateWorkflowWithMetadata(workflow, meta) + + // Sort shard streams by stream_id ASC, to support an optimization + // in fetchStreamLogs below. + for _, shardStreams := range workflow.ShardStreams { + sort.Slice(shardStreams.Streams, func(i, j int) bool { + return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id + }) + } + } + + if req.IncludeLogs { + var fetchLogsWG sync.WaitGroup + + for _, workflow := range workflowsMap { + // Fetch logs for all streams associated with this workflow in the background. + fetchLogsWG.Add(1) + go func(ctx context.Context, workflow *vtctldatapb.Workflow) { + defer fetchLogsWG.Done() + wf.fetchStreamLogs(ctx, req.Keyspace, workflow) + }(ctx, workflow) + } + + // Wait for all the log fetchers to finish. + fetchLogsWG.Wait() + } + + return maps.Values(workflowsMap), nil +} + +func (wf *workflowFetcher) scanWorkflow( + ctx context.Context, + workflow *vtctldatapb.Workflow, + res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, + tablet *topo.TabletInfo, + meta *workflowMetadata, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + keyspace string, +) error { + shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) + shardStream, ok := workflow.ShardStreams[shardStreamKey] + if !ok { + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard) + if err != nil { + return err + } + + shardStream = &vtctldatapb.Workflow_ShardStream{ + Streams: nil, + TabletControls: si.TabletControls, + IsPrimaryServing: si.IsPrimaryServing, + } + + workflow.ShardStreams[shardStreamKey] = shardStream + } + + for _, rstream := range res.Streams { + // The value in the pos column can be compressed and thus not + // have a valid GTID consisting of valid UTF-8 characters so we + // have to decode it so that it's properly decompressed first + // when needed. + pos := rstream.Pos + if pos != "" { + mpos, err := binlogplayer.DecodePosition(pos) + if err != nil { + return err + } + pos = mpos.String() + } + + cells := strings.Split(res.Cells, ",") + for i := range cells { + cells[i] = strings.TrimSpace(cells[i]) + } + options := res.Options + if options != "" { + if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { + return err + } + } + + stream := &vtctldatapb.Workflow_Stream{ + Id: int64(rstream.Id), + Shard: tablet.Shard, + Tablet: tablet.Alias, + BinlogSource: rstream.Bls, + Position: pos, + StopPosition: rstream.StopPos, + State: rstream.State.String(), + DbName: tablet.DbName(), + TabletTypes: res.TabletTypes, + TabletSelectionPreference: res.TabletSelectionPreference, + Cells: cells, + TransactionTimestamp: rstream.TransactionTimestamp, + TimeUpdated: rstream.TimeUpdated, + Message: rstream.Message, + Tags: strings.Split(res.Tags, ","), + RowsCopied: rstream.RowsCopied, + ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ + ComponentThrottled: rstream.ComponentThrottled, + TimeThrottled: rstream.TimeThrottled, + }, + } + + // Merge in copy states, which we've already fetched. + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) + if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok { + stream.CopyStates = copyStates + } + + if rstream.TimeUpdated == nil { + rstream.TimeUpdated = &vttimepb.Time{} + } + + stream.State = getStreamState(stream, rstream) + + shardStream.Streams = append(shardStream.Streams, stream) + + meta.sourceShards.Insert(stream.BinlogSource.Shard) + meta.targetShards.Insert(tablet.Shard) + + if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace { + return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace) + } + + meta.sourceKeyspace = stream.BinlogSource.Keyspace + + if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace { + return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace) + } + + meta.targetKeyspace = tablet.Keyspace + + if stream.TimeUpdated == nil { + stream.TimeUpdated = &vttimepb.Time{} + } + timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) + vreplicationLag := time.Since(timeUpdated) + + // MaxVReplicationLag represents the time since we last processed any event + // in the workflow. + if vreplicationLag.Seconds() > meta.maxVReplicationLag { + meta.maxVReplicationLag = vreplicationLag.Seconds() + } + + workflow.WorkflowType = res.WorkflowType.String() + workflow.WorkflowSubType = res.WorkflowSubType.String() + workflow.DeferSecondaryKeys = res.DeferSecondaryKeys + + // MaxVReplicationTransactionLag estimates the actual statement processing lag + // between the source and the target. If we are still processing source events it + // is the difference b/w current time and the timestamp of the last event. If + // heartbeats are more recent than the last event, then the lag is the time since + // the last heartbeat as there can be an actual event immediately after the + // heartbeat, but which has not yet been processed on the target. + // We don't allow switching during the copy phase, so in that case we just return + // a large lag. All timestamps are in seconds since epoch. + if rstream.TransactionTimestamp == nil { + rstream.TransactionTimestamp = &vttimepb.Time{} + } + lastTransactionTime := rstream.TransactionTimestamp.Seconds + if rstream.TimeHeartbeat == nil { + rstream.TimeHeartbeat = &vttimepb.Time{} + } + lastHeartbeatTime := rstream.TimeHeartbeat.Seconds + if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { + meta.maxVReplicationTransactionLag = math.MaxInt64 + } else { + if lastTransactionTime == 0 /* no new events after copy */ || + lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { + + lastTransactionTime = lastHeartbeatTime + } + now := time.Now().Unix() /* seconds since epoch */ + transactionReplicationLag := float64(now - lastTransactionTime) + if transactionReplicationLag > meta.maxVReplicationTransactionLag { + meta.maxVReplicationTransactionLag = transactionReplicationLag + } + } + } + + return nil +} + +func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) { + workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.sourceKeyspace, + Shards: sets.List(meta.sourceShards), + } + + workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.targetKeyspace, + Shards: sets.List(meta.targetShards), + } + + workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag) + workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag) +} + +func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs") + defer span.Finish() + + span.Annotate("keyspace", keyspace) + span.Annotate("workflow", workflow.Name) + + vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) + for _, shardStream := range maps.Values(workflow.ShardStreams) { + for _, stream := range shardStream.Streams { + vreplIDs = append(vreplIDs, stream.Id) + } + } + idsBV, err := sqltypes.BuildBindVariable(vreplIDs) + if err != nil { + return + } + + query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) + if err != nil { + return + } + + vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser) + results, err := vx.QueryContext(ctx, query) + if err != nil { + // Note that we do not return here. If there are any query results + // in the map (i.e. some tablets returned successfully), we will + // still try to read log rows from them on a best-effort basis. But, + // we will also pre-emptively record the top-level fetch error on + // every stream in every shard in the workflow. Further processing + // below may override the error message for certain streams. + for _, streams := range workflow.ShardStreams { + for _, stream := range streams.Streams { + stream.LogFetchError = err.Error() + } + } + } + + for target, p3qr := range results { + qr := sqltypes.Proto3ToResult(p3qr) + shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) + + ss, ok := workflow.ShardStreams[shardStreamKey] + if !ok || ss == nil { + continue + } + + streams := ss.Streams + streamIdx := 0 + markErrors := func(err error) { + if streamIdx >= len(streams) { + return + } + + streams[streamIdx].LogFetchError = err.Error() + } + + for _, row := range qr.Named().Rows { + id, err := row["id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamID, err := row["vrepl_id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + typ := row["type"].ToString() + state := row["state"].ToString() + message := row["message"].ToString() + + createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + count, err := row["count"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamLog := &vtctldatapb.Workflow_Stream_Log{ + Id: id, + StreamId: streamID, + Type: typ, + State: state, + CreatedAt: &vttimepb.Time{ + Seconds: createdAt.Unix(), + }, + UpdatedAt: &vttimepb.Time{ + Seconds: updatedAt.Unix(), + }, + Message: message, + Count: count, + } + + // Earlier, in buildWorkflows, we sorted each ShardStreams + // slice by ascending id, and our _vt.vreplication_log query + // ordered by (stream_id ASC, id ASC), so we can walk the + // streams in index order in O(n) amortized over all the rows + // for this tablet. + for streamIdx < len(streams) { + stream := streams[streamIdx] + if stream.Id < streamLog.StreamId { + streamIdx++ + continue + } + + if stream.Id > streamLog.StreamId { + wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog) + // This can happen on manual/failed workflow cleanup so move to the next log. + break + } + + // stream.Id == streamLog.StreamId + stream.Logs = append(stream.Logs, streamLog) + break + } + } + } +} + +func (wf *workflowFetcher) forAllShards( + ctx context.Context, + keyspace string, + shards []string, + f func(ctx context.Context, shard *topo.ShardInfo) error, +) error { + eg, egCtx := errgroup.WithContext(ctx) + for _, shard := range shards { + eg.Go(func() error { + si, err := wf.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return err + } + if si.PrimaryAlias == nil { + return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard) + } + + if err := f(egCtx, si); err != nil { + return err + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + return nil +} + +func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string { + switch { + case strings.Contains(strings.ToLower(stream.Message), "error"): + return binlogdatapb.VReplicationWorkflowState_Error.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: + return binlogdatapb.VReplicationWorkflowState_Copying.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: + return binlogdatapb.VReplicationWorkflowState_Lagging.String() + } + return rstream.State.String() +} diff --git a/go/vt/vtctl/workflow/workflows_test.go b/go/vt/vtctl/workflow/workflows_test.go new file mode 100644 index 00000000000..2015c8d1b7c --- /dev/null +++ b/go/vt/vtctl/workflow/workflows_test.go @@ -0,0 +1,260 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +func TestGetStreamState(t *testing.T) { + testCases := []struct { + name string + stream *vtctldatapb.Workflow_Stream + rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream + want string + }{ + { + name: "error state", + stream: &vtctldatapb.Workflow_Stream{ + Message: "test error", + }, + want: "Error", + }, + { + name: "copying state", + stream: &vtctldatapb.Workflow_Stream{ + State: "Running", + CopyStates: []*vtctldatapb.Workflow_Stream_CopyState{ + { + Table: "table1", + }, + }, + }, + want: "Copying", + }, + { + name: "lagging state", + stream: &vtctldatapb.Workflow_Stream{ + State: "Running", + }, + rstream: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + TimeUpdated: &vttime.Time{ + Seconds: int64(time.Now().Second()) - 11, + }, + }, + want: "Lagging", + }, + { + name: "non-running and error free", + stream: &vtctldatapb.Workflow_Stream{ + State: "Stopped", + }, + rstream: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + State: binlogdata.VReplicationWorkflowState_Stopped, + }, + want: "Stopped", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + state := getStreamState(tt.stream, tt.rstream) + assert.Equal(t, tt.want, state) + }) + } +} + +func TestGetWorkflowCopyStates(t *testing.T) { + ctx := context.Background() + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "source_keyspace", + TargetKeyspace: "target_keyspace", + Workflow: "test_workflow", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + wf := workflowFetcher{ + ts: te.ws.ts, + tmc: te.tmc, + } + + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + } + + query := "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)" + te.tmc.expectVRQuery(100, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "1|table2|1", + )) + + copyStates, err := wf.getWorkflowCopyStates(ctx, &topo.TabletInfo{ + Tablet: tablet, + }, []int32{1}) + assert.NoError(t, err) + assert.Len(t, copyStates, 2) + + state1 := &vtctldatapb.Workflow_Stream_CopyState{ + Table: "table1", + LastPk: "2", + StreamId: 1, + } + state2 := &vtctldatapb.Workflow_Stream_CopyState{ + Table: "table2", + LastPk: "1", + StreamId: 1, + } + assert.Contains(t, copyStates, state1) + assert.Contains(t, copyStates, state2) +} + +func TestFetchCopyStatesByShardStream(t *testing.T) { + ctx := context.Background() + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "source_keyspace", + TargetKeyspace: "target_keyspace", + Workflow: "test_workflow", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + wf := workflowFetcher{ + ts: te.ws.ts, + tmc: te.tmc, + } + + tablet := &topodatapb.Tablet{ + Shard: "-80", + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + } + tablet2 := &topodatapb.Tablet{ + Shard: "80-", + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + } + + query := "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1, 2) and id in (select max(id) from _vt.copy_state where vrepl_id in (1, 2) group by vrepl_id, table_name)" + te.tmc.expectVRQuery(100, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "2|table2|1", "2|table1|1", + )) + + te.tmc.expectVRQuery(101, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "1|table2|1", + )) + + ti := &topo.TabletInfo{ + Tablet: tablet, + } + ti2 := &topo.TabletInfo{ + Tablet: tablet2, + } + + readVReplicationResponse := map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + ti: { + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + }, { + Id: 2, + }, + }, + }, + }, + }, + ti2: { + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + }, { + Id: 2, + }, + }, + }, + }, + }, + } + copyStatesByStreamId, err := wf.fetchCopyStatesByShardStream(ctx, readVReplicationResponse) + assert.NoError(t, err) + + copyStates1 := copyStatesByStreamId["-80/1"] + copyStates2 := copyStatesByStreamId["-80/2"] + copyStates3 := copyStatesByStreamId["80-/1"] + + require.NotNil(t, copyStates1) + require.NotNil(t, copyStates2) + require.NotNil(t, copyStates3) + + assert.Len(t, copyStates1, 1) + assert.Len(t, copyStates2, 2) + assert.Len(t, copyStates3, 2) + + assert.Nil(t, copyStatesByStreamId["80-/2"]) +}