Skip to content

Commit

Permalink
log exits
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 10, 2025
1 parent 2512106 commit 37bec71
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *MySqlConnector) PullQRepRecords(

// testing
schema, _, _ := strings.Cut(config.WatermarkTable, ".")
rs, err := c.Execute(ctx, fmt.Sprintf("show tables from %s", schema))
rs, err := c.Execute(ctx, "show tables from "+schema)
if err != nil {
return 0, fmt.Errorf("mymymy err %w", err)
}
Expand All @@ -195,6 +195,7 @@ func (c *MySqlConnector) PullQRepRecords(
// this is a full table partition, so just run the query
var rs mysql.Result
if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult); err != nil {
c.logger.Error("mymymy full err", slog.Any("error", err))
return 0, err
}
} else {
Expand All @@ -215,10 +216,13 @@ func (c *MySqlConnector) PullQRepRecords(

var rs mysql.Result
if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult, rangeStart, rangeEnd); err != nil {
c.logger.Error("mymymy partial err", slog.Any("error", err))
return 0, err
}
}

c.logger.Info("mymymy success")

close(stream.Records)
return totalRecords, nil
}
Expand Down

0 comments on commit 37bec71

Please sign in to comment.