Skip to content

Commit

Permalink
Handle this entirely in vstream manager
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 23, 2024
1 parent 0cd4e18 commit 5a543d2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
29 changes: 24 additions & 5 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (vs *vstream) sendEvents(ctx context.Context) {

send := func(evs []*binlogdatapb.VEvent) error {
if err := vs.send(evs); err != nil {
log.Errorf("DEBUG: send error: %v", err)
vs.once.Do(func() {
vs.setError(err)
})
Expand All @@ -337,6 +338,7 @@ func (vs *vstream) sendEvents(ctx context.Context) {
})
return
}
log.Errorf("DEBUG: really sent all events: %v", evs)
resetHeartbeat()
case t := <-heartbeat:
now := t.UnixNano()
Expand Down Expand Up @@ -608,7 +610,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for _, event := range events {
for i, event := range events {
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -658,22 +660,37 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Include our own commit event to complete the BEGIN->JOURNAL-COMMIT
// sequence in the stream.
sendevents := append(sendevents, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT})
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet. This batch of events we're currently processing may not
// contain these events.
log.Errorf("DEBUG: JOURNAL event received for %s/%s: remaining events: %d",
sgtid.Keyspace, sgtid.Shard, len(events)-(i+1))
for j := i + 1; j < len(events); j++ {
log.Errorf("DEBUG: appending event: %+v", events[j])
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
break
}
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
eventss = nil
sendevents = nil
// We're going to be stopping the stream anyway, so we pause to give clients
// time to recv the journal event before the stream's context is cancelled.
// If they client doesn't recv the journal event, they'll have to resume from
// the last ShardGtid they received before the journal event.
time.Sleep(2 * time.Second)
}
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
Expand Down Expand Up @@ -761,6 +778,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
defer vs.mu.Unlock()

// Send all chunks while holding the lock.
log.Errorf("DEBUG: sending all events: %v", eventss)
for _, events := range eventss {
if err := vs.getError(); err != nil {
return err
Expand Down Expand Up @@ -811,6 +829,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
case <-ctx.Done():
return nil
case vs.eventCh <- events:
log.Errorf("DEBUG: sent all events: %v", eventss)
}
}
return nil
Expand Down
14 changes: 9 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) {

// Cancel stops the streaming.
func (vs *vstreamer) Cancel() {
// Try and send our buffered events first.
vs.cancel()
}

Expand Down Expand Up @@ -210,8 +211,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

// Only the following patterns are possible:
// BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks.
// BEGIN->JOURNAL
// ->GTID->COMMIT. This is a special case where the journal is sent immediately as some consumers stop on reshard events.
// BEGIN->JOURNAL->GTID->COMMIT
// GTID->DDL
// GTID->OTHER
// HEARTBEAT is issued if there's inactivity, which is likely
Expand All @@ -227,12 +227,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
vevent.Shard = vs.vse.shard

switch vevent.Type {
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD,
binlogdatapb.VEventType_JOURNAL:
// We never have to send GTID, BEGIN, FIELD events on their own.
// A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT.
// So, we don't have to send it right away.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER,
binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION, binlogdatapb.VEventType_JOURNAL:
// COMMIT, DDL, JOURNAL, OTHER and HEARTBEAT must be immediately sent.
binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION:
// COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent.
// Although unlikely, it's possible to get a HEARTBEAT in the middle
// of a transaction. If so, we still send the partial transaction along
// with the heartbeat.
Expand Down Expand Up @@ -627,6 +630,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_VERSION,
}
vevents = append(vevents, vevent)

} else {
vevents, err = vs.processRowEvent(vevents, plan, rows)
}
Expand Down
16 changes: 6 additions & 10 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1731,16 +1731,12 @@ func TestJournal(t *testing.T) {
"commit",
},
// External table events don't get sent.
output: [][]string{
{
`begin`,
`type:JOURNAL journal:{id:1 migration_type:SHARDS}`,
},
{
`gtid`,
`commit`,
},
},
output: [][]string{{
`begin`,
`type:JOURNAL journal:{id:1 migration_type:SHARDS}`,
`gtid`,
`commit`,
}},
}}
runCases(t, nil, testcases, "", nil)
}
Expand Down

0 comments on commit 5a543d2

Please sign in to comment.