Skip to content

Commit

Permalink
I'm an idiot
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 13, 2025
1 parent 1ca7854 commit 4d666ac
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 12 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (c *MySqlConnector) connect(ctx context.Context) (*client.Conn, error) {
}

func (c *MySqlConnector) Execute(ctx context.Context, cmd string, args ...interface{}) (*mysql.Result, error) {
slog.Info("mymymy", slog.String("query", cmd), slog.Any("when", time.Now()))
reconnects := 3
for {
// TODO need new connection if ctx changes between calls, or make upstream PR
Expand Down Expand Up @@ -103,7 +102,6 @@ func (c *MySqlConnector) ExecuteSelectStreaming(ctx context.Context, cmd string,
resultCb client.SelectPerResultCallback,
args ...interface{},
) error {
slog.Info("mymymy stream", slog.String("query", cmd), slog.Any("when", time.Now()))
reconnects := 3
for {
// TODO need new connection if ctx changes between calls, or make upstream PR
Expand Down Expand Up @@ -148,6 +146,7 @@ func (c *MySqlConnector) ExecuteSelectStreaming(ctx context.Context, cmd string,
return err
}
}
return nil
}
}

Expand Down
10 changes: 0 additions & 10 deletions flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,15 @@ func (c *MySqlConnector) PullQRepRecords(
if err != nil {
return err
}
c.logger.Info("mymy set schema")
stream.SetSchema(schema)
return nil
}
onRow := func(row []mysql.FieldValue) error {
totalRecords += 1 // TODO can this be batched in onResult or by checking rs at end?
c.logger.Info("mymy getting schema")
schema, err := stream.Schema()
if err != nil {
c.logger.Error("mymy error schema", slog.Any("error", err))
return err
}
c.logger.Info("mymy got schema")
record := make([]qvalue.QValue, 0, len(row))
for idx, val := range row {
qv, err := QValueFromMysqlFieldValue(schema.Fields[idx].Type, val)
Expand All @@ -178,17 +174,14 @@ func (c *MySqlConnector) PullQRepRecords(
}
record = append(record, qv)
}
c.logger.Info("mymy append record")
stream.Records <- record
c.logger.Info("mymy appended record")
return nil
}

if last.FullTablePartition {
// this is a full table partition, so just run the query
var rs mysql.Result
if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult); err != nil {
c.logger.Error("mymymy full err", slog.Any("error", err))
return 0, err
}
} else {
Expand All @@ -209,13 +202,10 @@ func (c *MySqlConnector) PullQRepRecords(

var rs mysql.Result
if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult, rangeStart, rangeEnd); err != nil {
c.logger.Error("mymymy partial err", slog.Any("error", err))
return 0, err
}
}

c.logger.Info("mymymy success")

close(stream.Records)
return totalRecords, nil
}
Expand Down

0 comments on commit 4d666ac

Please sign in to comment.