diff --git a/core/common/LogtailCommonFlags.cpp b/core/common/LogtailCommonFlags.cpp index 7deff9303c..c771b521a4 100644 --- a/core/common/LogtailCommonFlags.cpp +++ b/core/common/LogtailCommonFlags.cpp @@ -94,4 +94,5 @@ DEFINE_FLAG_INT32(fuse_file_max_count, "max file total count from fuse root dir" DEFINE_FLAG_BOOL(enable_root_path_collection, "", false); DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect, "if enable containerd upper dir detect when locating rootfs", - false); \ No newline at end of file + false); +DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false); \ No newline at end of file diff --git a/core/common/LogtailCommonFlags.h b/core/common/LogtailCommonFlags.h index 398542e510..22c73a2abb 100644 --- a/core/common/LogtailCommonFlags.h +++ b/core/common/LogtailCommonFlags.h @@ -70,4 +70,5 @@ DECLARE_FLAG_STRING(fuse_root_dir); DECLARE_FLAG_BOOL(enable_root_path_collection); DECLARE_FLAG_INT32(logtail_alarm_interval); DECLARE_FLAG_BOOL(enable_containerd_upper_dir_detect); +DECLARE_FLAG_BOOL(enable_sls_metrics_format); DECLARE_FLAG_BOOL(enable_new_pipeline); \ No newline at end of file diff --git a/core/logtail.cpp b/core/logtail.cpp index 0ff4672feb..8e9538b68c 100644 --- a/core/logtail.cpp +++ b/core/logtail.cpp @@ -111,6 +111,7 @@ static void overwrite_community_edition_flags() { INT32_FLAG(data_server_port) = 443; BOOL_FLAG(enable_env_ref_in_config) = true; BOOL_FLAG(enable_containerd_upper_dir_detect) = true; + BOOL_FLAG(enable_sls_metrics_format) = false; } // Main routine of worker process. diff --git a/core/logtail_windows.cpp b/core/logtail_windows.cpp index 8883bf0627..3e75794944 100644 --- a/core/logtail_windows.cpp +++ b/core/logtail_windows.cpp @@ -64,6 +64,7 @@ static void overwrite_community_edition_flags() { INT32_FLAG(data_server_port) = 443; BOOL_FLAG(enable_env_ref_in_config) = true; BOOL_FLAG(enable_containerd_upper_dir_detect) = true; + BOOL_FLAG(enable_sls_metrics_format) = false; } void do_worker_process() { diff --git a/core/plugin/LogtailPlugin.cpp b/core/plugin/LogtailPlugin.cpp index b4fcea5522..17169499ec 100644 --- a/core/plugin/LogtailPlugin.cpp +++ b/core/plugin/LogtailPlugin.cpp @@ -52,6 +52,7 @@ LogtailPlugin::LogtailPlugin() { mPluginCfg["HostIP"] = LogFileProfiler::mIpAddr; mPluginCfg["Hostname"] = LogFileProfiler::mHostname; mPluginCfg["EnableContainerdUpperDirDetect"] = BOOL_FLAG(enable_containerd_upper_dir_detect); + mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format); } LogtailPlugin::~LogtailPlugin() { diff --git a/pkg/config/global_config.go b/pkg/config/global_config.go index 0d53860b35..73f614d833 100644 --- a/pkg/config/global_config.go +++ b/pkg/config/global_config.go @@ -37,6 +37,7 @@ type GlobalConfig struct { EnableTimestampNanosecond bool EnableContainerdUpperDirDetect bool + EnableSlsMetricsFormat bool } // LogtailGlobalConfig is the singleton instance of GlobalConfig. diff --git a/pkg/helper/log_helper.go b/pkg/helper/log_helper.go index b96ec05d1d..3e6c787a8c 100644 --- a/pkg/helper/log_helper.go +++ b/pkg/helper/log_helper.go @@ -16,12 +16,25 @@ package helper import ( "fmt" + "math" "sort" "strconv" "strings" "time" + "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/protocol" + "github.com/alibaba/ilogtail/pkg/util" +) + +const ( + // StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0. + // This value is chosen with many leading 0s, so we have scope to store more + // complicated values in the future. It is 2 rather than 1 to make + // it easier to distinguish from the NormalNaN by a human when debugging. + StaleNaN uint64 = 0x7ff0000000000002 + StaleNan = "__STALE_NAN__" + SlsMetricstoreInvalidReplaceCharacter = '_' ) func CreateLog(t time.Time, configTag map[string]string, logTags map[string]string, fields map[string]string) (*protocol.Log, error) { @@ -96,25 +109,112 @@ type MetricLabel struct { } // Labels for metric labels -type MetricLabels []MetricLabel +type MetricLabels struct { + keyValues []*MetricLabel + sorted bool + formatStr string +} + +func (kv *MetricLabels) clearCache() { + kv.sorted = false + kv.formatStr = "" +} + +func (kv *MetricLabels) Len() int { + return len(kv.keyValues) +} -func (l MetricLabels) Len() int { - return len(l) +func (kv *MetricLabels) Swap(i int, j int) { + kv.keyValues[i], kv.keyValues[j] = kv.keyValues[j], kv.keyValues[i] } -func (l MetricLabels) Swap(i int, j int) { - l[i], l[j] = l[j], l[i] +func (kv *MetricLabels) Less(i int, j int) bool { + return kv.keyValues[i].Name < kv.keyValues[j].Name } -func (l MetricLabels) Less(i int, j int) bool { - return l[i].Name < l[j].Name +func (kv *MetricLabels) Replace(key, value string) { + findIndex := sort.Search(len(kv.keyValues), func(index int) bool { + return kv.keyValues[index].Name >= key + }) + if findIndex < len(kv.keyValues) && kv.keyValues[findIndex].Name == key { + kv.keyValues[findIndex].Value = value + } else { + kv.Append(key, value) + } + kv.clearCache() +} + +func (kv *MetricLabels) Clone() *MetricLabels { + if kv == nil { + return &MetricLabels{} + } + var newKeyValues MetricLabels + kv.CloneInto(&newKeyValues) + return &newKeyValues } -func MinInt(a, b int) int { - if a < b { - return a +func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels { + if kv == nil { + return &MetricLabels{} + } + if dst == nil { + return kv.Clone() + } + if len(kv.keyValues) < cap(dst.keyValues) { + dst.keyValues = dst.keyValues[:len(kv.keyValues)] + } else { + dst.keyValues = make([]*MetricLabel, len(kv.keyValues)) + } + dst.sorted = kv.sorted + dst.formatStr = kv.formatStr + for i, value := range kv.keyValues { + cp := *value + dst.keyValues[i] = &cp } - return b + return dst +} + +// AppendMap ... +func (kv *MetricLabels) AppendMap(mapVal map[string]string) { + for key, value := range mapVal { + kv.Append(key, value) + } + kv.clearCache() +} + +// Append ... +func (kv *MetricLabels) Append(key, value string) { + kv.keyValues = append(kv.keyValues, &MetricLabel{ + formatLabelKey(key), + formatLabelValue(value), + }) + kv.clearCache() +} + +func (kv *MetricLabels) SubSlice(begin, end int) { + kv.keyValues = kv.keyValues[begin:end] + kv.clearCache() +} + +func (kv *MetricLabels) String() string { + if kv == nil { + return "" + } + if !kv.sorted || kv.formatStr == "" { + sort.Sort(kv) + var builder strings.Builder + for index, label := range kv.keyValues { + builder.WriteString(label.Name) + builder.WriteString("#$#") + builder.WriteString(label.Value) + if index != len(kv.keyValues)-1 { + builder.WriteByte('|') + } + } + kv.formatStr = builder.String() + kv.sorted = true + } + return kv.formatStr } // DefBucket ... @@ -131,42 +231,128 @@ type HistogramData struct { } // ToMetricLogs .. -func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels MetricLabels) []*protocol.Log { +func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels *MetricLabels) []*protocol.Log { logs := make([]*protocol.Log, 0, len(hd.Buckets)+2) - sort.Sort(labels) + logs = append(logs, NewMetricLog(name+"_count", timeMs, float64(hd.Count), labels)) + logs = append(logs, NewMetricLog(name+"_sum", timeMs, hd.Sum, labels)) for _, v := range hd.Buckets { - newLabels := make(MetricLabels, len(labels), len(labels)+1) - copy(newLabels, labels) - newLabels = append(newLabels, MetricLabel{Name: "le", Value: strconv.FormatFloat(v.Le, 'g', -1, 64)}) - sort.Sort(newLabels) - logs = append(logs, NewMetricLog(name+"_bucket", timeMs, strconv.FormatInt(v.Count, 10), newLabels)) - } - logs = append(logs, NewMetricLog(name+"_count", timeMs, strconv.FormatInt(hd.Count, 10), labels)) - logs = append(logs, NewMetricLog(name+"_sum", timeMs, strconv.FormatFloat(hd.Sum, 'g', -1, 64), labels)) + labels.Replace("le", strconv.FormatFloat(v.Le, 'g', -1, 64)) + logs = append(logs, NewMetricLog(name+"_bucket", timeMs, float64(v.Count), labels)) + } + return logs } -// NewMetricLog caller must sort labels -func NewMetricLog(name string, timeMs int64, value string, labels []MetricLabel) *protocol.Log { - strTime := strconv.FormatInt(timeMs, 10) +// NewMetricLog create a metric log, time support unix milliseconds and unix nanoseconds. +func NewMetricLog(name string, t int64, value float64, labels *MetricLabels) *protocol.Log { + var valStr string + if math.Float64bits(value) == StaleNaN { + valStr = StaleNan + } else { + valStr = strconv.FormatFloat(value, 'g', -1, 64) + } + return NewMetricLogStringVal(name, t, valStr, labels) +} + +// NewMetricLogStringVal create a metric log with val string, time support unix milliseconds and unix nanoseconds. +func NewMetricLogStringVal(name string, t int64, value string, labels *MetricLabels) *protocol.Log { + strTime := strconv.FormatInt(t, 10) metric := &protocol.Log{} - protocol.SetLogTimeWithNano(metric, uint32(timeMs/1000), uint32((timeMs*1e6)%1e9)) - metric.Contents = []*protocol.Log_Content{} - metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: name}) + switch len(strTime) { + case 13: + protocol.SetLogTimeWithNano(metric, uint32(t/1000), uint32((t*1e6)%1e9)) + strTime += "000000" + case 19: + protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9)) + default: + t = int64(float64(t) * math.Pow10(19-len(strTime))) + strTime = strconv.FormatInt(t, 10) + protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9)) + } + metric.Contents = make([]*protocol.Log_Content, 0, 4) + metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: formatNewMetricName(name)}) metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__time_nano__", Value: strTime}) + metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: labels.String()}) + metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value}) + return metric +} + +func formatLabelKey(key string) string { + if !config.LogtailGlobalConfig.EnableSlsMetricsFormat { + return key + } + var newKey []byte + for i := 0; i < len(key); i++ { + b := key[i] + if (b >= 'a' && b <= 'z') || + (b >= 'A' && b <= 'Z') || + (b >= '0' && b <= '9') || + b == '_' { + continue + } else { + if newKey == nil { + newKey = []byte(key) + } + newKey[i] = SlsMetricstoreInvalidReplaceCharacter + } + } + if newKey == nil { + return key + } + return util.ZeroCopyBytesToString(newKey) +} - builder := strings.Builder{} - for index, l := range labels { - if index != 0 { - builder.WriteString("|") +func formatLabelValue(value string) string { + if !config.LogtailGlobalConfig.EnableSlsMetricsFormat { + return value + } + var newValue []byte + for i := 0; i < len(value); i++ { + b := value[i] + if b != '|' { + continue + } else { + if newValue == nil { + newValue = []byte(value) + } + newValue[i] = SlsMetricstoreInvalidReplaceCharacter } - builder.WriteString(l.Name) - builder.WriteString("#$#") - builder.WriteString(l.Value) + } + if newValue == nil { + return value + } + return util.ZeroCopyBytesToString(newValue) +} +func formatNewMetricName(name string) string { + if !config.LogtailGlobalConfig.EnableSlsMetricsFormat { + return name + } + newName := []byte(name) + for i, b := range newName { + if (b >= 'a' && b <= 'z') || + (b >= 'A' && b <= 'Z') || + (b >= '0' && b <= '9') || + b == '_' || + b == ':' { + continue + } else { + newName[i] = SlsMetricstoreInvalidReplaceCharacter + } } - metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: builder.String()}) + return util.ZeroCopyBytesToString(newName) +} - metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value}) - return metric +// ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]") +func ReplaceInvalidChars(in *string) { + for charIndex, char := range *in { + charInt := int(char) + if !((charInt >= 97 && charInt <= 122) || // a-z + (charInt >= 65 && charInt <= 90) || // A-Z + (charInt >= 48 && charInt <= 57) || // 0-9 + charInt == 95 || charInt == ':') { // _ + + *in = (*in)[:charIndex] + "_" + (*in)[charIndex+1:] + } + } } diff --git a/pkg/helper/log_helper_test.go b/pkg/helper/log_helper_test.go new file mode 100644 index 0000000000..388b53dbae --- /dev/null +++ b/pkg/helper/log_helper_test.go @@ -0,0 +1,36 @@ +package helper + +import ( + "github.com/stretchr/testify/require" + + "testing" +) + +func TestMetricLabels_Append(t *testing.T) { + var ml MetricLabels + ml.Append("key2", "val") + ml.Append("key", "val") + log := NewMetricLog("name", 1691646109945, 1, &ml) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + + ml.Replace("key", "val2") + + log = NewMetricLog("name", 1691646109945, 1, &ml) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + + ml.Replace("key3", "val3") + log = NewMetricLog("name", 1691646109945, 1, &ml) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + + cloneLabel := ml.Clone() + cloneLabel.Replace("key3", "val4") + log = NewMetricLog("name", 1691646109945, 1, cloneLabel) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + + log = NewMetricLog("name", 1691646109945, 1, &ml) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + + log = NewMetricLog("name", 1691646109945, 1, nil) + require.Equal(t, `Time:1691646109 Contents: Contents: Contents: Contents: `, log.String()) + +} diff --git a/pkg/helper/metrics_helper.go b/pkg/helper/metrics_helper.go deleted file mode 100644 index 5b3179c23b..0000000000 --- a/pkg/helper/metrics_helper.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2021 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package helper - -import ( - "sort" - "strconv" - "strings" - "time" - - "github.com/alibaba/ilogtail/pkg/pipeline" -) - -var metricKeys []string - -// KeyValue ... -type KeyValue struct { - Key string - Value string -} - -// KeyValues ... -type KeyValues struct { - keyValues []KeyValue -} - -func (kv *KeyValues) Len() int { return len(kv.keyValues) } -func (kv *KeyValues) Swap(i, j int) { - kv.keyValues[i], kv.keyValues[j] = kv.keyValues[j], kv.keyValues[i] -} -func (kv *KeyValues) Less(i, j int) bool { return kv.keyValues[i].Key < kv.keyValues[j].Key } - -// Sort ... -func (kv *KeyValues) Sort() { sort.Sort(kv) } - -// Replace ... -func (kv *KeyValues) Replace(key, value string) { - findIndex := sort.Search(len(kv.keyValues), func(index int) bool { - return kv.keyValues[index].Key >= key - }) - if findIndex < len(kv.keyValues) && kv.keyValues[findIndex].Key == key { - kv.keyValues[findIndex].Value = value - } -} - -// AppendMap ... -func (kv *KeyValues) AppendMap(mapVal map[string]string) { - for key, value := range mapVal { - kv.keyValues = append(kv.keyValues, KeyValue{ - Key: key, - Value: value, - }) - } -} - -// Append ... -func (kv *KeyValues) Append(key, value string) { - kv.keyValues = append(kv.keyValues, KeyValue{ - key, - value, - }) -} - -// Clone ... -func (kv *KeyValues) Clone() KeyValues { - var newKeyValues KeyValues - newKeyValues.keyValues = make([]KeyValue, len(kv.keyValues)) - copy(newKeyValues.keyValues, kv.keyValues) - return newKeyValues -} - -func (kv *KeyValues) String() string { - var builder strings.Builder - kv.labelToStringBuilder(&builder) - return builder.String() -} - -func (kv *KeyValues) labelToStringBuilder(sb *strings.Builder) { - if sb.Len() != 0 { - sb.WriteByte('|') - } - for index, label := range kv.keyValues { - sb.WriteString(label.Key) - sb.WriteString("#$#") - sb.WriteString(label.Value) - if index != len(kv.keyValues)-1 { - sb.WriteByte('|') - } - } -} - -// MakeMetric ... -func MakeMetric(name string, labels string, timeNano int64, value float64) ([]string, []string) { - values := make([]string, 4) - values[0] = name - values[1] = labels - values[2] = strconv.FormatInt(timeNano, 10) - values[3] = strconv.FormatFloat(value, 'g', -1, 64) - return metricKeys, values -} - -// AddMetric to the collector. -func AddMetric(collector pipeline.Collector, - name string, - time time.Time, - labels string, - value float64) { - keys, vals := MakeMetric(name, labels, time.UnixNano(), value) - collector.AddDataArray(nil, keys, vals, time) -} - -// ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]") -func ReplaceInvalidChars(in *string) { - - for charIndex, char := range *in { - charInt := int(char) - if !((charInt >= 97 && charInt <= 122) || // a-z - (charInt >= 65 && charInt <= 90) || // A-Z - (charInt >= 48 && charInt <= 57) || // 0-9 - charInt == 95 || charInt == ':') { // _ - - *in = (*in)[:charIndex] + "_" + (*in)[charIndex+1:] - } - } -} - -func init() { - metricKeys = append(metricKeys, "__name__") - metricKeys = append(metricKeys, "__labels__") - metricKeys = append(metricKeys, "__time_nano__") - metricKeys = append(metricKeys, "__value__") -} diff --git a/pkg/protocol/decoder/influxdb/decoder.go b/pkg/protocol/decoder/influxdb/decoder.go index 581487d5e9..d08395d2ed 100644 --- a/pkg/protocol/decoder/influxdb/decoder.go +++ b/pkg/protocol/decoder/influxdb/decoder.go @@ -18,7 +18,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "time" "github.com/influxdata/influxdb/models" @@ -27,15 +26,12 @@ import ( imodels "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/protocol/decoder/common" + "github.com/alibaba/ilogtail/pkg/util" ) const ( - metricNameKey = "__name__" - labelsKey = "__labels__" - timeNanoKey = "__time_nano__" - valueKey = "__value__" - typeKey = "__type__" - fieldNameKey = "__field__" + typeKey = "__type__" + fieldNameKey = "__field__" ) const ( @@ -162,14 +158,6 @@ func (d *Decoder) parsePointFieldsToMetricValues(p models.Point) (imodels.Metric func (d *Decoder) parsePointsToLogs(points []models.Point, req *http.Request) []*protocol.Log { db := req.FormValue("db") - contentLen := 4 - if d.FieldsExtend && len(db) > 0 { - contentLen++ - } - if d.FieldsExtend { - contentLen += 2 - } - logs := make([]*protocol.Log, 0, len(points)) for _, s := range points { fields, err := s.Fields() @@ -209,62 +197,25 @@ func (d *Decoder) parsePointsToLogs(points []models.Point, req *http.Request) [] } else { name = string(s.Name()) + ":" + field } - - if !d.FieldsExtend { - helper.ReplaceInvalidChars(&name) - } - var builder strings.Builder - for index, v := range s.Tags() { - if index != 0 { - builder.WriteByte('|') - } - if !d.FieldsExtend { - key := string(v.Key) - helper.ReplaceInvalidChars(&key) - builder.WriteString(key) - } else { - builder.Write(v.Key) - } - builder.WriteString("#$#") - builder.WriteString(string(v.Value)) + var labels helper.MetricLabels + for _, v := range s.Tags() { + labels.Append(util.ZeroCopyBytesToString(v.Key), util.ZeroCopyBytesToString(v.Value)) } - contents := make([]*protocol.Log_Content, 0, contentLen) - contents = append(contents, &protocol.Log_Content{ - Key: metricNameKey, - Value: name, - }, &protocol.Log_Content{ - Key: labelsKey, - Value: builder.String(), - }, &protocol.Log_Content{ - Key: timeNanoKey, - Value: strconv.FormatInt(s.UnixNano(), 10), - }, &protocol.Log_Content{ - Key: valueKey, - Value: value, - }) + metricLog := helper.NewMetricLogStringVal(name, s.UnixNano(), value, &labels) if d.FieldsExtend { - contents = append(contents, &protocol.Log_Content{ - Key: typeKey, - Value: valueType, - }, &protocol.Log_Content{ - Key: fieldNameKey, - Value: field, - }) - } - if d.FieldsExtend && len(db) > 0 { - contents = append(contents, &protocol.Log_Content{ - Key: tagDB, - Value: db, - }) - } + metricLog.Contents = append(metricLog.Contents, + &protocol.Log_Content{Key: typeKey, Value: valueType}, + &protocol.Log_Content{Key: fieldNameKey, Value: field}) + if len(db) > 0 { + metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{ + Key: tagDB, + Value: db, + }) - log := &protocol.Log{ - Contents: contents, + } } - protocol.SetLogTimeWithNano(log, uint32(s.Time().Unix()), uint32(s.Time().Nanosecond())) - logs = append(logs, log) - + logs = append(logs, metricLog) } } return logs diff --git a/pkg/protocol/decoder/influxdb/decoder_test.go b/pkg/protocol/decoder/influxdb/decoder_test.go index 9e737c96b8..777a072d4b 100644 --- a/pkg/protocol/decoder/influxdb/decoder_test.go +++ b/pkg/protocol/decoder/influxdb/decoder_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/protocol" ) @@ -69,10 +70,11 @@ cpu.load,host.dd=server02,region=uswest tt="xx",value=3 1434055562000010000 func TestFieldsExtend(t *testing.T) { cases := []struct { - enableFieldsExtend bool - data string - wantLogs []*protocol.Log - wantErr bool + enableFieldsExtend bool + enableSlsMetricsFormat bool + data string + wantLogs []*protocol.Log + wantErr bool }{ { enableFieldsExtend: true, @@ -112,9 +114,10 @@ func TestFieldsExtend(t *testing.T) { }, }, { - enableFieldsExtend: false, - data: txtWithDotNames, - wantErr: false, + enableFieldsExtend: false, + enableSlsMetricsFormat: true, + data: txtWithDotNames, + wantErr: false, wantLogs: []*protocol.Log{ { Contents: []*protocol.Log_Content{ @@ -138,6 +141,7 @@ func TestFieldsExtend(t *testing.T) { for _, testCase := range cases { decoder := &Decoder{FieldsExtend: testCase.enableFieldsExtend} + config.LogtailGlobalConfig.EnableSlsMetricsFormat = testCase.enableSlsMetricsFormat logs, err := decoder.Decode([]byte(txtWithDotNames), &http.Request{}, nil) if testCase.wantErr { assert.NotNil(t, err) diff --git a/pkg/protocol/decoder/prometheus/decoder.go b/pkg/protocol/decoder/prometheus/decoder.go index 9fb46b85c1..5ae8e7f0b6 100644 --- a/pkg/protocol/decoder/prometheus/decoder.go +++ b/pkg/protocol/decoder/prometheus/decoder.go @@ -19,8 +19,6 @@ import ( "fmt" "io" "net/http" - "sort" - "strconv" "strings" "github.com/gogo/protobuf/proto" @@ -30,6 +28,7 @@ import ( "github.com/richardartoul/molecule" "github.com/richardartoul/molecule/src/codec" + "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/protocol/decoder/common" @@ -68,35 +67,6 @@ type Decoder struct { AllowUnsafeMode bool } -func parseLabels(metric model.Metric) (metricName, labelsValue string) { - labels := (model.LabelSet)(metric) - - lns := make(model.LabelPairs, 0, len(labels)) - for k, v := range labels { - lns = append(lns, &model.LabelPair{ - Name: k, - Value: v, - }) - } - sort.Sort(lns) - var builder strings.Builder - labelCount := 0 - for _, label := range lns { - if label.Name == model.MetricNameLabel { - metricName = string(label.Value) - continue - } - if labelCount != 0 { - builder.WriteByte('|') - } - builder.WriteString(string(label.Name)) - builder.WriteString("#$#") - builder.WriteString(string(label.Value)) - labelCount++ - } - return metricName, builder.String() -} - // Decode impl func (d *Decoder) Decode(data []byte, req *http.Request, tags map[string]string) (logs []*protocol.Log, err error) { if req.Header.Get(contentEncodingKey) == snappyEncoding && @@ -125,29 +95,18 @@ func (d *Decoder) decodeInExpFmt(data []byte, _ *http.Request) (logs []*protocol break } for _, sample := range *s { - metricName, labelsValue := parseLabels(sample.Metric) - log := &protocol.Log{ - Contents: []*protocol.Log_Content{ - { - Key: metricNameKey, - Value: metricName, - }, - { - Key: labelsKey, - Value: labelsValue, - }, - { - Key: timeNanoKey, - Value: strconv.FormatInt(sample.Timestamp.UnixNano(), 10), - }, - { - Key: valueKey, - Value: strconv.FormatFloat(float64(sample.Value), 'g', -1, 64), - }, - }, + var name string + var labels helper.MetricLabels + labelsSet := (model.LabelSet)(sample.Metric) + for k, v := range labelsSet { + if k == model.MetricNameLabel { + name = string(v) + continue + } + labels.Append(string(k), string(v)) } - protocol.SetLogTimeWithNano(log, uint32(sample.Timestamp.Unix()), uint32(sample.Timestamp.UnixNano()%1e9)) - logs = append(logs, log) + metricLog := helper.NewMetricLog(name, sample.Timestamp.UnixNano(), float64(sample.Value), &labels) + logs = append(logs, metricLog) } } @@ -166,63 +125,31 @@ func (d *Decoder) decodeInRemoteWriteFormat(data []byte, req *http.Request) (log } db := req.FormValue(metaDBKey) - contentLen := 4 - if len(db) > 0 { - contentLen++ - } - for _, m := range metrics.Timeseries { - metricName, labelsValue := d.parsePbLabels(m.Labels) + var metricName string + var labels helper.MetricLabels + for _, label := range m.Labels { + if label.Name == model.MetricNameLabel { + metricName = label.Value + continue + } + labels.Append(label.Name, label.Value) + } for _, sample := range m.Samples { - contents := make([]*protocol.Log_Content, 0, contentLen) - contents = append(contents, &protocol.Log_Content{ - Key: metricNameKey, - Value: metricName, - }, &protocol.Log_Content{ - Key: labelsKey, - Value: labelsValue, - }, &protocol.Log_Content{ - Key: timeNanoKey, - Value: strconv.FormatInt(sample.Timestamp*1e6, 10), - }, &protocol.Log_Content{ - Key: valueKey, - Value: strconv.FormatFloat(sample.Value, 'g', -1, 64), - }) + metricLog := helper.NewMetricLog(metricName, sample.Timestamp, sample.Value, &labels) if len(db) > 0 { - contents = append(contents, &protocol.Log_Content{ + metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{ Key: tagDB, Value: db, }) } - - log := &protocol.Log{ - Contents: contents, - } - protocol.SetLogTimeWithNano(log, uint32(model.Time(sample.Timestamp).Unix()), uint32(model.Time(sample.Timestamp).UnixNano()%1e9)) - logs = append(logs, log) + logs = append(logs, metricLog) } } return logs, nil } -func (d *Decoder) parsePbLabels(labels []prompb.Label) (metricName, labelsValue string) { - var builder strings.Builder - for _, label := range labels { - if label.Name == model.MetricNameLabel { - metricName = label.Value - continue - } - if builder.Len() > 0 { - builder.WriteByte('|') - } - builder.WriteString(label.Name) - builder.WriteString("#$#") - builder.WriteString(label.Value) - } - return metricName, builder.String() -} - func (d *Decoder) DecodeV2(data []byte, req *http.Request) (groups []*models.PipelineGroupEvents, err error) { if len(data) == 0 { return nil, nil diff --git a/pkg/protocol/decoder/statsd/decoder.go b/pkg/protocol/decoder/statsd/decoder.go index eaad67fd1a..9822eaa122 100644 --- a/pkg/protocol/decoder/statsd/decoder.go +++ b/pkg/protocol/decoder/statsd/decoder.go @@ -18,9 +18,6 @@ import ( "bytes" "context" "net/http" - "sort" - "strconv" - "strings" "time" "github.com/alibaba/ilogtail/pkg/helper" @@ -30,43 +27,18 @@ import ( "github.com/alibaba/ilogtail/pkg/protocol/decoder/common" dogstatsd "github.com/narqo/go-dogstatsd-parser" - "github.com/prometheus/common/model" -) - -const ( - metricNameKey = "__name__" - labelsKey = "__labels__" - timeNanoKey = "__time_nano__" - valueKey = "__value__" ) type Decoder struct { Time time.Time } -func parseLabels(metric *dogstatsd.Metric) (labelsValue string) { - lns := make(model.LabelPairs, 0, len(metric.Tags)) +func parseLabels(metric *dogstatsd.Metric) *helper.MetricLabels { + var labels helper.MetricLabels for k, v := range metric.Tags { - lns = append(lns, &model.LabelPair{ - Name: model.LabelName(k), - Value: model.LabelValue(v), - }) + labels.Append(k, v) } - sort.Sort(lns) - var builder strings.Builder - labelCount := 0 - for _, label := range lns { - if labelCount != 0 { - builder.WriteByte('|') - } - k := string(label.Name) - helper.ReplaceInvalidChars(&k) - builder.WriteString(k) - builder.WriteString("#$#") - builder.WriteString(string(label.Value)) - labelCount++ - } - return builder.String() + return &labels } func (d *Decoder) Decode(data []byte, req *http.Request, tags map[string]string) (logs []*protocol.Log, err error) { @@ -85,28 +57,7 @@ func (d *Decoder) Decode(data []byte, req *http.Request, tags map[string]string) } continue } - helper.ReplaceInvalidChars(&m.Name) - log := &protocol.Log{ - Contents: []*protocol.Log_Content{ - { - Key: metricNameKey, - Value: m.Name, - }, - { - Key: labelsKey, - Value: parseLabels(m), - }, - { - Key: timeNanoKey, - Value: strconv.FormatInt(now.UnixNano(), 10), - }, - { - Key: valueKey, - Value: strconv.FormatFloat(m.Value.(float64), 'g', -1, 64), - }, - }, - } - protocol.SetLogTimeWithNano(log, uint32(now.Unix()), uint32(now.Nanosecond())) + log := helper.NewMetricLog(m.Name, now.UnixNano(), m.Value.(float64), parseLabels(m)) logs = append(logs, log) } return diff --git a/plugins/input/example/metric_checkpoint_example.go b/plugins/input/example/metric_checkpoint_example.go index a30723470d..81f03343ca 100644 --- a/plugins/input/example/metric_checkpoint_example.go +++ b/plugins/input/example/metric_checkpoint_example.go @@ -29,8 +29,7 @@ import ( type MetricsCheckpointExample struct { counter int gauge int - commonLabels helper.KeyValues - labels string + commonLabels helper.MetricLabels context pipeline.Context } @@ -48,7 +47,6 @@ func (m *MetricsCheckpointExample) Init(context pipeline.Context) (int, error) { m.commonLabels.Append("hostname", util.GetHostName()) m.commonLabels.Append("ip", util.GetIPAddress()) // convert the commonLabels to string to reduce memory cost because the labels is the fixed value. - m.labels = m.commonLabels.String() return 0, nil } @@ -65,8 +63,8 @@ func (m *MetricsCheckpointExample) Collect(collector pipeline.Collector) error { m.gauge = rand.Intn(100) // collect the metrics - helper.AddMetric(collector, "example_counter", time.Now(), m.labels, float64(m.counter)) - helper.AddMetric(collector, "example_gauge", time.Now(), m.labels, float64(m.gauge)) + collector.AddRawLog(helper.NewMetricLog("example_counter", time.Now().UnixNano(), float64(m.counter), &m.commonLabels)) + collector.AddRawLog(helper.NewMetricLog("example_gauge", time.Now().UnixNano(), float64(m.gauge), &m.commonLabels)) _ = m.context.SaveCheckPointObject("metric_checkpoint_example", &m.counter) return nil } diff --git a/plugins/input/example/metric_example.go b/plugins/input/example/metric_example.go index 54d7c70c2f..e778ac5bfc 100644 --- a/plugins/input/example/metric_example.go +++ b/plugins/input/example/metric_example.go @@ -29,7 +29,7 @@ import ( type MetricsExample struct { counter int gauge int - commonLabels helper.KeyValues + commonLabels helper.MetricLabels labels string } @@ -58,8 +58,8 @@ func (m *MetricsExample) Collect(collector pipeline.Collector) error { m.gauge = rand.Intn(100) //nolint:gosec // collect the metrics - helper.AddMetric(collector, "example_counter", time.Now(), m.labels, float64(m.counter)) - helper.AddMetric(collector, "example_gauge", time.Now(), m.labels, float64(m.gauge)) + collector.AddRawLog(helper.NewMetricLog("example_counter", time.Now().UnixNano(), float64(m.counter), &m.commonLabels)) + collector.AddRawLog(helper.NewMetricLog("example_gauge", time.Now().UnixNano(), float64(m.gauge), &m.commonLabels)) return nil } diff --git a/plugins/input/httpserver/input_http_server.go b/plugins/input/httpserver/input_http_server.go index 511fbc4198..b4bfb79303 100644 --- a/plugins/input/httpserver/input_http_server.go +++ b/plugins/input/httpserver/input_http_server.go @@ -197,7 +197,7 @@ func (s *ServiceHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) { case common.ProtocolSLS: w.Header().Set("x-log-requestid", "1234567890abcde") w.WriteHeader(http.StatusOK) - case common.ProtocolPyroscope: + case common.ProtocolPyroscope, common.ProtocolPrometheus: // do nothing default: w.WriteHeader(http.StatusNoContent) @@ -275,8 +275,11 @@ func (s *ServiceHTTP) start() error { s.listener = listener s.server = server go func() { - logger.Info(s.context.GetRuntimeContext(), "http server start", s.Address) - _ = server.Serve(listener) + logger.Info(s.context.GetRuntimeContext(), "http server start", s.Address, "listener", listener.Addr().String()) + err := server.Serve(listener) + if err != nil { + logger.Error(s.context.GetRuntimeContext(), "INIT_SERVER_ARMAR", "err", err.Error()) + } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.ShutdownTimeoutSec)*time.Second) defer cancel() _ = server.Shutdown(ctx) diff --git a/plugins/input/mock/input_mock.go b/plugins/input/mock/input_mock.go index 10353d143a..4fe6d760d4 100644 --- a/plugins/input/mock/input_mock.go +++ b/plugins/input/mock/input_mock.go @@ -32,23 +32,20 @@ type InputMock struct { Index int64 OpenPrometheusPattern bool - context pipeline.Context - labelStr string + context pipeline.Context + labels helper.MetricLabels } func (r *InputMock) Init(context pipeline.Context) (int, error) { r.context = context - var labels helper.KeyValues if r.OpenPrometheusPattern { for k, v := range r.Tags { - labels.Append(k, v) + r.labels.Append(k, v) } for k, v := range r.Fields { - labels.Append(k, fmt.Sprint(v)) + r.labels.Append(k, fmt.Sprint(v)) } } - labels.Sort() - r.labelStr = labels.String() return 0, nil } @@ -59,7 +56,7 @@ func (r *InputMock) Description() string { func (r *InputMock) Collect(collector pipeline.Collector) error { r.Index++ if r.OpenPrometheusPattern { - helper.AddMetric(collector, "metrics_mock", time.Now(), r.labelStr, float64(r.Index)) + collector.AddRawLog(helper.NewMetricLog("metrics_mock", time.Now().UnixNano(), float64(r.Index), &r.labels)) } else { // original log pattern. fields := make(map[string]string) diff --git a/plugins/input/netping/netping.go b/plugins/input/netping/netping.go index 659f0ab36e..3fce8ddfc0 100644 --- a/plugins/input/netping/netping.go +++ b/plugins/input/netping/netping.go @@ -51,7 +51,7 @@ const ( type Result struct { Valid bool // if the result is meaningful for count - Label string + Label *helper.MetricLabels Type string Total int Success int @@ -68,12 +68,12 @@ type Result struct { HTTPRTMs int HTTPResponseSize int HasHTTPSCert bool - HTTPSCertLabels string + HTTPSCertLabels *helper.MetricLabels HTTPSCertTTLDay int } type ResolveResult struct { - Label string + Label *helper.MetricLabels Success bool RTMs float64 } @@ -283,12 +283,12 @@ func (m *NetPing) Collect(collector pipeline.Collector) error { for i := 0; i < resolveCounter; i++ { result := <-m.resolveChannel if result.Success { - helper.AddMetric(collector, "dns_resolve_rt_ms", nowTs, result.Label, result.RTMs) - helper.AddMetric(collector, "dns_resolve_success", nowTs, result.Label, 1) - helper.AddMetric(collector, "dns_resolve_failed", nowTs, result.Label, 0) + m.addMetric(collector, "dns_resolve_rt_ms", &nowTs, result.Label, result.RTMs) + m.addMetric(collector, "dns_resolve_success", &nowTs, result.Label, 1) + m.addMetric(collector, "dns_resolve_failed", &nowTs, result.Label, 0) } else { - helper.AddMetric(collector, "dns_resolve_success", nowTs, result.Label, 0) - helper.AddMetric(collector, "dns_resolve_failed", nowTs, result.Label, 1) + m.addMetric(collector, "dns_resolve_success", &nowTs, result.Label, 0) + m.addMetric(collector, "dns_resolve_failed", &nowTs, result.Label, 1) } } } @@ -320,24 +320,24 @@ func (m *NetPing) Collect(collector pipeline.Collector) error { continue } - helper.AddMetric(collector, fmt.Sprintf("%s_total", result.Type), nowTs, result.Label, float64(result.Total)) - helper.AddMetric(collector, fmt.Sprintf("%s_success", result.Type), nowTs, result.Label, float64(result.Success)) - helper.AddMetric(collector, fmt.Sprintf("%s_failed", result.Type), nowTs, result.Label, float64(result.Failed)) + m.addMetric(collector, fmt.Sprintf("%s_total", result.Type), &nowTs, result.Label, float64(result.Total)) + m.addMetric(collector, fmt.Sprintf("%s_success", result.Type), &nowTs, result.Label, float64(result.Success)) + m.addMetric(collector, fmt.Sprintf("%s_failed", result.Type), &nowTs, result.Label, float64(result.Failed)) if (result.Type == PingTypeIcmp || result.Type == PingTypeTcping) && result.Success > 0 { - helper.AddMetric(collector, fmt.Sprintf("%s_rtt_min_ms", result.Type), nowTs, result.Label, result.MinRTTMs) - helper.AddMetric(collector, fmt.Sprintf("%s_rtt_max_ms", result.Type), nowTs, result.Label, result.MaxRTTMs) - helper.AddMetric(collector, fmt.Sprintf("%s_rtt_avg_ms", result.Type), nowTs, result.Label, result.AvgRTTMs) - helper.AddMetric(collector, fmt.Sprintf("%s_rtt_total_ms", result.Type), nowTs, result.Label, result.TotalRTTMs) - helper.AddMetric(collector, fmt.Sprintf("%s_rtt_stddev_ms", result.Type), nowTs, result.Label, result.StdDevRTTMs) + m.addMetric(collector, fmt.Sprintf("%s_rtt_min_ms", result.Type), &nowTs, result.Label, result.MinRTTMs) + m.addMetric(collector, fmt.Sprintf("%s_rtt_max_ms", result.Type), &nowTs, result.Label, result.MaxRTTMs) + m.addMetric(collector, fmt.Sprintf("%s_rtt_avg_ms", result.Type), &nowTs, result.Label, result.AvgRTTMs) + m.addMetric(collector, fmt.Sprintf("%s_rtt_total_ms", result.Type), &nowTs, result.Label, result.TotalRTTMs) + m.addMetric(collector, fmt.Sprintf("%s_rtt_stddev_ms", result.Type), &nowTs, result.Label, result.StdDevRTTMs) } else if result.Type == PingTypeHttping { if result.Success > 0 { - helper.AddMetric(collector, fmt.Sprintf("%s_rt_ms", result.Type), nowTs, result.Label, float64(result.HTTPRTMs)) - helper.AddMetric(collector, fmt.Sprintf("%s_response_bytes", result.Type), nowTs, result.Label, float64(result.HTTPResponseSize)) + m.addMetric(collector, fmt.Sprintf("%s_rt_ms", result.Type), &nowTs, result.Label, float64(result.HTTPRTMs)) + m.addMetric(collector, fmt.Sprintf("%s_response_bytes", result.Type), &nowTs, result.Label, float64(result.HTTPResponseSize)) } if result.HasHTTPSCert { - helper.AddMetric(collector, fmt.Sprintf("%s_cert_ttl_days", result.Type), nowTs, result.HTTPSCertLabels, float64(result.HTTPSCertTTLDay)) + m.addMetric(collector, fmt.Sprintf("%s_cert_ttl_days", result.Type), &nowTs, result.HTTPSCertLabels, float64(result.HTTPSCertTTLDay)) } } } @@ -345,6 +345,10 @@ func (m *NetPing) Collect(collector pipeline.Collector) error { return nil } +func (m *NetPing) addMetric(collector pipeline.Collector, name string, t *time.Time, labels *helper.MetricLabels, val float64) { + collector.AddRawLog(helper.NewMetricLog(name, t.UnixNano(), val, labels)) +} + func (m *NetPing) evaluteDNSResolve(host string) { success := true start := time.Now() @@ -362,7 +366,7 @@ func (m *NetPing) evaluteDNSResolve(host string) { m.resolveHostMap.Store(host, ips[n].String()) } - var label helper.KeyValues + var label helper.MetricLabels label.Append("dns_name", host) label.Append("src", m.ip) label.Append("src_host", m.hostname) @@ -373,7 +377,7 @@ func (m *NetPing) evaluteDNSResolve(host string) { m.resolveChannel <- &ResolveResult{ Success: success, RTMs: float64(rt.Milliseconds()), - Label: label.String(), + Label: &label, } } @@ -391,7 +395,7 @@ func (m *NetPing) getRealTarget(target string) string { func (m *NetPing) doICMPing(config *ICMPConfig) { // prepare labels - var label helper.KeyValues + var label helper.MetricLabels label.Append("name", config.Name) label.Append("src", config.Src) label.Append("dst", config.Target) @@ -409,7 +413,7 @@ func (m *NetPing) doICMPing(config *ICMPConfig) { Type: PingTypeIcmp, Total: config.Count, Failed: config.Count, - Label: label.String(), + Label: &label, } return } @@ -428,7 +432,7 @@ func (m *NetPing) doICMPing(config *ICMPConfig) { Type: PingTypeIcmp, Total: config.Count, Failed: config.Count, - Label: label.String(), + Label: &label, } return } @@ -441,7 +445,7 @@ func (m *NetPing) doICMPing(config *ICMPConfig) { m.resultChannel <- &Result{ Valid: true, - Label: label.String(), + Label: &label, Type: PingTypeIcmp, Total: pinger.Count, Success: stats.PacketsRecv, @@ -471,7 +475,7 @@ func evaluteTcping(target string, port int, timeout time.Duration) (time.Duratio func (m *NetPing) doTCPing(config *TCPConfig) { // prepare labels - var label helper.KeyValues + var label helper.MetricLabels label.Append("name", config.Name) label.Append("src", config.Src) label.Append("dst", config.Target) @@ -525,7 +529,7 @@ func (m *NetPing) doTCPing(config *TCPConfig) { m.resultChannel <- &Result{ Valid: true, - Label: label.String(), + Label: &label, Type: PingTypeTcping, Total: config.Count, Success: len(rtts), @@ -540,7 +544,7 @@ func (m *NetPing) doTCPing(config *TCPConfig) { func (m *NetPing) doHTTPing(config *HTTPConfig) { // prepare labels - var label helper.KeyValues + var label helper.MetricLabels label.Append("name", config.Name) label.Append("src", config.Src) label.Append("url", config.Target) @@ -579,7 +583,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { m.resultChannel <- &Result{ Valid: true, Type: PingTypeHttping, - Label: label.String(), + Label: &label, Total: 1, Success: 0, Failed: 1, @@ -595,7 +599,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { m.resultChannel <- &Result{ Valid: true, Type: PingTypeHttping, - Label: label.String(), + Label: &label, Total: 1, Success: 0, Failed: 1, @@ -611,7 +615,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { m.resultChannel <- &Result{ Valid: true, Type: PingTypeHttping, - Label: label.String(), + Label: &label, Total: 1, Success: 0, Failed: 1, @@ -638,7 +642,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { var certTTLDay int var hasCert bool - var certLabel helper.KeyValues + var certLabel helper.MetricLabels if resp.TLS != nil { for _, v := range resp.TLS.PeerCertificates { @@ -662,7 +666,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { m.resultChannel <- &Result{ Valid: true, Type: PingTypeHttping, - Label: label.String(), + Label: &label, Total: 1, Success: successCount, Failed: 1 - successCount, @@ -670,7 +674,7 @@ func (m *NetPing) doHTTPing(config *HTTPConfig) { HTTPResponseSize: len(respBody), HasHTTPSCert: hasCert, - HTTPSCertLabels: certLabel.String(), + HTTPSCertLabels: &certLabel, HTTPSCertTTLDay: certTTLDay, } diff --git a/plugins/input/netping/netping_test.go b/plugins/input/netping/netping_test.go index 15211d2f70..960a860785 100644 --- a/plugins/input/netping/netping_test.go +++ b/plugins/input/netping/netping_test.go @@ -165,8 +165,7 @@ func TestDoICMPing(t *testing.T) { netPing.doICMPing(&config1) res1 := <-netPing.resultChannel fmt.Println(res1) - - assert.Equal(t, true, strings.Contains(res1.Label, "src#$#|dst#$#8.8.8.8")) + assert.Equal(t, true, strings.Contains(res1.Label.String(), "dst#$#8.8.8.8|name#$#|src#$#|")) assert.Equal(t, true, res1.Valid) assert.Equal(t, 3, res1.Total) assert.Equal(t, 3, res1.Success+res1.Failed) diff --git a/plugins/input/process/input_process.go b/plugins/input/process/input_process.go index 7a52bf7269..898ee2247a 100644 --- a/plugins/input/process/input_process.go +++ b/plugins/input/process/input_process.go @@ -52,7 +52,7 @@ type InputProcess struct { context pipeline.Context lastProcesses map[int]processCache regexpList []*regexp.Regexp - commonLabels helper.KeyValues + commonLabels helper.MetricLabels collectTime time.Time } @@ -90,7 +90,7 @@ func (ip *InputProcess) Collect(collector pipeline.Collector) error { return err } for _, pc := range matchedProcesses { - labels := pc.Labels(ip.commonLabels) + labels := pc.Labels(&ip.commonLabels) // add necessary metrics ip.addCPUMetrics(pc, labels, collector) ip.addMemMetrics(pc, labels, collector) @@ -218,52 +218,57 @@ func (ip *InputProcess) filterTopAndThresholdMatchedProcesses(processList []proc return } -func (ip *InputProcess) addCPUMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addCPUMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if percentage := pc.GetProcessStatus().CPUPercentage; percentage != nil { - helper.AddMetric(collector, "process_cpu_percent", ip.collectTime, labels, percentage.TotalPercentage) - helper.AddMetric(collector, "process_cpu_stime_percent", ip.collectTime, labels, percentage.STimePercentage) - helper.AddMetric(collector, "process_cpu_utime_percent", ip.collectTime, labels, percentage.UTimePercentage) + + ip.addMetric(collector, "process_cpu_percent", &ip.collectTime, labels, percentage.TotalPercentage) + ip.addMetric(collector, "process_cpu_stime_percent", &ip.collectTime, labels, percentage.STimePercentage) + ip.addMetric(collector, "process_cpu_utime_percent", &ip.collectTime, labels, percentage.UTimePercentage) } } -func (ip *InputProcess) addMemMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addMetric(collector pipeline.Collector, name string, t *time.Time, labels *helper.MetricLabels, val float64) { + collector.AddRawLog(helper.NewMetricLog(name, t.UnixNano(), val, labels)) +} + +func (ip *InputProcess) addMemMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if mem := pc.GetProcessStatus().Memory; mem != nil { - helper.AddMetric(collector, "process_mem_rss", ip.collectTime, labels, float64(mem.Rss)) - helper.AddMetric(collector, "process_mem_swap", ip.collectTime, labels, float64(mem.Swap)) - helper.AddMetric(collector, "process_mem_vsz", ip.collectTime, labels, float64(mem.Vsz)) - helper.AddMetric(collector, "process_mem_data", ip.collectTime, labels, float64(mem.Data)) + ip.addMetric(collector, "process_mem_rss", &ip.collectTime, labels, float64(mem.Rss)) + ip.addMetric(collector, "process_mem_swap", &ip.collectTime, labels, float64(mem.Swap)) + ip.addMetric(collector, "process_mem_vsz", &ip.collectTime, labels, float64(mem.Vsz)) + ip.addMetric(collector, "process_mem_data", &ip.collectTime, labels, float64(mem.Data)) } } -func (ip *InputProcess) addThreadMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addThreadMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if pc.FetchThreads() { - helper.AddMetric(collector, "process_threads", ip.collectTime, labels, float64(pc.GetProcessStatus().ThreadsNum)) + ip.addMetric(collector, "process_threads", &ip.collectTime, labels, float64(pc.GetProcessStatus().ThreadsNum)) } } -func (ip *InputProcess) addOpenFilesMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addOpenFilesMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if pc.FetchFds() { - helper.AddMetric(collector, "process_fds", ip.collectTime, labels, float64(pc.GetProcessStatus().FdsNum)) + ip.addMetric(collector, "process_fds", &ip.collectTime, labels, float64(pc.GetProcessStatus().FdsNum)) } } -func (ip *InputProcess) addNetIOMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addNetIOMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if pc.FetchNetIO() { net := pc.GetProcessStatus().NetIO - helper.AddMetric(collector, "process_net_in_bytes", ip.collectTime, labels, float64(net.InBytes)) - helper.AddMetric(collector, "process_net_in_packet", ip.collectTime, labels, float64(net.InPacket)) - helper.AddMetric(collector, "process_net_out_bytes", ip.collectTime, labels, float64(net.OutBytes)) - helper.AddMetric(collector, "process_net_out_packet", ip.collectTime, labels, float64(net.OutPacket)) + ip.addMetric(collector, "process_net_in_bytes", &ip.collectTime, labels, float64(net.InBytes)) + ip.addMetric(collector, "process_net_in_packet", &ip.collectTime, labels, float64(net.InPacket)) + ip.addMetric(collector, "process_net_out_bytes", &ip.collectTime, labels, float64(net.OutBytes)) + ip.addMetric(collector, "process_net_out_packet", &ip.collectTime, labels, float64(net.OutPacket)) } } -func (ip *InputProcess) addIOMetrics(pc processCache, labels string, collector pipeline.Collector) { +func (ip *InputProcess) addIOMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) { if pc.FetchIO() { io := pc.GetProcessStatus().IO - helper.AddMetric(collector, "process_read_bytes", ip.collectTime, labels, float64(io.ReadeBytes)) - helper.AddMetric(collector, "process_write_bytes", ip.collectTime, labels, float64(io.WriteBytes)) - helper.AddMetric(collector, "process_read_count", ip.collectTime, labels, float64(io.ReadCount)) - helper.AddMetric(collector, "process_write_count", ip.collectTime, labels, float64(io.WriteCount)) + ip.addMetric(collector, "process_read_bytes", &ip.collectTime, labels, float64(io.ReadeBytes)) + ip.addMetric(collector, "process_write_bytes", &ip.collectTime, labels, float64(io.WriteBytes)) + ip.addMetric(collector, "process_read_count", &ip.collectTime, labels, float64(io.ReadCount)) + ip.addMetric(collector, "process_write_count", &ip.collectTime, labels, float64(io.WriteCount)) } } diff --git a/plugins/input/process/input_process_cache.go b/plugins/input/process/input_process_cache.go index 31d1acb958..6a12b4d8cc 100644 --- a/plugins/input/process/input_process_cache.go +++ b/plugins/input/process/input_process_cache.go @@ -27,7 +27,7 @@ type processCache interface { GetExe() string GetCmdLine() string FetchCoreCount() int64 - Labels(values helper.KeyValues) string + Labels(values *helper.MetricLabels) *helper.MetricLabels GetProcessStatus() *processStatus // FetchCore fetch the core exported status, such as CPU and Memory. FetchCore() bool @@ -41,12 +41,12 @@ type ( // processMeta contains the stable process meta data. processMeta struct { maxLabelLength int - labels string // The custom labels - lastFetchTime time.Time // The last fetch stat time - nowFetchTime time.Time // The fetch stat time - fetchCoreCount int64 // Auto increment, the max value supports running 1462356043387 years when the fetching frequency is 5s - cmdline string // The command line - exe string // The absolute path of the executable command + labels *helper.MetricLabels // The custom labels + lastFetchTime time.Time // The last fetch stat time + nowFetchTime time.Time // The fetch stat time + fetchCoreCount int64 // Auto increment, the max value supports running 1462356043387 years when the fetching frequency is 5s + cmdline string // The command line + exe string // The absolute path of the executable command } // processStatus contains the dynamic process status. diff --git a/plugins/input/process/input_process_cache_linux.go b/plugins/input/process/input_process_cache_linux.go index 705808bb7e..7d592b904f 100644 --- a/plugins/input/process/input_process_cache_linux.go +++ b/plugins/input/process/input_process_cache_linux.go @@ -141,10 +141,10 @@ func (pc *processCacheLinux) GetCmdLine() string { return pc.meta.cmdline } -func (pc *processCacheLinux) Labels(customLabels helper.KeyValues) string { - if len(pc.meta.labels) == 0 { +func (pc *processCacheLinux) Labels(customLabels *helper.MetricLabels) *helper.MetricLabels { + if pc.meta.labels == nil { if pc.stat == nil && !pc.fetchStat() { - return "" + return nil } processLabels := customLabels.Clone() processLabels.Append("pid", strconv.Itoa(pc.GetPid())) @@ -155,8 +155,7 @@ func (pc *processCacheLinux) Labels(customLabels helper.KeyValues) string { } else { processLabels.Append("comm", comm) } - processLabels.Sort() - pc.meta.labels = processLabels.String() + pc.meta.labels = processLabels } return pc.meta.labels } diff --git a/plugins/input/process/input_process_cache_other.go b/plugins/input/process/input_process_cache_other.go index de365691e0..8586c755e2 100644 --- a/plugins/input/process/input_process_cache_other.go +++ b/plugins/input/process/input_process_cache_other.go @@ -124,8 +124,8 @@ func (pc *processCacheOther) FetchCoreCount() int64 { return pc.meta.fetchCoreCount } -func (pc *processCacheOther) Labels(customLabels helper.KeyValues) string { - if pc.meta.labels == "" { +func (pc *processCacheOther) Labels(customLabels *helper.MetricLabels) *helper.MetricLabels { + if pc.meta.labels == nil { if name := pc.getCommName(); name != "" { processLabels := customLabels.Clone() processLabels.Append("pid", strconv.Itoa(pc.GetPid())) @@ -135,8 +135,7 @@ func (pc *processCacheOther) Labels(customLabels helper.KeyValues) string { } else { processLabels.Append("comm", name) } - processLabels.Sort() - pc.meta.labels = processLabels.String() + pc.meta.labels = processLabels } } return pc.meta.labels diff --git a/plugins/input/process/input_process_test.go b/plugins/input/process/input_process_test.go index 0b682f5dbb..39a802683e 100644 --- a/plugins/input/process/input_process_test.go +++ b/plugins/input/process/input_process_test.go @@ -58,8 +58,8 @@ func (t *TestProcessCache) FetchCoreCount() int64 { return t.fetchCoreCount } -func (t *TestProcessCache) Labels(values helper.KeyValues) string { - return "" +func (t *TestProcessCache) Labels(values *helper.MetricLabels) *helper.MetricLabels { + return nil } func (t *TestProcessCache) GetProcessStatus() *processStatus { // nolint:revive diff --git a/plugins/input/prometheus/sls_writer.go b/plugins/input/prometheus/sls_writer.go index dfd1373ada..4530626730 100644 --- a/plugins/input/prometheus/sls_writer.go +++ b/plugins/input/prometheus/sls_writer.go @@ -15,136 +15,25 @@ package prometheus import ( - "math" - "sort" - "strconv" - "strings" - "time" - "unsafe" - - "github.com/alibaba/ilogtail/pkg/pipeline" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/alibaba/ilogtail/pkg/helper" + "github.com/alibaba/ilogtail/pkg/pipeline" ) -const StaleNan = "__STALE_NAN__" - -var prometheusKeys []string - -const ( - // NormalNaN is a quiet NaN. This is also math.NaN(). - NormalNaN uint64 = 0x7ff8000000000001 - - // StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0. - // This value is chosen with many leading 0s, so we have scope to store more - // complicated values in the future. It is 2 rather than 1 to make - // it easier to distinguish from the NormalNaN by a human when debugging. - StaleNaN uint64 = 0x7ff0000000000002 -) - -// IsStaleNaN returns true when the provided NaN value is a stale marker. -func IsStaleNaN(v float64) bool { - return math.Float64bits(v) == StaleNaN -} - -func init() { - prometheusKeys = append(prometheusKeys, "__name__") - prometheusKeys = append(prometheusKeys, "__labels__") - prometheusKeys = append(prometheusKeys, "__time_nano__") - prometheusKeys = append(prometheusKeys, "__value__") -} - -func formatLabelKey(key string) string { - var newKey []byte - for i := 0; i < len(key); i++ { - b := key[i] - if (b >= 'a' && b <= 'z') || - (b >= 'A' && b <= 'Z') || - (b >= '0' && b <= '9') || - b == '_' { - continue - } else { - if newKey == nil { - newKey = []byte(key) - } - newKey[i] = '_' - } - } - if newKey == nil { - return key - } - return string(newKey) -} - -func formatLabelValue(value string) string { - var newValue []byte - for i := 0; i < len(value); i++ { - b := value[i] - if b != '|' { - continue - } else { - if newValue == nil { - newValue = []byte(value) - } - newValue[i] = '_' - } - } - if newValue == nil { - return value - } - return string(newValue) -} - -func formatNewMetricName(name string) string { - newName := []byte(name) - for i, b := range newName { - if (b >= 'a' && b <= 'z') || - (b >= 'A' && b <= 'Z') || - (b >= '0' && b <= '9') || - b == '_' || - b == ':' { - continue - } else { - newName[i] = '_' - } - } - return *(*string)(unsafe.Pointer(&newName)) -} - func appendTSDataToSlsLog(c pipeline.Collector, wr *prompbmarshal.WriteRequest) { - values := make([]string, 4) for i := range wr.Timeseries { ts := &wr.Timeseries[i] - sort.Slice(ts.Labels, func(i, j int) bool { - return ts.Labels[i].Name < ts.Labels[j].Name - }) - i := 0 - var labelsBuilder strings.Builder + var name string + var labels helper.MetricLabels for _, label := range ts.Labels { if label.Name == "__name__" { - // __name__ - // must call new for metric name, because Name is the reference of raw buffer - values[0] = formatNewMetricName(label.Value) + name = label.Value continue } - if i != 0 { - labelsBuilder.WriteByte('|') - } - labelsBuilder.WriteString(formatLabelKey(label.Name)) - labelsBuilder.WriteString("#$#") - labelsBuilder.WriteString(formatLabelValue(label.Value)) - i++ + labels.Append(label.Name, label.Value) } - values[1] = labelsBuilder.String() - for _, sample := range ts.Samples { - values[2] = strconv.FormatInt(sample.Timestamp, 10) - if IsStaleNaN(sample.Value) { - values[3] = StaleNan - } else { - values[3] = strconv.FormatFloat(sample.Value, 'g', -1, 64) - } - c.AddDataArray(nil, prometheusKeys, values, time.Unix(sample.Timestamp/(1e3), 0)) + c.AddRawLog(helper.NewMetricLog(name, sample.Timestamp, sample.Value, &labels)) } } diff --git a/plugins/input/skywalkingv2/jvm_metric_handle.go b/plugins/input/skywalkingv2/jvm_metric_handle.go index c786b3f145..d49340edaf 100644 --- a/plugins/input/skywalkingv2/jvm_metric_handle.go +++ b/plugins/input/skywalkingv2/jvm_metric_handle.go @@ -16,7 +16,6 @@ package skywalkingv2 import ( "context" - "strconv" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/pipeline" @@ -51,10 +50,15 @@ func (j *JVMMetricServiceHandle) Collect(ctx context.Context, metrics *agent.JVM func toMetricStoreFormat(metric *agent.JVMMetric, service string, serviceInstance string, host string) []*protocol.Log { var logs []*protocol.Log - cpuUsage := helper.NewMetricLog("skywalking_jvm_cpu_usage", metric.Time, - strconv.FormatFloat(metric.GetCpu().UsagePercent, 'f', 6, 64), []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + var labels helper.MetricLabels + labels.Append("service", service) + labels.Append("serviceInstance", serviceInstance) + labels.Append("host", host) + + cpuUsage := helper.NewMetricLog("skywalking_jvm_cpu_usage", metric.Time, metric.GetCpu().UsagePercent, &labels) logs = append(logs, cpuUsage) + memLabels := labels.Clone() for _, mem := range metric.Memory { var memType string if mem.IsHeap { @@ -62,64 +66,51 @@ func toMetricStoreFormat(metric *agent.JVMMetric, service string, serviceInstanc } else { memType = "nonheap" } - memCommitted := helper.NewMetricLog("skywalking_jvm_memory_committed", metric.GetTime(), - strconv.FormatInt(mem.Committed, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memLabels.Replace("type", memType) + + memCommitted := helper.NewMetricLog("skywalking_jvm_memory_committed", metric.GetTime(), float64(mem.Committed), memLabels) logs = append(logs, memCommitted) - memInit := helper.NewMetricLog("skywalking_jvm_memory_init", metric.GetTime(), - strconv.FormatInt(mem.Init, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memInit := helper.NewMetricLog("skywalking_jvm_memory_init", metric.GetTime(), float64(mem.Init), memLabels) logs = append(logs, memInit) - memMax := helper.NewMetricLog("skywalking_jvm_memory_max", metric.GetTime(), - strconv.FormatInt(mem.Max, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memMax := helper.NewMetricLog("skywalking_jvm_memory_max", metric.GetTime(), float64(mem.Max), memLabels) logs = append(logs, memMax) - memUsed := helper.NewMetricLog("skywalking_jvm_memory_used", metric.GetTime(), - strconv.FormatInt(mem.Used, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memUsed := helper.NewMetricLog("skywalking_jvm_memory_used", metric.GetTime(), float64(mem.Used), memLabels) logs = append(logs, memUsed) } - for _, gc := range metric.Gc { - gcTime := helper.NewMetricLog("skywalking_jvm_gc_time", metric.GetTime(), - strconv.FormatInt(gc.Time, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "phrase", Value: gc.Phrase.String()}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) - logs = append(logs, gcTime) - - phrase := "Old" - if gc.Phrase.Number() == agent.GCPhrase_NEW.Number() { - phrase = "Young" - } - - gcCount := helper.NewMetricLog("skywalking_jvm_gc_count", metric.GetTime(), - strconv.FormatInt(gc.Count, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "phrase", Value: phrase}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) - logs = append(logs, gcCount) - } for _, memPool := range metric.MemoryPool { - memPoolCommitted := helper.NewMetricLog("skywalking_jvm_memory_pool_committed", metric.GetTime(), - strconv.FormatInt(memPool.Commited, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memLabels.Replace("type", memPool.Type.String()) + memPoolCommitted := helper.NewMetricLog("skywalking_jvm_memory_pool_committed", metric.GetTime(), float64(memPool.Commited), memLabels) logs = append(logs, memPoolCommitted) - memPoolInit := helper.NewMetricLog("skywalking_jvm_memory_pool_init", metric.GetTime(), - strconv.FormatInt(memPool.Init, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolInit := helper.NewMetricLog("skywalking_jvm_memory_pool_init", metric.GetTime(), float64(memPool.Init), memLabels) logs = append(logs, memPoolInit) - memPoolMax := helper.NewMetricLog("skywalking_jvm_memory_pool_max", metric.GetTime(), - strconv.FormatInt(memPool.Max, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolMax := helper.NewMetricLog("skywalking_jvm_memory_pool_max", metric.GetTime(), float64(memPool.Max), memLabels) logs = append(logs, memPoolMax) - memPoolUsed := helper.NewMetricLog("skywalking_jvm_memory_pool_used", - metric.GetTime(), strconv.FormatInt(memPool.Used, 10), - []helper.MetricLabel{{Name: "host", Value: host}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolUsed := helper.NewMetricLog("skywalking_jvm_memory_pool_used", metric.GetTime(), float64(memPool.Used), memLabels) logs = append(logs, memPoolUsed) } + gcLabels := labels.CloneInto(memLabels) + for _, gc := range metric.Gc { + memLabels.Replace("phrase", gc.Phrase.String()) + gcTime := helper.NewMetricLog("skywalking_jvm_gc_time", metric.GetTime(), float64(gc.Time), gcLabels) + logs = append(logs, gcTime) + + phrase := "Old" + if gc.Phrase.Number() == agent.GCPhrase_NEW.Number() { + phrase = "Young" + } + memLabels.Replace("phrase", phrase) + + gcCount := helper.NewMetricLog("skywalking_jvm_gc_count", metric.GetTime(), float64(gc.Count), gcLabels) + logs = append(logs, gcCount) + } + return logs } diff --git a/plugins/input/skywalkingv3/clr_metric_handle.go b/plugins/input/skywalkingv3/clr_metric_handle.go index 4703045054..7aa92d3c39 100644 --- a/plugins/input/skywalkingv3/clr_metric_handle.go +++ b/plugins/input/skywalkingv3/clr_metric_handle.go @@ -16,7 +16,6 @@ package skywalkingv3 import ( "context" - "strconv" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/pipeline" @@ -52,50 +51,45 @@ func (c *CLRMetricHandler) Collect(ctx context.Context, req *agent.CLRMetricColl func (c *CLRMetricHandler) toMetricStoreFormat(metric *agent.CLRMetric, service string, serviceInstance string) []*protocol.Log { var logs []*protocol.Log - cpuUsage := helper.NewMetricLog("skywalking_clr_cpu_usage", metric.Time, - strconv.FormatFloat(metric.GetCpu().UsagePercent, 'f', 6, 64), []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + var labels helper.MetricLabels + labels.Append("service", service) + labels.Append("serviceInstance", serviceInstance) + cpuUsage := helper.NewMetricLog("skywalking_clr_cpu_usage", metric.Time, metric.GetCpu().UsagePercent, &labels) logs = append(logs, cpuUsage) gc := metric.Gc - gen0GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), - strconv.FormatInt(gc.Gen0CollectCount, 10), - []helper.MetricLabel{{Name: "gen", Value: "gen0"}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + gcLabels := labels.Clone() + gcLabels.Append("gen", "gen0") + gen0GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), float64(gc.Gen0CollectCount), gcLabels) logs = append(logs, gen0GcCount) - gen1GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), - strconv.FormatInt(gc.Gen1CollectCount, 10), - []helper.MetricLabel{{Name: "gen", Value: "gen1"}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + gcLabels.Replace("gen", "gen1") + gen1GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), float64(gc.Gen1CollectCount), gcLabels) logs = append(logs, gen1GcCount) - gen2GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), - strconv.FormatInt(gc.Gen2CollectCount, 10), - []helper.MetricLabel{{Name: "gen", Value: "gen2"}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + gcLabels.Replace("gen", "gen2") + gen2GcCount := helper.NewMetricLog("skywalking_clr_gc_count", metric.GetTime(), float64(gc.Gen2CollectCount), gcLabels) logs = append(logs, gen2GcCount) - heapMemory := helper.NewMetricLog("skywalking_clr_heap_memory", metric.GetTime(), - strconv.FormatInt(gc.HeapMemory, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + heapMemory := helper.NewMetricLog("skywalking_clr_heap_memory", metric.GetTime(), float64(gc.HeapMemory), &labels) logs = append(logs, heapMemory) thread := metric.Thread - availableCompletionPortThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), - strconv.FormatInt(int64(thread.AvailableCompletionPortThreads), 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: "availableCompletionPortThreads"}}) + threadLabels := labels.CloneInto(gcLabels) + threadLabels.Append("type", "availableCompletionPortThreads") + availableCompletionPortThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), float64(thread.AvailableCompletionPortThreads), threadLabels) logs = append(logs, availableCompletionPortThreads) - availableWorkerThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), - strconv.FormatInt(int64(thread.AvailableWorkerThreads), 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: "availableWorkerThreads"}}) + threadLabels.Replace("type", "availableWorkerThreads") + availableWorkerThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), float64(thread.AvailableWorkerThreads), threadLabels) logs = append(logs, availableWorkerThreads) - maxCompletionPortThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), - strconv.FormatInt(int64(thread.MaxCompletionPortThreads), 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: "maxCompletionPortThreads"}}) + threadLabels.Replace("type", "maxCompletionPortThreads") + maxCompletionPortThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), float64(thread.MaxCompletionPortThreads), threadLabels) logs = append(logs, maxCompletionPortThreads) - maxWorkerThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), - strconv.FormatInt(int64(thread.MaxWorkerThreads), 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: "maxWorkerThreads"}}) + threadLabels.Replace("type", "maxWorkerThreads") + maxWorkerThreads := helper.NewMetricLog("skywalking_clr_threads", metric.GetTime(), float64(thread.MaxWorkerThreads), threadLabels) logs = append(logs, maxWorkerThreads) return logs } diff --git a/plugins/input/skywalkingv3/jvm_metric_handle.go b/plugins/input/skywalkingv3/jvm_metric_handle.go index b3118aa22d..72fc4b649e 100644 --- a/plugins/input/skywalkingv3/jvm_metric_handle.go +++ b/plugins/input/skywalkingv3/jvm_metric_handle.go @@ -16,7 +16,6 @@ package skywalkingv3 import ( "context" - "strconv" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/pipeline" @@ -52,10 +51,14 @@ func (h *JVMMetricHandler) Collect(ctx context.Context, req *skywalking.JVMMetri func (h *JVMMetricHandler) toMetricStoreFormat(metric *skywalking.JVMMetric, service string, serviceInstance string) []*protocol.Log { var logs []*protocol.Log - cpuUsage := helper.NewMetricLog("skywalking_jvm_cpu_usage", metric.Time, - strconv.FormatFloat(metric.GetCpu().UsagePercent, 'f', 6, 64), []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + var labels helper.MetricLabels + labels.Append("service", service) + labels.Append("serviceInstance", serviceInstance) + + cpuUsage := helper.NewMetricLog("skywalking_jvm_cpu_usage", metric.Time, metric.GetCpu().UsagePercent, &labels) logs = append(logs, cpuUsage) + memLabels := labels.Clone() for _, mem := range metric.Memory { var memType string if mem.IsHeap { @@ -63,71 +66,52 @@ func (h *JVMMetricHandler) toMetricStoreFormat(metric *skywalking.JVMMetric, ser } else { memType = "nonheap" } - memCommitted := helper.NewMetricLog("skywalking_jvm_memory_committed", metric.GetTime(), - strconv.FormatInt(mem.Committed, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memLabels.Replace("type", memType) + memCommitted := helper.NewMetricLog("skywalking_jvm_memory_committed", metric.GetTime(), float64(mem.Committed), memLabels) logs = append(logs, memCommitted) - memInit := helper.NewMetricLog("skywalking_jvm_memory_init", metric.GetTime(), - strconv.FormatInt(mem.Init, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memInit := helper.NewMetricLog("skywalking_jvm_memory_init", metric.GetTime(), float64(mem.Init), memLabels) logs = append(logs, memInit) - memMax := helper.NewMetricLog("skywalking_jvm_memory_max", metric.GetTime(), - strconv.FormatInt(mem.Max, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memMax := helper.NewMetricLog("skywalking_jvm_memory_max", metric.GetTime(), float64(mem.Max), memLabels) logs = append(logs, memMax) - memUsed := helper.NewMetricLog("skywalking_jvm_memory_used", metric.GetTime(), - strconv.FormatInt(mem.Used, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memType}}) + memUsed := helper.NewMetricLog("skywalking_jvm_memory_used", metric.GetTime(), float64(mem.Used), memLabels) logs = append(logs, memUsed) } - for _, gc := range metric.Gc { - gcTime := helper.NewMetricLog("skywalking_jvm_gc_time", metric.GetTime(), - strconv.FormatInt(gc.Time, 10), - []helper.MetricLabel{{Name: "phrase", Value: gc.Phrase.String()}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) - logs = append(logs, gcTime) - - gcCount := helper.NewMetricLog("skywalking_jvm_gc_count", metric.GetTime(), - strconv.FormatInt(gc.Count, 10), - []helper.MetricLabel{{Name: "phrase", Value: gc.Phrase.String()}, {Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) - logs = append(logs, gcCount) - } - for _, memPool := range metric.MemoryPool { - memPoolCommitted := helper.NewMetricLog("skywalking_jvm_memory_pool_committed", metric.GetTime(), - strconv.FormatInt(memPool.Committed, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memLabels.Replace("type", memPool.Type.String()) + memPoolCommitted := helper.NewMetricLog("skywalking_jvm_memory_pool_committed", metric.GetTime(), float64(memPool.Committed), memLabels) logs = append(logs, memPoolCommitted) - memPoolInit := helper.NewMetricLog("skywalking_jvm_memory_pool_init", metric.GetTime(), - strconv.FormatInt(memPool.Init, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolInit := helper.NewMetricLog("skywalking_jvm_memory_pool_init", metric.GetTime(), float64(memPool.Init), memLabels) logs = append(logs, memPoolInit) - memPoolMax := helper.NewMetricLog("skywalking_jvm_memory_pool_max", metric.GetTime(), - strconv.FormatInt(memPool.Max, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolMax := helper.NewMetricLog("skywalking_jvm_memory_pool_max", metric.GetTime(), float64(memPool.Max), memLabels) logs = append(logs, memPoolMax) - memPoolUsed := helper.NewMetricLog("skywalking_jvm_memory_pool_used", - metric.GetTime(), strconv.FormatInt(memPool.Used, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}, {Name: "type", Value: memPool.Type.String()}}) + memPoolUsed := helper.NewMetricLog("skywalking_jvm_memory_pool_used", metric.GetTime(), float64(memPool.Used), memLabels) logs = append(logs, memPoolUsed) } - threadsLive := helper.NewMetricLog("skywalking_jvm_threads_live", metric.GetTime(), - strconv.FormatInt(metric.Thread.LiveCount, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + gcLabels := labels.CloneInto(memLabels) + for _, gc := range metric.Gc { + gcLabels.Replace("phrase", gc.Phrase.String()) + gcTime := helper.NewMetricLog("skywalking_jvm_gc_time", metric.GetTime(), float64(gc.Time), gcLabels) + logs = append(logs, gcTime) + + gcCount := helper.NewMetricLog("skywalking_jvm_gc_count", metric.GetTime(), float64(gc.Count), gcLabels) + logs = append(logs, gcCount) + } + + threadsLive := helper.NewMetricLog("skywalking_jvm_threads_live", metric.GetTime(), float64(metric.Thread.LiveCount), &labels) logs = append(logs, threadsLive) - threadsDaemon := helper.NewMetricLog("skywalking_jvm_threads_daemon", metric.GetTime(), - strconv.FormatInt(metric.Thread.DaemonCount, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + + threadsDaemon := helper.NewMetricLog("skywalking_jvm_threads_daemon", metric.GetTime(), float64(metric.Thread.DaemonCount), &labels) logs = append(logs, threadsDaemon) - threadsPeak := helper.NewMetricLog("skywalking_jvm_threads_peak", metric.GetTime(), - strconv.FormatInt(metric.Thread.PeakCount, 10), - []helper.MetricLabel{{Name: "service", Value: service}, {Name: "serviceInstance", Value: serviceInstance}}) + + threadsPeak := helper.NewMetricLog("skywalking_jvm_threads_peak", metric.GetTime(), float64(metric.Thread.PeakCount), &labels) logs = append(logs, threadsPeak) + return logs } diff --git a/plugins/input/skywalkingv3/jvm_metric_handle_test.go b/plugins/input/skywalkingv3/jvm_metric_handle_test.go index 91a467b072..ff8a748f4f 100644 --- a/plugins/input/skywalkingv3/jvm_metric_handle_test.go +++ b/plugins/input/skywalkingv3/jvm_metric_handle_test.go @@ -48,6 +48,9 @@ func validate(expectedResultPath string, result []*protocol.Log, t *testing.T) { jsonBytes, _ := json.MarshalIndent(result, "", " ") fmt.Println(string(jsonBytes)) expected, _ := ioutil.ReadFile(filepath.Clean(expectedResultPath)) + temp := make([]*protocol.Log, 0, 16) + json.Unmarshal(expected, &temp) + expected, _ = json.MarshalIndent(temp, "", " ") if !bytes.Equal(jsonBytes, expected) { t.Fail() } diff --git a/plugins/input/skywalkingv3/meter_handle.go b/plugins/input/skywalkingv3/meter_handle.go index 78d4ab6b31..09f14f389c 100644 --- a/plugins/input/skywalkingv3/meter_handle.go +++ b/plugins/input/skywalkingv3/meter_handle.go @@ -17,8 +17,6 @@ package skywalkingv3 import ( "io" "math" - "sort" - "strconv" "time" "github.com/alibaba/ilogtail/pkg/helper" @@ -100,29 +98,29 @@ func handleMeterData(context pipeline.Context, collector pipeline.Collector, met if singleValue != nil { value := singleValue.Value name := singleValue.Name - labels := make(helper.MetricLabels, 0, len(singleValue.Labels)+2) + var labels helper.MetricLabels for _, l := range singleValue.Labels { - labels = append(labels, helper.MetricLabel{Name: l.Name, Value: l.Value}) + labels.Append(l.Name, l.Value) } - labels = append(labels, helper.MetricLabel{Name: "service", Value: service}) - labels = append(labels, helper.MetricLabel{Name: "serviceInstance", Value: serviceInstance}) - sort.Sort(labels) - metricLog := helper.NewMetricLog(name, ts, strconv.FormatFloat(value, 'g', -1, 64), labels) + labels.Append("service", service) + labels.Append("serviceInstance", serviceInstance) + + metricLog := helper.NewMetricLog(name, ts, value, &labels) // logger.Info("meter", meterData) collector.AddRawLog(metricLog) } histogramData := meterData.GetHistogram() if histogramData != nil { - labels := make(helper.MetricLabels, 0, len(histogramData.Labels)+2) + var labels helper.MetricLabels + labels.Append("service", service) + labels.Append("serviceInstance", serviceInstance) for _, l := range histogramData.Labels { - labels = append(labels, helper.MetricLabel{Name: l.Name, Value: l.Value}) + labels.Append(l.Name, l.Value) } - labels = append(labels, helper.MetricLabel{Name: "service", Value: service}) - labels = append(labels, helper.MetricLabel{Name: "serviceInstance", Value: serviceInstance}) hd := convertHistogramData(histogramData) - logs := hd.ToMetricLogs(histogramData.Name, ts, labels) + logs := hd.ToMetricLogs(histogramData.Name, ts, &labels) for _, logIns := range logs { collector.AddRawLog(logIns) } diff --git a/plugins/input/skywalkingv3/testdata/clr_metrics.json b/plugins/input/skywalkingv3/testdata/clr_metrics.json index 789b448d0e..c0e76d49f7 100644 --- a/plugins/input/skywalkingv3/testdata/clr_metrics.json +++ b/plugins/input/skywalkingv3/testdata/clr_metrics.json @@ -1,200 +1,200 @@ [ - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_cpu_usage" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "0.500000" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_gc_count" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "gen#$#gen0|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_gc_count" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "gen#$#gen1|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "2" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_gc_count" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "gen#$#gen2|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "3" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_heap_memory" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "123456" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_threads" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#availableCompletionPortThreads" - }, - { - "Key": "__value__", - "Value": "999" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_threads" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#availableWorkerThreads" - }, - { - "Key": "__value__", - "Value": "888" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_threads" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#maxCompletionPortThreads" - }, - { - "Key": "__value__", - "Value": "777" - } - ], - "Time_ns": 0 - }, - { - "Time": 15, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_clr_threads" - }, - { - "Key": "__time_nano__", - "Value": "15000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#maxWorkerThreads" - }, - { - "Key": "__value__", - "Value": "666" - } - ], - "Time_ns": 0 - } + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_cpu_usage" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "0.5" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_gc_count" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "gen#$#gen0|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_gc_count" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "gen#$#gen1|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "2" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_gc_count" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "gen#$#gen2|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "3" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_heap_memory" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "123456" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_threads" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#availableCompletionPortThreads" + }, + { + "Key": "__value__", + "Value": "999" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_threads" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#availableWorkerThreads" + }, + { + "Key": "__value__", + "Value": "888" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_threads" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#maxCompletionPortThreads" + }, + { + "Key": "__value__", + "Value": "777" + } + ], + "Time_ns": 0 + }, + { + "Time": 1500000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_clr_threads" + }, + { + "Key": "__time_nano__", + "Value": "1500000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#maxWorkerThreads" + }, + { + "Key": "__value__", + "Value": "666" + } + ], + "Time_ns": 0 + } ] \ No newline at end of file diff --git a/plugins/input/skywalkingv3/testdata/jvm_metrics.json b/plugins/input/skywalkingv3/testdata/jvm_metrics.json index a8ee6b698a..5a1ae25508 100644 --- a/plugins/input/skywalkingv3/testdata/jvm_metrics.json +++ b/plugins/input/skywalkingv3/testdata/jvm_metrics.json @@ -1,530 +1,530 @@ [ - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_cpu_usage" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "50.000000" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_committed" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" - }, - { - "Key": "__value__", - "Value": "4" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_init" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_max" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" - }, - { - "Key": "__value__", - "Value": "9" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_used" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" - }, - { - "Key": "__value__", - "Value": "7" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_committed" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" - }, - { - "Key": "__value__", - "Value": "4" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_init" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_max" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" - }, - { - "Key": "__value__", - "Value": "9" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_used" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" - }, - { - "Key": "__value__", - "Value": "7" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_gc_time" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "phrase#$#NEW|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "123" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_gc_count" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "phrase#$#NEW|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "12" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_gc_time" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "phrase#$#OLD|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "123" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_gc_count" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "phrase#$#OLD|service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "12" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_committed" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "7" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_init" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_max" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "9" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_used" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "4" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_committed" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "7" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_init" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_max" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "9" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_memory_pool_used" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" - }, - { - "Key": "__value__", - "Value": "4" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_threads_live" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "1" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_threads_daemon" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "2" - } - ], - "Time_ns": 0 - }, - { - "Time": 10, - "Contents": [ - { - "Key": "__name__", - "Value": "skywalking_jvm_threads_peak" - }, - { - "Key": "__time_nano__", - "Value": "10000" - }, - { - "Key": "__labels__", - "Value": "service#$#service_1|serviceInstance#$#instance_1" - }, - { - "Key": "__value__", - "Value": "3" - } - ], - "Time_ns": 0 - } + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_cpu_usage" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "50" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_committed" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" + }, + { + "Key": "__value__", + "Value": "4" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_init" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_max" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" + }, + { + "Key": "__value__", + "Value": "9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_used" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#nonheap" + }, + { + "Key": "__value__", + "Value": "7" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_committed" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" + }, + { + "Key": "__value__", + "Value": "4" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_init" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_max" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" + }, + { + "Key": "__value__", + "Value": "9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_used" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#heap" + }, + { + "Key": "__value__", + "Value": "7" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_committed" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "7" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_init" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_max" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_used" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#NEWGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "4" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_committed" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "7" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_init" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_max" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_memory_pool_used" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1|type#$#OLDGEN_USAGE" + }, + { + "Key": "__value__", + "Value": "4" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_gc_time" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "phrase#$#NEW|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "123" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_gc_count" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "phrase#$#NEW|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "12" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_gc_time" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "phrase#$#OLD|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "123" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_gc_count" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "phrase#$#OLD|service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "12" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_threads_live" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "1" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_threads_daemon" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "2" + } + ], + "Time_ns": 0 + }, + { + "Time": 1000000000, + "Contents": [ + { + "Key": "__name__", + "Value": "skywalking_jvm_threads_peak" + }, + { + "Key": "__time_nano__", + "Value": "1000000000000000000" + }, + { + "Key": "__labels__", + "Value": "service#$#service_1|serviceInstance#$#instance_1" + }, + { + "Key": "__value__", + "Value": "3" + } + ], + "Time_ns": 0 + } ] \ No newline at end of file diff --git a/plugins/input/skywalkingv3/testdata/meter_histogram.json b/plugins/input/skywalkingv3/testdata/meter_histogram.json index db3b178357..6e5df38c5e 100644 --- a/plugins/input/skywalkingv3/testdata/meter_histogram.json +++ b/plugins/input/skywalkingv3/testdata/meter_histogram.json @@ -1,156 +1,156 @@ [ - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_bucket" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#50|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "5" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_bucket" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#88.8|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "9" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_bucket" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#90|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "12" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_bucket" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#100|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "14" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_bucket" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#+Inf|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "15" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_count" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "15" - } - ], - "Time_ns": 789000000 - }, - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_histogram_metric_sum" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "746.9" - } - ], - "Time_ns": 789000000 - } + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_count" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "15" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_sum" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "746.9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_bucket" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#50|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "5" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_bucket" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#88.8|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "9" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_bucket" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#90|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "12" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_bucket" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#100|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "14" + } + ], + "Time_ns": 0 + }, + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_histogram_metric_bucket" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|le#$#+Inf|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "15" + } + ], + "Time_ns": 0 + } ] \ No newline at end of file diff --git a/plugins/input/skywalkingv3/testdata/meter_singlevalue.json b/plugins/input/skywalkingv3/testdata/meter_singlevalue.json index c727d23b15..941fcb2fa9 100644 --- a/plugins/input/skywalkingv3/testdata/meter_singlevalue.json +++ b/plugins/input/skywalkingv3/testdata/meter_singlevalue.json @@ -1,24 +1,24 @@ [ - { - "Time": 123456, - "Contents": [ - { - "Key": "__name__", - "Value": "i_am_singleValue_metric" - }, - { - "Key": "__time_nano__", - "Value": "123456789" - }, - { - "Key": "__labels__", - "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" - }, - { - "Key": "__value__", - "Value": "123" - } - ], - "Time_ns": 789000000 - } + { + "Time": 1234567890, + "Contents": [ + { + "Key": "__name__", + "Value": "i_am_singleValue_metric" + }, + { + "Key": "__time_nano__", + "Value": "1234567890000000000" + }, + { + "Key": "__labels__", + "Value": "Hahaha#$#test|a#$#aaa|ip#$#1.2.3.4|service#$#service_111|serviceInstance#$#instance_222" + }, + { + "Key": "__value__", + "Value": "123" + } + ], + "Time_ns": 0 + } ] \ No newline at end of file diff --git a/plugins/input/systemv2/input_system_linux.go b/plugins/input/systemv2/input_system_linux.go index acf64094e8..78da22e1de 100644 --- a/plugins/input/systemv2/input_system_linux.go +++ b/plugins/input/systemv2/input_system_linux.go @@ -111,7 +111,7 @@ func (r *InputSystem) Init(context pipeline.Context) (int, error) { func (r *InputSystem) CollectTCPStats(collector pipeline.Collector, stat *net.ProtoCountersStat) { if !r.TCP { - r.addMetric(collector, "protocol_tcp_established", r.commonLabelsStr, float64(stat.Stats["CurrEstab"])) + r.addMetric(collector, "protocol_tcp_established", &r.commonLabels, float64(stat.Stats["CurrEstab"])) return } @@ -138,7 +138,7 @@ func (r *InputSystem) CollectTCPStats(collector pipeline.Collector, stat *net.Pr tcpStats[tcpRxQueuedBytes] += line.RxQueue } for s, num := range tcpStats { - r.addMetric(collector, "protocol_tcp_"+s.String(), r.commonLabelsStr, float64(num)) + r.addMetric(collector, "protocol_tcp_"+s.String(), &r.commonLabels, float64(num)) } } @@ -164,8 +164,8 @@ func (r *InputSystem) CollectOpenFD(collector pipeline.Collector) { } allocated, _ := strconv.ParseFloat(string(parts[0]), 64) maximum, _ := strconv.ParseFloat(string(parts[2]), 64) - r.addMetric(collector, "fd_allocated", r.commonLabelsStr, allocated) - r.addMetric(collector, "fd_max", r.commonLabelsStr, maximum) + r.addMetric(collector, "fd_allocated", &r.commonLabels, allocated) + r.addMetric(collector, "fd_max", &r.commonLabels, maximum) } // CollectDiskUsage use `/proc/1/mounts` to find the device rather than `proc/self/mounts` @@ -198,12 +198,10 @@ func (r *InputSystem) CollectDiskUsage(collector pipeline.Collector) { logger.Debug(r.context.GetRuntimeContext(), "ignore disk path", text) continue } - newLabels := r.commonLabels.Clone() - newLabels.Append("path", parts[1]) - newLabels.Append("device", parts[0]) - newLabels.Append("fs_type", parts[2]) - newLabels.Sort() - labels := newLabels.String() + labels := r.commonLabels.Clone() + labels.Append("path", parts[1]) + labels.Append("device", parts[0]) + labels.Append("fs_type", parts[2]) // wrapper with mountedpath because of using unix statfs rather than proc file system. usage, err := disk.Usage(helper.GetMountedFilePath(parts[1])) if err == nil { diff --git a/plugins/input/systemv2/input_system_others.go b/plugins/input/systemv2/input_system_others.go index c94e947712..269c88634e 100644 --- a/plugins/input/systemv2/input_system_others.go +++ b/plugins/input/systemv2/input_system_others.go @@ -30,7 +30,7 @@ func (r *InputSystem) Init(context pipeline.Context) (int, error) { } func (r *InputSystem) CollectTCPStats(collector pipeline.Collector, stat *net.ProtoCountersStat) { - r.addMetric(collector, "protocol_tcp_established", r.commonLabelsStr, float64(stat.Stats["CurrEstab"])) + r.addMetric(collector, "protocol_tcp_established", &r.commonLabels, float64(stat.Stats["CurrEstab"])) } func (r *InputSystem) CollectOpenFD(collector pipeline.Collector) { @@ -48,12 +48,10 @@ func (r *InputSystem) CollectDiskUsage(collector pipeline.Collector) { logger.Debug(r.context.GetRuntimeContext(), "ignore disk path", part.Mountpoint) continue } - newLabels := r.commonLabels.Clone() - newLabels.Append("path", part.Mountpoint) - newLabels.Append("device", part.Device) - newLabels.Append("fs_type", part.Fstype) - newLabels.Sort() - labels := newLabels.String() + labels := r.commonLabels.Clone() + labels.Append("path", part.Mountpoint) + labels.Append("device", part.Device) + labels.Append("fs_type", part.Fstype) usage, err := disk.Usage(part.Mountpoint) if err == nil { diff --git a/plugins/input/systemv2/input_system_v2.go b/plugins/input/systemv2/input_system_v2.go index 2ef39751e0..bbc048ef56 100644 --- a/plugins/input/systemv2/input_system_v2.go +++ b/plugins/input/systemv2/input_system_v2.go @@ -67,8 +67,7 @@ type InputSystem struct { lastDiskStat disk.IOCountersStat lastDiskStatAll map[string]disk.IOCountersStat lastDiskTime time.Time - commonLabels helper.KeyValues - commonLabelsStr string + commonLabels helper.MetricLabels collectTime time.Time context pipeline.Context excludeDiskFsTypeRegex *regexp.Regexp @@ -103,17 +102,11 @@ func (r *InputSystem) CommonInit(context pipeline.Context) (int, error) { for key, val := range r.Labels { r.commonLabels.Append(key, val) } - r.commonLabels.Sort() - r.commonLabelsStr = r.commonLabels.String() return 0, nil } -func (r *InputSystem) addMetric(collector pipeline.Collector, - name string, - labels string, - value float64) { - keys, vals := helper.MakeMetric(name, labels, r.collectTime.UnixNano(), value) - collector.AddDataArray(nil, keys, vals, r.collectTime) +func (r *InputSystem) addMetric(collector pipeline.Collector, name string, labels *helper.MetricLabels, value float64) { + collector.AddRawLog(helper.NewMetricLog(name, r.collectTime.UnixNano(), value, labels)) } func (r *InputSystem) CollectCore(collector pipeline.Collector) { @@ -126,11 +119,11 @@ func (r *InputSystem) CollectCore(collector pipeline.Collector) { // load stat loadStat, err := load.Avg() if err == nil { - r.addMetric(collector, "system_load1", r.commonLabelsStr, loadStat.Load1) - r.addMetric(collector, "system_load5", r.commonLabelsStr, loadStat.Load5) - r.addMetric(collector, "system_load15", r.commonLabelsStr, loadStat.Load15) + r.addMetric(collector, "system_load1", &r.commonLabels, loadStat.Load1) + r.addMetric(collector, "system_load5", &r.commonLabels, loadStat.Load5) + r.addMetric(collector, "system_load15", &r.commonLabels, loadStat.Load15) } - r.addMetric(collector, "system_boot_time", r.commonLabelsStr, float64(r.lastInfo.BootTime)) + r.addMetric(collector, "system_boot_time", &r.commonLabels, float64(r.lastInfo.BootTime)) } func (r *InputSystem) CollectCPU(collector pipeline.Collector) { @@ -141,7 +134,7 @@ func (r *InputSystem) CollectCPU(collector pipeline.Collector) { for _, c := range cpuInfo { ncpus += c.Cores } - r.addMetric(collector, "cpu_count", r.commonLabelsStr, float64(ncpus)) + r.addMetric(collector, "cpu_count", &r.commonLabels, float64(ncpus)) if err == nil && len(cpuStat) > 0 { cpuBusy := cpuStat[0].GuestNice + cpuStat[0].Guest + cpuStat[0].Nice + cpuStat[0].Softirq + cpuStat[0].Irq + cpuStat[0].User + cpuStat[0].System @@ -164,16 +157,16 @@ func (r *InputSystem) CollectCPU(collector pipeline.Collector) { deltaTotal := cpuTotal - r.lastCPUTotal if r.CPUPercent && !r.lastCPUTime.IsZero() && deltaTotal > 0 { - r.addMetric(collector, "cpu_util", r.commonLabelsStr, 100*(cpuBusy-r.lastCPUBusy)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_wait_util", r.commonLabelsStr, 100*(cpuStat[0].Iowait-r.lastCPUStat.Iowait)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_sys_util", r.commonLabelsStr, 100*(cpuStat[0].System-r.lastCPUStat.System)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_user_util", r.commonLabelsStr, 100*(cpuStat[0].User-r.lastCPUStat.User)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_irq_util", r.commonLabelsStr, 100*(cpuStat[0].Irq-r.lastCPUStat.Irq)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_softirq_util", r.commonLabelsStr, 100*(cpuStat[0].Softirq-r.lastCPUStat.Softirq)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_nice_util", r.commonLabelsStr, 100*(cpuStat[0].Nice-r.lastCPUStat.Nice)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_steal_util", r.commonLabelsStr, 100*(cpuStat[0].Steal-r.lastCPUStat.Steal)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_guest_util", r.commonLabelsStr, 100*(cpuStat[0].Guest-r.lastCPUStat.Guest)/deltaTotal*cpuShareFactor) - r.addMetric(collector, "cpu_guestnice_util", r.commonLabelsStr, 100*(cpuStat[0].GuestNice-r.lastCPUStat.GuestNice)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_util", &r.commonLabels, 100*(cpuBusy-r.lastCPUBusy)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_wait_util", &r.commonLabels, 100*(cpuStat[0].Iowait-r.lastCPUStat.Iowait)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_sys_util", &r.commonLabels, 100*(cpuStat[0].System-r.lastCPUStat.System)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_user_util", &r.commonLabels, 100*(cpuStat[0].User-r.lastCPUStat.User)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_irq_util", &r.commonLabels, 100*(cpuStat[0].Irq-r.lastCPUStat.Irq)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_softirq_util", &r.commonLabels, 100*(cpuStat[0].Softirq-r.lastCPUStat.Softirq)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_nice_util", &r.commonLabels, 100*(cpuStat[0].Nice-r.lastCPUStat.Nice)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_steal_util", &r.commonLabels, 100*(cpuStat[0].Steal-r.lastCPUStat.Steal)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_guest_util", &r.commonLabels, 100*(cpuStat[0].Guest-r.lastCPUStat.Guest)/deltaTotal*cpuShareFactor) + r.addMetric(collector, "cpu_guestnice_util", &r.commonLabels, 100*(cpuStat[0].GuestNice-r.lastCPUStat.GuestNice)/deltaTotal*cpuShareFactor) } r.lastCPUTime = time.Now() @@ -187,25 +180,23 @@ func (r *InputSystem) CollectMem(collector pipeline.Collector) { // mem stat memStat, err := mem.VirtualMemory() if err == nil { - r.addMetric(collector, "mem_util", r.commonLabelsStr, memStat.UsedPercent) - r.addMetric(collector, "mem_cache", r.commonLabelsStr, float64(memStat.Cached)) - r.addMetric(collector, "mem_free", r.commonLabelsStr, float64(memStat.Free)) - r.addMetric(collector, "mem_available", r.commonLabelsStr, float64(memStat.Available)) - r.addMetric(collector, "mem_used", r.commonLabelsStr, float64(memStat.Used)) - r.addMetric(collector, "mem_total", r.commonLabelsStr, float64(memStat.Total)) + r.addMetric(collector, "mem_util", &r.commonLabels, memStat.UsedPercent) + r.addMetric(collector, "mem_cache", &r.commonLabels, float64(memStat.Cached)) + r.addMetric(collector, "mem_free", &r.commonLabels, float64(memStat.Free)) + r.addMetric(collector, "mem_available", &r.commonLabels, float64(memStat.Available)) + r.addMetric(collector, "mem_used", &r.commonLabels, float64(memStat.Used)) + r.addMetric(collector, "mem_total", &r.commonLabels, float64(memStat.Total)) } swapStat, err := mem.SwapMemory() if err == nil { - r.addMetric(collector, "mem_swap_util", r.commonLabelsStr, swapStat.UsedPercent) + r.addMetric(collector, "mem_swap_util", &r.commonLabels, swapStat.UsedPercent) } } func (r *InputSystem) collectOneDisk(collector pipeline.Collector, name string, timeDeltaSec float64, last, now *disk.IOCountersStat) { - newLabels := r.commonLabels.Clone() - newLabels.Append("disk", name) - newLabels.Sort() - labels := newLabels.String() + labels := r.commonLabels.Clone() + labels.Append("disk", name) r.addMetric(collector, "disk_rbps", labels, float64(now.ReadBytes-last.ReadBytes)/timeDeltaSec) r.addMetric(collector, "disk_wbps", labels, float64(now.WriteBytes-last.WriteBytes)/timeDeltaSec) r.addMetric(collector, "disk_riops", labels, float64(now.ReadCount-last.ReadCount)/timeDeltaSec) @@ -272,10 +263,8 @@ func (r *InputSystem) CollectDisk(collector pipeline.Collector) { } func (r *InputSystem) collectOneNet(collector pipeline.Collector, name string, timeDeltaSec float64, last, now *net.IOCountersStat) { - newLabels := r.commonLabels.Clone() - newLabels.Append("interface", name) - newLabels.Sort() - labels := newLabels.String() + labels := r.commonLabels.Clone() + labels.Append("interface", name) r.addMetric(collector, "net_in", labels, float64(now.BytesRecv-last.BytesRecv)/timeDeltaSec) r.addMetric(collector, "net_out", labels, float64(now.BytesSent-last.BytesSent)/timeDeltaSec) r.addMetric(collector, "net_in_pkt", labels, float64(now.PacketsRecv-last.PacketsRecv)/timeDeltaSec) @@ -367,13 +356,13 @@ func (r *InputSystem) CollectProtocol(collector pipeline.Collector) { deltaTotalOutSegs := protoCounterStats[i].Stats[totalOutSegField] - r.lastProtoAll[i].Stats[totalOutSegField] deltaTotalInSegs := protoCounterStats[i].Stats[totalInSegField] - r.lastProtoAll[i].Stats[totalInSegField] - r.addMetric(collector, "protocol_tcp_outsegs", r.commonLabelsStr, float64(deltaTotalOutSegs)) - r.addMetric(collector, "protocol_tcp_insegs", r.commonLabelsStr, float64(deltaTotalInSegs)) - r.addMetric(collector, "protocol_tcp_retran_segs", r.commonLabelsStr, float64(deltaRetransSegs)) + r.addMetric(collector, "protocol_tcp_outsegs", &r.commonLabels, float64(deltaTotalOutSegs)) + r.addMetric(collector, "protocol_tcp_insegs", &r.commonLabels, float64(deltaTotalInSegs)) + r.addMetric(collector, "protocol_tcp_retran_segs", &r.commonLabels, float64(deltaRetransSegs)) if deltaTotalOutSegs <= 0 { - r.addMetric(collector, "protocol_tcp_retran_util", r.commonLabelsStr, 0.) + r.addMetric(collector, "protocol_tcp_retran_util", &r.commonLabels, 0.) } else { - r.addMetric(collector, "protocol_tcp_retran_util", r.commonLabelsStr, 100*float64(deltaRetransSegs)/float64(deltaTotalOutSegs)) + r.addMetric(collector, "protocol_tcp_retran_util", &r.commonLabels, 100*float64(deltaRetransSegs)/float64(deltaTotalOutSegs)) } } diff --git a/plugins/processor/appender/processor_appender.go b/plugins/processor/appender/processor_appender.go index 31576ff6b8..a6f956e6e8 100644 --- a/plugins/processor/appender/processor_appender.go +++ b/plugins/processor/appender/processor_appender.go @@ -92,7 +92,7 @@ func (p *ProcessorAppender) processField(c *protocol.Log_Content) { c.Value += r if p.SortLabels { labels := strings.Split(c.Value, "|") - var keyValue helper.KeyValues + var keyValue helper.MetricLabels for _, labelStr := range labels { kv := strings.SplitN(labelStr, "#$#", 2) if len(kv) == 2 { @@ -100,7 +100,6 @@ func (p *ProcessorAppender) processField(c *protocol.Log_Content) { } } if keyValue.Len() > 0 { - keyValue.Sort() c.Value = keyValue.String() } }