Skip to content

Commit

Permalink
ingester] all deepflow_system data store in a table
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Oct 17, 2023
1 parent b86ed6d commit 2a904f7
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 49 deletions.
8 changes: 4 additions & 4 deletions server/controller/grpc/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync/atomic"

"github.com/deepflowio/deepflow/server/libs/stats"
"github.com/op/go-logging"
logging "github.com/op/go-logging"
)

var log = logging.MustGetLogger("trisolaris/statsd")
Expand Down Expand Up @@ -118,17 +118,17 @@ func AddGPIDSendCounter(count uint64) {
func Start() {
for apiType, name := range ApiTypeToName {
grpcCounters[apiType] = NewGrpcCounter()
err := stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", grpcCounters[apiType], stats.OptionStatTags{"grpc_type": name})
err := stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", grpcCounters[apiType], stats.OptionStatTags{"grpc_type": name})
if err != nil {
log.Error(err)
}
}

err := stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", gpidCounter, stats.OptionStatTags{"grpc_type": "GPIDCount"})
err := stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", gpidCounter, stats.OptionStatTags{"grpc_type": "GPIDCount"})
if err != nil {
log.Error(err)
}
err = stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", GetPrometheusLabelIDsCounterSingleton(), stats.OptionStatTags{"grpc_type": "GetPrometheusLabelIDsDetail"})
err = stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", GetPrometheusLabelIDsCounterSingleton(), stats.OptionStatTags{"grpc_type": "GetPrometheusLabelIDsDetail"})
if err != nil {
log.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var log = logging.MustGetLogger("common")

const (
MODULE_INGESTER = "ingester."
MODULE_INGESTER = "ingester_"
QUEUE_STATS_MODULE_INGESTER = queue.OptionModule(MODULE_INGESTER)
)

Expand Down
24 changes: 5 additions & 19 deletions server/ingester/ext_metrics/dbwriter/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,24 @@ func (m *ExtMetrics) DatabaseName() string {

func (m *ExtMetrics) TableName() string {
if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS {
return m.VTableName
return DEEPFLOW_SYSTEM_TABLE
} else {
return EXT_METRICS_TABLE
}
}

func (m *ExtMetrics) VirtualTableName() string {
if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS {
return ""
} else {
return m.VTableName
}
return m.VTableName
}

// Note: The order of Write() must be consistent with the order of append() in Columns.
func (m *ExtMetrics) WriteBlock(block *ckdb.Block) {
block.WriteDateTime(m.Timestamp)
if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS {
m.UniversalTag.WriteBlock(block)
block.Write(m.VTableName)
}
block.Write(
m.VTableName,
m.TagNames,
m.TagValues,
m.MetricsFloatNames,
Expand All @@ -91,12 +87,9 @@ func (m *ExtMetrics) Columns() []*ckdb.Column {
columns = append(columns, ckdb.NewColumnWithGroupBy("time", ckdb.DateTime))
if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS {
columns = zerodoc.GenUniversalTagColumns(columns)

// FIXME: Currently there is no virtual_table_name column in the deepflow_system database,
// but it will be unified in the future.
columns = append(columns, ckdb.NewColumn("virtual_table_name", ckdb.LowCardinalityString).SetComment("虚拟表名k"))
}
columns = append(columns,
ckdb.NewColumn("virtual_table_name", ckdb.LowCardinalityString).SetComment("虚拟表名"),
ckdb.NewColumn("tag_names", ckdb.ArrayLowCardinalityString).SetComment("额外的tag"),
ckdb.NewColumn("tag_values", ckdb.ArrayLowCardinalityString).SetComment("额外的tag对应的值"),
ckdb.NewColumn("metrics_float_names", ckdb.ArrayLowCardinalityString).SetComment("额外的float类型metrics"),
Expand All @@ -115,19 +108,12 @@ func (m *ExtMetrics) GenCKTable(cluster, storagePolicy string, ttl int, coldStor
engine := ckdb.MergeTree

// order key
orderKeys := []string{}
orderKeys := []string{"virtual_table_name", timeKey}
if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS {
// FIXME: Currently there is no virtual_table_name column in the deepflow_system database,
// but it will be unified in the future.
orderKeys = append(orderKeys, "virtual_table_name")
orderKeys = append(orderKeys, timeKey)

// order key in universal tags
orderKeys = append(orderKeys, "l3_epc_id")
orderKeys = append(orderKeys, "ip4")
orderKeys = append(orderKeys, "ip6")
} else {
orderKeys = append(orderKeys, timeKey)
}

return &ckdb.Table{
Expand Down
20 changes: 5 additions & 15 deletions server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ import (
var log = logging.MustGetLogger("ext_metrics.dbwriter")

const (
QUEUE_BATCH_SIZE = 1024
EXT_METRICS_DB = "ext_metrics"
EXT_METRICS_TABLE = "metrics"
DEEPFLOW_SYSTEM_DB = "deepflow_system"
QUEUE_BATCH_SIZE = 1024
EXT_METRICS_DB = "ext_metrics"
EXT_METRICS_TABLE = "metrics"
DEEPFLOW_SYSTEM_DB = "deepflow_system"
DEEPFLOW_SYSTEM_TABLE = "deepflow_system"
)

type ClusterNode struct {
Expand Down Expand Up @@ -244,18 +245,7 @@ func NewExtMetricsWriter(
db string,
config *config.Config) (*ExtMetricsWriter, error) {

// adjust CKWriterConfig
ckWriterConfig := config.CKWriterConfig
if msgType == datatype.MESSAGE_TYPE_DFSTATS {
// FIXME: At present, there are hundreds of tables in the deepflow_system database,
// and the amount of data is not large. Adjust the queue size to reduce memory consumption.
// In the future, it is necessary to merge the data tables in deepflow_system with
// reference to the ext_metrics database.
ckWriterConfig.QueueCount = 1
ckWriterConfig.QueueSize >>= 3
ckWriterConfig.BatchSize >>= 3
}

// FlowTagWriter
flowTagWriterConfig := baseconfig.CKWriterConfig{
QueueCount: 1, // Allocate one FlowTagWriter for each ExtMetricsWriter.
Expand Down
8 changes: 0 additions & 8 deletions server/ingester/ext_metrics/ext_metrics/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ func NewExtMetrics(config *config.Config, recv *receiver.Receiver, platformDataM

func NewMetricsor(msgType datatype.MessageType, db string, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) {
queueCount := config.DecoderQueueCount
if msgType == datatype.MESSAGE_TYPE_DFSTATS {
// FIXME: At present, there are hundreds of tables in the deepflow_system database,
// and the amount of data is not large. Adjust the queue size to reduce memory consumption.
// In the future, it is necessary to merge the data tables in deepflow_system with
// reference to the ext_metrics database.
queueCount = 1
}

decodeQueues := manager.NewQueues(
"1-receive-to-decode-"+msgType.String(),
config.DecoderQueueSize,
Expand Down
2 changes: 1 addition & 1 deletion server/libs/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ func (r *Receiver) Start() {
go r.ProcessTCPServer()
}

stats.RegisterCountableWithModulePrefix("ingester.", "recviver", r)
stats.RegisterCountableWithModulePrefix("ingester_", "recviver", r)
}

func (r *Receiver) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion server/libs/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *StatSource) String() string {

var (
processName string
processNameJoiner string = "."
processNameJoiner string = "_"
hostname string
lock sync.Mutex
preHooks []func()
Expand Down

0 comments on commit 2a904f7

Please sign in to comment.