Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: support plugin #626

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ dm_integration_test_build: retool_setup
-coverpkg=github.com/pingcap/dm/... \
-o bin/dm-tracer.test github.com/pingcap/dm/cmd/dm-tracer \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \
-coverpkg=github.com/pingcap/dm/... \
-o bin/dm-syncer.test github.com/pingcap/dm/cmd/dm-syncer \
|| { $(FAILPOINT_DISABLE); exit 1; }
CGO_ENABLED=1 GO111MODULE=on go build -o bin/dm-syncer ./cmd/dm-syncer
CGO_ENABLED=1 GO111MODULE=on go build -o bin/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go
$(FAILPOINT_DISABLE)
tests/prepare_tools.sh

Expand All @@ -173,7 +171,7 @@ integration_test: check_third_party_binary
@which bin/dm-master.test
@which bin/dm-worker.test
@which bin/dm-tracer.test
@which bin/dm-syncer.test
@which bin/dm-syncer
tests/run.sh $(CASE)

compatibility_test: check_third_party_binary
Expand Down
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected"
ErrSyncerLoadPlugin,[code=36063:class=sync-unit:scope=internal:level=high],"fail to load plugin from %s"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
7 changes: 7 additions & 0 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type commonConfig struct {
EnableANSIQuotes bool
TimezoneStr string

PluginPath string

SyncerConfigFormat bool
}

Expand All @@ -77,6 +79,7 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
MaxRetry: c.MaxRetry,
EnableANSIQuotes: c.EnableANSIQuotes,
TimezoneStr: c.TimezoneStr,
PluginPath: c.PluginPath,
}

cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError)
Expand Down Expand Up @@ -271,6 +274,8 @@ type syncerConfig struct {
TimezoneStr string `toml:"timezone" json:"timezone"`
Timezone *time.Location `json:"-"`

PluginPath string `toml:"plugin-path" json:"plugin-path"`

printVersion bool
}

Expand Down Expand Up @@ -349,6 +354,8 @@ func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
Timezone: oc.TimezoneStr,
From: oc.From,
To: oc.To,

PluginPath: oc.PluginPath,
}

for _, rule := range oc.RouteRules {
Expand Down
2 changes: 2 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ type SubTaskConfig struct {
PprofAddr string `toml:"pprof-addr" json:"pprof-addr"`
StatusAddr string `toml:"status-addr" json:"status-addr"`

PluginPath string `toml:"plugin-path" json:"plugin-path"`

ConfigFile string `toml:"-" json:"config-file"`

// still needed by Syncer / Loader bin
Expand Down
3 changes: 3 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ const (
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
codeSyncerShardDDLConflict
codeSyncerLoadPlugin
)

// DM-master error code
Expand Down Expand Up @@ -878,6 +879,8 @@ var (
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")
ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected")

ErrSyncerLoadPlugin = New(codeSyncerLoadPlugin, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to load plugin from %s")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
ErrMasterSQLOpNotSupport = New(codeMasterSQLOpNotSupport, ClassDMMaster, ScopeInternal, LevelMedium, "op %s not supported")
Expand Down
4 changes: 2 additions & 2 deletions syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func (s *Syncer) handleQueryEventOptimistic(
return err
}

if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
if detected, err := s.execError.Detected(); detected {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
}

for _, table := range onlineDDLTableNames {
Expand Down
156 changes: 156 additions & 0 deletions syncer/plugin/demo/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/siddontang/go-mysql/replication"
"go.uber.org/zap"
)

// DemoPlugin is a demo to show how to use plugin
type DemoPlugin struct {
db *sql.DB
}

// NewPlugin creates a new DemoPlugin
func NewPlugin() interface{} {
return &DemoPlugin{}
}

// Init implements Plugin's Init
func (dp *DemoPlugin) Init(cfg *config.SubTaskConfig) error {
log.Info("demo plugin initialize")

dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port)
db, err := sql.Open("mysql", dsn)
if err != nil {
return err
}
dp.db = db

return nil
}

// HandleDDLJobResult implements Plugin's HandleDDLJobResult
// for example:
// ev.Query is `ALTER TABLE test.t1 MODIFY COLUMN name varchar(50);`
// error is `unsupported modify column length 50 is less than origin 100`
func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error {
if err == nil {
return nil
}

log.Info("demo plugin handle ddl job result", zap.String("query", string(ev.Query)), zap.Error(err))

if !strings.Contains(err.Error(), "unsupported modify column length") {
log.Info("don't contain error message \"unsupported modify column length\"")
return nil
}

stmt, err := parser.New().ParseOneStmt(string(ev.Query), "", "")
if err != nil {
log.Info("parser failed", zap.Error(err))
return err
}

schema := string(ev.Schema)

switch st := stmt.(type) {
case *ast.AlterTableStmt:
switch st.Specs[0].Tp {
case ast.AlterTableModifyColumn:
originColName := st.Specs[0].NewColumns[0].Name.Name.O
tmpColName := fmt.Sprintf("%s_tmp", st.Specs[0].NewColumns[0].Name)

// get origin column from ast, originCol is `name varchar(50)`
var sb strings.Builder
st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb))
originCol := sb.String()

// generate tmp column, tmpCol is `name_tmp varchar(50)`
st.Specs[0].NewColumns[0].Name.Name = model.NewCIStr(tmpColName)
var sb2 strings.Builder
st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb2))
tmpCol := sb2.String()

// get table infomation, used to get primary key and unique key
ctx := context.Background()
tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, schema, st.Table.Name.O, "")
if err != nil {
log.Info("get table information failed", zap.Error(err))
return err
}
keys, _ := dbutil.SelectUniqueOrderKey(tableInfo)
keysList := strings.Join(keys, ", ")

addColSQL := fmt.Sprintf("alter table `%s`.`%s` add column %s after %s", schema, st.Table.Name.O, tmpCol, originColName)
log.Info("execute", zap.String("sql", addColSQL))
_, err = dp.db.ExecContext(ctx, addColSQL)
if err != nil {
log.Info("execute sql failed", zap.String("sql", addColSQL), zap.Error(err))
return err
}

insertSQL := fmt.Sprintf("replace into `%s`.`%s`(%s, %s) SELECT %s, %s AS %s FROM `%s`.`%s`;", schema, st.Table.Name.O, keysList, tmpColName, keysList, originColName, tmpColName, schema, st.Table.Name.O)
log.Info("execute", zap.String("sql", insertSQL))
_, err = dp.db.ExecContext(ctx, insertSQL)
if err != nil {
log.Info("execute sql failed", zap.String("sql", insertSQL), zap.Error(err))
return err
}

dropColSQL := fmt.Sprintf("alter table `%s`.`%s` drop column %s", schema, st.Table.Name.O, originColName)
log.Info("execute", zap.String("sql", dropColSQL))
_, err = dp.db.ExecContext(ctx, dropColSQL)
if err != nil {
log.Info("execute sql failed", zap.String("sql", dropColSQL), zap.Error(err))
return err
}

changeColSQL := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN %s %s;", schema, st.Table.Name.O, tmpColName, originCol)
log.Info("execute", zap.String("sql", changeColSQL))
_, err = dp.db.ExecContext(ctx, changeColSQL)
if err != nil {
log.Info("execute sql failed", zap.String("sql", changeColSQL), zap.Error(err))
return err
}
default:
log.Info("unhandle ddl type")
return nil
}
default:
log.Info("unhandle ddl type")
return nil
}

return nil
}

// HandleDMLJobResult implements Plugin's HandleDMLJobResult
func (dp *DemoPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error {
return err
}
86 changes: 86 additions & 0 deletions syncer/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plugin

import (
"plugin"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/terror"
"github.com/siddontang/go-mysql/replication"
)

var (
createPluginFunc = "NewPlugin"
)

// LoadPlugin loads plugin by plugin's file path
func LoadPlugin(filepath string) (Plugin, error) {
if len(filepath) == 0 {
return new(NilPlugin), nil
}

p, err := plugin.Open(filepath)
if err != nil {
return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath)
}

pluginSymbol, err := p.Lookup(createPluginFunc)
if err != nil {
return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath)
}

newPlugin, ok := pluginSymbol.(func() interface{})
if !ok {
return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath)
}

plg := newPlugin()
plg2, ok := plg.(Plugin)
if !ok {
return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath)
}

return plg2, nil
}

// Plugin is a struct of plugin used in syncer unit
type Plugin interface {
// Init do some init job
Init(cfg *config.SubTaskConfig) error

// HandleDDLJobResult handles the result of ddl job
HandleDDLJobResult(ev *replication.QueryEvent, err error) error

// HandleDMLJobResult handles the result of dml job
HandleDMLJobResult(ev *replication.RowsEvent, err error) error
}

// NilPlugin is a plugin which do nothing
type NilPlugin struct{}

// Init implements Plugin's Init
func (n *NilPlugin) Init(cfg *config.SubTaskConfig) error {
return nil
}

// HandleDDLJobResult implements Plugin's HandleDDLJobResult
func (n *NilPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error {
return err
}

// HandleDMLJobResult implements Plugin's HandleDMLJobResult
func (n *NilPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error {
return err
}
Loading