Skip to content

Commit

Permalink
track position more accurately, avoid clearing it
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 17, 2025
1 parent 22e1106 commit 3ab0360
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 77 deletions.
38 changes: 25 additions & 13 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,49 @@ func (c *MySqlConnector) startSyncer() *replication.BinlogSyncer {
})
}

func (c *MySqlConnector) startStreaming(pos string) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, error) {
func (c *MySqlConnector) startStreaming(
pos string,
) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, mysql.Position, error) {
if rest, isFile := strings.CutPrefix(pos, "!f:"); isFile {
comma := strings.LastIndexByte(rest, ',')
if comma == -1 {
return nil, nil, nil, fmt.Errorf("no comma in file/pos offset %s", pos)
return nil, nil, nil, mysql.Position{}, fmt.Errorf("no comma in file/pos offset %s", pos)
}
offset, err := strconv.ParseUint(rest[comma+1:], 16, 32)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid offset in file<D-o>pos offset %s: %w", pos, err)
return nil, nil, nil, mysql.Position{}, fmt.Errorf("invalid offset in file/pos offset %s: %w", pos, err)
}
return c.startCdcStreamingFilePos(rest[:comma], uint32(offset))
return c.startCdcStreamingFilePos(mysql.Position{Name: rest[:comma], Pos: uint32(offset)})
} else {
gset, err := mysql.ParseGTIDSet(c.config.Flavor, pos)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, mysql.Position{}, err
}
return c.startCdcStreamingGtid(gset)
}
}

func (c *MySqlConnector) startCdcStreamingFilePos(
lastOffsetName string, lastOffsetPos uint32,
) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, error) {
pos mysql.Position,
) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, mysql.Position, error) {
syncer := c.startSyncer()
stream, err := syncer.StartSync(mysql.Position{Name: lastOffsetName, Pos: lastOffsetPos})
stream, err := syncer.StartSync(pos)
if err != nil {
syncer.Close()
}
return syncer, stream, nil, err
return syncer, stream, nil, pos, err
}

func (c *MySqlConnector) startCdcStreamingGtid(
gset mysql.GTIDSet,
) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, error) {
) (*replication.BinlogSyncer, *replication.BinlogStreamer, mysql.GTIDSet, mysql.Position, error) {
// https://hevodata.com/learn/mysql-gtids-and-replication-set-up
syncer := c.startSyncer()
stream, err := syncer.StartSyncGTID(gset)
if err != nil {
syncer.Close()
}
return syncer, stream, gset, err
return syncer, stream, gset, mysql.Position{}, err
}

func (c *MySqlConnector) ReplPing(context.Context) error {
Expand Down Expand Up @@ -249,12 +251,16 @@ func (c *MySqlConnector) PullRecords(
) error {
defer req.RecordStream.Close()

syncer, mystream, gset, err := c.startStreaming(req.LastOffset.Text)
syncer, mystream, gset, pos, err := c.startStreaming(req.LastOffset.Text)
if err != nil {
return err
}
defer syncer.Close()

if gset == nil {
req.RecordStream.UpdateLatestCheckpointText(fmt.Sprintf("!f:%s,%x", pos.Name, pos.Pos))
}

var fetchedBytesCounter metric.Int64Counter
if otelManager != nil {
var err error
Expand Down Expand Up @@ -285,11 +291,17 @@ func (c *MySqlConnector) PullRecords(
}

// TODO if gset == nil update pos with event.Header.LogPos
if gset == nil && event.Header.LogPos > 0 {
pos.Pos = max(pos.Pos, event.Header.LogPos)
req.RecordStream.UpdateLatestCheckpointText(fmt.Sprintf("!f:%s,%x", pos.Name, pos.Pos))
}

switch ev := event.Event.(type) {
case *replication.RotateEvent:
if gset == nil && event.Header.Timestamp != 0 {
req.RecordStream.UpdateLatestCheckpointText(fmt.Sprintf("!f:%s,%x", string(ev.NextLogName), ev.Position))
pos.Name = string(ev.NextLogName)
pos.Pos = uint32(ev.Position)
req.RecordStream.UpdateLatestCheckpointText(fmt.Sprintf("!f:%s,%x", pos.Name, pos.Pos))
}
case *replication.MariadbGTIDEvent:
if gset != nil {
Expand Down
123 changes: 59 additions & 64 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ type CDCBatchInfo struct {
}

func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_flows(flow_name,latest_lsn_at_source,latest_lsn_at_target) VALUES($1,0,0)
ON CONFLICT DO NOTHING`, flowJobName)
if err != nil {
ON CONFLICT DO NOTHING`, flowJobName,
); err != nil {
return fmt.Errorf("error while inserting flow into cdc_flows: %w", err)
}
return nil
Expand All @@ -36,10 +36,10 @@ func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName stri
func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtSource int64,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2",
uint64(latestLSNAtSource), flowJobName)
if err != nil {
uint64(latestLSNAtSource), flowJobName,
); err != nil {
return fmt.Errorf("[source] error while updating flow in cdc_flows: %w", err)
}
return nil
Expand All @@ -48,10 +48,10 @@ func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool,
func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtTarget int64,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2",
uint64(latestLSNAtTarget), flowJobName)
if err != nil {
uint64(latestLSNAtTarget), flowJobName,
); err != nil {
return fmt.Errorf("[target] error while updating flow in cdc_flows: %w", err)
}
return nil
Expand All @@ -60,12 +60,12 @@ func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool,
func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
batchInfo CDCBatchInfo,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn,
start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`,
flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, 0,
uint64(batchInfo.BatchEndlSN), batchInfo.StartTime)
if err != nil {
uint64(batchInfo.BatchEndlSN), batchInfo.StartTime,
); err != nil {
return fmt.Errorf("error while inserting batch into cdc_batch: %w", err)
}
return nil
Expand All @@ -80,10 +80,10 @@ func UpdateNumRowsAndEndLSNForCDCBatch(
numRows uint32,
batchEndCheckpoint model.CdcCheckpoint,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2,batch_end_lsn_text=$3 WHERE flow_name=$4 AND batch_id=$5",
numRows, uint64(batchEndCheckpoint.ID), batchEndCheckpoint.Text, flowJobName, batchID)
if err != nil {
numRows, uint64(batchEndCheckpoint.ID), batchEndCheckpoint.Text, flowJobName, batchID,
); err != nil {
return fmt.Errorf("error while updating batch in cdc_batch: %w", err)
}
return nil
Expand All @@ -95,12 +95,12 @@ func UpdateEndTimeForCDCBatch(
flowJobName string,
batchID int64,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
`UPDATE peerdb_stats.cdc_batches
SET end_time = NOW()
WHERE flow_name = $1 AND batch_id <= $2 AND end_time IS NULL`,
flowJobName, batchID)
if err != nil {
flowJobName, batchID,
); err != nil {
return fmt.Errorf("error while updating batch in cdc_batch: %w", err)
}
return nil
Expand All @@ -114,8 +114,7 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa
return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err)
}
defer func() {
err = insertBatchTablesTx.Rollback(context.Background())
if err != pgx.ErrTxClosed && err != nil {
if err := insertBatchTablesTx.Rollback(context.Background()); err != pgx.ErrTxClosed && err != nil {
shared.LoggerFromCtx(ctx).Error("error during transaction rollback",
slog.Any("error", err),
slog.String(string(shared.FlowNameKey), flowJobName))
Expand All @@ -126,21 +125,19 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa
inserts := rowCounts.InsertCount.Load()
updates := rowCounts.UpdateCount.Load()
deletes := rowCounts.DeleteCount.Load()
_, err = insertBatchTablesTx.Exec(ctx,
if _, err := insertBatchTablesTx.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batch_table
(flow_name,batch_id,destination_table_name,num_rows,
insert_count,update_count,delete_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`,
flowJobName, batchID, destinationTableName,
inserts+updates+deletes, inserts, updates, deletes)
if err != nil {
inserts+updates+deletes, inserts, updates, deletes,
); err != nil {
return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err)
}
}
err = insertBatchTablesTx.Commit(ctx)
if err != nil {
return fmt.Errorf("error while committing transaction for inserting statistics into cdc_batch_table: %w",
err)
if err := insertBatchTablesTx.Commit(ctx); err != nil {
return fmt.Errorf("error while committing transaction for inserting statistics into cdc_batch_table: %w", err)
}
return nil
}
Expand All @@ -154,11 +151,11 @@ func InitializeQRepRun(
parentMirrorName string,
) error {
flowJobName := config.GetFlowJobName()
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,source_table,destination_table,parent_mirror_name)"+
" VALUES($1,$2,$3,$4,$5) ON CONFLICT DO NOTHING",
flowJobName, runUUID, config.WatermarkTable, config.DestinationTableIdentifier, parentMirrorName)
if err != nil {
flowJobName, runUUID, config.WatermarkTable, config.DestinationTableIdentifier, parentMirrorName,
); err != nil {
return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err)
}

Expand All @@ -172,21 +169,21 @@ func InitializeQRepRun(
}

func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET start_time=$1, fetch_complete=true WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
time.Now(), runUUID,
); err != nil {
return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
}

func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET end_time=$1, consolidate_complete=true WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
time.Now(), runUUID,
); err != nil {
return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

Expand All @@ -199,7 +196,7 @@ func AppendSlotSizeInfo(
peerName string,
slotInfo *protos.SlotInfo,
) error {
_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+
"VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING;",
Expand All @@ -210,8 +207,7 @@ func AppendSlotSizeInfo(
slotInfo.ConfirmedFlushLSN,
slotInfo.LagInMb,
slotInfo.WalStatus,
)
if err != nil {
); err != nil {
return fmt.Errorf("error while upserting row for slot_size: %w", err)
}

Expand Down Expand Up @@ -260,13 +256,13 @@ func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName
return fmt.Errorf("unknown range type: %v", x)
}

_, err := pool.Exec(ctx,
if _, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.qrep_partitions
(flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count,parent_mirror_name)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET
restart_count=qrep_partitions.restart_count+1`,
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, 0, parentMirrorName)
if err != nil {
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, 0, parentMirrorName,
); err != nil {
return fmt.Errorf("error while inserting qrep partition in qrep_partitions: %w", err)
}

Expand All @@ -280,9 +276,10 @@ func UpdateStartTimeForPartition(
partition *protos.QRepPartition,
startTime time.Time,
) error {
_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId)
if err != nil {
if _, err := pool.Exec(ctx,
`UPDATE peerdb_stats.qrep_partitions SET start_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`,
startTime, runUUID, partition.PartitionId,
); err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
return nil
Expand All @@ -291,9 +288,10 @@ func UpdateStartTimeForPartition(
func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string,
partition *protos.QRepPartition, rowsInPartition int64,
) error {
_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2
WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId)
if err != nil {
if _, err := pool.Exec(ctx,
`UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 WHERE run_uuid=$3 AND partition_uuid=$4`,
time.Now(), rowsInPartition, runUUID, partition.PartitionId,
); err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
return nil
Expand All @@ -302,9 +300,10 @@ func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Poo
func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string,
partition *protos.QRepPartition,
) error {
_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId)
if err != nil {
if _, err := pool.Exec(ctx,
`UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`,
time.Now(), runUUID, partition.PartitionId,
); err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
return nil
Expand All @@ -313,37 +312,33 @@ func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID
func UpdateRowsSyncedForPartition(ctx context.Context, pool *pgxpool.Pool, rowsSynced int, runUUID string,
partition *protos.QRepPartition,
) error {
_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId)
if err != nil {
if _, err := pool.Exec(ctx,
`UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 WHERE run_uuid=$2 AND partition_uuid=$3`,
rowsSynced, runUUID, partition.PartitionId,
); err != nil {
return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err)
}
return nil
}

func DeleteMirrorStats(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error {
_, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.qrep_partitions WHERE parent_mirror_name = $1`, flowJobName)
if err != nil {
if _, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.qrep_partitions WHERE parent_mirror_name = $1`, flowJobName); err != nil {
return fmt.Errorf("error while deleting qrep_partitions: %w", err)
}

_, err = pool.Exec(ctx, `DELETE FROM peerdb_stats.qrep_runs WHERE parent_mirror_name = $1`, flowJobName)
if err != nil {
if _, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.qrep_runs WHERE parent_mirror_name = $1`, flowJobName); err != nil {
return fmt.Errorf("error while deleting qrep_runs: %w", err)
}

_, err = pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_batches WHERE flow_name = $1`, flowJobName)
if err != nil {
if _, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_batches WHERE flow_name = $1`, flowJobName); err != nil {
return fmt.Errorf("error while deleting cdc_batches: %w", err)
}

_, err = pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_batch_table WHERE flow_name = $1`, flowJobName)
if err != nil {
if _, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_batch_table WHERE flow_name = $1`, flowJobName); err != nil {
return fmt.Errorf("error while deleting cdc_batch_table: %w", err)
}

_, err = pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_flows WHERE flow_name = $1`, flowJobName)
if err != nil {
if _, err := pool.Exec(ctx, `DELETE FROM peerdb_stats.cdc_flows WHERE flow_name = $1`, flowJobName); err != nil {
return fmt.Errorf("error while deleting cdc_flows: %w", err)
}

Expand Down

0 comments on commit 3ab0360

Please sign in to comment.