Skip to content

Commit

Permalink
Unify metrics format (#1060)
Browse files Browse the repository at this point in the history
* temp

* polish metrics log in sls & add pushgateway decoder

* polish metrics log in sls & add pushgateway decoder

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* fix lint

* polish codes

* polish codes

* polish codes

* polish codes

* polish codes

* fix unittest
  • Loading branch information
EvanLjp authored Sep 25, 2023
1 parent 2c403c7 commit 286e281
Show file tree
Hide file tree
Showing 38 changed files with 1,483 additions and 1,719 deletions.
3 changes: 2 additions & 1 deletion core/common/LogtailCommonFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ DEFINE_FLAG_INT32(fuse_file_max_count, "max file total count from fuse root dir"
DEFINE_FLAG_BOOL(enable_root_path_collection, "", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
"if enable containerd upper dir detect when locating rootfs",
false);
false);
DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
1 change: 1 addition & 0 deletions core/common/LogtailCommonFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ DECLARE_FLAG_STRING(fuse_root_dir);
DECLARE_FLAG_BOOL(enable_root_path_collection);
DECLARE_FLAG_INT32(logtail_alarm_interval);
DECLARE_FLAG_BOOL(enable_containerd_upper_dir_detect);
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
DECLARE_FLAG_BOOL(enable_new_pipeline);
1 change: 1 addition & 0 deletions core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ static void overwrite_community_edition_flags() {
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
BOOL_FLAG(enable_sls_metrics_format) = false;
}

// Main routine of worker process.
Expand Down
1 change: 1 addition & 0 deletions core/logtail_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ static void overwrite_community_edition_flags() {
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
BOOL_FLAG(enable_sls_metrics_format) = false;
}

void do_worker_process() {
Expand Down
1 change: 1 addition & 0 deletions core/plugin/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ LogtailPlugin::LogtailPlugin() {
mPluginCfg["HostIP"] = LogFileProfiler::mIpAddr;
mPluginCfg["Hostname"] = LogFileProfiler::mHostname;
mPluginCfg["EnableContainerdUpperDirDetect"] = BOOL_FLAG(enable_containerd_upper_dir_detect);
mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format);
}

LogtailPlugin::~LogtailPlugin() {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type GlobalConfig struct {

EnableTimestampNanosecond bool
EnableContainerdUpperDirDetect bool
EnableSlsMetricsFormat bool
}

// LogtailGlobalConfig is the singleton instance of GlobalConfig.
Expand Down
260 changes: 223 additions & 37 deletions pkg/helper/log_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@ package helper

import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/alibaba/ilogtail/pkg/config"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/util"
)

const (
// StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0.
// This value is chosen with many leading 0s, so we have scope to store more
// complicated values in the future. It is 2 rather than 1 to make
// it easier to distinguish from the NormalNaN by a human when debugging.
StaleNaN uint64 = 0x7ff0000000000002
StaleNan = "__STALE_NAN__"
SlsMetricstoreInvalidReplaceCharacter = '_'
)

func CreateLog(t time.Time, configTag map[string]string, logTags map[string]string, fields map[string]string) (*protocol.Log, error) {
Expand Down Expand Up @@ -96,25 +109,112 @@ type MetricLabel struct {
}

// Labels for metric labels
type MetricLabels []MetricLabel
type MetricLabels struct {
keyValues []*MetricLabel
sorted bool
formatStr string
}

func (kv *MetricLabels) clearCache() {
kv.sorted = false
kv.formatStr = ""
}

func (kv *MetricLabels) Len() int {
return len(kv.keyValues)
}

func (l MetricLabels) Len() int {
return len(l)
func (kv *MetricLabels) Swap(i int, j int) {
kv.keyValues[i], kv.keyValues[j] = kv.keyValues[j], kv.keyValues[i]
}

func (l MetricLabels) Swap(i int, j int) {
l[i], l[j] = l[j], l[i]
func (kv *MetricLabels) Less(i int, j int) bool {
return kv.keyValues[i].Name < kv.keyValues[j].Name
}

func (l MetricLabels) Less(i int, j int) bool {
return l[i].Name < l[j].Name
func (kv *MetricLabels) Replace(key, value string) {
findIndex := sort.Search(len(kv.keyValues), func(index int) bool {
return kv.keyValues[index].Name >= key
})
if findIndex < len(kv.keyValues) && kv.keyValues[findIndex].Name == key {
kv.keyValues[findIndex].Value = value
} else {
kv.Append(key, value)
}
kv.clearCache()
}

func (kv *MetricLabels) Clone() *MetricLabels {
if kv == nil {
return &MetricLabels{}
}
var newKeyValues MetricLabels
kv.CloneInto(&newKeyValues)
return &newKeyValues
}

func MinInt(a, b int) int {
if a < b {
return a
func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels {
if kv == nil {
return &MetricLabels{}
}
if dst == nil {
return kv.Clone()
}
if len(kv.keyValues) < cap(dst.keyValues) {
dst.keyValues = dst.keyValues[:len(kv.keyValues)]
} else {
dst.keyValues = make([]*MetricLabel, len(kv.keyValues))
}
dst.sorted = kv.sorted
dst.formatStr = kv.formatStr
for i, value := range kv.keyValues {
cp := *value
dst.keyValues[i] = &cp
}
return b
return dst
}

// AppendMap ...
func (kv *MetricLabels) AppendMap(mapVal map[string]string) {
for key, value := range mapVal {
kv.Append(key, value)
}
kv.clearCache()
}

// Append ...
func (kv *MetricLabels) Append(key, value string) {
kv.keyValues = append(kv.keyValues, &MetricLabel{
formatLabelKey(key),
formatLabelValue(value),
})
kv.clearCache()
}

func (kv *MetricLabels) SubSlice(begin, end int) {
kv.keyValues = kv.keyValues[begin:end]
kv.clearCache()
}

func (kv *MetricLabels) String() string {
if kv == nil {
return ""
}
if !kv.sorted || kv.formatStr == "" {
sort.Sort(kv)
var builder strings.Builder
for index, label := range kv.keyValues {
builder.WriteString(label.Name)
builder.WriteString("#$#")
builder.WriteString(label.Value)
if index != len(kv.keyValues)-1 {
builder.WriteByte('|')
}
}
kv.formatStr = builder.String()
kv.sorted = true
}
return kv.formatStr
}

// DefBucket ...
Expand All @@ -131,42 +231,128 @@ type HistogramData struct {
}

// ToMetricLogs ..
func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels MetricLabels) []*protocol.Log {
func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels *MetricLabels) []*protocol.Log {
logs := make([]*protocol.Log, 0, len(hd.Buckets)+2)
sort.Sort(labels)
logs = append(logs, NewMetricLog(name+"_count", timeMs, float64(hd.Count), labels))
logs = append(logs, NewMetricLog(name+"_sum", timeMs, hd.Sum, labels))
for _, v := range hd.Buckets {
newLabels := make(MetricLabels, len(labels), len(labels)+1)
copy(newLabels, labels)
newLabels = append(newLabels, MetricLabel{Name: "le", Value: strconv.FormatFloat(v.Le, 'g', -1, 64)})
sort.Sort(newLabels)
logs = append(logs, NewMetricLog(name+"_bucket", timeMs, strconv.FormatInt(v.Count, 10), newLabels))
}
logs = append(logs, NewMetricLog(name+"_count", timeMs, strconv.FormatInt(hd.Count, 10), labels))
logs = append(logs, NewMetricLog(name+"_sum", timeMs, strconv.FormatFloat(hd.Sum, 'g', -1, 64), labels))
labels.Replace("le", strconv.FormatFloat(v.Le, 'g', -1, 64))
logs = append(logs, NewMetricLog(name+"_bucket", timeMs, float64(v.Count), labels))
}

return logs
}

// NewMetricLog caller must sort labels
func NewMetricLog(name string, timeMs int64, value string, labels []MetricLabel) *protocol.Log {
strTime := strconv.FormatInt(timeMs, 10)
// NewMetricLog create a metric log, time support unix milliseconds and unix nanoseconds.
func NewMetricLog(name string, t int64, value float64, labels *MetricLabels) *protocol.Log {
var valStr string
if math.Float64bits(value) == StaleNaN {
valStr = StaleNan
} else {
valStr = strconv.FormatFloat(value, 'g', -1, 64)
}
return NewMetricLogStringVal(name, t, valStr, labels)
}

// NewMetricLogStringVal create a metric log with val string, time support unix milliseconds and unix nanoseconds.
func NewMetricLogStringVal(name string, t int64, value string, labels *MetricLabels) *protocol.Log {
strTime := strconv.FormatInt(t, 10)
metric := &protocol.Log{}
protocol.SetLogTimeWithNano(metric, uint32(timeMs/1000), uint32((timeMs*1e6)%1e9))
metric.Contents = []*protocol.Log_Content{}
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: name})
switch len(strTime) {
case 13:
protocol.SetLogTimeWithNano(metric, uint32(t/1000), uint32((t*1e6)%1e9))
strTime += "000000"
case 19:
protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9))
default:
t = int64(float64(t) * math.Pow10(19-len(strTime)))
strTime = strconv.FormatInt(t, 10)
protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9))
}
metric.Contents = make([]*protocol.Log_Content, 0, 4)
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: formatNewMetricName(name)})
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__time_nano__", Value: strTime})
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: labels.String()})
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value})
return metric
}

func formatLabelKey(key string) string {
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
return key
}
var newKey []byte
for i := 0; i < len(key); i++ {
b := key[i]
if (b >= 'a' && b <= 'z') ||
(b >= 'A' && b <= 'Z') ||
(b >= '0' && b <= '9') ||
b == '_' {
continue
} else {
if newKey == nil {
newKey = []byte(key)
}
newKey[i] = SlsMetricstoreInvalidReplaceCharacter
}
}
if newKey == nil {
return key
}
return util.ZeroCopyBytesToString(newKey)
}

builder := strings.Builder{}
for index, l := range labels {
if index != 0 {
builder.WriteString("|")
func formatLabelValue(value string) string {
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
return value
}
var newValue []byte
for i := 0; i < len(value); i++ {
b := value[i]
if b != '|' {
continue
} else {
if newValue == nil {
newValue = []byte(value)
}
newValue[i] = SlsMetricstoreInvalidReplaceCharacter
}
builder.WriteString(l.Name)
builder.WriteString("#$#")
builder.WriteString(l.Value)
}
if newValue == nil {
return value
}
return util.ZeroCopyBytesToString(newValue)
}

func formatNewMetricName(name string) string {
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
return name
}
newName := []byte(name)
for i, b := range newName {
if (b >= 'a' && b <= 'z') ||
(b >= 'A' && b <= 'Z') ||
(b >= '0' && b <= '9') ||
b == '_' ||
b == ':' {
continue
} else {
newName[i] = SlsMetricstoreInvalidReplaceCharacter
}
}
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: builder.String()})
return util.ZeroCopyBytesToString(newName)
}

metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value})
return metric
// ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")
func ReplaceInvalidChars(in *string) {
for charIndex, char := range *in {
charInt := int(char)
if !((charInt >= 97 && charInt <= 122) || // a-z
(charInt >= 65 && charInt <= 90) || // A-Z
(charInt >= 48 && charInt <= 57) || // 0-9
charInt == 95 || charInt == ':') { // _

*in = (*in)[:charIndex] + "_" + (*in)[charIndex+1:]
}
}
}
36 changes: 36 additions & 0 deletions pkg/helper/log_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package helper

import (
"github.com/stretchr/testify/require"

"testing"
)

func TestMetricLabels_Append(t *testing.T) {
var ml MetricLabels
ml.Append("key2", "val")
ml.Append("key", "val")
log := NewMetricLog("name", 1691646109945, 1, &ml)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val|key2#$#val" > Contents:<Key:"__value__" Value:"1" > `, log.String())

ml.Replace("key", "val2")

log = NewMetricLog("name", 1691646109945, 1, &ml)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val" > Contents:<Key:"__value__" Value:"1" > `, log.String())

ml.Replace("key3", "val3")
log = NewMetricLog("name", 1691646109945, 1, &ml)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val3" > Contents:<Key:"__value__" Value:"1" > `, log.String())

cloneLabel := ml.Clone()
cloneLabel.Replace("key3", "val4")
log = NewMetricLog("name", 1691646109945, 1, cloneLabel)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val4" > Contents:<Key:"__value__" Value:"1" > `, log.String())

log = NewMetricLog("name", 1691646109945, 1, &ml)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val3" > Contents:<Key:"__value__" Value:"1" > `, log.String())

log = NewMetricLog("name", 1691646109945, 1, nil)
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"" > Contents:<Key:"__value__" Value:"1" > `, log.String())

}
Loading

0 comments on commit 286e281

Please sign in to comment.