Skip to content

Commit

Permalink
[CONTROLLER/PROMETHEUS] optimizes memory
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya committed Sep 25, 2023
1 parent 4c16db8 commit 25296f1
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 121 deletions.
6 changes: 6 additions & 0 deletions message/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ message PrometheusMetricLabelRequest {
repeated PrometheusLabelRequest labels = 2;
}

message PrometheusMetricLabel {
optional uint32 metric_name_id = 1;
repeated uint32 label_ids = 2;
}

message PrometheusMetricAPPLabelLayout {
required string metric_name = 1;
required string app_label_name = 2;
Expand Down Expand Up @@ -282,6 +287,7 @@ message SyncPrometheusResponse {
repeated PrometheusLabel labels = 6;
repeated PrometheusMetricTarget metric_targets = 7;
repeated PrometheusTarget targets = 8;
repeated PrometheusMetricLabel metric_labels = 9;
}

service PrometheusDebug {
Expand Down
26 changes: 13 additions & 13 deletions server/controller/prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func GetSingleton() *Cache {
MetricAndAPPLabelLayout: new(metricAndAPPLabelLayout),
Target: t,
Label: l,
MetricLabel: newMetricLabel(l),
MetricLabel: newMetricLabel(mn, l),
MetricTarget: newMetricTarget(mn, t),
}
})
Expand Down Expand Up @@ -223,9 +223,9 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte {
temp["id_to_key"].(map[int]string)[key.(int)] = marshal(value)
return true
})
for item := range tempCache.Label.keys.Iterator().C {
temp["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
// for item := range tempCache.Label.keys.Iterator().C { // TODO
// temp["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
// }
if len(temp["keys"].(map[string]interface{})) > 0 ||
len(temp["id_to_key"].(map[int]string)) > 0 {
content["label"] = temp
Expand All @@ -240,19 +240,19 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte {
"metric_name_to_label_ids": make(map[string][]int),
"metric_label_detail_keys": make(map[string]interface{}),
}
for item := range tempCache.MetricLabel.labelCache.keys.Iterator().C {
temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
// for item := range tempCache.MetricLabel.labelCache.keys.Iterator().C { // TODO
// temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
// }
tempCache.MetricLabel.labelCache.idToKey.Range(func(key, value any) bool {
temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)[key.(int)] = marshal(value)
return true
})
for k, v := range tempCache.MetricLabel.metricNameToLabelIDs {
temp["metric_name_to_label_ids"].(map[string][]int)[k] = v
}
for item := range tempCache.MetricLabel.metricLabelDetailKeys.Iterator().C {
temp["metric_label_detail_keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
// for k, v := range tempCache.MetricLabel.metricNameToLabelIDs { // TODO
// temp["metric_name_to_label_ids"].(map[string][]int)[k] = v
// }
// for item := range tempCache.MetricLabel.metricLabelDetailKeys.Iterator().C { // TODO
// temp["metric_label_detail_keys"].(map[string]interface{})[marshal(item)] = struct{}{}
// }
if len(temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})) > 0 ||
len(temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)) > 0 ||
len(temp["metric_name_to_label_ids"].(map[string][]int)) > 0 ||
Expand Down
30 changes: 20 additions & 10 deletions server/controller/prometheus/cache/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package cache
import (
"sync"

mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)
Expand All @@ -38,18 +36,26 @@ func NewLabelKey(name, value string) LabelKey {
}

type label struct {
keys mapset.Set[LabelKey]
// keys mapset.Set[LabelKey]
idToKey sync.Map
keyToID sync.Map
}

func newLabel() *label {
return &label{
keys: mapset.NewSet[LabelKey](),
// keys: mapset.NewSet[LabelKey](),
}
}

func (l *label) IfKeyExists(key LabelKey) bool {
return l.keys.Contains(key)
// func (l *label) IfKeyExists(key LabelKey) bool {
// return l.keys.Contains(key)
// }

func (l *label) GetIDByKey(key LabelKey) (int, bool) {
if item, ok := l.keyToID.Load(key); ok {
return item.(int), true
}
return 0, false
}

func (l *label) GetKeyByID(id int) (LabelKey, bool) {
Expand All @@ -61,8 +67,10 @@ func (l *label) GetKeyByID(id int) (LabelKey, bool) {

func (l *label) Add(batch []*controller.PrometheusLabel) {
for _, item := range batch {
l.keys.Add(NewLabelKey(item.GetName(), item.GetValue()))
l.idToKey.Store(int(item.GetId()), NewLabelKey(item.GetName(), item.GetValue()))
k := NewLabelKey(item.GetName(), item.GetValue())
l.keyToID.Store(k, item.GetId())
l.idToKey.Store(int(item.GetId()), k)
// l.keys.Add(NewLabelKey(item.GetName(), item.GetValue()))
}
}

Expand All @@ -72,8 +80,10 @@ func (l *label) refresh(args ...interface{}) error {
return err
}
for _, item := range ls {
l.keys.Add(NewLabelKey(item.Name, item.Value))
l.idToKey.Store(item.ID, NewLabelKey(item.Name, item.Value))
// l.keys.Add(NewLabelKey(item.Name, item.Value))
k := NewLabelKey(item.Name, item.Value)
l.keyToID.Store(k, item.ID)
l.idToKey.Store(item.ID, k)
}
return nil
}
Expand Down
127 changes: 84 additions & 43 deletions server/controller/prometheus/cache/metric_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,85 +17,126 @@
package cache

import (
"github.com/cornelk/hashmap"
mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)

type MetricLabelDetailKey struct {
MetricName string
LabelName string
LabelValue string
}
// type MetricLabelKey struct {
// MetricNameID int
// LabelID int
// }

func NewMetricLabelDetailKey(metricName, labelName, labelValue string) MetricLabelDetailKey {
return MetricLabelDetailKey{
MetricName: metricName,
LabelName: labelName,
LabelValue: labelValue,
}
}
// func NewMetricLabelKey(metricNameID, labelID int) MetricLabelKey {
// return MetricLabelKey{
// MetricNameID: metricNameID,
// LabelID: labelID,
// }
// }

// type MetricLabelDetailKey struct {
// MetricName string
// LabelName string
// LabelValue string
// }

// func NewMetricLabelDetailKey(metricName, labelName, labelValue string) MetricLabelDetailKey {
// return MetricLabelDetailKey{
// MetricName: metricName,
// LabelName: labelName,
// LabelValue: labelValue,
// }
// }

type metricLabel struct {
labelCache *label
metricLabelDetailKeys mapset.Set[MetricLabelDetailKey] // for metric_label check
metricNameToLabelIDs map[string][]int // only for fully assembled
metricNameCache *metricName
labelCache *label
// metricLabelDetailKeys mapset.Set[MetricLabelDetailKey] // for metric_label check
// metricLabelKeys mapset.Set[MetricLabelKey] // for metric_label check
// metricNameToLabelIDs map[string][]int // only for fully assembled
metricNameIDToLabelIDs *hashmap.Map[int, mapset.Set[int]]
}

func newMetricLabel(l *label) *metricLabel {
func newMetricLabel(mn *metricName, l *label) *metricLabel {
return &metricLabel{
labelCache: l,
metricLabelDetailKeys: mapset.NewSet[MetricLabelDetailKey](),
metricNameToLabelIDs: make(map[string][]int),
metricNameCache: mn,
labelCache: l,
// metricLabelDetailKeys: mapset.NewSet[MetricLabelDetailKey](),
// metricLabelKeys: mapset.NewSet[MetricLabelKey](),
// metricNameToLabelIDs: make(map[string][]int),
metricNameIDToLabelIDs: hashmap.New[int, mapset.Set[int]](),
}
}

func (ml *metricLabel) IfKeyExists(k MetricLabelDetailKey) bool {
return ml.metricLabelDetailKeys.Contains(k)
// func (ml *metricLabel) IfKeyExists(k MetricLabelDetailKey) bool {
// return ml.metricLabelDetailKeys.Contains(k)
// }

func (ml *metricLabel) IfLinked(metricID, labelID int) bool {
if labelIDs, ok := ml.metricNameIDToLabelIDs.Get(metricID); ok {
return labelIDs.(mapset.Set[int]).Contains(labelID)
}
return false
}

func (ml *metricLabel) GetLabelsByMetricName(metricName string) []LabelKey {
var ret []LabelKey
if labelIDs, ok := ml.metricNameToLabelIDs[metricName]; ok {
for _, labelID := range labelIDs {
if labelKey, ok := ml.labelCache.GetKeyByID(labelID); ok {
ret = append(ret, labelKey)
}
}
func (ml *metricLabel) GetLabelsByMetricName(metricName string) []int {
mni, ok := ml.metricNameCache.GetIDByName(metricName)
if !ok {
return nil
}
return ret
labelIDs, ok := ml.metricNameIDToLabelIDs.Get(mni)
return labelIDs.ToSlice()
}

func (mi *metricLabel) GetMetricNameIDToLabelIDs() *hashmap.Map[int, mapset.Set[int]] {
return mi.metricNameIDToLabelIDs
}

func (ml *metricLabel) Add(batch []MetricLabelDetailKey) {
func (ml *metricLabel) Add(batch []*controller.PrometheusMetricLabel) {
for _, item := range batch {
ml.metricLabelDetailKeys.Add(item)
for _, li := range item.GetLabelIds() {
// ml.metricLabelKeys.Add(NewMetricLabelKey(int(item.GetMetricNameId()), int(li)))
ml.metricNameIDToLabelIDs.GetOrInsert(int(item.GetMetricNameId()), mapset.NewSet(int(li)))
}
}
}

func (ml *metricLabel) GetMetricLabelDetailKeys() mapset.Set[MetricLabelDetailKey] {
return ml.metricLabelDetailKeys
}
// func (ml *metricLabel) Add(batch []MetricLabelDetailKey) {
// for _, item := range batch {
// ml.metricLabelDetailKeys.Add(item)
// }
// }

// func (ml *metricLabel) GetMetricLabelDetailKeys() mapset.Set[MetricLabelDetailKey] {
// return ml.metricLabelDetailKeys
// }

func (ml *metricLabel) refresh(args ...interface{}) error {
metricLabels, err := ml.load()
if err != nil {
return err
}
metricNameToLabelIDs := make(map[string][]int)
// metricNameToLabelIDs := make(map[string][]int)
for _, item := range metricLabels {
if lk, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
ml.metricLabelDetailKeys.Add(NewMetricLabelDetailKey(item.MetricName, lk.Name, lk.Value))
}
if _, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
metricNameToLabelIDs[item.MetricName] = append(metricNameToLabelIDs[item.MetricName], item.LabelID)
// if lk, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
// ml.metricLabelDetailKeys.Add(NewMetricLabelDetailKey(item.MetricName, lk.Name, lk.Value))
// }
if mni, ok := ml.metricNameCache.GetIDByName(item.MetricName); ok {
// ml.metricLabelKeys.Add(NewMetricLabelKey(mni, item.LabelID))
ml.metricNameIDToLabelIDs.GetOrInsert(mni, mapset.NewSet(item.LabelID))
}
// if _, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
// metricNameToLabelIDs[item.MetricName] = append(metricNameToLabelIDs[item.MetricName], item.LabelID)
// }
}
ml.metricNameToLabelIDs = metricNameToLabelIDs
// ml.metricNameToLabelIDs = metricNameToLabelIDs
return nil
}

func (ml *metricLabel) load() ([]*mysql.PrometheusMetricLabel, error) {
var metricLabels []*mysql.PrometheusMetricLabel
err := mysql.Db.Find(&metricLabels).Error
err := mysql.Db.Select("metric_name", "label_id").Find(&metricLabels).Error
return metricLabels, err
}
10 changes: 10 additions & 0 deletions server/controller/prometheus/cache/metric_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

type metricName struct {
nameToID sync.Map
idToName sync.Map
}

func (mn *metricName) Get() *sync.Map {
Expand All @@ -38,9 +39,17 @@ func (mn *metricName) GetIDByName(n string) (int, bool) {
return 0, false
}

func (mn *metricName) GetNameByID(id int) (string, bool) {
if name, ok := mn.idToName.Load(id); ok {
return name.(string), true
}
return "", false
}

func (mn *metricName) Add(batch []*controller.PrometheusMetricName) {
for _, item := range batch {
mn.nameToID.Store(item.GetName(), int(item.GetId()))
mn.idToName.Store(int(item.GetId()), item.GetName())
}
}

Expand All @@ -51,6 +60,7 @@ func (mn *metricName) refresh(args ...interface{}) error {
}
for _, item := range metricNames {
mn.nameToID.Store(item.Name, item.ID)
mn.idToName.Store(item.ID, item.Name)
}
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions server/controller/prometheus/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (e *Encoder) Init(ctx context.Context, cfg *prometheuscfg.Config) {
e.labelValue = newLabelValue(cfg.ResourceMaxID1)
e.label = newLabel()
e.LabelLayout = newLabelLayout(cfg)
e.metricLabel = newMetricLabel(e.label)
e.metricLabel = newMetricLabel(e.metricName, e.label)
e.target = newTarget(cfg.ResourceMaxID1)
e.metricTarget = newMetricTarget(e.target)
e.refreshInterval = time.Duration(cfg.EncoderCacheRefreshInterval) * time.Second
Expand Down Expand Up @@ -205,11 +205,13 @@ func (e *Encoder) encodeLabel(args ...interface{}) error {
}

func (e *Encoder) encodeMetricLabel(args ...interface{}) error {
mls := args[0].([]*controller.PrometheusMetricLabelRequest)
err := e.metricLabel.encode(mls)
resp := args[0].(*controller.SyncPrometheusResponse)
metricLabels := args[1].([]*controller.PrometheusMetricLabelRequest)
mls, err := e.metricLabel.encode(metricLabels)
if err != nil {
return err
}
resp.MetricLabels = mls
return nil
}

Expand Down
Loading

0 comments on commit 25296f1

Please sign in to comment.