Skip to content

Commit

Permalink
[CONTROLLER/PROMETHEUS] optimizes memory
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya authored and SongZhen0704 committed Sep 26, 2023
1 parent 71ac308 commit 616c320
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 141 deletions.
6 changes: 6 additions & 0 deletions message/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ message PrometheusMetricLabelRequest {
repeated PrometheusLabelRequest labels = 2;
}

message PrometheusMetricLabel {
optional uint32 metric_name_id = 1;
repeated uint32 label_ids = 2;
}

message PrometheusMetricAPPLabelLayout {
required string metric_name = 1;
required string app_label_name = 2;
Expand Down Expand Up @@ -282,6 +287,7 @@ message SyncPrometheusResponse {
repeated PrometheusLabel labels = 6;
repeated PrometheusMetricTarget metric_targets = 7;
repeated PrometheusTarget targets = 8;
repeated PrometheusMetricLabel metric_labels = 9;
}

service PrometheusDebug {
Expand Down
42 changes: 15 additions & 27 deletions server/controller/prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/op/go-logging"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -67,7 +68,7 @@ func GetSingleton() *Cache {
MetricAndAPPLabelLayout: new(metricAndAPPLabelLayout),
Target: t,
Label: l,
MetricLabel: newMetricLabel(l),
MetricLabel: newMetricLabel(mn, l),
MetricTarget: newMetricTarget(mn, t),
}
})
Expand Down Expand Up @@ -216,47 +217,34 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte {
}
getLabel := func() {
temp := map[string]interface{}{
"keys": make(map[string]interface{}),
"key_to_id": make(map[string]interface{}),
"id_to_key": make(map[int]string),
}
tempCache.Label.idToKey.Range(func(key, value any) bool {
temp["key_to_id"].(map[string]interface{})[marshal(key)] = value
return true
})
tempCache.Label.idToKey.Range(func(key, value any) bool {
temp["id_to_key"].(map[int]string)[key.(int)] = marshal(value)
return true
})
for item := range tempCache.Label.keys.Iterator().C {
temp["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
if len(temp["keys"].(map[string]interface{})) > 0 ||

if len(temp["key_to_id"].(map[string]interface{})) > 0 ||
len(temp["id_to_key"].(map[int]string)) > 0 {
content["label"] = temp
}
}
getMetricLabel := func() {
temp := map[string]interface{}{
"label_cache": map[string]interface{}{
"keys": make(map[string]interface{}),
"id_to_key": make(map[int]string),
},
"metric_name_to_label_ids": make(map[string][]int),
"metric_label_detail_keys": make(map[string]interface{}),
"metric_name_id_to_label_ids": make(map[int][]int),
}
for item := range tempCache.MetricLabel.labelCache.keys.Iterator().C {
temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
tempCache.MetricLabel.labelCache.idToKey.Range(func(key, value any) bool {
temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)[key.(int)] = marshal(value)

tempCache.MetricLabel.metricNameIDToLabelIDs.Range(func(i int, s mapset.Set[int]) bool {
temp["metric_name_id_to_label_ids"].(map[int][]int)[i] = s.ToSlice()
return true
})
for k, v := range tempCache.MetricLabel.metricNameToLabelIDs {
temp["metric_name_to_label_ids"].(map[string][]int)[k] = v
}
for item := range tempCache.MetricLabel.metricLabelDetailKeys.Iterator().C {
temp["metric_label_detail_keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
if len(temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})) > 0 ||
len(temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)) > 0 ||
len(temp["metric_name_to_label_ids"].(map[string][]int)) > 0 ||
len(temp["metric_label_detail_keys"].(map[string]interface{})) > 0 {

if len(temp["metric_name_id_to_label_ids"].(map[int][]int)) > 0 {
content["metric_label"] = temp
}
}
Expand Down
25 changes: 13 additions & 12 deletions server/controller/prometheus/cache/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package cache
import (
"sync"

mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)
Expand All @@ -38,18 +36,19 @@ func NewLabelKey(name, value string) LabelKey {
}

type label struct {
keys mapset.Set[LabelKey]
idToKey sync.Map
keyToID sync.Map
}

func newLabel() *label {
return &label{
keys: mapset.NewSet[LabelKey](),
}
return &label{}
}

func (l *label) IfKeyExists(key LabelKey) bool {
return l.keys.Contains(key)
func (l *label) GetIDByKey(key LabelKey) (int, bool) {
if item, ok := l.keyToID.Load(key); ok {
return item.(int), true
}
return 0, false
}

func (l *label) GetKeyByID(id int) (LabelKey, bool) {
Expand All @@ -61,8 +60,9 @@ func (l *label) GetKeyByID(id int) (LabelKey, bool) {

func (l *label) Add(batch []*controller.PrometheusLabel) {
for _, item := range batch {
l.keys.Add(NewLabelKey(item.GetName(), item.GetValue()))
l.idToKey.Store(int(item.GetId()), NewLabelKey(item.GetName(), item.GetValue()))
k := NewLabelKey(item.GetName(), item.GetValue())
l.keyToID.Store(k, int(item.GetId()))
l.idToKey.Store(int(item.GetId()), k)
}
}

Expand All @@ -72,8 +72,9 @@ func (l *label) refresh(args ...interface{}) error {
return err
}
for _, item := range ls {
l.keys.Add(NewLabelKey(item.Name, item.Value))
l.idToKey.Store(item.ID, NewLabelKey(item.Name, item.Value))
k := NewLabelKey(item.Name, item.Value)
l.keyToID.Store(k, item.ID)
l.idToKey.Store(item.ID, k)
}
return nil
}
Expand Down
83 changes: 37 additions & 46 deletions server/controller/prometheus/cache/metric_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,85 +17,76 @@
package cache

import (
"github.com/cornelk/hashmap"
mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)

type MetricLabelDetailKey struct {
MetricName string
LabelName string
LabelValue string
}

func NewMetricLabelDetailKey(metricName, labelName, labelValue string) MetricLabelDetailKey {
return MetricLabelDetailKey{
MetricName: metricName,
LabelName: labelName,
LabelValue: labelValue,
}
}

type metricLabel struct {
labelCache *label
metricLabelDetailKeys mapset.Set[MetricLabelDetailKey] // for metric_label check
metricNameToLabelIDs map[string][]int // only for fully assembled
metricNameCache *metricName
labelCache *label

metricNameIDToLabelIDs *hashmap.Map[int, mapset.Set[int]]
}

func newMetricLabel(l *label) *metricLabel {
func newMetricLabel(mn *metricName, l *label) *metricLabel {
return &metricLabel{
labelCache: l,
metricLabelDetailKeys: mapset.NewSet[MetricLabelDetailKey](),
metricNameToLabelIDs: make(map[string][]int),
metricNameCache: mn,
labelCache: l,

metricNameIDToLabelIDs: hashmap.New[int, mapset.Set[int]](),
}
}

func (ml *metricLabel) IfKeyExists(k MetricLabelDetailKey) bool {
return ml.metricLabelDetailKeys.Contains(k)
func (ml *metricLabel) IfLinked(metricID, labelID int) bool {
if labelIDs, ok := ml.metricNameIDToLabelIDs.Get(metricID); ok {
return labelIDs.(mapset.Set[int]).Contains(labelID)
}
return false
}

func (ml *metricLabel) GetLabelsByMetricName(metricName string) []LabelKey {
var ret []LabelKey
if labelIDs, ok := ml.metricNameToLabelIDs[metricName]; ok {
for _, labelID := range labelIDs {
if labelKey, ok := ml.labelCache.GetKeyByID(labelID); ok {
ret = append(ret, labelKey)
}
}
func (ml *metricLabel) GetLabelsByMetricName(metricName string) []int {
mni, ok := ml.metricNameCache.GetIDByName(metricName)
if !ok {
log.Debugf("metric_name: %s id not found", metricName)
return nil
}
if labelIDs, ok := ml.metricNameIDToLabelIDs.Get(mni); ok {
return labelIDs.ToSlice()
}
return ret
log.Debugf("metric_name: %s label_ids not found", metricName)
return []int{}
}

func (ml *metricLabel) Add(batch []MetricLabelDetailKey) {
for _, item := range batch {
ml.metricLabelDetailKeys.Add(item)
}
func (mi *metricLabel) GetMetricNameIDToLabelIDs() *hashmap.Map[int, mapset.Set[int]] {
return mi.metricNameIDToLabelIDs
}

func (ml *metricLabel) GetMetricLabelDetailKeys() mapset.Set[MetricLabelDetailKey] {
return ml.metricLabelDetailKeys
func (ml *metricLabel) Add(batch []*controller.PrometheusMetricLabel) {
for _, item := range batch {
for _, li := range item.GetLabelIds() {
ml.metricNameIDToLabelIDs.GetOrInsert(int(item.GetMetricNameId()), mapset.NewSet(int(li)))
}
}
}

func (ml *metricLabel) refresh(args ...interface{}) error {
metricLabels, err := ml.load()
if err != nil {
return err
}
metricNameToLabelIDs := make(map[string][]int)
for _, item := range metricLabels {
if lk, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
ml.metricLabelDetailKeys.Add(NewMetricLabelDetailKey(item.MetricName, lk.Name, lk.Value))
}
if _, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
metricNameToLabelIDs[item.MetricName] = append(metricNameToLabelIDs[item.MetricName], item.LabelID)
if mni, ok := ml.metricNameCache.GetIDByName(item.MetricName); ok {
ml.metricNameIDToLabelIDs.GetOrInsert(mni, mapset.NewSet(item.LabelID))
}
}
ml.metricNameToLabelIDs = metricNameToLabelIDs
return nil
}

func (ml *metricLabel) load() ([]*mysql.PrometheusMetricLabel, error) {
var metricLabels []*mysql.PrometheusMetricLabel
err := mysql.Db.Find(&metricLabels).Error
err := mysql.Db.Select("metric_name", "label_id").Find(&metricLabels).Error
return metricLabels, err
}
10 changes: 10 additions & 0 deletions server/controller/prometheus/cache/metric_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

type metricName struct {
nameToID sync.Map
idToName sync.Map
}

func (mn *metricName) Get() *sync.Map {
Expand All @@ -38,9 +39,17 @@ func (mn *metricName) GetIDByName(n string) (int, bool) {
return 0, false
}

func (mn *metricName) GetNameByID(id int) (string, bool) {
if name, ok := mn.idToName.Load(id); ok {
return name.(string), true
}
return "", false
}

func (mn *metricName) Add(batch []*controller.PrometheusMetricName) {
for _, item := range batch {
mn.nameToID.Store(item.GetName(), int(item.GetId()))
mn.idToName.Store(int(item.GetId()), item.GetName())
}
}

Expand All @@ -51,6 +60,7 @@ func (mn *metricName) refresh(args ...interface{}) error {
}
for _, item := range metricNames {
mn.nameToID.Store(item.Name, item.ID)
mn.idToName.Store(item.ID, item.Name)
}
return nil
}
Expand Down
10 changes: 6 additions & 4 deletions server/controller/prometheus/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (e *Encoder) Init(ctx context.Context, cfg *prometheuscfg.Config) {
e.labelValue = newLabelValue(cfg.ResourceMaxID1)
e.label = newLabel()
e.LabelLayout = newLabelLayout(cfg)
e.metricLabel = newMetricLabel(e.label)
e.metricLabel = newMetricLabel(e.metricName, e.label)
e.target = newTarget(cfg.ResourceMaxID1)
e.metricTarget = newMetricTarget(e.target)
e.refreshInterval = time.Duration(cfg.EncoderCacheRefreshInterval) * time.Second
Expand Down Expand Up @@ -143,7 +143,7 @@ func (e *Encoder) Encode(req *controller.SyncPrometheusRequest) (*controller.Syn
AppendErrGroup(eg, e.encodeMetricName, resp, req.GetMetricNames())
AppendErrGroup(eg, e.encodeLabelName, resp, req.GetLabelNames())
AppendErrGroup(eg, e.encodeLabelValue, resp, req.GetLabelValues())
AppendErrGroup(eg, e.encodeMetricLabel, req.GetMetricLabels())
AppendErrGroup(eg, e.encodeMetricLabel, resp, req.GetMetricLabels())
AppendErrGroup(eg, e.encodeMetricTarget, resp, req.GetMetricTargets())
err = eg.Wait()
return resp, err
Expand Down Expand Up @@ -205,11 +205,13 @@ func (e *Encoder) encodeLabel(args ...interface{}) error {
}

func (e *Encoder) encodeMetricLabel(args ...interface{}) error {
mls := args[0].([]*controller.PrometheusMetricLabelRequest)
err := e.metricLabel.encode(mls)
resp := args[0].(*controller.SyncPrometheusResponse)
metricLabels := args[1].([]*controller.PrometheusMetricLabelRequest)
mls, err := e.metricLabel.encode(metricLabels)
if err != nil {
return err
}
resp.MetricLabels = mls
return nil
}

Expand Down
Loading

0 comments on commit 616c320

Please sign in to comment.