Skip to content

Commit

Permalink
sink(ticdc): add total write bytes counter to sink (pingcap#10040) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 31, 2024
1 parent fe4bdc2 commit 71c4e3d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 52 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (b *blackHoleSink) FlushRowChangedEvents(
}, nil
}
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts))
err := b.statistics.RecordBatchExecution(func() (int, error) {
err := b.statistics.RecordBatchExecution(func() (int, int64, error) {
// TODO: add some random replication latency
accumulated := atomic.LoadUint64(&b.accumulated)
lastAccumulated := atomic.SwapUint64(&b.lastAccumulated, accumulated)
batchSize := accumulated - lastAccumulated
return int(batchSize), nil
return int(batchSize), int64(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.flushing, 0)
Expand Down
12 changes: 11 additions & 1 deletion cdc/sink/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ var (
Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18),
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records the row size of events.
// TotalWriteBytesCounter records the total number of bytes written by sink.
TotalWriteBytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "write_bytes_total",
Help: "Total number of bytes written by sink",
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records size of large rows.
LargeRowSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -122,6 +131,7 @@ var (
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(ExecTxnHistogram)
registry.MustRegister(TotalWriteBytesCounter)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(LargeRowSizeHistogram)
registry.MustRegister(ExecutionErrorCounter)
Expand Down
12 changes: 8 additions & 4 deletions cdc/sink/metrics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func NewStatistics(ctx context.Context, t sinkType) *Statistics {
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID)
statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.
WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)

// Flush metrics in background for better accuracy and efficiency.
changefeedID := statistics.changefeedID
Expand Down Expand Up @@ -117,8 +119,9 @@ type Statistics struct {
metricExecDDLHis prometheus.Observer
metricExecBatchHis prometheus.Observer
metricExecErrCnt prometheus.Counter

metricRowSizesHis prometheus.Observer
// Counter for total bytes of DML.
metricTotalWriteBytesCnt prometheus.Counter
metricRowSizesHis prometheus.Observer
}

// AddRowsCount records total number of rows needs to flush
Expand Down Expand Up @@ -153,15 +156,16 @@ func (b *Statistics) AddDDLCount() {
}

// RecordBatchExecution records the cost time of batch execution and batch size
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error {
func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error {
startTime := time.Now()
batchSize, err := executor()
batchSize, batchWriteBytes, err := executor()
if err != nil {
b.metricExecErrCnt.Inc()
return err
}
b.metricExecTxnHis.Observe(time.Since(startTime).Seconds())
b.metricExecBatchHis.Observe(float64(batchSize))
b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes))
atomic.AddUint64(&b.totalFlushedRows, uint64(batchSize))
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mq/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ func (w *flushWorker) sendMessages(ctx context.Context) error {
}

for _, message := range future.Messages {
if err := w.statistics.RecordBatchExecution(func() (int, error) {
if err := w.statistics.RecordBatchExecution(func() (int, int64, error) {
if err := w.producer.AsyncSendMessage(ctx, future.Topic, future.Partition, message); err != nil {
return 0, err
return 0, 0, err
}
return message.GetRowsCount(), nil
return message.GetRowsCount(), int64(message.Length()), nil
}); err != nil {
return err
}
Expand Down
33 changes: 22 additions & 11 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "BEGIN", dmls.rowCount)
}
Expand All @@ -709,7 +709,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
start, s.params.changefeedID, query, dmls.rowCount)
}
cancelFunc()
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
}
Expand All @@ -724,19 +724,19 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
log.Warn("failed to rollback txn", zap.Error(err))
}
cancelFunc()
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, dmls.markSQL, dmls.rowCount)
}
cancelFunc()
}

if err = tx.Commit(); err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "COMMIT", dmls.rowCount)
}
return dmls.rowCount, nil
return dmls.rowCount, dmls.approximateSize, nil
})
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -974,10 +974,11 @@ func hasHandleKey(cols []*model.Column) bool {
}

type preparedDMLs struct {
sqls []string
values [][]interface{}
markSQL string
rowCount int
sqls []string
values [][]interface{}
markSQL string
rowCount int
approximateSize int64
}

// prepareDMLs converts model.RowChangedEvent list to query string list and args list
Expand All @@ -992,7 +993,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
replaces := make(map[string][][]interface{})

rowCount := 0

approximateSize := int64(0)
// flush cached batch replace or insert, to keep the sequence of DMLs
flushCacheDMLs := func() {
if s.params.batchReplaceEnabled && len(replaces) > 0 {
Expand Down Expand Up @@ -1039,6 +1040,13 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
sql, value := s.batchSingleTxnDmls(txn.Rows, tableInfo, translateToInsert)
sqls = append(sqls, sql...)
values = append(values, value...)

for _, stmt := range sql {
approximateSize += int64(len(stmt))
}
for _, row := range txn.Rows {
approximateSize += row.ApproximateDataSize
}
continue
}
}
Expand All @@ -1057,6 +1065,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
values = append(values, args)
rowCount++
}
approximateSize += int64(len(query)) + row.ApproximateDataSize
continue
}

Expand Down Expand Up @@ -1103,6 +1112,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
}
}
}
approximateSize += int64(len(query)) + row.ApproximateDataSize
}
}
flushCacheDMLs()
Expand All @@ -1121,6 +1131,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
// we do not count mark table rows in rowCount.
}
dmls.rowCount = rowCount
dmls.approximateSize = approximateSize
return dmls
}

Expand Down
75 changes: 44 additions & 31 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 75,
},
}, {
input: []*model.RowChangedEvent{
Expand All @@ -114,9 +115,10 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
approximateSize: 64,
},
},
}
Expand Down Expand Up @@ -2310,8 +2312,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
sqls: []string{
"INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{1, 1}},
rowCount: 1,
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 63,
},
}, {
name: "insert with PK",
Expand All @@ -2335,9 +2338,10 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
approximateSize: 52,
},
}, {
name: "update without PK",
Expand Down Expand Up @@ -2376,8 +2380,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;",
},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 84,
},
}, {
name: "update with PK",
Expand Down Expand Up @@ -2414,8 +2419,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
expected: &preparedDMLs{
sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;"},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 73,
},
}, {
name: "batch insert with PK",
Expand Down Expand Up @@ -2460,8 +2466,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
approximateSize: 104,
},
}, {
name: "safe mode on commit ts < replicating ts",
Expand All @@ -2488,8 +2495,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
sqls: []string{
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}},
rowCount: 1,
values: [][]interface{}{{3, 3}},
rowCount: 1,
approximateSize: 53,
},
}, {
name: "safe mode on one row commit ts < replicating ts",
Expand Down Expand Up @@ -2534,8 +2542,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
values: [][]interface{}{{3, 3}, {5, 5}},
rowCount: 2,
approximateSize: 106,
},
},
}
Expand Down Expand Up @@ -2611,9 +2620,10 @@ func TestPrepareBatchDMLs(t *testing.T) {
},
},
expected: &preparedDMLs{
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
approximateSize: 73,
},
},
{ // insert event
Expand Down Expand Up @@ -2657,9 +2667,10 @@ func TestPrepareBatchDMLs(t *testing.T) {
},
},
expected: &preparedDMLs{
sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
approximateSize: 69,
},
},
// update event
Expand Down Expand Up @@ -2737,8 +2748,9 @@ func TestPrepareBatchDMLs(t *testing.T) {
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " +
"WHERE `a1` = ? AND `a3` = ? LIMIT 1",
},
values: [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
rowCount: 2,
values: [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
rowCount: 2,
approximateSize: 184,
},
},
// mixed event
Expand Down Expand Up @@ -2808,8 +2820,9 @@ func TestPrepareBatchDMLs(t *testing.T) {
"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
},
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
approximateSize: 136,
},
},
}
Expand Down

0 comments on commit 71c4e3d

Please sign in to comment.