Skip to content

Commit

Permalink
[querier] update cache
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Sep 6, 2023
1 parent c05df96 commit e60b96a
Show file tree
Hide file tree
Showing 6 changed files with 708 additions and 45 deletions.
114 changes: 84 additions & 30 deletions server/querier/app/prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"math"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -47,6 +48,7 @@ func (c *CacheItem) Size() uint64 {
return unsafeSize(c.data)
}

// it should called under Lock
func (c *CacheItem) replace(d *common.Result) bool {
new_size := unsafeSize(d)
if new_size <= config.Cfg.Prometheus.Cache.CacheItemSize {
Expand All @@ -61,8 +63,11 @@ func (c *CacheItem) replace(d *common.Result) bool {
}

func (c *CacheItem) Hit(start int64, end int64) CacheHit {
c.rwLock.RLock()
defer c.rwLock.RUnlock()

// outside cache: cache miss
if end < c.startTime || start > c.endTime {
if end <= c.startTime || start >= c.endTime {
return CacheMiss
}

Expand All @@ -78,19 +83,21 @@ func (c *CacheItem) Hit(start int64, end int64) CacheHit {
}

// deviation: fix up start time, cache hit right
if end <= c.endTime && end > c.startTime {
if end <= c.endTime && end > c.startTime && start < c.startTime {
return CacheHitPart
}

// deviation: fix up end time, cache hit left
if start < c.endTime && start >= c.startTime {
if start < c.endTime && start >= c.startTime && end > c.endTime {
return CacheHitPart
}
return CacheMiss
}

func (c *CacheItem) Deviation(start int64, end int64) (int64, int64) {
// only cache hit part needs to re-calculate deviation
c.rwLock.RLock()
defer c.rwLock.RUnlock()

// both side out of cache, query all
if start < c.startTime && end > c.endTime {
Expand All @@ -99,17 +106,17 @@ func (c *CacheItem) Deviation(start int64, end int64) (int64, int64) {

// if the deviation between start & c.start > maxAllowDeviation, directy query all data to replace cache
// add left data
if end <= c.endTime && end > c.startTime {
if math.Abs(float64(c.startTime-start)) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
if end <= c.endTime && end > c.startTime && start < c.startTime {
if math.Abs(float64(c.startTime-start)/1000) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
return start, end
} else {
return start, c.startTime
}
}

// add right data
if start < c.endTime && start >= c.startTime {
if math.Abs(float64(c.endTime-end)) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
if start < c.endTime && start >= c.startTime && end > c.endTime {
if math.Abs(float64(c.endTime-end)/1000) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
return start, end
} else {
return c.endTime, end
Expand All @@ -120,12 +127,17 @@ func (c *CacheItem) Deviation(start int64, end int64) (int64, int64) {
}

func (c *CacheItem) Data() *common.Result {
c.rwLock.RLock()
defer c.rwLock.RUnlock()

return c.data
}

func (c *CacheItem) Merge(start, end int64, data *common.Result) (*common.Result, bool) {
if data == nil || len(data.Values) == 0 {
return data, true
func (c *CacheItem) MergeCache(start, end int64, cache *common.Result, query *common.Result) (*common.Result, bool) {
log.Debugf("cache merged, query range: [%d-%d], cache range: [%d-%d]", start, end, c.startTime, c.endTime)
if query == nil || len(query.Values) == 0 {
log.Debugf("cache data is nil: %v, query data is nil: %v", cache == nil, query == nil)
return cache, true
}

// re-calculate cache time, because other session may already update cache
Expand All @@ -134,48 +146,84 @@ func (c *CacheItem) Merge(start, end int64, data *common.Result) (*common.Result

if start >= c.startTime && end <= c.endTime {
// not merge
log.Debugf("cache not merge, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
return c.data, true
log.Debugf("cache full hit, will not merge, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
return cache, true
}
// no matter c.replace(data) success or not, we should return data
// extern both side

// why need to extern left/right here: because other session may already update cache item during sql query
// so we should re-calculate cache time range here
// but the `data` merge into cache not completely equals to `data` back to api call

// extern both side
if start < c.startTime && end > c.endTime {
log.Debugf("cache extern both side, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
c.startTime = start
c.endTime = end
return data, c.replace(data)
return query, c.replace(query)
}

mergeResult := &common.Result{Columns: query.Columns, Schemas: query.Schemas}
// extern left
if end <= c.endTime && end > c.startTime {
// cached: [0, N]
// replaced: [-X, Y] (0<Y<=N, X<0)
if end <= c.endTime && end > c.startTime && start < c.startTime {
if math.Abs(float64(c.startTime-start)/1000) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
log.Debugf("cache replace due to deviation too large, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
c.startTime = start
c.endTime = end
// in deviation case, will query all data [-X,Y], not only extern data [-X,0]
// replace cache data with query data
mergeResult.Values = query.Values
} else {
log.Debugf("cache merge extern left, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
c.startTime = start
data.Values = append(data.Values, c.data.Values...)
// in not deviation case, query data [-X,0], merge into [-X,N]
// note that `Values` is order by time DESC, so extern `left` should append into right
mergeResult.Values = append(cache.Values, query.Values...)
}

if len(mergeResult.Values) > 0 {
fv := mergeResult.Values[0].([]interface{})
lv := mergeResult.Values[len(mergeResult.Values)-1].([]interface{})
if len(fv) > 0 && len(lv) > 0 {
log.Debugf("merge result, data range [%v-%v]", lv[0], fv[0])
}
}
return data, c.replace(data)

return mergeResult, c.replace(mergeResult)
}

// extern right
if start < c.endTime && start >= c.startTime {
// cached: [0, N]
// replaced: [X,Y] (0<=X<N, Y>N)
if start < c.endTime && start >= c.startTime && end > c.endTime {
if math.Abs(float64(c.endTime-end)/1000) > config.Cfg.Prometheus.Cache.CacheMaxAllowDeviation {
log.Debugf("cache replace due to deviation too large, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
c.startTime = start
c.endTime = end
// in deviation case, will query all data [X,Y], not only extern data [N,Y]
// replace cache data with query data
mergeResult.Values = query.Values
} else {
log.Debugf("cache merge extern right, cache: [%d-%d], query: [%d-%d]", c.startTime, c.endTime, start, end)
c.endTime = end
data.Values = append(c.data.Values, data.Values...)
// in not deviation case, query data [N,Y], merge into [0,Y]
// note that `Values` is order by time DESC, so extern `right` should append into left
mergeResult.Values = append(query.Values, cache.Values...)
}

if len(mergeResult.Values) > 0 {
fv := mergeResult.Values[0].([]interface{})
lv := mergeResult.Values[len(mergeResult.Values)-1].([]interface{})
if len(fv) > 0 && len(lv) > 0 {
log.Debugf("merge result, data range [%v-%v]", lv[0], fv[0])
}
}
return data, c.replace(data)

return mergeResult, c.replace(mergeResult)
}

return data, true
return query, true
}

type RemoteReadQueryCache struct {
Expand Down Expand Up @@ -204,26 +252,27 @@ func NewRemoteReadQueryCache() *RemoteReadQueryCache {
return s
}

func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, data *common.Result, item *CacheItem) *common.Result {
func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, item *CacheItem, cache *common.Result, query *common.Result) *common.Result {
if req == nil || len(req.Queries) == 0 {
return nil
return cache
}
q := req.Queries[0]
if q.Hints.Func == "series" {
return data
return query
}

key, _, start, end := promRequestToCacheKey(q)
start, end = timeAlign(start, end)
start = timeAlign(start)
if item == nil {
// cache miss
s.cache.Add(key, &CacheItem{startTime: start, endTime: end, data: data, rwLock: &sync.RWMutex{}})
return data
item = &CacheItem{startTime: start, endTime: end, data: query, rwLock: &sync.RWMutex{}}
s.cache.Add(key, item)
return query
} else {
// cache hit, merge data
atomic.AddUint64(&s.counter.Stats.CacheMerge, 1)
t1 := time.Now()
result, ok := item.Merge(start, end, data)
result, ok := item.MergeCache(start, end, cache, query)
d := time.Since(t1)
atomic.AddUint64(&s.counter.Stats.CacheMergeDuration, uint64(d.Seconds()))
if !ok {
Expand Down Expand Up @@ -255,7 +304,12 @@ func (s *RemoteReadQueryCache) Get(req *prompb.ReadRequest) (*CacheItem, CacheHi

// for query api, cache query samples
key, metric, start, end := promRequestToCacheKey(q)
start, end = timeAlign(start, end)
if strings.Contains(metric, "__") {
// for DeepFlow Native metrics, don't use cache
return nil, CacheMiss, metric, start, end
}

start = timeAlign(start)
item, ok := s.cache.Get(key)
if !ok {
atomic.AddUint64(&s.counter.Stats.CacheMiss, 1)
Expand Down
Loading

0 comments on commit e60b96a

Please sign in to comment.