Skip to content

Commit

Permalink
[Feature][Zeta engine] Added the metrics information of table statist…
Browse files Browse the repository at this point in the history
…ics in multi-table mode apache#6959
  • Loading branch information
hawk9821 committed Jun 28, 2024
1 parent 993bb8d commit 1b7ad46
Showing 1 changed file with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,24 @@ public void received(Record<?> record) {
long size = ((SeaTunnelRow) record.getData()).getBytesSize();
sinkWriteBytes.inc(size);
sinkWriteBytesPerSeconds.markEvent(size);
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName = TablePath.of(tableId).getTableName();
if (StringUtils.isNotBlank(tableName)) {
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
} else {
sinkWriteCountPerTable.put(
tableName,
metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName));
try {
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName = TablePath.of(tableId).getTableName();
if (StringUtils.isNotBlank(tableName)) {
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
} else {
sinkWriteCountPerTable.put(
tableName,
metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName));
}
}
}
} catch (Exception e) {
log.error("====================== {}", e.getMessage());
throw new RuntimeException(e);
}
}
}
Expand Down

0 comments on commit 1b7ad46

Please sign in to comment.