Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONTROLLER/PROMETHEUS] optimizes memory #4400

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions message/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ message PrometheusMetricLabelRequest {
repeated PrometheusLabelRequest labels = 2;
}

message PrometheusMetricLabel {
optional uint32 metric_name_id = 1;
repeated uint32 label_ids = 2;
}

message PrometheusMetricAPPLabelLayout {
required string metric_name = 1;
required string app_label_name = 2;
Expand Down Expand Up @@ -282,6 +287,7 @@ message SyncPrometheusResponse {
repeated PrometheusLabel labels = 6;
repeated PrometheusMetricTarget metric_targets = 7;
repeated PrometheusTarget targets = 8;
repeated PrometheusMetricLabel metric_labels = 9;
}

service PrometheusDebug {
Expand Down
42 changes: 15 additions & 27 deletions server/controller/prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/op/go-logging"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -67,7 +68,7 @@ func GetSingleton() *Cache {
MetricAndAPPLabelLayout: new(metricAndAPPLabelLayout),
Target: t,
Label: l,
MetricLabel: newMetricLabel(l),
MetricLabel: newMetricLabel(mn, l),
MetricTarget: newMetricTarget(mn, t),
}
})
Expand Down Expand Up @@ -216,47 +217,34 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte {
}
getLabel := func() {
temp := map[string]interface{}{
"keys": make(map[string]interface{}),
"key_to_id": make(map[string]interface{}),
"id_to_key": make(map[int]string),
}
tempCache.Label.idToKey.Range(func(key, value any) bool {
temp["key_to_id"].(map[string]interface{})[marshal(key)] = value
return true
})
tempCache.Label.idToKey.Range(func(key, value any) bool {
temp["id_to_key"].(map[int]string)[key.(int)] = marshal(value)
return true
})
for item := range tempCache.Label.keys.Iterator().C {
temp["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
if len(temp["keys"].(map[string]interface{})) > 0 ||

if len(temp["key_to_id"].(map[string]interface{})) > 0 ||
len(temp["id_to_key"].(map[int]string)) > 0 {
content["label"] = temp
}
}
getMetricLabel := func() {
temp := map[string]interface{}{
"label_cache": map[string]interface{}{
"keys": make(map[string]interface{}),
"id_to_key": make(map[int]string),
},
"metric_name_to_label_ids": make(map[string][]int),
"metric_label_detail_keys": make(map[string]interface{}),
"metric_name_id_to_label_ids": make(map[int][]int),
}
for item := range tempCache.MetricLabel.labelCache.keys.Iterator().C {
temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
tempCache.MetricLabel.labelCache.idToKey.Range(func(key, value any) bool {
temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)[key.(int)] = marshal(value)

tempCache.MetricLabel.metricNameIDToLabelIDs.Range(func(i int, s mapset.Set[int]) bool {
temp["metric_name_id_to_label_ids"].(map[int][]int)[i] = s.ToSlice()
return true
})
for k, v := range tempCache.MetricLabel.metricNameToLabelIDs {
temp["metric_name_to_label_ids"].(map[string][]int)[k] = v
}
for item := range tempCache.MetricLabel.metricLabelDetailKeys.Iterator().C {
temp["metric_label_detail_keys"].(map[string]interface{})[marshal(item)] = struct{}{}
}
if len(temp["label_cache"].(map[string]interface{})["keys"].(map[string]interface{})) > 0 ||
len(temp["label_cache"].(map[string]interface{})["id_to_key"].(map[int]string)) > 0 ||
len(temp["metric_name_to_label_ids"].(map[string][]int)) > 0 ||
len(temp["metric_label_detail_keys"].(map[string]interface{})) > 0 {

if len(temp["metric_name_id_to_label_ids"].(map[int][]int)) > 0 {
content["metric_label"] = temp
}
}
Expand Down
25 changes: 13 additions & 12 deletions server/controller/prometheus/cache/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package cache
import (
"sync"

mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)
Expand All @@ -38,18 +36,19 @@ func NewLabelKey(name, value string) LabelKey {
}

type label struct {
keys mapset.Set[LabelKey]
idToKey sync.Map
keyToID sync.Map
}

func newLabel() *label {
return &label{
keys: mapset.NewSet[LabelKey](),
}
return &label{}
}

func (l *label) IfKeyExists(key LabelKey) bool {
return l.keys.Contains(key)
func (l *label) GetIDByKey(key LabelKey) (int, bool) {
if item, ok := l.keyToID.Load(key); ok {
return item.(int), true
}
return 0, false
}

func (l *label) GetKeyByID(id int) (LabelKey, bool) {
Expand All @@ -61,8 +60,9 @@ func (l *label) GetKeyByID(id int) (LabelKey, bool) {

func (l *label) Add(batch []*controller.PrometheusLabel) {
for _, item := range batch {
l.keys.Add(NewLabelKey(item.GetName(), item.GetValue()))
l.idToKey.Store(int(item.GetId()), NewLabelKey(item.GetName(), item.GetValue()))
k := NewLabelKey(item.GetName(), item.GetValue())
l.keyToID.Store(k, int(item.GetId()))
l.idToKey.Store(int(item.GetId()), k)
}
}

Expand All @@ -72,8 +72,9 @@ func (l *label) refresh(args ...interface{}) error {
return err
}
for _, item := range ls {
l.keys.Add(NewLabelKey(item.Name, item.Value))
l.idToKey.Store(item.ID, NewLabelKey(item.Name, item.Value))
k := NewLabelKey(item.Name, item.Value)
l.keyToID.Store(k, item.ID)
l.idToKey.Store(item.ID, k)
}
return nil
}
Expand Down
83 changes: 37 additions & 46 deletions server/controller/prometheus/cache/metric_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,85 +17,76 @@
package cache

import (
"github.com/cornelk/hashmap"
mapset "github.com/deckarep/golang-set/v2"

"github.com/deepflowio/deepflow/message/controller"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)

type MetricLabelDetailKey struct {
MetricName string
LabelName string
LabelValue string
}

func NewMetricLabelDetailKey(metricName, labelName, labelValue string) MetricLabelDetailKey {
return MetricLabelDetailKey{
MetricName: metricName,
LabelName: labelName,
LabelValue: labelValue,
}
}

type metricLabel struct {
labelCache *label
metricLabelDetailKeys mapset.Set[MetricLabelDetailKey] // for metric_label check
metricNameToLabelIDs map[string][]int // only for fully assembled
metricNameCache *metricName
labelCache *label

metricNameIDToLabelIDs *hashmap.Map[int, mapset.Set[int]]
}

func newMetricLabel(l *label) *metricLabel {
func newMetricLabel(mn *metricName, l *label) *metricLabel {
return &metricLabel{
labelCache: l,
metricLabelDetailKeys: mapset.NewSet[MetricLabelDetailKey](),
metricNameToLabelIDs: make(map[string][]int),
metricNameCache: mn,
labelCache: l,

metricNameIDToLabelIDs: hashmap.New[int, mapset.Set[int]](),
}
}

func (ml *metricLabel) IfKeyExists(k MetricLabelDetailKey) bool {
return ml.metricLabelDetailKeys.Contains(k)
func (ml *metricLabel) IfLinked(metricID, labelID int) bool {
if labelIDs, ok := ml.metricNameIDToLabelIDs.Get(metricID); ok {
return labelIDs.(mapset.Set[int]).Contains(labelID)
}
return false
}

func (ml *metricLabel) GetLabelsByMetricName(metricName string) []LabelKey {
var ret []LabelKey
if labelIDs, ok := ml.metricNameToLabelIDs[metricName]; ok {
for _, labelID := range labelIDs {
if labelKey, ok := ml.labelCache.GetKeyByID(labelID); ok {
ret = append(ret, labelKey)
}
}
func (ml *metricLabel) GetLabelsByMetricName(metricName string) []int {
mni, ok := ml.metricNameCache.GetIDByName(metricName)
if !ok {
log.Debugf("metric_name: %s id not found", metricName)
return nil
}
if labelIDs, ok := ml.metricNameIDToLabelIDs.Get(mni); ok {
return labelIDs.ToSlice()
}
return ret
log.Debugf("metric_name: %s label_ids not found", metricName)
return []int{}
}

func (ml *metricLabel) Add(batch []MetricLabelDetailKey) {
for _, item := range batch {
ml.metricLabelDetailKeys.Add(item)
}
func (mi *metricLabel) GetMetricNameIDToLabelIDs() *hashmap.Map[int, mapset.Set[int]] {
return mi.metricNameIDToLabelIDs
}

func (ml *metricLabel) GetMetricLabelDetailKeys() mapset.Set[MetricLabelDetailKey] {
return ml.metricLabelDetailKeys
func (ml *metricLabel) Add(batch []*controller.PrometheusMetricLabel) {
for _, item := range batch {
for _, li := range item.GetLabelIds() {
ml.metricNameIDToLabelIDs.GetOrInsert(int(item.GetMetricNameId()), mapset.NewSet(int(li)))
}
}
}

func (ml *metricLabel) refresh(args ...interface{}) error {
metricLabels, err := ml.load()
if err != nil {
return err
}
metricNameToLabelIDs := make(map[string][]int)
for _, item := range metricLabels {
if lk, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
ml.metricLabelDetailKeys.Add(NewMetricLabelDetailKey(item.MetricName, lk.Name, lk.Value))
}
if _, ok := ml.labelCache.GetKeyByID(item.LabelID); ok {
metricNameToLabelIDs[item.MetricName] = append(metricNameToLabelIDs[item.MetricName], item.LabelID)
if mni, ok := ml.metricNameCache.GetIDByName(item.MetricName); ok {
ml.metricNameIDToLabelIDs.GetOrInsert(mni, mapset.NewSet(item.LabelID))
}
}
ml.metricNameToLabelIDs = metricNameToLabelIDs
return nil
}

func (ml *metricLabel) load() ([]*mysql.PrometheusMetricLabel, error) {
var metricLabels []*mysql.PrometheusMetricLabel
err := mysql.Db.Find(&metricLabels).Error
err := mysql.Db.Select("metric_name", "label_id").Find(&metricLabels).Error
return metricLabels, err
}
10 changes: 10 additions & 0 deletions server/controller/prometheus/cache/metric_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

type metricName struct {
nameToID sync.Map
idToName sync.Map
}

func (mn *metricName) Get() *sync.Map {
Expand All @@ -38,9 +39,17 @@ func (mn *metricName) GetIDByName(n string) (int, bool) {
return 0, false
}

func (mn *metricName) GetNameByID(id int) (string, bool) {
if name, ok := mn.idToName.Load(id); ok {
return name.(string), true
}
return "", false
}

func (mn *metricName) Add(batch []*controller.PrometheusMetricName) {
for _, item := range batch {
mn.nameToID.Store(item.GetName(), int(item.GetId()))
mn.idToName.Store(int(item.GetId()), item.GetName())
}
}

Expand All @@ -51,6 +60,7 @@ func (mn *metricName) refresh(args ...interface{}) error {
}
for _, item := range metricNames {
mn.nameToID.Store(item.Name, item.ID)
mn.idToName.Store(item.ID, item.Name)
}
return nil
}
Expand Down
10 changes: 6 additions & 4 deletions server/controller/prometheus/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (e *Encoder) Init(ctx context.Context, cfg *prometheuscfg.Config) {
e.labelValue = newLabelValue(cfg.ResourceMaxID1)
e.label = newLabel()
e.LabelLayout = newLabelLayout(cfg)
e.metricLabel = newMetricLabel(e.label)
e.metricLabel = newMetricLabel(e.metricName, e.label)
e.target = newTarget(cfg.ResourceMaxID1)
e.metricTarget = newMetricTarget(e.target)
e.refreshInterval = time.Duration(cfg.EncoderCacheRefreshInterval) * time.Second
Expand Down Expand Up @@ -143,7 +143,7 @@ func (e *Encoder) Encode(req *controller.SyncPrometheusRequest) (*controller.Syn
AppendErrGroup(eg, e.encodeMetricName, resp, req.GetMetricNames())
AppendErrGroup(eg, e.encodeLabelName, resp, req.GetLabelNames())
AppendErrGroup(eg, e.encodeLabelValue, resp, req.GetLabelValues())
AppendErrGroup(eg, e.encodeMetricLabel, req.GetMetricLabels())
AppendErrGroup(eg, e.encodeMetricLabel, resp, req.GetMetricLabels())
AppendErrGroup(eg, e.encodeMetricTarget, resp, req.GetMetricTargets())
err = eg.Wait()
return resp, err
Expand Down Expand Up @@ -205,11 +205,13 @@ func (e *Encoder) encodeLabel(args ...interface{}) error {
}

func (e *Encoder) encodeMetricLabel(args ...interface{}) error {
mls := args[0].([]*controller.PrometheusMetricLabelRequest)
err := e.metricLabel.encode(mls)
resp := args[0].(*controller.SyncPrometheusResponse)
metricLabels := args[1].([]*controller.PrometheusMetricLabelRequest)
mls, err := e.metricLabel.encode(metricLabels)
if err != nil {
return err
}
resp.MetricLabels = mls
return nil
}

Expand Down
Loading
Loading