Skip to content

Commit

Permalink
Column allowlist for MySQL and MSSQL (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 26, 2024
1 parent 7c2e5a9 commit 2a4bc1b
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 0 deletions.
7 changes: 7 additions & 0 deletions config/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type MSSQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MSSQL) ToDSN() string {
Expand Down Expand Up @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error {
if stringutil.Empty(table.Name, table.Schema) {
return fmt.Errorf("table name and schema must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions config/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) {

assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in")
}
{
// Exclude and include columns at the same time
m := &MSSQL{
Host: "host",
Port: 1,
Username: "username",
Password: "password",
Database: "database",
Tables: []*MSSQLTable{
{
Name: "name",
Schema: "schema",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
},
},
}

assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time")
}
{
// Valid
m := &MSSQL{
Expand Down
7 changes: 7 additions & 0 deletions config/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MySQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MySQLTable) GetBatchSize() uint {
Expand Down Expand Up @@ -91,6 +93,11 @@ func (m *MySQL) Validate() error {
if table.Name == "" {
return fmt.Errorf("table name must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions config/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ func TestMySQL_Validate(t *testing.T) {
c.Tables = append(c.Tables, &MySQLTable{})
assert.ErrorContains(t, c.Validate(), "table name must be passed in")
}
{
// exclude and include at the same time
c := createValidConfig()
c.Tables = append(c.Tables, &MySQLTable{
Name: "foo",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
})

assert.ErrorContains(t, c.Validate(), "cannot exclude and include columns at the same time")
}
}

func TestMySQL_ToDSN(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions sources/mssql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ func NewMSSQLAdapter(db *sql.DB, dbName string, tableCfg config.MSSQLTable) (MSS
return MSSQLAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err)
}

// Exclude columns (if any) from the table metadata
columns, err := column.FilterOutExcludedColumns(table.Columns(), tableCfg.ExcludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

// Include columns (if any) from the table metadata
columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

fieldConverters := make([]transformer.FieldConverter, len(columns))
for i, col := range columns {
converter, err := valueConverterForType(col.Type, col.Opts)
Expand Down
7 changes: 7 additions & 0 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (MyS
return MySQLAdapter{}, fmt.Errorf("failed to load metadata for table %q: %w", tableCfg.Name, err)
}

// Exclude columns (if any) from the table metadata
columns, err := column.FilterOutExcludedColumns(table.Columns, tableCfg.ExcludeColumns, table.PrimaryKeys)
if err != nil {
return MySQLAdapter{}, err
}

// Include columns (if any) from the table metadata
columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys)
if err != nil {
return MySQLAdapter{}, err
}

return newMySQLAdapter(db, dbName, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries))
}

Expand Down

0 comments on commit 2a4bc1b

Please sign in to comment.