Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle columnIdx out range data size #689

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions canal/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ func (r *RowsEvent) handleUnsigned() {

for i := 0; i < len(r.Rows); i++ {
for _, columnIdx := range r.Table.UnsignedColumns {
// ignore when current column index gets out of old data size.
// when new fields added to the table but the data is in old schema.
if columnIdx >= len(r.Rows[i]) {
continue
}

// Best practice: new columns should be added to the the end of table existing columns
// like : table(id,name,addr) => table(id,name,addr,age) , here age is added the the end
// Bad practice: table(id,name,addr) => table(id,name,age,addr) , here age is added before the addr
// The result of bad practice will cause the following logic be problematic

switch value := r.Rows[i][columnIdx].(type) {
case int8:
r.Rows[i][columnIdx] = uint8(value)
Expand Down
12 changes: 5 additions & 7 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Canal) runSyncBinlog() error {
}
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
err = c.handleRowsEvent(ev.Header, e)
if err != nil {
e := errors.Cause(err)
// if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
Expand Down Expand Up @@ -245,9 +245,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
atomic.StoreUint32(c.delay, newDelay)
}

func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)

func (c *Canal) handleRowsEvent(eventHeader *replication.EventHeader, ev *replication.RowsEvent) error {
// Caveat: table may be altered at runtime.
schema := string(ev.Table.Schema)
table := string(ev.Table.Table)
Expand All @@ -257,17 +255,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return err
}
var action string
switch e.Header.EventType {
switch eventHeader.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
return errors.Errorf("%s not supported now", eventHeader.EventType)
}
events := newRowsEvent(t, action, ev.Rows, e.Header)
events := newRowsEvent(t, action, ev.Rows, eventHeader)
return c.eventHandler.OnRow(events)
}

Expand Down