Skip to content

Commit

Permalink
mycdc: cleanup tests for MySQL CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Dec 20, 2024
1 parent a32c417 commit 5a20f00
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 85 deletions.
13 changes: 11 additions & 2 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,17 @@ func prepSnapshotScannerAndMappers(cols []*sql.ColumnType) (values []any, mapper
return s.Time, nil
}
case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT", "YEAR":
val = new(sql.Null[int])
mapper = snapshotValueMapper[int]
val = new(sql.NullInt64)
mapper = func(v any) (any, error) {
s, ok := v.(*sql.NullInt64)
if !ok {
return nil, fmt.Errorf("expected %T got %T", int64(0), v)
}
if !s.Valid {
return nil, nil
}
return int(s.Int64), nil
}
case "DECIMAL", "NUMERIC":
val = new(sql.NullString)
mapper = stringMapping(func(s string) (any, error) {
Expand Down
169 changes: 86 additions & 83 deletions internal/impl/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (db *testDB) Exec(query string, args ...any) {
}

func setupTestWithMySQLVersion(t *testing.T, version string) (string, *testDB) {
t.Parallel()
integration.CheckSkip(t)
pool, err := dockertest.NewPool("")
require.NoError(t, err)
Expand Down Expand Up @@ -105,14 +106,15 @@ func TestIntegrationMySQLCDC(t *testing.T) {
integration.CheckSkip(t)
var mysqlTestVersions = []string{"8.0", "9.0", "9.1"}
for _, version := range mysqlTestVersions {
dsn, db := setupTestWithMySQLVersion(t, version)
// Create table
db.Exec(`
t.Run(version, func(t *testing.T) {
dsn, db := setupTestWithMySQLVersion(t, version)
// Create table
db.Exec(`
CREATE TABLE IF NOT EXISTS foo (
a INT PRIMARY KEY
)
`)
template := fmt.Sprintf(`
template := fmt.Sprintf(`
mysql_cdc:
dsn: %s
stream_snapshot: false
Expand All @@ -121,84 +123,85 @@ mysql_cdc:
- foo
`, dsn)

cacheConf := fmt.Sprintf(`
cacheConf := fmt.Sprintf(`
label: foocache
file:
directory: %s`, t.TempDir())

streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

var outBatches []string
var outBatchMut sync.Mutex
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

var outBatches []string
var outBatchMut sync.Mutex
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
require.NoError(t, err)
outBatchMut.Lock()
outBatches = append(outBatches, string(msgBytes))
outBatchMut.Unlock()
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
outBatchMut.Lock()
outBatches = append(outBatches, string(msgBytes))
outBatchMut.Unlock()
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

go func() {
err = streamOut.Run(context.Background())
go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
}()

time.Sleep(time.Second * 5)
for i := 0; i < 1000; i++ {
// Insert 10000 rows
db.Exec("INSERT INTO foo VALUES (?)", i)
}

assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1000
}, time.Minute*5, time.Millisecond*100)

require.NoError(t, streamOut.StopWithin(time.Second*10))

streamOutBuilder = service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

outBatches = nil
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
require.NoError(t, err)
outBatchMut.Lock()
outBatches = append(outBatches, string(msgBytes))
outBatchMut.Unlock()
return nil
}))

streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)
}()

time.Sleep(time.Second * 5)
for i := 0; i < 1000; i++ {
// Insert 10000 rows
db.Exec("INSERT INTO foo VALUES (?)", i)
}

assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1000
}, time.Minute*5, time.Millisecond*100)
time.Sleep(time.Second)
for i := 1001; i < 2001; i++ {
db.Exec("INSERT INTO foo VALUES (?)", i)
}

require.NoError(t, streamOut.StopWithin(time.Second*10))
go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
}()

streamOutBuilder = service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))
assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1000
}, time.Minute*5, time.Millisecond*100)

outBatches = nil
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
require.NoError(t, err)
outBatchMut.Lock()
outBatches = append(outBatches, string(msgBytes))
outBatchMut.Unlock()
return nil
}))

streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)

time.Sleep(time.Second)
for i := 1001; i < 2001; i++ {
db.Exec("INSERT INTO foo VALUES (?)", i)
}

go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
}()

assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1000
}, time.Minute*5, time.Millisecond*100)

require.NoError(t, streamOut.StopWithin(time.Second*10))
require.NoError(t, streamOut.StopWithin(time.Second*10))
})
}
}

Expand Down Expand Up @@ -273,28 +276,28 @@ func TestIntegrationMySQLCDCWithCompositePrimaryKeys(t *testing.T) {
dsn, db := setupTestWithMySQLVersion(t, "8.0")
// Create table
db.Exec(`
CREATE TABLE IF NOT EXISTS foo (
a INT,
b INT,
v JSON,
size ENUM('x-small', 'small', 'medium', 'large', 'x-large'),
PRIMARY KEY (a, b)
CREATE TABLE IF NOT EXISTS ` + "`Foo`" + ` (
` + "`A`" + ` INT,
` + "`B`" + ` INT,
PRIMARY KEY (
` + "`A`" + `,
` + "`B`" + `
)
)
`)
// Create control table to ensure we don't stream it
db.Exec(`
CREATE TABLE IF NOT EXISTS foo_non_streamed (
a INT,
b INT,
v JSON,
PRIMARY KEY (a, b)
)
`)

// Insert 1000 rows for initial snapshot streaming
for i := 0; i < 1000; i++ {
db.Exec("INSERT INTO foo VALUES (?, ?, ?, ?)", i, i, `{"json":"data"}`, `large`)
db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?, ?)", i, i, `{"json":"data"}`)
db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i)
db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i)
}

template := fmt.Sprintf(`
Expand All @@ -304,7 +307,7 @@ mysql_cdc:
snapshot_max_batch_size: 500
checkpoint_cache: foocache
tables:
- foo
- Foo
`, dsn)

cacheConf := fmt.Sprintf(`
Expand Down Expand Up @@ -339,8 +342,8 @@ file:
time.Sleep(time.Second * 5)
for i := 1000; i < 2000; i++ {
// Insert 10000 rows
db.Exec("INSERT INTO foo VALUES (?, ?, ?, ?)", i, i, `{"json":"data"}`, `x-small`)
db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?, ?)", i, i, `{"json":"data"}`)
db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i)
db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i)
}

assert.Eventually(t, func() bool {
Expand Down

0 comments on commit 5a20f00

Please sign in to comment.