Skip to content

Commit

Permalink
GetMySqlRows
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 9, 2025
1 parent c183444 commit 53c6489
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 25 deletions.
10 changes: 5 additions & 5 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (c *MySqlConnector) PullRecords(
items := model.NewRecordItems(len(row))
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
items.AddColumn(fd.Name, QValueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -335,13 +335,13 @@ func (c *MySqlConnector) PullRecords(
oldItems := model.NewRecordItems(len(oldRow))
for idx, val := range oldRow {
fd := schema.Columns[idx]
oldItems.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
oldItems.AddColumn(fd.Name, QValueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}
newRow := ev.Rows[idx+1]
newItems := model.NewRecordItems(len(newRow))
for idx, val := range ev.Rows[idx+1] {
fd := schema.Columns[idx]
newItems.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
newItems.AddColumn(fd.Name, QValueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -360,7 +360,7 @@ func (c *MySqlConnector) PullRecords(
items := model.NewRecordItems(len(row))
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
items.AddColumn(fd.Name, QValueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -380,7 +380,7 @@ func (c *MySqlConnector) PullRecords(
return nil
}

func qvalueFromMysqlRowEvent(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValue {
func QValueFromMysqlRowEvent(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValue {
// TODO signedness, in ev.Table, need to extend QValue system
// See go-mysql row_event.go for mapping
switch val := val.(type) {
Expand Down
21 changes: 20 additions & 1 deletion flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,26 @@ func qkindFromMysql(ty uint8) (qvalue.QValueKind, error) {
}
}

func qvalueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qvalue.QValue, error) {
func QRecordSchemaFromMysqlFields(fields []*mysql.Field) (qvalue.QRecordSchema, error) {
schema := make([]qvalue.QField, 0, len(rs.Fields))
for _, field := range rs.Fields {
qkind, err := qkindFromMysql(field.Type)
if err != nil {
return qvalue.QRecordSchema{}, err
}

schema = append(schema, qvalue.QField{
Name: string(field.Name),
Type: qkind,
Precision: 0, // TODO numerics
Scale: 0, // TODO numerics
Nullable: (field.Flag & mysql.NOT_NULL_FLAG) == 0,
})
}
return qvalue.QRecordSchema{Fields: schema}, nil
}

func QValueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qvalue.QValue, error) {
// TODO fill this in, maybe contribute upstream, figvure out how numeric etc fit in
switch v := fv.Value().(type) {
case nil:
Expand Down
22 changes: 5 additions & 17 deletions flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func (c *MySqlConnector) GetQRepPartitions(
return partitionHelper.GetPartitions(), nil
}

// TODO use ExecuteStreamingSelect
func (c *MySqlConnector) PullQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
Expand All @@ -154,22 +153,11 @@ func (c *MySqlConnector) PullQRepRecords(

totalRecords := 0
onResult := func(rs *mysql.Result) error {
schema := make([]qvalue.QField, 0, len(rs.Fields))
for _, field := range rs.Fields {
qkind, err := qkindFromMysql(field.Type)
if err != nil {
return err
}

schema = append(schema, qvalue.QField{
Name: string(field.Name),
Type: qkind,
Precision: 0, // TODO numerics
Scale: 0, // TODO numerics
Nullable: (field.Flag & mysql.NOT_NULL_FLAG) == 0,
})
schema, err := QRecordSchemaFromMysqlFields(rs.Fields)
if err != nil {
return err
}
stream.SetSchema(qvalue.QRecordSchema{Fields: schema})
stream.SetSchema(schema)
return nil
}
onRow := func(row []mysql.FieldValue) error {
Expand All @@ -180,7 +168,7 @@ func (c *MySqlConnector) PullQRepRecords(
}
record := make([]qvalue.QValue, 0, len(row))
for idx, val := range row {
qv, err := qvalueFromMysqlFieldValue(schema.Fields[idx].Type, val)
qv, err := QValueFromMysqlFieldValue(schema.Fields[idx].Type, val)
if err != nil {
return err
}
Expand Down
31 changes: 29 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,38 @@ func GetPgRows(conn *connpostgres.PostgresConnector, suffix string, table string
}

func GetMySqlRows(conn *connmysql.MySqlConnector, suffix string, table string, cols string) (*model.QRecordBatch, error) {
// TODO mysql
return nil, nil
rs, err := conn.Execute(
context.Background(),
fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, suffix, connpostgres.QuoteIdentifier(table)),
)

schema, err := connmysql.QRecordSchemaFromMysqlFields(rs.Fields)
if err != nil {
return nil, err
}

batch := &model.QRecordBatch{
Schema: schema,
Records: nil,
}

for _, row := range rs.Values {
record := make([]qvalue.QValue, 0, len(row))
for idx, val := range row {
qv, err := connmysql.QValueFromMysqlFieldValue(schema.Fields[idx].Type, val)
if err != nil {
return nil, err
}
record = append(record, qv)
}
batch.Records = append(batch.Records, record)
}

return batch, nil
}

func GetSuiteSourceRows[TSource connectors.Connector](suite Suite[TSource], table string, cols string) (*model.QRecordBatch, error) {
// TODO move to SuiteSource
switch conn := any(suite.Connector()).(type) {
case *connpostgres.PostgresConnector:
return GetPgRows(conn, suite.Suffix(), table, cols)
Expand Down

0 comments on commit 53c6489

Please sign in to comment.