diff --git a/flow/connectors/mysql/mysql.go b/flow/connectors/mysql/mysql.go index 123b8bc7a..5b8e96e2e 100644 --- a/flow/connectors/mysql/mysql.go +++ b/flow/connectors/mysql/mysql.go @@ -72,7 +72,6 @@ func (c *MySqlConnector) connect(ctx context.Context) (*client.Conn, error) { } func (c *MySqlConnector) Execute(ctx context.Context, cmd string, args ...interface{}) (*mysql.Result, error) { - slog.Info("mymymy", slog.String("query", cmd), slog.Any("when", time.Now())) reconnects := 3 for { // TODO need new connection if ctx changes between calls, or make upstream PR @@ -103,7 +102,6 @@ func (c *MySqlConnector) ExecuteSelectStreaming(ctx context.Context, cmd string, resultCb client.SelectPerResultCallback, args ...interface{}, ) error { - slog.Info("mymymy stream", slog.String("query", cmd), slog.Any("when", time.Now())) reconnects := 3 for { // TODO need new connection if ctx changes between calls, or make upstream PR @@ -148,6 +146,7 @@ func (c *MySqlConnector) ExecuteSelectStreaming(ctx context.Context, cmd string, return err } } + return nil } } diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index 4517f07f5..437625be9 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -157,19 +157,15 @@ func (c *MySqlConnector) PullQRepRecords( if err != nil { return err } - c.logger.Info("mymy set schema") stream.SetSchema(schema) return nil } onRow := func(row []mysql.FieldValue) error { totalRecords += 1 // TODO can this be batched in onResult or by checking rs at end? - c.logger.Info("mymy getting schema") schema, err := stream.Schema() if err != nil { - c.logger.Error("mymy error schema", slog.Any("error", err)) return err } - c.logger.Info("mymy got schema") record := make([]qvalue.QValue, 0, len(row)) for idx, val := range row { qv, err := QValueFromMysqlFieldValue(schema.Fields[idx].Type, val) @@ -178,9 +174,7 @@ func (c *MySqlConnector) PullQRepRecords( } record = append(record, qv) } - c.logger.Info("mymy append record") stream.Records <- record - c.logger.Info("mymy appended record") return nil } @@ -188,7 +182,6 @@ func (c *MySqlConnector) PullQRepRecords( // this is a full table partition, so just run the query var rs mysql.Result if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult); err != nil { - c.logger.Error("mymymy full err", slog.Any("error", err)) return 0, err } } else { @@ -209,13 +202,10 @@ func (c *MySqlConnector) PullQRepRecords( var rs mysql.Result if err := c.ExecuteSelectStreaming(ctx, query, &rs, onRow, onResult, rangeStart, rangeEnd); err != nil { - c.logger.Error("mymymy partial err", slog.Any("error", err)) return 0, err } } - c.logger.Info("mymymy success") - close(stream.Records) return totalRecords, nil }