diff --git a/server/controller/grpc/statsd/statsd.go b/server/controller/grpc/statsd/statsd.go index 81e92653cbb..ca439628a98 100644 --- a/server/controller/grpc/statsd/statsd.go +++ b/server/controller/grpc/statsd/statsd.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "github.com/deepflowio/deepflow/server/libs/stats" - "github.com/op/go-logging" + logging "github.com/op/go-logging" ) var log = logging.MustGetLogger("trisolaris/statsd") @@ -118,17 +118,17 @@ func AddGPIDSendCounter(count uint64) { func Start() { for apiType, name := range ApiTypeToName { grpcCounters[apiType] = NewGrpcCounter() - err := stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", grpcCounters[apiType], stats.OptionStatTags{"grpc_type": name}) + err := stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", grpcCounters[apiType], stats.OptionStatTags{"grpc_type": name}) if err != nil { log.Error(err) } } - err := stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", gpidCounter, stats.OptionStatTags{"grpc_type": "GPIDCount"}) + err := stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", gpidCounter, stats.OptionStatTags{"grpc_type": "GPIDCount"}) if err != nil { log.Error(err) } - err = stats.RegisterCountableWithModulePrefix("controller.", "trisolaris", GetPrometheusLabelIDsCounterSingleton(), stats.OptionStatTags{"grpc_type": "GetPrometheusLabelIDsDetail"}) + err = stats.RegisterCountableWithModulePrefix("controller_", "trisolaris", GetPrometheusLabelIDsCounterSingleton(), stats.OptionStatTags{"grpc_type": "GetPrometheusLabelIDsDetail"}) if err != nil { log.Error(err) } diff --git a/server/ingester/common/common.go b/server/ingester/common/common.go index 8a269b263bd..33aec1c68d8 100644 --- a/server/ingester/common/common.go +++ b/server/ingester/common/common.go @@ -36,7 +36,7 @@ import ( var log = logging.MustGetLogger("common") const ( - MODULE_INGESTER = "ingester." + MODULE_INGESTER = "ingester_" QUEUE_STATS_MODULE_INGESTER = queue.OptionModule(MODULE_INGESTER) ) diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics.go b/server/ingester/ext_metrics/dbwriter/ext_metrics.go index 181dce56c3d..3493a6a1b02 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics.go @@ -55,18 +55,14 @@ func (m *ExtMetrics) DatabaseName() string { func (m *ExtMetrics) TableName() string { if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS { - return m.VTableName + return DEEPFLOW_SYSTEM_TABLE } else { return EXT_METRICS_TABLE } } func (m *ExtMetrics) VirtualTableName() string { - if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS { - return "" - } else { - return m.VTableName - } + return m.VTableName } // Note: The order of Write() must be consistent with the order of append() in Columns. @@ -74,9 +70,9 @@ func (m *ExtMetrics) WriteBlock(block *ckdb.Block) { block.WriteDateTime(m.Timestamp) if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS { m.UniversalTag.WriteBlock(block) - block.Write(m.VTableName) } block.Write( + m.VTableName, m.TagNames, m.TagValues, m.MetricsFloatNames, @@ -91,12 +87,9 @@ func (m *ExtMetrics) Columns() []*ckdb.Column { columns = append(columns, ckdb.NewColumnWithGroupBy("time", ckdb.DateTime)) if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS { columns = zerodoc.GenUniversalTagColumns(columns) - - // FIXME: Currently there is no virtual_table_name column in the deepflow_system database, - // but it will be unified in the future. - columns = append(columns, ckdb.NewColumn("virtual_table_name", ckdb.LowCardinalityString).SetComment("虚拟表名k")) } columns = append(columns, + ckdb.NewColumn("virtual_table_name", ckdb.LowCardinalityString).SetComment("虚拟表名"), ckdb.NewColumn("tag_names", ckdb.ArrayLowCardinalityString).SetComment("额外的tag"), ckdb.NewColumn("tag_values", ckdb.ArrayLowCardinalityString).SetComment("额外的tag对应的值"), ckdb.NewColumn("metrics_float_names", ckdb.ArrayLowCardinalityString).SetComment("额外的float类型metrics"), @@ -115,19 +108,12 @@ func (m *ExtMetrics) GenCKTable(cluster, storagePolicy string, ttl int, coldStor engine := ckdb.MergeTree // order key - orderKeys := []string{} + orderKeys := []string{"virtual_table_name", timeKey} if m.MsgType != datatype.MESSAGE_TYPE_DFSTATS { - // FIXME: Currently there is no virtual_table_name column in the deepflow_system database, - // but it will be unified in the future. - orderKeys = append(orderKeys, "virtual_table_name") - orderKeys = append(orderKeys, timeKey) - // order key in universal tags orderKeys = append(orderKeys, "l3_epc_id") orderKeys = append(orderKeys, "ip4") orderKeys = append(orderKeys, "ip6") - } else { - orderKeys = append(orderKeys, timeKey) } return &ckdb.Table{ diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go index 125ceb52918..8312f1d66a1 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go @@ -37,10 +37,11 @@ import ( var log = logging.MustGetLogger("ext_metrics.dbwriter") const ( - QUEUE_BATCH_SIZE = 1024 - EXT_METRICS_DB = "ext_metrics" - EXT_METRICS_TABLE = "metrics" - DEEPFLOW_SYSTEM_DB = "deepflow_system" + QUEUE_BATCH_SIZE = 1024 + EXT_METRICS_DB = "ext_metrics" + EXT_METRICS_TABLE = "metrics" + DEEPFLOW_SYSTEM_DB = "deepflow_system" + DEEPFLOW_SYSTEM_TABLE = "deepflow_system" ) type ClusterNode struct { @@ -244,18 +245,7 @@ func NewExtMetricsWriter( db string, config *config.Config) (*ExtMetricsWriter, error) { - // adjust CKWriterConfig ckWriterConfig := config.CKWriterConfig - if msgType == datatype.MESSAGE_TYPE_DFSTATS { - // FIXME: At present, there are hundreds of tables in the deepflow_system database, - // and the amount of data is not large. Adjust the queue size to reduce memory consumption. - // In the future, it is necessary to merge the data tables in deepflow_system with - // reference to the ext_metrics database. - ckWriterConfig.QueueCount = 1 - ckWriterConfig.QueueSize >>= 3 - ckWriterConfig.BatchSize >>= 3 - } - // FlowTagWriter flowTagWriterConfig := baseconfig.CKWriterConfig{ QueueCount: 1, // Allocate one FlowTagWriter for each ExtMetricsWriter. diff --git a/server/ingester/ext_metrics/ext_metrics/ext_metrics.go b/server/ingester/ext_metrics/ext_metrics/ext_metrics.go index e6084951054..829077775f7 100644 --- a/server/ingester/ext_metrics/ext_metrics/ext_metrics.go +++ b/server/ingester/ext_metrics/ext_metrics/ext_metrics.go @@ -74,14 +74,6 @@ func NewExtMetrics(config *config.Config, recv *receiver.Receiver, platformDataM func NewMetricsor(msgType datatype.MessageType, db string, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) { queueCount := config.DecoderQueueCount - if msgType == datatype.MESSAGE_TYPE_DFSTATS { - // FIXME: At present, there are hundreds of tables in the deepflow_system database, - // and the amount of data is not large. Adjust the queue size to reduce memory consumption. - // In the future, it is necessary to merge the data tables in deepflow_system with - // reference to the ext_metrics database. - queueCount = 1 - } - decodeQueues := manager.NewQueues( "1-receive-to-decode-"+msgType.String(), config.DecoderQueueSize, diff --git a/server/libs/receiver/receiver.go b/server/libs/receiver/receiver.go index d9be0979399..fd505c22ee4 100644 --- a/server/libs/receiver/receiver.go +++ b/server/libs/receiver/receiver.go @@ -935,7 +935,7 @@ func (r *Receiver) Start() { go r.ProcessTCPServer() } - stats.RegisterCountableWithModulePrefix("ingester.", "recviver", r) + stats.RegisterCountableWithModulePrefix("ingester_", "recviver", r) } func (r *Receiver) Close() error { diff --git a/server/libs/stats/stats.go b/server/libs/stats/stats.go index 55d53200b4e..bd37a6158f5 100644 --- a/server/libs/stats/stats.go +++ b/server/libs/stats/stats.go @@ -61,7 +61,7 @@ func (s *StatSource) String() string { var ( processName string - processNameJoiner string = "." + processNameJoiner string = "_" hostname string lock sync.Mutex preHooks []func()