From 71c4e3d95e5f0fa6f343dd29561a89573af08f10 Mon Sep 17 00:00:00 2001
From: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Date: Sun, 31 Mar 2024 23:54:15 +0800
Subject: [PATCH] sink(ticdc): add total write bytes counter to sink (#10040)
 (#10566)

close pingcap/tiflow#10114
---
 cdc/sink/black_hole.go         |  4 +-
 cdc/sink/metrics/metrics.go    | 12 +++++-
 cdc/sink/metrics/statistics.go | 12 ++++--
 cdc/sink/mq/mq_flush_worker.go |  6 +--
 cdc/sink/mysql/mysql.go        | 33 ++++++++++-----
 cdc/sink/mysql/mysql_test.go   | 75 ++++++++++++++++++++--------------
 6 files changed, 90 insertions(+), 52 deletions(-)

diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go
index 0379acf8e08..12a17cd5197 100644
--- a/cdc/sink/black_hole.go
+++ b/cdc/sink/black_hole.go
@@ -67,12 +67,12 @@ func (b *blackHoleSink) FlushRowChangedEvents(
 		}, nil
 	}
 	log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts))
-	err := b.statistics.RecordBatchExecution(func() (int, error) {
+	err := b.statistics.RecordBatchExecution(func() (int, int64, error) {
 		// TODO: add some random replication latency
 		accumulated := atomic.LoadUint64(&b.accumulated)
 		lastAccumulated := atomic.SwapUint64(&b.lastAccumulated, accumulated)
 		batchSize := accumulated - lastAccumulated
-		return int(batchSize), nil
+		return int(batchSize), int64(batchSize), nil
 	})
 	b.statistics.PrintStatus(ctx)
 	atomic.StoreUint64(&b.flushing, 0)
diff --git a/cdc/sink/metrics/metrics.go b/cdc/sink/metrics/metrics.go
index c9f39661bfe..32678278cd0 100644
--- a/cdc/sink/metrics/metrics.go
+++ b/cdc/sink/metrics/metrics.go
@@ -42,7 +42,16 @@ var (
 			Buckets:   prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18),
 		}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`
 
-	// LargeRowSizeHistogram records the row size of events.
+	// TotalWriteBytesCounter records the total number of bytes written by sink.
+	TotalWriteBytesCounter = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "ticdc",
+			Subsystem: "sink",
+			Name:      "write_bytes_total",
+			Help:      "Total number of bytes written by sink",
+		}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`
+
+	// LargeRowSizeHistogram records size of large rows.
 	LargeRowSizeHistogram = prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "ticdc",
@@ -122,6 +131,7 @@ var (
 func InitMetrics(registry *prometheus.Registry) {
 	registry.MustRegister(ExecBatchHistogram)
 	registry.MustRegister(ExecTxnHistogram)
+	registry.MustRegister(TotalWriteBytesCounter)
 	registry.MustRegister(ExecDDLHistogram)
 	registry.MustRegister(LargeRowSizeHistogram)
 	registry.MustRegister(ExecutionErrorCounter)
diff --git a/cdc/sink/metrics/statistics.go b/cdc/sink/metrics/statistics.go
index 9c5772579a9..ffc1c0bf464 100644
--- a/cdc/sink/metrics/statistics.go
+++ b/cdc/sink/metrics/statistics.go
@@ -71,6 +71,8 @@ func NewStatistics(ctx context.Context, t sinkType) *Statistics {
 		WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)
 	statistics.metricExecErrCnt = ExecutionErrorCounter.
 		WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID)
+	statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.
+		WithLabelValues(statistics.changefeedID.Namespace, statistics.changefeedID.ID, s)
 
 	// Flush metrics in background for better accuracy and efficiency.
 	changefeedID := statistics.changefeedID
@@ -117,8 +119,9 @@ type Statistics struct {
 	metricExecDDLHis   prometheus.Observer
 	metricExecBatchHis prometheus.Observer
 	metricExecErrCnt   prometheus.Counter
-
-	metricRowSizesHis prometheus.Observer
+	// Counter for total bytes of DML.
+	metricTotalWriteBytesCnt prometheus.Counter
+	metricRowSizesHis        prometheus.Observer
 }
 
 // AddRowsCount records total number of rows needs to flush
@@ -153,15 +156,16 @@ func (b *Statistics) AddDDLCount() {
 }
 
 // RecordBatchExecution records the cost time of batch execution and batch size
-func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error {
+func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error {
 	startTime := time.Now()
-	batchSize, err := executor()
+	batchSize, batchWriteBytes, err := executor()
 	if err != nil {
 		b.metricExecErrCnt.Inc()
 		return err
 	}
 	b.metricExecTxnHis.Observe(time.Since(startTime).Seconds())
 	b.metricExecBatchHis.Observe(float64(batchSize))
+	b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes))
 	atomic.AddUint64(&b.totalFlushedRows, uint64(batchSize))
 	return nil
 }
diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go
index 0b2f034cc04..4b39100695d 100644
--- a/cdc/sink/mq/mq_flush_worker.go
+++ b/cdc/sink/mq/mq_flush_worker.go
@@ -318,11 +318,11 @@ func (w *flushWorker) sendMessages(ctx context.Context) error {
 			}
 
 			for _, message := range future.Messages {
-				if err := w.statistics.RecordBatchExecution(func() (int, error) {
+				if err := w.statistics.RecordBatchExecution(func() (int, int64, error) {
 					if err := w.producer.AsyncSendMessage(ctx, future.Topic, future.Partition, message); err != nil {
-						return 0, err
+						return 0, 0, err
 					}
-					return message.GetRowsCount(), nil
+					return message.GetRowsCount(), int64(message.Length()), nil
 				}); err != nil {
 					return err
 				}
diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go
index 7c0643a5be6..be4245fcfad 100644
--- a/cdc/sink/mysql/mysql.go
+++ b/cdc/sink/mysql/mysql.go
@@ -689,10 +689,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
 		failpoint.Inject("MySQLSinkHangLongTime", func() {
 			time.Sleep(time.Hour)
 		})
-		err := s.statistics.RecordBatchExecution(func() (int, error) {
+		err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
 			tx, err := s.db.BeginTx(pctx, nil)
 			if err != nil {
-				return 0, logDMLTxnErr(
+				return 0, 0, logDMLTxnErr(
 					cerror.WrapError(cerror.ErrMySQLTxnError, err),
 					start, s.params.changefeedID, "BEGIN", dmls.rowCount)
 			}
@@ -709,7 +709,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
 							start, s.params.changefeedID, query, dmls.rowCount)
 					}
 					cancelFunc()
-					return 0, logDMLTxnErr(
+					return 0, 0, logDMLTxnErr(
 						cerror.WrapError(cerror.ErrMySQLTxnError, err),
 						start, s.params.changefeedID, query, dmls.rowCount)
 				}
@@ -724,7 +724,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
 						log.Warn("failed to rollback txn", zap.Error(err))
 					}
 					cancelFunc()
-					return 0, logDMLTxnErr(
+					return 0, 0, logDMLTxnErr(
 						cerror.WrapError(cerror.ErrMySQLTxnError, err),
 						start, s.params.changefeedID, dmls.markSQL, dmls.rowCount)
 				}
@@ -732,11 +732,11 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
 			}
 
 			if err = tx.Commit(); err != nil {
-				return 0, logDMLTxnErr(
+				return 0, 0, logDMLTxnErr(
 					cerror.WrapError(cerror.ErrMySQLTxnError, err),
 					start, s.params.changefeedID, "COMMIT", dmls.rowCount)
 			}
-			return dmls.rowCount, nil
+			return dmls.rowCount, dmls.approximateSize, nil
 		})
 		if err != nil {
 			return errors.Trace(err)
@@ -974,10 +974,11 @@ func hasHandleKey(cols []*model.Column) bool {
 }
 
 type preparedDMLs struct {
-	sqls     []string
-	values   [][]interface{}
-	markSQL  string
-	rowCount int
+	sqls            []string
+	values          [][]interface{}
+	markSQL         string
+	rowCount        int
+	approximateSize int64
 }
 
 // prepareDMLs converts model.RowChangedEvent list to query string list and args list
@@ -992,7 +993,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
 	replaces := make(map[string][][]interface{})
 
 	rowCount := 0
-
+	approximateSize := int64(0)
 	// flush cached batch replace or insert, to keep the sequence of DMLs
 	flushCacheDMLs := func() {
 		if s.params.batchReplaceEnabled && len(replaces) > 0 {
@@ -1039,6 +1040,13 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
 				sql, value := s.batchSingleTxnDmls(txn.Rows, tableInfo, translateToInsert)
 				sqls = append(sqls, sql...)
 				values = append(values, value...)
+
+				for _, stmt := range sql {
+					approximateSize += int64(len(stmt))
+				}
+				for _, row := range txn.Rows {
+					approximateSize += row.ApproximateDataSize
+				}
 				continue
 			}
 		}
@@ -1057,6 +1065,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
 					values = append(values, args)
 					rowCount++
 				}
+				approximateSize += int64(len(query)) + row.ApproximateDataSize
 				continue
 			}
 
@@ -1103,6 +1112,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
 					}
 				}
 			}
+			approximateSize += int64(len(query)) + row.ApproximateDataSize
 		}
 	}
 	flushCacheDMLs()
@@ -1121,6 +1131,7 @@ func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64,
 		// we do not count mark table rows in rowCount.
 	}
 	dmls.rowCount = rowCount
+	dmls.approximateSize = approximateSize
 	return dmls
 }
 
diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go
index 00b4e1d6632..f9b0ef58d35 100644
--- a/cdc/sink/mysql/mysql_test.go
+++ b/cdc/sink/mysql/mysql_test.go
@@ -89,9 +89,10 @@ func TestPrepareDML(t *testing.T) {
 				},
 			},
 			expected: &preparedDMLs{
-				sqls:     []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
-				values:   [][]interface{}{{1, 1}},
-				rowCount: 1,
+				sqls:            []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
+				values:          [][]interface{}{{1, 1}},
+				rowCount:        1,
+				approximateSize: 75,
 			},
 		}, {
 			input: []*model.RowChangedEvent{
@@ -114,9 +115,10 @@ func TestPrepareDML(t *testing.T) {
 				},
 			},
 			expected: &preparedDMLs{
-				sqls:     []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
-				values:   [][]interface{}{{2, 2}},
-				rowCount: 1,
+				sqls:            []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
+				values:          [][]interface{}{{2, 2}},
+				rowCount:        1,
+				approximateSize: 64,
 			},
 		},
 	}
@@ -2310,8 +2312,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 				sqls: []string{
 					"INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);",
 				},
-				values:   [][]interface{}{{1, 1}},
-				rowCount: 1,
+				values:          [][]interface{}{{1, 1}},
+				rowCount:        1,
+				approximateSize: 63,
 			},
 		}, {
 			name: "insert with PK",
@@ -2335,9 +2338,10 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 				},
 			},
 			expected: &preparedDMLs{
-				sqls:     []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
-				values:   [][]interface{}{{1, 1}},
-				rowCount: 1,
+				sqls:            []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
+				values:          [][]interface{}{{1, 1}},
+				rowCount:        1,
+				approximateSize: 52,
 			},
 		}, {
 			name: "update without PK",
@@ -2376,8 +2380,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 					"UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " +
 						"WHERE `a1`=? AND `a3`=? LIMIT 1;",
 				},
-				values:   [][]interface{}{{3, 3, 2, 2}},
-				rowCount: 1,
+				values:          [][]interface{}{{3, 3, 2, 2}},
+				rowCount:        1,
+				approximateSize: 84,
 			},
 		}, {
 			name: "update with PK",
@@ -2414,8 +2419,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 			expected: &preparedDMLs{
 				sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " +
 					"WHERE `a1`=? AND `a3`=? LIMIT 1;"},
-				values:   [][]interface{}{{3, 3, 2, 2}},
-				rowCount: 1,
+				values:          [][]interface{}{{3, 3, 2, 2}},
+				rowCount:        1,
+				approximateSize: 73,
 			},
 		}, {
 			name: "batch insert with PK",
@@ -2460,8 +2466,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 					"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
 					"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
 				},
-				values:   [][]interface{}{{3, 3}, {5, 5}},
-				rowCount: 2,
+				values:          [][]interface{}{{3, 3}, {5, 5}},
+				rowCount:        2,
+				approximateSize: 104,
 			},
 		}, {
 			name: "safe mode on commit ts < replicating ts",
@@ -2488,8 +2495,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 				sqls: []string{
 					"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
 				},
-				values:   [][]interface{}{{3, 3}},
-				rowCount: 1,
+				values:          [][]interface{}{{3, 3}},
+				rowCount:        1,
+				approximateSize: 53,
 			},
 		}, {
 			name: "safe mode on one row commit ts < replicating ts",
@@ -2534,8 +2542,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
 					"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
 					"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
 				},
-				values:   [][]interface{}{{3, 3}, {5, 5}},
-				rowCount: 2,
+				values:          [][]interface{}{{3, 3}, {5, 5}},
+				rowCount:        2,
+				approximateSize: 106,
 			},
 		},
 	}
@@ -2611,9 +2620,10 @@ func TestPrepareBatchDMLs(t *testing.T) {
 				},
 			},
 			expected: &preparedDMLs{
-				sqls:     []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
-				values:   [][]interface{}{{1, "你好", 2, "世界"}},
-				rowCount: 2,
+				sqls:            []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
+				values:          [][]interface{}{{1, "你好", 2, "世界"}},
+				rowCount:        2,
+				approximateSize: 73,
 			},
 		},
 		{ // insert event
@@ -2657,9 +2667,10 @@ func TestPrepareBatchDMLs(t *testing.T) {
 				},
 			},
 			expected: &preparedDMLs{
-				sqls:     []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
-				values:   [][]interface{}{{1, "你好", 2, "世界"}},
-				rowCount: 2,
+				sqls:            []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
+				values:          [][]interface{}{{1, "你好", 2, "世界"}},
+				rowCount:        2,
+				approximateSize: 69,
 			},
 		},
 		// update event
@@ -2737,8 +2748,9 @@ func TestPrepareBatchDMLs(t *testing.T) {
 					"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " +
 						"WHERE `a1` = ? AND `a3` = ? LIMIT 1",
 				},
-				values:   [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
-				rowCount: 2,
+				values:          [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
+				rowCount:        2,
+				approximateSize: 184,
 			},
 		},
 		// mixed event
@@ -2808,8 +2820,9 @@ func TestPrepareBatchDMLs(t *testing.T) {
 					"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
 					"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
 				},
-				values:   [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
-				rowCount: 3,
+				values:          [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
+				rowCount:        3,
+				approximateSize: 136,
 			},
 		},
 	}